Skip to content

Commit

Permalink
Prototype - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cbioportal import user committed Jan 10, 2025
1 parent 6d05b44 commit a79d4b0
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 24 deletions.
4 changes: 2 additions & 2 deletions docker/web-and-data/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ WORKDIR /cbioportal
RUN mvn dependency:go-offline --fail-never

COPY $PWD /cbioportal
RUN mvn install package -DskipTests -Dfed.mode=FEDERATOR -q
RUN mvn install package -DskipTests -q
RUN mkdir -p target/dependency && (cd target/dependency; jar -xf ../*-exec.jar)

FROM eclipse-temurin:21
Expand Down Expand Up @@ -60,7 +60,7 @@ RUN find /core/scripts/ -type f -executable \! -name '*.pl' -print0 | xargs -0 -
ENV PORTAL_WEB_HOME=/cbioportal-webapp
ENV PORTAL_HOME=/cbioportal
ENV JAVA_OPTS="-Xms2G -Xmx8G"
ENV WEBAPP_OPTS=
ENV WEBAPP_OPTS="-Dfedapi.mode=FEDERATOR"

# add exploded Spring Boot jar contents to image
# See: https://spring.io/guides/topicals/spring-boot-docker/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public CompletableFuture<List<ClinicalAttribute>> fetchClinicalAttributes() {

@Override
public CompletableFuture<List<ClinicalDataCountItem>> fetchClinicalDataCounts(ClinicalDataCountFilter filter) {
filter.getStudyViewFilter().setStudyIds(null);
return POST(
"/clinical-data-counts/fetch",
Map.of(),
Expand All @@ -67,6 +68,7 @@ public CompletableFuture<List<ClinicalDataCountItem>> fetchClinicalDataCounts(Cl

@Override
public CompletableFuture<List<ClinicalDataBin>> fetchClinicalDataBinCounts(ClinicalDataBinCountFilter filter) {
filter.getStudyViewFilter().setStudyIds(null);
return POST(
"/clinical-data-bin-counts/fetch",
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@ConditionalOnProperty(name = "fedapi.mode", havingValue = "DATASOURCE")
public class FederatedDataSourceService implements FederatedService {

@Value("fedapi.datasource.display-name")
@Value("${fedapi.datasource.display-name}")
private String displayName;

@Value("#{'${fedapi.datasource.visible-studies}'.split(',')}")
Expand Down
101 changes: 82 additions & 19 deletions src/main/java/org/cbioportal/service/impl/FederatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,31 @@

import org.cbioportal.model.ClinicalAttribute;
import org.cbioportal.model.ClinicalDataBin;
import org.cbioportal.model.ClinicalDataCount;
import org.cbioportal.model.ClinicalDataCountItem;
import org.cbioportal.persistence.fedapi.FederatedDataSource;
import org.cbioportal.persistence.fedapi.FederatedDataSourceImpl;
import org.cbioportal.persistence.fedapi.FederatorConfig;
import org.cbioportal.service.ClinicalAttributeService;
import org.cbioportal.service.ClinicalDataService;
import org.cbioportal.service.FederatedService;
import org.cbioportal.service.exception.FederationException;
import org.cbioportal.utils.config.annotation.ConditionalOnProperty;
import org.cbioportal.web.parameter.*;
import org.cbioportal.web.util.ClinicalDataBinUtil;
import org.cbioportal.web.util.StudyViewFilterApplier;
import org.cbioportal.web.util.StudyViewFilterUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
@ConditionalOnProperty(name = "fedapi.mode", havingValue = "FEDERATOR")
@ConditionalOnProperty(name = "fedapi.mode", havingValue = "FEDERATOR", matchIfMissing = true) // TODO undo
@Primary // TODO undo
public class FederatorService implements FederatedService {

@Autowired
Expand All @@ -35,16 +35,21 @@ public class FederatorService implements FederatedService {
@Override
public List<ClinicalAttribute> fetchClinicalAttributes() throws FederationException {
try {
return aggResultsFromDifferentSources(
List<List<ClinicalAttribute>> results = collectResultsFromDifferentSources(
s -> s.fetchClinicalAttributes()
);
return results.stream()
.flatMap(List::stream)
// not necessary to do a distinctBy(attributeId) -- attributes are specific to a particular study,
// so if 2 studies have the same attribute we should leave it be.
.collect(Collectors.toList());
} catch (Exception e) {
throw new FederationException("Failed to fetch clinical attributes", e);
}
}

private <T> List<T> aggResultsFromDifferentSources(
Function<FederatedDataSource, CompletableFuture<List<T>>> apiFunc
private <TResult> List<TResult> collectResultsFromDifferentSources(
Function<FederatedDataSource, CompletableFuture<TResult>> apiFunc
) throws Exception {
// Each list = results from one data source
// WhenAll's over the list of tasks to get a CF<List<List<CA>>>
Expand All @@ -56,46 +61,104 @@ private <T> List<T> aggResultsFromDifferentSources(
.<FederatedDataSource>map(inf -> new FederatedDataSourceImpl(inf))
.toList();

List<CompletableFuture<List<T>>> futures = sources.stream()
List<CompletableFuture<TResult>> futures = sources.stream()
.map(apiFunc::apply)
.toList();

CompletableFuture<List<List<T>>> combinedFuture =
CompletableFuture<List<TResult>> combinedFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // Join each future result
.collect(Collectors.toList()) // Collect into List<List<ClinicalAttribute>>
);

List<List<T>> results = combinedFuture.get();
List<T> flattened = results.stream()
.flatMap(List::stream) // Flatten each sublist
.collect(Collectors.toList());
return flattened;
List<TResult> results = combinedFuture.get();
return results;
}

@Override
public List<ClinicalDataCountItem> fetchClinicalDataCounts(
ClinicalDataCountFilter filter
) throws FederationException {
try {
return aggResultsFromDifferentSources(
List<List<ClinicalDataCountItem>> results = collectResultsFromDifferentSources(
s -> s.fetchClinicalDataCounts(filter)
);
return flattenClinicalDataCountItems(results);
} catch (Exception e) {
throw new FederationException("Failed to fetch clinical data counts", e);
}
}

private List<ClinicalDataCountItem> flattenClinicalDataCountItems(
List<List<ClinicalDataCountItem>> results
) {
BinaryOperator<ClinicalDataCount> mergeClinicalDataCounts = (left, right) -> {
if (!left.getValue().equals(right.getValue())) {
throw new IllegalArgumentException("attempting to merge counts for different values");
}

var res = new ClinicalDataCount();
res.setValue(left.getValue());
res.setCount(left.getCount() + right.getCount());
return res;
};

BinaryOperator<ClinicalDataCountItem> mergeClinicalDataCountItems = (left, right) -> {
if (!left.getAttributeId().equals(right.getAttributeId())) {
throw new IllegalArgumentException("attempting to merge counts for different attributes");
}

List<ClinicalDataCount> leftCounts = left.getCounts();
List<ClinicalDataCount> rightCounts = right.getCounts();
Stream<ClinicalDataCount> combinedCounts = Stream.concat(leftCounts.stream(), rightCounts.stream());
Map<String, ClinicalDataCount> groupedByValue = combinedCounts.collect(
Collectors.toMap(
ct -> ct.getValue(),
ct -> ct,
mergeClinicalDataCounts
)
);
List<ClinicalDataCount> mergedCounts = new ArrayList<>(groupedByValue.values());

var res = new ClinicalDataCountItem();
res.setAttributeId(left.getAttributeId());
res.setCounts(mergedCounts);
return res;
};

// Flatten the results
List<ClinicalDataCountItem> flattened = results.stream()
.flatMap(List::stream)
.collect(Collectors.toList());
// Group by the clinical attribute ID for each count item
// Merge ones the same ID
Map<String, ClinicalDataCountItem> groupedByAttr = flattened.stream()
.collect(
Collectors.toMap(
it -> it.getAttributeId(),
it -> it,
mergeClinicalDataCountItems
)
);

List<ClinicalDataCountItem> aggResults = new ArrayList<>(groupedByAttr.values());
return aggResults;
}

@Override
public List<ClinicalDataBin> fetchClinicalDataBinCounts(
ClinicalDataBinCountFilter filter
) throws FederationException {
try {
return aggResultsFromDifferentSources(
List<List<ClinicalDataBin>> results = collectResultsFromDifferentSources(
s -> s.fetchClinicalDataBinCounts(filter)
);
// TODO merge bins for the same range?
return results.stream()
.flatMap(List::stream)
.collect(Collectors.toList());
} catch (Exception e) {
throw new FederationException("Failed to fetch clinical data bin counts", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import java.util.List;

@Service
@ConditionalOnProperty(name = "fedapi.mode", havingValue = "NONE", matchIfMissing = true)
@Primary // default implementation
@ConditionalOnProperty(name = "fedapi.mode", havingValue = "NONE") // TODO matchIfMissing = true
// @Primary // default implementation
public class NullFederatedService implements FederatedService {
@Override
public List<ClinicalAttribute> fetchClinicalAttributes() throws FederationException {
Expand Down

0 comments on commit a79d4b0

Please sign in to comment.