-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnanodirb.py
executable file
·74 lines (62 loc) · 2.3 KB
/
nanodirb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python3
"""
Copyright (c) 2023 Oliver Lau, Heise Medien GmbH & Co. KG
Diese Software wurde zu Lehr- und Demonstrationszwecken
geschaffen und ist nicht für den produktiven Einsatz vorgesehen.
Heise Medien und der Autor haften daher nicht für Schäden, die
aus der Nutzung der Software entstehen, und übernehmen keine
Gewähr für ihre Vollständigkeit, Fehlerfreiheit und Eignung für
einen bestimmten Zweck.
"""
import asyncio
from typing import Iterable, Any
from tornado.httpclient import AsyncHTTPClient, HTTPClientError
class NanoDirb:
def __init__(self, **kwargs) -> None:
self.queue = asyncio.Queue()
self.num_workers = kwargs.get('num_workers', 10)
self.result_callback = kwargs.get('result_callback')
async def run(self, urls: Iterable[str]) -> None:
workers = [asyncio.create_task(self.worker())
for _ in range(self.num_workers)]
for url in urls:
await self.queue.put(url)
await self.queue.join()
for worker in workers:
worker.cancel()
async def worker(self) -> None:
while True:
url = await self.queue.get()
http_client = AsyncHTTPClient()
try:
response = await http_client.fetch(url)
if callable(self.result_callback):
self.result_callback({
'url': url,
'status_code': response.code,
})
except HTTPClientError as e:
if callable(self.result_callback):
self.result_callback({
'url': url,
'status_code': e.code,
})
except asyncio.CancelledError:
return
finally:
self.queue.task_done()
async def main(**kwargs) -> None:
def result_hook(result: Any) -> None:
print(f"{result['status_code']}\t"
f"{result['url']}")
kwargs['result_callback'] = result_hook
dirb = NanoDirb(**kwargs)
urls = [
'http://example.com/admin',
'http://example.com/admin.php',
'http://example.com/auth',
'http://example.com/auth.php',
]
await dirb.run(urls)
if __name__ == '__main__':
asyncio.run(main(num_workers=10))