-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgithub_scraper
executable file
·779 lines (613 loc) · 28.2 KB
/
github_scraper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
#!/usr/bin/env python3
"""
Script for scraping GitHubs based on a keyword made to be a pipeline into running MOSS.
Previous queries and downloaded repositories are saved and it's possible for
only assignment folders to be extracted in preparation for MOSS. This matching
is done based on required files in assignments.
This script should have execute permissions if you wish to run it with ./github_scraper.
However, it can just be run with python github_scraper.
Version: 8.16.2024
See https://github.com/DALPy-Developers/Github-Scraping for the most up to date version.
Authors: Chami Lamelas (slamel01), Eitan Joseph
Date: Fall 2022 - Present
"""
from enum import Enum
import zipfile
import base64
import time
import sys
import logging
import json
import requests
from pytz import timezone
from datetime import datetime, timedelta
import urllib.parse
import csv
import os
import argparse
from pathlib import Path
import importlib
import inspect
from dataclasses import dataclass, field, fields
import traceback
from math import ceil
import magic
# toml module (from PyPI) is used in place of tomllib (Python standard library) as tomllib
# will only be available in Python 3.11.x and Tufts server runs Python 3.9.2.
import toml
# Script descriptions ...
DESCRIPTION = (
"This script is used for scraping GitHub repositories based on a provided query."
)
EPILOG = "Please visit https://github.com/DALPy-Developers/Github-Scraping/blob/main/README.md to learn how to use this script."
@dataclass
class GitHubScraperConfig:
"""Stores configuration for this code"""
token: str = None
collection_root: str = None
raise_issue: bool = False
issue_title: str = "TUFTS COMP15 IMPORTANT"
issue_body: str = (
"We noticed you have publicly posted the solution to an assignment in Tufts University course CS15. Please either delete this repository or make it private ASAP. In the past, we've had issues with students plagiarizing code they find on GitHub."
)
issue_contact: str = (
"If you have additional questions, please send an email to the current course instructor listed on the CS 15 website: https://www.cs.tufts.edu/comp/15/. Do not send an email to the email address associated with this issue."
)
language: str = "c++"
extra_directory: str = None
api_timeout: float = 5.0
custom_file: str = None
file_filter: str = None
extra_work: str = None
extra_work_args: dict = field(default_factory=dict)
def __repr__(self):
"""
Displays configuration like:
token:
blah ...
collection_root:
blah ...
etc.
"""
return "\n".join(
f"{f.name}:\n\t{getattr(self, f.name)}" for f in fields(GitHubScraperConfig)
)
def __str__(self):
return repr(self)
# Basenames of various files that will be placed in ROOT
RECORDS_BASENAME = "records.txt"
QUERIES_BASENAME = "queries.txt"
LOG_BASENAME = "github_scraper.log"
TMP_ARCHIVE_BASENAME = ".github_scraper.zip"
# Formatting for records file
RECORDS_HEADER = ["owner", "repository", "repository_url", "download_timestamp"]
# UI related constants
QUIT_QUERYING = "@q"
INITIAL_PREVIEW_ROWS = int(os.get_terminal_size()[1] / 2)
"""I chose half the terminal size because I thought it looked better"""
# GitHub API constants
ISSUE_CREATED = 201
ISSUES_DISABLED = 410
ITEMS_PER_PAGE = 30
GET_OK = 200
def is_importable_python_file(filepath):
"""Checks if file is a Python file one can import from"""
if "Python" not in magic.from_file(filepath):
raise ScraperError(f"{filepath} is not a Python file")
if not filepath.endswith(".py"):
raise ScraperError(f"To import from {filepath}, it must end with .py")
def convert_user_specified_path(path):
"""Converts path specified by user (e.g. including ~) into a real path"""
return os.path.expanduser(path).rstrip(os.sep)
def alwaysfalse(_):
"""Function that always returns false"""
return False
def doesnothing(x, y, z):
"""Function that does nothing but takes 3 parameters"""
pass
def file_exists(filepath, errmsg=None):
"""Enforces that filepath is a file"""
if errmsg is None:
errmsg = f"{filepath} is not a file!"
if not os.path.isfile(filepath):
raise ScraperError(errmsg)
def get_config_filename():
"""Gets the configuration filename from the command line (a required argument) and sets up help info."""
parser = argparse.ArgumentParser(description=DESCRIPTION, epilog=EPILOG)
parser.add_argument("config", type=str, help="config file")
args = parser.parse_args()
file_exists(args.config, f"Configuration file {args.config} does not exist!")
return args.config
def check_and_make(dir_path):
"""If dir_path is not a directory, make it"""
Path(dir_path).mkdir(exist_ok=True, parents=True)
def get_first_line(path):
"""Gets 1st line of a text file (with no newline)"""
with open(path, "r", encoding="utf-8") as f:
return next(iter(f)).rstrip("\n")
def get_subdir_basenames(dir):
"""Get set of basenames of subdirectories of a directory"""
return {e.name for e in os.scandir(dir) if e.is_dir()}
def make_urlstr(s):
"""Turns a string into a proper component of a URL - needed for github api"""
return urllib.parse.quote(s)
def preview_line(line, query, index):
"""Previews a line in green if query is in it (case-insensitive), else terminal default"""
line = line.lower()
query = query.lower()
print(
("\033[92m" + line + "\033[0m") if query in line else line,
end=("" if index >= INITIAL_PREVIEW_ROWS else "\n"),
)
def add_line(path, line):
"""Adds a line to the end of a text file"""
with open(path, "a", encoding="utf-8") as f:
f.write(line + "\n")
def make_nice_time(sec):
"""Converts a time in seconds into a nicer str time that will include minutes if secs is too big"""
return str(timedelta(seconds=ceil(sec)))
def get_func_byname(module, func_name, substitute):
"""Gets function object by name from module object, errors are thrown if a function with that name cannot be created"""
if func_name is None or not hasattr(module, func_name):
return substitute
func = getattr(module, func_name)
if not inspect.isfunction(func):
return substitute
# Note this does not check if the function takes a single parameter, but we assume that a user could
# figure this out from the traceback who is comfortable enough with Python to write a custom function
# to pass in here
return func
class ScraperError(Exception):
"""Error class raised by our code"""
pass
class PreviewResult(str, Enum):
"""Represents the 4 states of a user's preview of a file"""
YES_RESPONSE = "y"
MAYBE_RESPONSE = "m"
NO_RESPONSE = "n"
QUIT_QUERY = "q"
@staticmethod
def stdin_to_result(prompt=""):
read = input(prompt).strip().lower()
try:
return PreviewResult(read)
except ValueError:
return None
@staticmethod
def wait_for_result_from_stdin():
prompt = f"Enter ({'/'.join(e.value for e in PreviewResult)}): "
result = None
while result is None:
result = PreviewResult.stdin_to_result(prompt)
return result
def __repr__(self):
return self.value
def __str__(self):
return repr(self)
class APIResult:
"""Represents a result from searching the API."""
def __init__(self, owner, repo, path, repo_url):
"""Initializes an APIResult from with repository owner, name, search result path, and repo URL"""
self.owner = owner
self.repo = repo
self.path = path
self.repo_url = repo_url
def __eq__(self, other):
"""Determines if two results are equal - necessary to make a set of APIResults"""
return self.owner == other.owner and self.repo == other.repo
def __hash__(self):
"""Provides hash code for a result - necessary to make a set of APIResults"""
return hash(self.owner + self.repo)
def get_dirbasename(self):
"""Gets a directory basename that, within the parent directory, is a valid (database) key for a record"""
return self.owner + "_" + self.repo
def get_download_path(self, download_root):
"""Gets the download path of a record"""
return os.path.join(download_root, self.get_dirbasename())
def __repr__(self):
"""String representation of result - used in logging"""
return f"APIResult(owner={self.owner},repo={self.repo},path={self.path},repo_url={self.repo_url})"
def __str__(self):
return repr(self)
def nick(self):
"""Nickname that serves as unique ID for repo"""
return f"{self.owner}/{self.repo}"
class GithubScraper:
"""This is the main class of this file: initializing a GitHub scraper runs the program."""
def __init__(self, config_filename):
"""Initializes the scraper given a configuration filename."""
self.config_file = config_filename
self.__parse_toml()
if self.config.token is None:
raise ScraperError("token is a required configuration setting")
if self.config.collection_root is None:
raise ScraperError("collection_root is a required configuration setting")
self.config.collection_root = convert_user_specified_path(
self.config.collection_root
)
if self.config.extra_directory is not None:
self.config.extra_directory = convert_user_specified_path(
self.config.extra_directory
)
check_and_make(self.config.collection_root)
self.__load_custom()
self.__setup_logger()
# Set of records (repos) that user has declined in the current
# session (by saying n). These records aren't saved in any way,
# it's just so user doesn't have to keep rejecting results of
# a repo that are being previewed
self.declined_records = set()
def run(self):
"""Runs the scraper"""
self.__init_records()
self.__load_previous_queries()
# Runs query prompting input loop
done = False
while not done:
self.query = input(f"Query (or {QUIT_QUERYING} to quit)? ")
if self.query == QUIT_QUERYING:
done = True
elif len(self.query.strip()) == 0:
continue
else:
self.__process()
def __process(self):
"""
Processes a query - collects query results to preview, previews each one, downloads ones user selects
and updates the saved records, raising issues and creating extra dirs if appropriate
"""
# here we maintain that self.queries reflects the contents of the queries file (assuming
# the file is not edited independent of this program)
if self.query not in self.queries:
add_line(self.queries_path, self.query)
self.queries.add(self.query)
records_to_preview = self.__collect_results_to_preview()
for i, record in enumerate(records_to_preview):
# Don't preview any result that user has downloaded in
# the current scrape (note self.records is updated by
# self.__add_record)
if record in self.records:
self.logger.debug(
f"Skipping record (already downloaded repo {record.nick()}): {record}"
)
# Don't preview any result that user has said no to in
# the current scrape
elif record in self.declined_records:
self.logger.debug(
f"Skipping record (user previously said no to {record.nick()}): {record}"
)
else:
self.logger.info(
f"Previewing file {i + 1}/{len(records_to_preview)}:\n"
)
preview_result = self.__preview_record(record)
self.logger.debug(f"Preview result: {preview_result}")
if preview_result == PreviewResult.QUIT_QUERY:
return
elif preview_result == PreviewResult.YES_RESPONSE:
self.__download_record(record)
self.__add_record(record)
self.__raise_issue(record)
self.__make_extra_dir(record)
elif preview_result == PreviewResult.NO_RESPONSE:
self.logger.info(
f"Will not preview any more results from {record.nick()}"
)
self.declined_records.add(record)
# else it's a maybe response - we do nothing
if len(records_to_preview) == 0:
print(f"No records to preview matching {self.query} -- try a more broad query.")
# Helper functions for __process() ...
def __collect_results_to_preview(self):
"""Collects query results to be previewed, checking saved records"""
self.logger.info(
f"Beginning to collect records to preview, this may take awhile..."
)
records = list()
page = 1
done = False
# Go page by page and make api request and collect into
while not done:
json_results = self.__search(page)
for result in json_results:
record = APIResult(
result["repository"]["owner"]["login"],
result["repository"]["name"],
result["path"],
result["repository"]["html_url"],
)
# Don't collect records that were downloaded in previous scrapes
if record not in self.records:
records.append(record)
# last page will have less < ITEMS_PER_PAGE
done = len(json_results) < ITEMS_PER_PAGE
page += 1
return records
def __preview_record(self, record):
"""
Previews a record, return PreviewResult indicating user decision
Relevant API: https://docs.github.com/en/rest/repos/contents?apiVersion=2022-11-28
"""
resp_json = self.__get_json(
f"https://api.github.com/repos/{record.owner}/{record.repo}/contents/{record.path}"
)
# Github gives you the content as a base64 file, so we try to decode and then read line by line
if resp_json["encoding"] != "base64":
raise ScraperError("Unable to preview the contents of search result.")
file_content = base64.b64decode(resp_json["content"]).decode("utf-8")
print("\n" + ("=" * int(os.get_terminal_size()[0] / 2)))
self.logger.info(f"Username/Repository is {record.nick()}:")
for index, line in enumerate(file_content.splitlines()):
preview_line(line, self.query, index)
# Once we show the first few rows of the file, we start asking user to move forward line
# by line with enter (like unix less) or respond
if index >= INITIAL_PREVIEW_ROWS:
result = PreviewResult.stdin_to_result()
if result is not None:
return result
return PreviewResult.wait_for_result_from_stdin()
def __download_record(self, record):
"""
Downloads a record
Relevant guide: https://stackoverflow.com/questions/67962757/python-how-to-download-repository-zip-file-from-github-using-github-api
"""
self.logger.info(f"Commencing download into {record.get_dirbasename()}\n")
url = f"https://api.github.com/repos/{record.owner}/{record.repo}/zipball/"
# For downloading a repo, we get it as a zip and then extract it
response = self.__get(url)
archive = os.path.join(self.config.collection_root, TMP_ARCHIVE_BASENAME)
with open(archive, "wb") as f:
f.write(response.content)
try:
with zipfile.ZipFile(archive, "r") as zip_ref:
zip_ref.extractall(
record.get_download_path(self.config.collection_root)
)
self.__filter_files(
record.get_download_path(self.config.collection_root)
)
self.logger.debug(f"Completed download into {record.get_dirbasename()}")
except FileNotFoundError as e:
raise RuntimeError(
f"Something went wrong extracting the repository, check it manually - {record.repo_url} (it may be too big).\n{str(e)}"
)
finally:
# Remove temporary archive if extraction failed
os.remove(archive)
def __make_extra_dir(self, record):
"""Makes an extra directory corresponding to record"""
if self.config.extra_directory is not None:
extra_dir_path = os.path.join(
self.config.extra_directory, record.get_dirbasename()
)
Path(extra_dir_path).mkdir(parents=True, exist_ok=True)
self.extra_work(
record.get_download_path(self.config.collection_root),
extra_dir_path,
self.query,
**self.config.extra_work_args
)
def __add_record(self, record):
"""Adds a record to the records file"""
# adds a record to maintained set and records file (inc. timestamp)
self.records.add(record)
timestamp = datetime.now(timezone("US/Eastern")).strftime("%Y/%m/%d %H:%M:%S")
add_line(
self.records_path,
",".join((record.owner, record.repo, record.repo_url, timestamp)),
)
def __raise_issue(self, record):
"""
Raises an issue on a particular record
Relevant API: https://docs.github.com/en/rest/issues/issues?apiVersion=2022-11-28#create-an-issue
"""
if not self.config.raise_issue:
return
headers = {"Authorization": f"Token {self.config.token}"}
data = {
"title": self.config.issue_title,
"body": self.config.issue_body + "\n\n" + self.config.issue_contact,
}
url = f"https://api.github.com/repos/{record.owner}/{record.repo}/issues"
self.logger.debug(f"Making authorized POST request to {url} with data:\n{data}")
response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code == ISSUE_CREATED:
self.logger.info(
f"Issue raised successfully on the repository {record.repo} belonging to {record.owner}."
)
elif response.status_code == ISSUES_DISABLED:
self.logger.warn(
f"{record.owner} has disabled issues on the repository {record.repo}."
)
else:
self.logger.warn(
f"Something went wrong creating a record on the repository {record.repo} belonging to {record.owner}. Response code: {response.status_code}"
)
# Wrappers over GET requests ...
def __get(self, url):
"""
Hits URL with GET request and returns Response, GET failing causes program exit
Note all GitHub API GET requests are done with the provided token for authorization
in the hope that this will allow for nicer rate limits. However, it is only technically
necessary for the code search API (for the same reason why you have to be logged in to
search code).
"""
headers = {"Authorization": f"Token {self.config.token}"}
self.logger.debug("Sending authorized GET request to: " + url)
response = requests.get(url, headers=headers)
if response.status_code != GET_OK:
# For failed requests, log the entire response to see if user can diagnose
self.logger.error("GET request failed, response: " + response.text)
sys.exit(1)
return response
def __get_json(self, url):
"""Gets JSON response from a GET at provided URL"""
response = self.__get(url)
resp_json = response.json()
self.logger.debug(
f"{len(str(resp_json))} character JSON response, top level keys = {resp_json.keys()}"
)
return resp_json
# Set up __init__ helper functions (before scraping begins) ...
def __setup_logger(self):
"""Sets up logger - basically copy pasted from the Python logging cookbook"""
self.logger = logging.getLogger("github_scraping")
self.logger.setLevel(logging.DEBUG)
self.log = os.path.join(self.config.collection_root, LOG_BASENAME)
fh = logging.FileHandler(self.log)
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
file_formatter = logging.Formatter(
"%(asctime)s | [%(levelname)s] : %(message)s"
)
fh.setFormatter(file_formatter)
console_formatter = logging.Formatter("[%(levelname)s] : %(message)s")
ch.setFormatter(console_formatter)
self.logger.addHandler(fh)
self.logger.addHandler(ch)
def __parse_toml(self):
"""Load in config into dataclass object"""
try:
self.config = GitHubScraperConfig(**toml.load(self.config_file))
except TypeError as e:
errmsg = str(e)
setting = errmsg[errmsg.index("'") : errmsg.rindex("'") + 1]
raise ScraperError(f"{setting} is not a valid configuration setting")
def __load_custom(self):
"""Loads custom functions from provided file into class attributes to be used later"""
if self.config.custom_file is not None:
self.config.custom_file = convert_user_specified_path(
self.config.custom_file
)
file_exists(self.config.custom_file)
is_importable_python_file(self.config.custom_file)
# Add python file to path and then import the module (assumed to be x if CUSTOM_PATH is PATH/TO/x.py)
sys.path.append(os.path.dirname(self.config.custom_file))
module_name = os.path.basename(self.config.custom_file)[: -len(".py")]
custom_module = importlib.import_module(module_name)
# Now, load the functions themselves using helper -- substitute defaults if the function can't be
# found (remember this can be entered if only 1 of FILE_FILTER, EXTRA_WORK is specified)
self.file_filter = get_func_byname(
custom_module, self.config.file_filter, alwaysfalse
)
self.extra_work = get_func_byname(
custom_module, self.config.extra_work, doesnothing
)
else:
self.file_filter = alwaysfalse
self.extra_work = doesnothing
def __init_records(self):
"""
If the ROOT folder has no directories or records file, one is created.
If the root has folders it must have a valid records file
"""
subdirs = get_subdir_basenames(self.config.collection_root)
self.records_path = os.path.join(self.config.collection_root, RECORDS_BASENAME)
if os.path.isfile(self.records_path):
# Validate records in records file match the directories in ROOT
with open(self.records_path, "r", encoding="utf-8") as f:
reader = csv.reader(f)
header = next(reader)
if header != RECORDS_HEADER:
raise ScraperError(
f"{self.records_path} header does not match required header {','.join(RECORDS_HEADER)}."
)
# Load up records from lines
self.records = {APIResult(*row) for row in reader}
# Do a set equals to make sure directories match with list of records in file
if {record.get_dirbasename() for record in self.records} != subdirs:
raise ScraperError(
f"Directories in {self.config.collection_root} should match the records in {self.records_path}."
)
else:
# If no records file exists, make one after making sure there are no directories
self.records = set()
if len(subdirs) != 0:
raise ScraperError(
f"Records file {self.records_path} does not exist, but directories do in {self.config.collection_root}."
)
Path(self.records_path).write_text(",".join(RECORDS_HEADER) + "\n")
def __load_previous_queries(self):
"""Prints previous queries if a queries file exists in ROOT"""
self.queries_path = os.path.join(self.config.collection_root, QUERIES_BASENAME)
if os.path.isfile(self.queries_path):
# Load up set of queries as lines of queries file and print if any exist
with open(self.queries_path, "r", encoding="utf-8") as f:
self.queries = set(line.rstrip("\n") for line in f)
if len(self.queries) == 0:
self.logger.info(
f"No previous queries identified in {self.queries_path}"
)
else:
query_str = "\n".join(("\t" + query) for query in self.queries)
self.logger.info(
f"Previous queries identified in {self.queries_path}:\n{query_str}\n"
)
else:
self.queries = set()
# Helper functions for __collect_results_to_preview() ...
def __estimate_collection_time(self, total_records):
"""Estimates the amount of time it will take to collect a certain number of records"""
num_pulls = int(total_records / ITEMS_PER_PAGE)
if total_records % ITEMS_PER_PAGE == 0:
# Note if not divisibile, we don't need to subtract 1
# because there will really be 1 other pull not being counted
num_pulls -= 1
return num_pulls * self.config.api_timeout
def __search(self, page):
"""
Makes a code search request, JSON items result of the request are returned
Relevant API: https://docs.github.com/en/rest/search?apiVersion=2022-11-28#search-code
"""
# Sleep between these code search hits, based on experience this is where we've seen rate limits
if page > 1:
time.sleep(self.config.api_timeout)
resp_json = self.__get_json(
f"https://api.github.com/search/code?q={make_urlstr(self.query)}+in:file+language:{make_urlstr(self.config.language)}&page={page}"
)
# 'items' is what we're interested in - so that needs to be there
if "items" not in resp_json:
raise ScraperError(
"Your token's rate limit has likely been exceeded. Try either (1) increasing your api_timeout configuration option or (2) trying a more refined query."
)
# each request includes the total records, after the first one estimate remaining time
if page == 1:
self.logger.info(
f"Estimated collection time = {make_nice_time(self.__estimate_collection_time(resp_json['total_count']))}"
)
return resp_json["items"]
# Helper functions for __download_record() ...
def __filter_files(self, extraction_path):
"""Deletes all files who self.file_filter( ) says to delete"""
files_to_delete = list()
for dirpath, _, filenames in os.walk(extraction_path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if self.file_filter(filepath):
files_to_delete.append(filepath)
for filepath in files_to_delete:
os.remove(filepath)
def main():
scraper = None
try:
scraper = GithubScraper(get_config_filename())
except ScraperError as e:
print(f"scraper error in setup: {e}", file=sys.stderr)
except Exception as e:
print(f"unexpected error in setup: {e}", file=sys.stderr)
if scraper is not None:
try:
scraper.run()
except ScraperError as e:
print(
f"scraper error: {e}\nCheck {scraper.log} for more details.",
file=sys.stderr,
)
except Exception as e:
print(
f"unexpected error: {e}\nCheck {scraper.log} for more details.\nscraper traceback:\n{traceback.format_exc()}",
file=sys.stderr,
)
if __name__ == "__main__":
main()