-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetcher.py
121 lines (105 loc) · 3.59 KB
/
fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# coding: utf-8
import simplejson
import threading
import urllib2
class Fetcher:
BASE_URL = 'https://www.googleapis.com/plus/v1/'
TIMEOUT = 3600 # sec
FETCHER_COUNT = 1000
def __init__(self, user_id, key, storage):
self._user_id = user_id
self._key = key
self._storage = storage
self._event = threading.Event()
self._event.clear()
self._fetch_count = 0
self._fetch_counts = {}
self._fetch_activity_id = None
self._fetch_etag = None
self._thread = threading.Thread(
target=self._fetcher_thread, name="Fetcher")
self._should_run = True
self._thread.start()
def _activities_url(self):
request = urllib2.Request(self.BASE_URL + 'people/' + self._user_id +
'/activities/public?num=100&key=' + self._key)
if self._fetch_etag:
request.add_header('If-None-Match', self._fetch_etag)
return request
def _paged_activities_url(self, page_token):
return urllib2.Request(
self.BASE_URL + 'people/' + self._user_id +
'/activities/public?num=100&pageToken=%s&key=%s' % (
page_token, self._key))
def _single_post_url(self, activity_id):
return (self.BASE_URL + 'activities/%s?num=100&key=%s' % (
activity_id, self._key))
def _fetch(self):
try:
loaded = urllib2.urlopen(self._activities_url())
self._fetch_etag = loaded.info().getheader('ETag')
data = simplejson.load(loaded)
self._storage.storePosts(data['items'])
except urllib2.HTTPError as e:
# fetcher errors. Including 30x.
print 'fetcher error: ', e.code
def _fetch_a_post(self, activity_id):
try:
data = simplejson.load(
urllib2.urlopen(self._single_post_url(activity_id)))
self._storage.storePosts([data])
except urllib2.HTTPError as e:
print 'fetcher error: ', e.code
def _fetcher_thread(self):
while True:
self._event.wait(self.TIMEOUT)
if not self._should_run:
break
self._event.clear()
if self._fetch_activity_id:
self._fetch_a_post(self._fetch_activity_id)
self._fetch_activity_id = None
else:
self._fetch()
def fetch_all_posts(self):
try:
latest_ids = set([post['id'] for post in self._storage.getLatestPosts()])
print 'fetch the latest...'
data = simplejson.load(urllib2.urlopen(self._activities_url()))
all_items = []
while len(data['items']) > 0:
if any(map(lambda e: e['id'] in latest_ids, data['items'])):
break
all_items.extend(data['items'])
if 'nextPageToken' not in data:
break
token = data['nextPageToken']
data = simplejson.load(
urllib2.urlopen(self._paged_activities_url(token)))
self._storage.storePosts(all_items)
except urllib2.HTTPError as e:
print 'fetcher error: ', e.code
def post_fetch(self):
self._event.set()
def maybe_post_fetch(self):
"""Probably not usable in our current traffic though..."""
self._fetch_count += 1
if self._fetch_count > self.FETCHER_COUNT:
self._fetch_count = 0
self.post_fetch()
def maybe_single_fetch(self, activity_id):
do_fetch = False
if activity_id not in self._fetch_counts:
do_fetch = True
self._fetch_counts[activity_id] = 0
self._fetch_counts[activity_id] += 1
if self._fetch_counts[activity_id] > self.FETCHER_COUNT:
self._fetch_counts[activity_id] = 0
do_fetch = True
if do_fetch:
self._fetch_activity_id = activity_id
self._event.set()
def finish_thread(self):
self._should_run = False
self._event.set()
self._thread.join()