端口扫描工具
发布于 2 年前 阅读权限 无需登录 作者 bing 4755 次浏览 来自 开源项目

使用协程并发任务

使用200协程扫描端口,大约需要35秒扫完65535
#!/user/bin python
# -*- coding:utf-8 -*- 
# Author:Bing
# Contact:[email protected]
# DateTime: 2017-01-17 19:06:06
# Description:  coding 


import sys
sys.path.append("..")

import gevent
from gevent import monkey
from gevent.pool import Pool
monkey.patch_all()
import socket,os,time

#判断是否为域名
def is_domain(domain):
    domain_regex = re.compile(
        r'(?:[A-Z0-9_](?:[A-Z0-9-_]{0,247}[A-Z0-9])?\.)+(?:[A-Z]{2,6}|[A-Z0-9-]{2,}(?<!-))\Z', re.IGNORECASE)
    return True if domain_regex.match(domain) else False

#判断是否为ip
def is_host(host):
    ip_regex = re.compile(r'(^(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])$)', re.IGNORECASE)
    return True if ip_regex.match(host) else False
	
def nessus_target_check(test):
	if is_domain(test) or is_host(test) :
		return str(test)
	else :
		return False


class Work(object):
    def __init__(self, scan_id = "", scan_target = "", scan_type = "" ,scan_args = "", back_fn = None):
        self.pool = Pool(200)
        self.timeout = 0.1

        self.scan_id = scan_id
        self.target = scan_target
        self.scan_type = scan_type
        self.args = scan_args
        self.back_fn = back_fn
        self.result = []        

    def get_port_service(self,text):
        service_path = "nmap-services.txt"		#这个地方需要修改为你自己nmap指纹的地址
        port_server = str(text)+"/tcp"
        with open(service_path,"r") as server:
            for finger in server.readlines():
                port = finger.strip().split(";")[1]
                if port == port_server:
                    fingers = str(finger.strip().split(";")[0])
                    return (port_server,fingers)
            return (port_server,"unknown")


    def port_scan(self,port):
        target = nessus_target_check(self.target)
        if target == False :
            return { "status" : 2 , "data" : "NMAP >>>> :格式错误" }

        try:
            sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sd.settimeout(self.timeout)
            try:
                sd.connect((target,int(port)))
                self.result.append(self.get_port_service(port))
            except socket.error:
                pass    
            sd.close()
        except:
            pass

    def run(self):
        res = []
        for port in range(65535):
            res.append(port)
        self.pool.map(self.port_scan,res)
        data = []
        for line in self.result:
            data.append({ "bug_name" : str(line[0]) ,"bug_summary" : str(line[1]) }) 
        result = { "status" : 1 , "data" : data , "scan_id": self.scan_id , "scan_type": "nmap" }
        self.back_fn(result)
		
def save(nmap_result):
    print nmap_result,"----------------"

t = Work(scan_target = "127.0.0.1",back_fn = save)
t.run()

使用多进程

使用200进程扫描端口,大约需要160多秒扫完65535
#!/user/bin python
# -*- coding:utf-8 -*- 
# Author:Bing
# Contact:[email protected]
# DateTime: 2017-01-17 19:06:06
# Description:  coding 

import socket
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool


remote_server_ip = "www.baidu.com"
ports = []
 
socket.setdefaulttimeout(0.5)
 
def scan_port(port):
    try:
        s = socket.socket(2,1)
        res = s.connect_ex((remote_server_ip,port))
        if res == 0: # 如果端口开启 发送 hello 获取banner
            print 'Port {}: OPEN'.format(port)
        s.close()
    except Exception,e:
        print str(e.message)
 
 
 
for i in range(1,65535):
    ports.append(i)
 
# Check what time the scan started
t1 = datetime.now()
 
 
pool = ThreadPool(processes = 200)
results = pool.map(scan_port,ports)
pool.close()
pool.join()
 
print 'Multiprocess Scanning Completed in  ', datetime.now() - t1

使用多线程

使用200线程扫描端口,大约需要2多秒扫完65535
import sys
sys.path.append("..")

import threading, socket, sys, cmd, os, Queue
from core.settings import *

#线程锁
lock = threading.Lock()

#制作扫描端口队列
def GetQueue(host):
    PortQueue = Queue.Queue()
    for port in range(1,65535):
        PortQueue.put((host,port))
    return PortQueue

class ScanThread(threading.Thread):
    def __init__(self,SingleQueue,outip):
        threading.Thread.__init__(self)
        self.setDaemon(True)		#设置后台运行,让join结束
        self.SingleQueue = SingleQueue
        self.outip = outip

    def get_port_service(self,text):
        service_path = "nmap-services.txt"		#这个地方需要修改为你自己nmap指纹的地址
        port_server = str(text)+"/tcp"
        with open(service_path,"r") as server:
            for finger in server.readlines():
                port = finger.strip().split(";")[1]
                if port == port_server:
                    fingers = str(finger.strip().split(";")[0])
                    return (port_server,fingers)
            return (port_server,"unknown")

    def Ping(self,scanIP, Port):
        global OpenPort, lock
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(0.1)
        address = (scanIP, Port)
        try:
            sock.connect(address)
        except:
            sock.close()
            return False
        sock.close()
        if lock.acquire():
            #print "IP:%s  Port:%d" % (scanIP, Port)
            self.outip.put(self.get_port_service(Port))
            lock.release()
        return True

    def run(self):
        while not self.SingleQueue.empty():
        	#获取扫描队列,并扫描
            host,port = self.SingleQueue.get()
            self.Ping(host,port)


class Work(object):
	def __init__(self, scan_id = "", scan_target = "", scan_type = "" ,scan_args = "", back_fn = None):
	    self.scan_id = scan_id
	    self.target = scan_target
	    self.scan_type = scan_type
	    self.args = scan_args
	    self.back_fn = back_fn
	    self.result = []        

	def run(self):
	    ThreadList = []
	    #扫描队列
	    SingleQueue = GetQueue(self.target)
	    #存储结果队列
	    resultQueue = Queue.Queue()
	    #启动200线程并发
	    for i in range(0, 200):
	        t = ScanThread(SingleQueue,resultQueue)
	        ThreadList.append(t)
	    for t in ThreadList:
	        t.start()
	    for t in ThreadList:
	        #需要设置线程为后台,然后没法结束;join等待结束后台线程
	        t.join(0.1)

	    data = []
	    while not resultQueue.empty():
	        line = resultQueue.get() 
	        data.append({ "bug_name" : str(line[0]) ,"bug_summary" : str(line[1]) }) 
	    result = { "status" : 1 , "data" : data , "scan_id": self.scan_id , "scan_type": "nmap" }
	    self.back_fn(result)

def save(nmap_result):
    print nmap_result,"----------------"

t = Work(scan_target = "127.0.0.1",back_fn = save)
t.run()
26 回复

多线程的用着好爽!

分享是一种美德

社区有专门的开源项目的账号,大家觉得不错的话可以 @admin 添加到社区的开源项目里面。

这个在内网里面用怕是要被发现哦

为何多线程比协程还快?并发还是golang比较快。

我多线程扫一个C段,几分钟后网就爆炸了。

感谢楼主分享

代码乍一看还行,仔细一看问题还是很大啊, 你把任务timeout 设置成了0.1 什么鬼, 协程 你用socket句柄设置timeout 意义不大,正确应该用gevent的timeout, 而且pool.map 不是协程。应该用pool.spawn

快是挺快的,就是不太准确

好快的速度,需要控制适当的速率

感谢分享,mark

效率和准确性果然很难兼顾啊

恩,是挺快的

多线程 容易爆炸啊

动作太大了。

谢谢分享,更喜欢直观的gui版

是个学习python多线程,多进程,协程并发的好模板。

python多线程,学习了

学习了感谢

学习了感谢

学习了,感谢

学习了,感谢

200线程?一般的路由器或交换机直接会卡死的。 还是建议自己调整线程吧

@longxiaowu 其实根本没扫完65535个端口,难怪那么快。。。

回到顶部