Skip to content

Commit

Permalink
allow sending records to alternate collections
Browse files Browse the repository at this point in the history
  • Loading branch information
aaccomazzi authored and marblestation committed Jan 6, 2021
1 parent 8f5ab4d commit b69d894
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def print_kvs():


def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True, update_metrics=True,
update_links=True, force_processing=False, ignore_checksums=False):
update_links=True, force_processing=False, ignore_checksums=False, solr_targets=None):
"""
Initiates routing of the records (everything that was updated)
since point in time T.
Expand Down Expand Up @@ -157,17 +157,17 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
batch.append(rec.bibcode)
tasks.task_index_records.delay(batch, force=force_indexing, update_solr=update_solr,
update_metrics=update_metrics, update_links=update_links,
ignore_checksums=ignore_checksums)
ignore_checksums=ignore_checksums, solr_targets=solr_targets)
batch = []
last_bibcode = rec.bibcode

if len(batch) > 0:
tasks.task_index_records.delay(batch, force=force_indexing, update_solr=update_solr, update_metrics=update_metrics,
commit=force_indexing, ignore_checksums=ignore_checksums)
commit=force_indexing, ignore_checksums=ignore_checksums, solr_targets=solr_targets)
elif force_indexing and last_bibcode:
# issue one extra call with the commit
tasks.task_index_records.delay([last_bibcode], force=force_indexing, update_solr=update_solr, update_metrics=update_metrics,
commit=force_indexing, ignore_checksums=ignore_checksums)
commit=force_indexing, ignore_checksums=ignore_checksums, solr_targets=solr_targets)

logger.info('Done processing %s records', sent)
except Exception, e:
Expand All @@ -182,6 +182,16 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
raise e


def collection_to_urls(collection_name):
solr_urls = []
urls = app.conf['SOLR_URLS']
for u in urls:
parts = u.split('/')
parts[-2] = collection_name
solr_urls.append('/'.join(parts))
return solr_urls


def rebuild_collection(collection_name):
"""
Will grab all recs from the database and send them to solr
Expand All @@ -197,12 +207,7 @@ def rebuild_collection(collection_name):
if collection_name.startswith('http'):
solr_urls = [collection_name]
else:
solr_urls = []
urls = app.conf['SOLR_URLS']
for u in urls:
parts = u.split('/')
parts[-2] = collection_name
solr_urls.append('/'.join(parts))
solr_urls = collection_to_urls(collection_name)

logger.info('Sending all records to: %s', ';'.join(solr_urls))
sent = 0
Expand Down Expand Up @@ -373,7 +378,7 @@ def reindex_failed(app):
help='sends bibcodes to augment affilation pipeline, works with --filename')
parser.add_argument('--solr-collection',
dest='solr_collection',
default='collection2',
default='collection1',
action='store',
help='name of solr collection, currently only used by rebuild collection')
parser.add_argument('-x',
Expand Down Expand Up @@ -449,6 +454,8 @@ def reindex_failed(app):
update_solr = 's' in args.reindex.lower()
update_metrics = 'm' in args.reindex.lower()
update_links = 'l' in args.reindex.lower()
solr_urls = collection_to_urls(args.solr_collection)
print 'reindexing to solr url ' + str(solr_urls)

if args.filename:
print 'sending bibcodes from file to the queue for reindexing'
Expand All @@ -461,18 +468,21 @@ def reindex_failed(app):
if len(bibs) >= 100:
tasks.task_index_records.delay(bibs, force=True,
update_solr=update_solr, update_metrics=update_metrics,
update_links = update_links, ignore_checksums=args.ignore_checksums)
update_links = update_links, ignore_checksums=args.ignore_checksums,
solr_targets=solr_urls)
bibs = []
if len(bibs) > 0:
tasks.task_index_records.delay(bibs, force=True,
update_solr=update_solr, update_metrics=update_metrics,
update_links = update_links, ignore_checksums=args.ignore_checksums)
update_links = update_links, ignore_checksums=args.ignore_checksums,
solr_targets=solr_urls)
bibs = []
else:
print 'sending bibcode since date to the queue for reindexing'
reindex(since=args.since, batch_size=args.batch_size, force_indexing=args.force_indexing,
update_solr=update_solr, update_metrics=update_metrics,
update_links = update_links, force_processing=args.force_processing, ignore_checksums=args.ignore_checksums)
update_links = update_links, force_processing=args.force_processing, ignore_checksums=args.ignore_checksums,
solr_targets=solr_urls)

elif args.reindex_failed:
reindex_failed(app)
Expand Down

0 comments on commit b69d894

Please sign in to comment.