Skip to content

Commit

Permalink
log partition id
Browse files Browse the repository at this point in the history
  • Loading branch information
Cristian Goina committed Apr 2, 2024
1 parent 962c419 commit 7da8590
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,41 +143,48 @@ private void calculateAllGradientScores() {
// partition matches and process all partitions concurrently
ItemsHandling.partitionCollection(matchesMasksToProcess, args.processingPartitionSize).entrySet().stream().parallel()
.forEach(indexedPartition -> {
int partitionId = indexedPartition.getKey(); // unbox it
List<String> partionMasks = indexedPartition.getValue();
LOG.info("Start processing partition {} ({} items)",
indexedPartition.getKey(),
indexedPartition.getValue().size());
partitionId,
partionMasks.size());
long startProcessingPartitionTime = System.currentTimeMillis();
// process each item from the current partition sequentially
indexedPartition.getValue().forEach(maskIdToProcess -> {
partionMasks.forEach(maskIdToProcess -> {
// read all matches for the current mask
List<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesForMask = getCDMatchesForMask(cdMatchesReader, maskIdToProcess);
long nPublishedLines = cdMatchesForMask.stream()
.map(cdm -> cdm.getMatchedImage().getPublishedName())
.distinct()
.count();
// calculate the grad scores
LOG.info("Calculate grad scores for {} matches ({} lines) of {} - memory usage {}M out of {}M",
LOG.info("Partition {} - calculate grad scores for {} matches ({} lines) of {} - memory usage {}M out of {}M",
partitionId,
cdMatchesForMask.size(), nPublishedLines, maskIdToProcess,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
List<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesWithGradScores = calculateGradientScores(
gradScoreAlgorithmProvider,
cdMatchesForMask,
executor);
LOG.info("Completed grad scores for {} matches of {} - memory usage {}M out of {}M",
LOG.info("Partition {} - completed grad scores for {} matches of {} - memory usage {}M out of {}M",
partitionId,
cdMatchesWithGradScores.size(), maskIdToProcess,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
long writtenUpdates = updateCDMatches(cdMatchesWithGradScores);
LOG.info("Updated {} grad scores for {} matches of {}", writtenUpdates, cdMatchesWithGradScores.size(), maskIdToProcess);
LOG.info("Partition {} - updated {} grad scores for {} matches of {}",
partitionId,
writtenUpdates, cdMatchesWithGradScores.size(), maskIdToProcess);
if (StringUtils.isNotBlank(args.processingTag)) {
long updatesWithProcessedTag = updateProcessingTag(cdMatchesForMask);
LOG.info("Set processing tag {} for {} mips", args.getProcessingTag(), updatesWithProcessedTag);
LOG.info("Partition {} - set processing tag {} for {} mips",
partitionId, args.getProcessingTag(), updatesWithProcessedTag);
}
});
LOG.info("Finished partition {} ({} items) in {}s - memory usage {}M out of {}M",
indexedPartition.getKey(),
indexedPartition.getValue().size(),
partitionId,
partionMasks.size(),
(System.currentTimeMillis() - startProcessingPartitionTime) / 1000.,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
Expand Down

0 comments on commit 7da8590

Please sign in to comment.