python的五大特点:1.简单易学,开发程序时,专注的是解决问题,而不是搞明白语言本身。2.面向对象,与其他主要的语言如C++和Java相比, Python以一种非常强大又简单的方式实现面向对象编程。3.可移植性,Python程序无需修改就可以在各种平台上运行。4.解释性,Python语言写的程序不需要编译成二进制代码,可以直接从源代码运行程序。5.开源,Python是 FLOSS(自由/开放源码软件)之一。
此端口扫描器的源码,文档及详细调用方法见Github PythonPortScanner by Yaokai。
具体实现I. 利用TCP握手连接扫描一个给定的(ip,port)地址对
为了实现端口扫描,我们首先明白如何使用python socket
与给定的(ip, port)
进行TCP握手。为了完成TCP握手,我们需要先初始化一个TCP socket。在python
中新建一个TCP socket的代码如下:
TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #(1) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) #(2) TCP_sock.settimeout(delay) #(3)
参数表示IPv4 socket
参数表示TCP socket
try: result = TCP_sock.connect_ex((ip, int(port_number))) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except socket.error as e: output[port_number] = 'CLOSE' pass
方法来关闭与远程端口之间的TCP连接。否则的话我们的扫描操作可能会引起所谓的TCP连接悬挂问题(Hanging TCP connection)。
""" Perform status checking for a given port on a given ip address using TCP handshake Keyword arguments: ip -- the ip address that is being scanned port_number -- the port that is going to be checked delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') """ def __TCP_connect(ip, port_number, delay, output): # Initilize the TCP socket object TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) TCP_sock.settimeout(delay) try: result = TCP_sock.connect_ex((ip, int(port_number))) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except socket.error as e: output[port_number] = 'CLOSE' pass
II. 多线程扫描端口
复制代码 代码如下:
__port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563,...]
1. 取出一个端口
2. 新建一条线程,利用__TCP_connect()
3. 调用thread.start()
4. 重复这个过程直到所有的端口都被扫描过。
""" Open multiple threads to perform port scanning Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') """ def __scan_ports_helper(ip, delay, output): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(__port_list): # Ensure that the number of cocurrently running threads does not exceed the thread limit while threading.activeCount() < __thread_limit and port_index < len(__port_list): # Start threads thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output)) thread.start() # lock the thread until all threads complete thread.join() port_index = port_index + 1
其中__thread_limit参数是用来限制线程数目的。output是一个字典,以(port: status)
III. 多线程扫描多个网站
def __scan_ports_helper(ip, delay, output): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(__port_list): # Ensure that the number of cocurrently running threads does not exceed the thread limit while threading.activeCount() < __thread_limit and port_index < len(__port_list): # Start threads thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output)) thread.start() port_index = port_index + 1 while (len(output) < len(self.target_ports)): continue
""" Controller of the __scan_ports_helper() function Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout """ def __scan_ports(websites, output_ip, delay): scan_result = {} for website in websites: website = str(website) scan_result[website] = {} thread = threading.Thread(target = __scan_ports_helper, args = (ip, delay, scan_result[website])) thread.start() # lock the script until all threads complete thread.join() return scan_result
IV. 总结!利用这些代码扫描给定网站并输出结果
import sys import subprocess import socket import threading import time class PortScanner: # default ports to be scanned # or put any ports you want to scan here! __port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563] # default thread number limit __thread_limit = 1000 # default connection timeout time inseconds __delay = 10 """ Constructor of a PortScanner object Keyword arguments: target_ports -- the list of ports that is going to be scanned (default self.__port_list) """ def __init__(self, target_ports = None): # If target ports not given in the arguments, use default ports # If target ports is given in the arguments, use given port lists if target_ports is None: self.target_ports = self.__port_list else: self.target_ports = target_ports """ Return the usage information for invalid input host name. """ def __usage(self): print('python Port Scanner v0.1') print('please make sure the input host name is in the form of "something.com" or "http://something.com!"\n') """ This is the function need to be called to perform port scanning Keyword arguments: host_name -- the hostname that is going to be scanned message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def scan(self, host_name, message = ''): if 'http://' in host_name or 'https://' in host_name: host_name = host_name[host_name.find('://') + 3 : ] print('*' * 60 + '\n') print('start scanning website: ' + str(host_name)) try: server_ip = socket.gethostbyname(str(host_name)) print('server ip is: ' + str(server_ip)) except socket.error as e: # If the DNS resolution of a website cannot be finished, abort that website. #print(e) print('hostname %s unknown!!!' % host_name) self.__usage() return {} # May need to return specificed values to the DB in the future start_time = time.time() output = self.__scan_ports(server_ip, self.__delay, message) stop_time = time.time() print('host %s scanned in %f seconds' %(host_name, stop_time - start_time)) print('finish scanning!\n') return output """ Set the maximum number of thread for port scanning Keyword argument: num -- the maximum number of thread running concurrently (default 1000) """ def set_thread_limit(self, num): num = int(num) if num <= 0 or num > 50000: print('Warning: Invalid thread number limit! Please make sure the thread limit is within the range of (1, 50,000)!') print('The scanning process will use default thread limit!') return self.__thread_limit = num """ Set the time out delay for port scanning in seconds Keyword argument: delay -- the time in seconds that a TCP socket waits until timeout (default 10) """ def set_delay(self, delay): delay = int(delay) if delay <= 0 or delay > 100: print('Warning: Invalid delay value! Please make sure the input delay is within the range of (1, 100)') print('The scanning process will use the default delay time') return self.__delay = delay """ Print out the list of ports being scanned """ def show_target_ports(self): print ('Current port list is:') print (self.target_ports) """ Print out the delay in seconds that a TCP socket waits until timeout """ def show_delay(self): print ('Current timeout delay is :%d' %(int(self.__delay))) """ Open multiple threads to perform port scanning Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __scan_ports_helper(self, ip, delay, output, message): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(self.target_ports): # Ensure that the number of cocurrently running threads does not exceed the thread limit while threading.activeCount() < self.__thread_limit and port_index < len(self.target_ports): # Start threads thread = threading.Thread(target = self.__TCP_connect, args = (ip, self.target_ports[port_index], delay, output, message)) thread.start() port_index = port_index + 1 """ Controller of the __scan_ports_helper() function Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __scan_ports(self, ip, delay, message): output = {} thread = threading.Thread(target = self.__scan_ports_helper, args = (ip, delay, output, message)) thread.start() # Wait until all port scanning threads finished while (len(output) < len(self.target_ports)): continue # Print openning ports from small to large for port in self.target_ports: if output[port] == 'OPEN': print(str(port) + ': ' + output[port] + '\n') return output """ Perform status checking for a given port on a given ip address using TCP handshake Keyword arguments: ip -- the ip address that is being scanned port_number -- the port that is going to be checked delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __TCP_connect(self, ip, port_number, delay, output, message): # Initilize the TCP socket object TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) TCP_sock.settimeout(delay) # Initilize a UDP socket to send scanning alert message if there exists an non-empty message if message != '': UDP_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) UDP_sock.sendto(str(message), (ip, int(port_number))) try: result = TCP_sock.connect_ex((ip, int(port_number))) if message != '': TCP_sock.sendall(str(message)) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except socket.error as e: output[port_number] = 'CLOSE' pass