Skip to content

Commit

Permalink
EVA-3543 check if rs with same hash already exist before merging (#441)
Browse files Browse the repository at this point in the history
* check if rs with same hash already exist before merging
  • Loading branch information
nitin-ebi authored May 1, 2024
1 parent 1348569 commit 0170913
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import uk.ac.ebi.eva.accession.clustering.metric.ClusteringMetric;
import uk.ac.ebi.eva.accession.core.model.IClusteredVariant;
import uk.ac.ebi.eva.accession.core.model.ISubmittedVariant;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpClusteredVariantEntity;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantEntity;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantOperationEntity;
import uk.ac.ebi.eva.accession.core.model.eva.ClusteredVariantEntity;
Expand Down Expand Up @@ -191,12 +192,39 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation)
ImmutablePair<ClusteredVariantEntity, List<ClusteredVariantEntity>> mergeDestinationAndMergees =
getMergeDestinationAndMergees(mergeCandidates);
ClusteredVariantEntity mergeDestination = mergeDestinationAndMergees.getLeft();

List<ClusteredVariantEntity> mergees = mergeDestinationAndMergees.getRight();

removeMergeesAndInsertMergeDestination(mergeDestination, mergees);

for (ClusteredVariantEntity mergee: mergees) {
logger.info("RS merge operation: Merging rs{} to rs{} due to hash collision...",
mergee.getAccession(), mergeDestination.getAccession());
merge(mergeDestination, mergee, currentOperation);
recordMergeOperations(mergeDestination, mergee, currentOperation);
}
}

private void removeMergeesAndInsertMergeDestination(ClusteredVariantEntity mergeDestination,
List<ClusteredVariantEntity> mergeeList) {
Query queryForCheckingExistingCVE = query(where(ID_ATTRIBUTE).is(mergeDestination.getHashedMessage()));
List<ClusteredVariantEntity> existingCVEList = mongoTemplate.find(queryForCheckingExistingCVE, ClusteredVariantEntity.class);
List<DbsnpClusteredVariantEntity> existingDbsnpCVEList = mongoTemplate.find(queryForCheckingExistingCVE, DbsnpClusteredVariantEntity.class);
ClusteredVariantEntity existingCVE = null;
if (existingCVEList != null && !existingCVEList.isEmpty()) {
existingCVE = existingCVEList.get(0);
}
if (existingDbsnpCVEList != null && !existingDbsnpCVEList.isEmpty()) {
existingCVE = existingDbsnpCVEList.get(0);
}

if (existingCVE == null) {
insertRSRecordForMergeDestination(mergeDestination);
} else {
if (!existingCVE.getAccession().equals(mergeDestination.getAccession())) {
mongoTemplate.remove(query(where(ID_ATTRIBUTE).is(existingCVE.getHashedMessage())),
clusteringWriter.getClusteredVariantCollection(existingCVE.getAccession()));
metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, 1);
insertRSRecordForMergeDestination(mergeDestination);
}
}
}

Expand Down Expand Up @@ -259,40 +287,11 @@ private ImmutablePair<ClusteredVariantEntity, List<ClusteredVariantEntity>> getM
return new ImmutablePair<>(targetRS, mergees);
}

protected void merge(ClusteredVariantEntity mergeDestination, ClusteredVariantEntity mergee,
protected void recordMergeOperations(ClusteredVariantEntity mergeDestination, ClusteredVariantEntity mergee,
SubmittedVariantOperationEntity currentOperation) {
Long accessionToBeMerged = mergee.getAccession();
Long accessionToKeep = mergeDestination.getAccession();

//Confine merge updates to the particular assembly where clustering is being performed
Query queryForMergee = query(where(ACCESSION_ATTRIBUTE).is(accessionToBeMerged))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
Query queryForMergeTarget = query(where(ACCESSION_ATTRIBUTE).is(accessionToKeep))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));

List<? extends ClusteredVariantEntity> clusteredVariantToMerge =
mongoTemplate.find(queryForMergee,
clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));

List<? extends ClusteredVariantEntity> clusteredVariantToKeep =
mongoTemplate.find(queryForMergeTarget, clusteringWriter.getClusteredVariantCollection(
accessionToKeep));

if (clusteringWriter.isMultimap(clusteredVariantToMerge) || clusteringWriter.isMultimap(clusteredVariantToKeep)) {
// multimap! don't merge. see isMultimap() below for more details
return;
}

// Mergee is no longer valid to be present in the main clustered variant collection
mongoTemplate.remove(queryForMergee, clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));
metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, clusteredVariantToMerge.size());

if (clusteredVariantToKeep.isEmpty()) {
// Insert RS record for destination RS
insertRSRecordForMergeDestination(mergeDestination);
}
// Record merge operation
insertMergeOperation(mergeDestination, mergee);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,82 @@ public void testMultiLevelRSMerges() throws Exception {
assertRSAssociatedWithSS(1L, ss4);
}

@Test
@DirtiesContext
public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithHigherAccession() throws Exception {
ss1 = createSS(1L, 1L, 100L, "C", "T");
ss2 = createSS(2L, 2L, 100L, "C", "A");
ss3 = createSS(3L, 3L, 100L, "A", "G");
this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION);

DbsnpClusteredVariantEntity rs2 = this.createRS(ss2);
this.mongoTemplate.insert(rs2);

SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity();
mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE,
ss1.getAccession(), null, "Different RS with matching loci",
Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList()));
mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1));
this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION);

List<SubmittedVariantOperationEntity> submittedVariantOperationEntities = new ArrayList<>();
SubmittedVariantOperationEntity temp;
rsMergeCandidatesReader.open(new ExecutionContext());
while ((temp = rsMergeCandidatesReader.read()) != null) {
submittedVariantOperationEntities.add(temp);
}

//Perform merge
rsMergeWriter.write(submittedVariantOperationEntities);

// Check rs2,rs3 merged into to rs1
assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);
assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);

// Check rs1 to rs3 all have same rs
assertRSAssociatedWithSS(1L, ss1);
assertRSAssociatedWithSS(1L, ss2);
assertRSAssociatedWithSS(1L, ss3);
}

@Test
@DirtiesContext
public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithLowerAccession() throws Exception {
ss1 = createSS(1L, 1L, 100L, "C", "T");
ss2 = createSS(2L, 2L, 100L, "C", "A");
ss3 = createSS(3L, 3L, 100L, "A", "G");
this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION);

DbsnpClusteredVariantEntity rs1 = this.createRS(ss1);
this.mongoTemplate.insert(rs1);

SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity();
mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE,
ss1.getAccession(), null, "Different RS with matching loci",
Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList()));
mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1));
this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION);

List<SubmittedVariantOperationEntity> submittedVariantOperationEntities = new ArrayList<>();
SubmittedVariantOperationEntity temp;
rsMergeCandidatesReader.open(new ExecutionContext());
while ((temp = rsMergeCandidatesReader.read()) != null) {
submittedVariantOperationEntities.add(temp);
}

//Perform merge
rsMergeWriter.write(submittedVariantOperationEntities);

// Check rs2,rs3 merged into to rs1
assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);
assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);

// Check rs1 to rs3 all have same rs
assertRSAssociatedWithSS(1L, ss1);
assertRSAssociatedWithSS(1L, ss2);
assertRSAssociatedWithSS(1L, ss3);
}

private void assertMergeOp(Long mergee, Long merger, String assemblyToUse) {
assertEquals(0, this.clusteredVariantAccessioningService
.getAllActiveByAssemblyAndAccessionIn(assemblyToUse, Arrays.asList(mergee)).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,87 +498,6 @@ public void do_not_merge_remapped_multimap_variants() throws Exception {
assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void do_not_merge_multimap_variants_into_remapped_variants() throws Exception {
// given
Long rs1 = 3000000000L;
Long rs2 = 3100000000L;
Long ssToRemap = 5000000000L;
Long ss2 = 5100000000L;
Long ss3 = 5200000000L;
String asm1 = "asm1";
String asm2 = "asm2";
assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0);

mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1));

mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED),
getSubmittedCollection(ssToRemap));


// NOTE: rs2 won't be merged into rs1 because rs2 is multimap
mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2));
mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, null), getClusteredTable(rs2));

mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED, 3),
getSubmittedCollection(ss2));
mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED),
getSubmittedCollection(ss2));

assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

// when
SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1);
this.clusterVariants(Collections.singletonList(sve1Remapped));

// then
assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void do_not_merge_remapped_variant_into_multimap_variants() throws Exception {
// given
Long rs1 = 3100000000L;
Long rs2 = 3000000000L;
Long ssToRemap = 5500000000L;
Long ss2 = 5100000000L;
Long ss3 = 5200000000L;
String asm1 = "asm1";
String asm2 = "asm2";
assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0);

// NOTE rs1 won't be merged into rs2 because rs2 is multimap
mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1));

mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED), getSubmittedCollection(ssToRemap));


mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2));
mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, 3), getClusteredTable(rs2));

mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED), getSubmittedCollection(ss2));
mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED), getSubmittedCollection(ss2));

assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

// when
SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1);
this.clusterVariants(Collections.singletonList(sve1Remapped));

// then
assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void merge_into_remapped_multimap_variants_if_single_mapping_per_assembly()
Expand Down

0 comments on commit 0170913

Please sign in to comment.