记 fofa 爬虫工具开发篇

记 fofa 爬虫工具开发篇

前沿

开发这个工具的原因单纯是在 大一(目前已经大三了)的时候 ,在用 fofa 搜索时,只能 搜索到 前 5 页,而且 github 上也找不到一个好点的爬虫,全是需要

会员的,我就想着弄一个 在不同城市之前收集目标,这样不仅仅 只有 50 条了 (如下图),每个国家 的每个城市分别爬取 50 条那么 一条查询语句至少,得有 500 条吧 也是受到一款工具的启发(那个大哥的博客找不到了)而且 fofa 的域名更改后,就找不到一个像样的爬虫工具了,要么需要手动 要么 只能爬取 5 页,真 low 所以基于这种现实问题开发了一个爬虫工具(现在高级会员可以直接下载 10000 条哈哈哈哈,噶,嗯 这个工具如果开注册会员的话 完美代替高级会员,如果你钱多当我没说,哈哈哈)那么接下来就开搞

image-20220619002905702** **

爬取的平台

目前有两个 fofaapp、fofaapi(这个写的太 low 了。。。。还是用别人写好了的吧 我这里给个链接 fofa_viewer还是挺牛逼了别的不说)fofaapp 这个呢 就是面向 没钱的小朋友哈,我就属于其中的小朋友,所以我就有的摆了,来来来 下面说重点了

步骤

1. 爬取个人网页

我的思路跟上面前沿介绍的需求差不多嘛,先 请求一次 检查一下 cookies 的正确性,请求https://fofa.info/personalData把相关的信息拿到 主要的信息就是用户组(没钱的组)

 def getUserinfo(self):
         """
         :return: 个人信息
         """
         userinfo = {}
         requrl = self.url + '/personalData'
         soup = self.getDriverSoup(requrl, True)
         tags = soup.find_all(name="div", attrs={"class": "personList"})
         apiTag = soup.find(name="span", attrs={"class": "apikeynumber"})
         try:
             if tags and apiTag:
                 userinfo['username'] = tags[1].contents[2].string
                 userinfo['useremail'] = tags[2].contents[2].string
                 userinfo['usergroup'] = tags[4].contents[2].string
                 userinfo['userApikey'] = apiTag.string.strip()
             if (tags is None) or \
                     (userinfo['username'] is None) or \
                     (userinfo['useremail'] is None) or \
                     (userinfo['usergroup'] is None) or \
                     (userinfo['userApikey'] is None):
                 raise
         except:
             loguru.logger.warning("登录失败,检查cookies")
             userinfo['usergroup'] = "未登录用户"
             userinfo['username'] = "未登录用户"
             userinfo['useremail'] = "未登录用户"
             userinfo['userApikey'] = "未登录用户"
         finally:
             printfUserinfo(userinfo['username'], userinfo['useremail'], userinfo['usergroup'], userinfo['userApikey'])
             if userinfo['usergroup'] == "未登录用户":
                 self.driver.close()
                 sys.exit()
             return userinfo

getDriverSoup 这个方法就是为了得到 Beautisoup4(简称 bs4)的 soup 反正就是请求 到/personalData的内容,然后分别提取出来,如果你不会 bs4 的话,我之前也有写过一篇文章(抄的别人的 唉 不懂还是要学到的呗 链接:BS4- 爬虫利器学习 好好读很快就学会了)tags = soup.find_all(name="div", attrs={"class": "personList"}) apiTag = soup.find(name="span", attrs={"class": "apikeynumber"}) 得到 信息对应的 tag ,然后就是 错误判断了(我的错误判断错的很差,应该写统一抛出异常然后在一个文件下统一处理,如果你们看到了这里并且能个有心解决 欢迎来找我,我们一起制作 v4.0 嘿嘿)判断错误 就是 cookies 错误

printfUserinfo打印一下没别的功能

self.driver.close()这个是selenium 为了爬取 网页上动态参数(有些数据 是网页上经过动态计算得来的,requests 爬取不到的) ,如果你不知道的话 额。。以后再说吧,我也没有写 文章,可以找一下其他的学习,不过不影响你接下来的观看

2. 得到城市信息

就是图里面的信息 耶。里面的每一个标签都好找

 def getCityUrls(soup):
     """
     获得所有城市的url
     :param soup: 第一次请求后的soup对象
     :return:
     """
     Cityurls = {}
     countryLi = soup.find_all(name="li", attrs={"class": "countryLi"})
     for tag in countryLi:
         temp = {}
         countryName = tag.find(name="div", attrs={"class": "titleLeft"}).a.string
         countryName = str(countryName).strip()
         cityTags = tag.find_all(name="div", attrs={"class": "tbDivList"})
         for city in cityTags:
             tempCity = {}
             cityName = city.find(name="div", attrs={"class": "listCont table-label table-label-left"}).a.string
             cityName = str(cityName).strip()
             cityNum = city.find(name="div", attrs={"listCont table-label table-label-right"}).span.string
             cityNum = str(cityNum).replace(',', '')
             tempCity['href'] = city.find(name="div", attrs={"class": "listCont table-label table-label-left"}).a['href']
             tempCity['Num'] = int(cityNum)
             temp[cityName] = tempCity
         Cityurls[countryName] = temp
     return Cityurls

soup :第一次请求的结果 text ( 这里必须用 selenium 的技术 动态请求 ),这里圈圈里面的参数都是动态获取的,所以动态请求一次就行,本篇文章主要讲思路,你不会的话,以后慢慢学 先掌握思路嘛

找到每个国家所在的 tag - countryLi,从每个国家里面提取城市的名字 和城市城市中对应的数量 图二图二 代码在上面,最后在返回得到的城市信息以及城市请求的 url

image-20220619010122198

image-20220619011230733

3. 处理城市的请求页数

得到城市的 url 就可以开始请求了,不过这里,如果你不想默认请求 想指定,请求多少页和每页的大小 就是这个(图里面),可以写个方法在 每次请求时修改这两个就行啦,然后请求第几页时就不会超过这个值了就不会报错了

 def getNumPage(self, num):
     """
     得到本次请求的 num page
     :param num: 每个城市的num
     :return: 是否获取成功
     """
     self.num = int(num)
     if self.num == 0:
         return False
     if 0 < self.num <= self.size:
         self.page = 1
     if self.num >= self.size:
         self.page = int(self.num / self.size)
     if (self.userGroup == "注册用户") & (self.page > 5):
         self.size = 10
         self.page = 5
     return True

num 就是第二步骤中获取的 cityNum

self.num就是每个城市的总 IP 数量(这个参数没啥用就是为了约束 self.page 的)

self.page这个就是每个城市的最大页数

逻辑都在代码里面了慢慢看吧,逻辑太简单了 但是要注意的时,如果是注册用户,就只能请求 50 条啦,(有条件可以开个注册会员! 注册会员就够了!!!!这个工具足够让你剩下那 700 了 )

return嗯 ~ 当语句没有结果时返回 False 有结果就返回 True

image-20220619011701735

4. 处理请求

前面的工作做完了的话,就可以开始请求了 写个 for 循环挨个请求每个城市的 url 以及 page 我这里写的多线程,其实可以不用写多线程的,因为 fofa 禁爬虫,请求多了容易被 fofa ban 掉,不过写都写了 那就将就用呗,也是为了 优化嘛 多线程还是好一点的,大不了线程设置低一点(我设置的是 2 线程。。。。试过,最优了)

 def futureThree(OK, maxWorkers, cookies, headers, reqUrl, size, page, reqCode, fileName, timeout):
     """
     多线程函数 将 请求的url整理好 放入线程队列中
     :param OK: 标志 请求
     :param maxWorkers: 线程中最大线程
     :param cookies: cookies值
     :param headers: 请求头
     :param reqUrl: 请求url
     :param page: 页数
     :param reqCode: 请求代码
     :param fileName: 保存的文件名
     :param timeout: 延时
     :return: 成功请求多少页
     """
     num = 0
     if OK:
         reqUrlList = [reqUrl + f"&page={pa + 1}&page_size={size}" for pa in
                       range(int(page))]
     else:
         reqUrlList = [reqUrl + f"/result?page={pa + 1}&page_size={size}&qbase64={Global.deBase64code(reqCode)}" for pa
                       in
                       range(int(page))]
 
     pool = pocReq.ThreadPool(maxWorkers)
     for url in reqUrlList:
         pool.setTask(getReq, (url, cookies, headers, timeout,))
     futures = pool.thread()
     for future in futures:
         a = future.result()
         if not a:
             continue
         b = getUrlIp(soup=a, filename=fileName)
         if not b:
             continue
         num = num + b
     return num
 def getUrlIp(soup, filename):
     """
      文件IO操作
     :param soup:
     :param filename:
     :return:
     """
 
     tags = soup.find_all(name="div", attrs={"class": "addrLeft"}, limit=10)
     urls = []
     ips = []
     try:
         for tag in tags:
             tag = tag.find(name="span", attrs={"class": "aSpan"})
             t = tag.find(name="a", attrs={"target": "_blank"})
             ip = ""
             url = ""
             if t and t.has_attr('href'):
                 if t['href'][0:4] != 'http':
                     ip = "ip: " + t['href'][0:4] + "\n"
                 else:
                     url = "url: " + t['href'] + "\n"
                 if ip != "" and ip not in ips:
                     ips.append(ip)
                 if url != "" and url not in urls:
                     urls.append(url)
         with open(filename, "a+", encoding="utf-8")as f:
             if ips:
                 f.writelines(ips)
             if urls:
                 f.writelines(urls)
         return len(ips) + len(urls)
     except Exception as e:
         loguru.logger.error(e)
         return False
 def getReq(url, cookies, headers, timeout):
     """
     请求 页数
     :param url:
     :param cookies:
     :param headers:
     :param timeout:
     :return:
     """
     while 1:
         try:
             time.sleep(timeout)
             html = requests.get(url=url, cookies=cookies, headers=headers)
             text = html.text
             if 'Retry later' in text:
                 loguru.logger.warning("忙碌")
                 time.sleep(10)
                 continue
             if html.status_code != 200:
                 time.sleep(timeout)
                 continue
             return BeautifulSoup(text, "html.parser")
         except Exception as e:
             loguru.logger.error(e)
             return False
 class ThreadPool:
     def __init__(self, maxWorkers):
         self.pool = ThreadPoolExecutor(max_workers=maxWorkers)
         self.taskQueue = queue.Queue()
         self.futures = {}
 
     def setTask(self, poc, target):
         newTask = (poc, target)
         self.taskQueue.put(newTask)
 
     def thread(self):
         while self.taskQueue.qsize() != 0:
             currentTask = self.taskQueue.get()
             currentPoc = currentTask[0]
             currentTarget = currentTask[1]
             future = self.pool.submit(currentPoc, *currentTarget)
             self.futures[future] = currentTarget
         self.futures = concurrent.futures.as_completed(self.futures)
         return self.futures

ThreadPool我这里是对原线程池函数进行了封装,任务多了也会出现 bug 就是上万条的那种,我们这里不会遇到,所以讲究写了( 多线程不懂的话,我就真没法帮你了)

getReq这个就是用请求到 每一页的结果,得到 soup

getUrlIp这个就是得到每页的 url 咯,详细逻辑 自己就慢慢看吧 也简单无非就是找到 url 所在 tag ,然后在保存到文件夹里面,也可以返回出来都行随便你

futureThree这个就是多线程函数啦,讲上述得到的 page,制作 reqUrl 然后批量请求 得到结果,保存 url。

5. 输出样式

最后就是输出的样式啦 两个文件 第一个是 都在 outputs 里面了

image-20220619020504030

image-20220619020530695

看法

对于这次工具 开发看法的话,就是让我获得到爬虫的大量知识,让我学习到了多线程,多协程(这个在 v2.0 里面有体现。。协程太快了所以我就弃用了,那一部分的代码 也是开源的 链接在下面),requuest 的使用,selenium 的使用,配置文件的提取,click 包的使用,loguru 的使用 ,这些也是爬虫的基本知识了吧,还是收获不错的.

  • requests :get 、post、session
  • selenium:driver
  • 配置文件:configparser 的使用
  • 参数:click 的使用
  • 输出:loguru

这次涉及的知识点就那么多了,还是很简单的,用心开发的话 700 行代码 大约两三天就完成了,至于 我为啥一直在更新,是因为 额 对工具的执着吧,让更多功能呈现出来,没有做出 GUI 也是我的一大遗憾,不过哪位朋友来帮帮我做一下 ,也是不错滴 (有兴趣访问我的个人博客!!!里面有我的联系方式)

总结

里面还有很多很多不足的地方,我能想到的也就那么多 ,唉,我也只是一名普通的大三学生耶,其实有一个人 跟我一起开发 那这个工具不就更加完美了!到时候代码就开源啦这多好呀,是吧。

代码没有开源(理由 就是 不想让人白嫖呗,反正二进制包是有的 - windows10 windows 7 windows 11 都适用的 只要是 amd64 架构的都行 amd32 我不知道没试过),工具是有的(链接在下面)

再说一次吧 就是 找人!!!找 个一起开发的小伙伴 取长补短 啦

链接

工具:https://github.com/wjlin0/Fofaapi

博客:https://wjlin0.com/