-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathRequestGenerator.py
executable file
·94 lines (86 loc) · 3.57 KB
/
RequestGenerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Read delay,url tuples from the central queue and issue request after waiting delay seconds
Requires: urllib3 - https://github.com/shazow/urllib3
pip install urllib3
"""
from multiprocessing.managers import BaseManager
from datetime import datetime
import time
import json
import urllib3
from Config import HOST, PORT, AUTHKEY, BASE_URL, DELAY_IN_PRODUCER
__author__ = 'Aaron Daubman <[email protected]>'
__date__ = '9/7/12 3:25 PM'
class RequestGenerator():
def __init__(self, host=None, port=None, authkey=None, baseurl=None, name='1', wq=None, rq=None, delinprod=True):
self.name = name
self.baseurl = baseurl
self.delinprod = delinprod
self.m = None
if wq is None or rq is None:
print 'Initializing RequestGenerator: ' + self.name + ' as BaseManager(address=(' + host + ', ' + str(port) + ', authkey=' + authkey + ') with remote queues'
BaseManager.register('get_work_queue')
BaseManager.register('get_result_queue')
self.m = BaseManager(address=(host, port), authkey=authkey)
self.m.connect()
self.work_queue = self.m.get_work_queue()
self.result_queue = self.m.get_result_queue()
else:
print 'Initializing RequestGenerator: ' + self.name + ' with shared local queues'
self.work_queue = wq
self.result_queue = rq
#self.work_queue.cancel_join_thread()
#self.result_queue.cancel_join_thread()
self.http = urllib3.PoolManager()
def run(self):
self.running = True
print 'Running RequestGenerator: ' + self.name
while self.running:
try:
query, oqt = self.work_queue.get()
try:
ts, taken, qt, nf, sz = self.request_url(query, self.baseurl)
except ValueError:
continue
self.result_queue.put((ts, taken, qt, nf, sz, oqt))
except KeyboardInterrupt:
print '\nKeyboardInterrupt detected in RequestGenerator: ' + self.name + ', attempting to exit...'
#sys.exc_clear()
self.stop()
return
def stop(self):
self.running = False
#self.work_queue.close()
#self.result_queue.close()
#if self.m is not None:
# self.m.shutdown()
def request_url(self, query, baseurl):
#myreq = urllib2.Request(baseurl)
#myreq.add_data(query)
req_start = datetime.now()
ts = time.time()
#with contextlib.closing(urllib2.urlopen(myreq)) as c:
# r = json.load(c)
r = self.http.request('GET', baseurl + query)
if r.status != 200:
print 'Received status: {0} for request: {1}\n{2}'.format(r.status, query, r.data)
raise ValueError
d = r.data
td = datetime.now() - req_start
taken = int(round((td.microseconds + (td.seconds * 1000000.0)) / 1000))
sz = len(str(d))
qt = -1
nf = -1
try:
j = json.loads(d)
#qt = r[r.find('QTime':') + 7:r.find('}')] #attempt to parse QTime from response
qt = j.get('responseHeader', {}).get('QTime', -1)
nf = j.get('response', {}).get('numFound', -1)
except ValueError:
print 'ValueError raised attempting to parse json: ' + repr(d)
return ts, taken, qt, nf, sz
if __name__ == '__main__':
p = RequestGenerator(host=HOST, port=PORT, authkey=AUTHKEY, baseurl=BASE_URL, delinprod=DELAY_IN_PRODUCER)
p.run()