Skip to content

Commit

Permalink
ReactorUtils added.
Browse files Browse the repository at this point in the history
  • Loading branch information
marco-brandizi committed Jun 29, 2024
1 parent ad64b20 commit 280ba79
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 7 deletions.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@
</dependency>
-->

<!--
Required by uk.ac.ebi.utils.opt.runcontrol.ReactorUtils
-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.10</version>
<optional>true</optional>
</dependency>


</dependencies>

<build>
Expand Down
3 changes: 2 additions & 1 deletion revision-history.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Revision History

*This file was last revised on 2024-06-07*. **Please keep this note updated**.
*This file was last revised on 2024-06-30*. **Please keep this note updated**.

## 14.1-SNAPSHOT
* `ChainExecutor.andThen()` added (to be tested).
* `StreamUtils.sampleStream()` added.
* `uk.ac.ebi.utils.statistics.FisherExact` imported from the USeq project.
* `CollectionsUtils`, adding functions to copy collections and making them read-only (to be tested, but used in dependants).
* `JacksonJsUtils` added.
* `ReactorUtils` added.


## 14.0.1
Expand Down
163 changes: 163 additions & 0 deletions src/main/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package uk.ac.ebi.utils.opt.runcontrol;

import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
import static reactor.core.scheduler.Schedulers.newBoundedElastic;

import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* Utilities based on the Project Reactor library.
*
* @author Marco Brandizi
* <dl><dt>Date:</dt><dd>29 Jun 2024</dd></dl>
*
*/
public class ReactorUtils
{
/**
* {@link Schedulers#newBoundedElastic(int, int, String)} with the {@link Schedulers#DEFAULT_BOUNDED_ELASTIC_SIZE default threadCap}
* and a low limit for queuedTaskCap. This is suitable for cases where the source is
* much faster than the downstream processing and hence there is little point with queueing
* too much stuff.
*
*/
public static final Scheduler DEFAULT_FLUX_SCHEDULER = newBoundedElastic (
DEFAULT_BOUNDED_ELASTIC_SIZE, 100,
"jutils.batchSched"
);

/**
* This has been tested in tasks like saving data on a database.
*/
public static final int DEFAULT_BATCH_SIZE = 2500;

/**
* Little helper to build a common {@link ParallelFlux} to process a source of items
* in parallel batches.
*
* @author Marco Brandizi
* <dl><dt>Date:</dt><dd>30 Jun 2024</dd></dl>
*
*/
public static class ParallelBatchFluxBuilder<T, B extends Collection<T>>
{
private Flux<T> flux;
private Scheduler scheduler = DEFAULT_FLUX_SCHEDULER;
private int batchSize = DEFAULT_BATCH_SIZE;
private Supplier<B> batchSupplier;

@SuppressWarnings ( "unchecked" )
public ParallelBatchFluxBuilder ( Flux<? extends T> flux )
{
this.flux = (Flux<T>) flux;
}

public ParallelBatchFluxBuilder ( Stream<? extends T> stream )
{
this ( Flux.fromStream ( stream ) );
}

public ParallelBatchFluxBuilder ( Collection<? extends T> collection )
{
this ( collection.stream () );
}

public ParallelFlux<B> build ()
{
@SuppressWarnings ( "unchecked" )
Flux<B> result = this.batchSupplier == null
? (Flux<B>) flux.buffer ( batchSize ) : flux.buffer ( batchSize, batchSupplier );

return result
.parallel ()
.runOn ( scheduler );
}

/**
* Default is {@link ReactorUtils#DEFAULT_FLUX_SCHEDULER}.
*/
public ParallelBatchFluxBuilder<T, B> withScheduler ( Scheduler scheduler )
{
this.scheduler = scheduler;
return this;
}

/**
* Default it {@link ReactorUtils#DEFAULT_BATCH_SIZE}.
*/
public ParallelBatchFluxBuilder<T, B> withBatchSize ( int batchSize )
{
this.batchSize = batchSize;
return this;
}

/**
* Default is null, which fallback to {@link Flux#buffer(int)}, usually a {@link List} supplier.
*/
@SuppressWarnings ( "unchecked" )
public ParallelBatchFluxBuilder<T, B> withBatchSupplier ( Supplier<? extends Collection<? super T>> batchSupplier )
{
this.batchSupplier = (Supplier<B>) batchSupplier;
return this;
}
} // class ParallelBatchFluxBuilder


/**
* Uses {@link ParallelBatchFluxBuilder} to process a source of batches.
*/
public static <T, B extends Collection<? super T>> void batchProcessing (
ParallelFlux<B> parallelFlux, Consumer<B> task
)
{
parallelFlux.doOnNext ( task )
.sequential ()
.blockLast ();
}

/**
* Uses {@link ParallelBatchFluxBuilder} with default options and
* {@link #batchProcessing(ParallelFlux, Consumer)} to batch a source of items and
* process them in parallel batches.
*
*/
public static <T> void batchProcessing (
Flux<T> flux, Consumer<List<T>> task
)
{
ParallelFlux<List<T>> parFlux = new ParallelBatchFluxBuilder<T,List<T>> ( flux ).build ();
batchProcessing ( parFlux, task );
}

/**
* Variant of {@link #batchProcessing(Flux, Consumer)}
*/
public static <T> void batchProcessing (
Stream<T> stream, Consumer<List<T>> task
)
{
ParallelFlux<List<T>> parFlux = new ParallelBatchFluxBuilder<T, List<T>> ( stream ).build ();
batchProcessing ( parFlux, task );
}

/**
* Variant of {@link #batchProcessing(Flux, Consumer)}
*/
public static <T> void batchProcessing (
Collection<T> collection, Consumer<List<T>> task
)
{
ParallelFlux<List<T>> parFlux = new ParallelBatchFluxBuilder<T,List<T>> ( collection ).build ();
batchProcessing ( parFlux, task );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.function.Supplier;
import java.util.stream.Collector;

import uk.ac.ebi.utils.opt.runcontrol.ReactorUtils;

/**
* <h1>The Batch Collector</h1>
*
Expand All @@ -23,7 +25,7 @@
* <dl><dt>Date:</dt><dd>23 Nov 2019</dd></dl>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. TODO: helpers and examples about batching via Reactor.
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public interface BatchCollector<B>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.slf4j.LoggerFactory;

import uk.ac.ebi.utils.exceptions.UnexpectedEventException;
import uk.ac.ebi.utils.opt.runcontrol.ReactorUtils;
import uk.ac.ebi.utils.threading.HackedBlockingQueue;
import uk.ac.ebi.utils.threading.ThreadUtils;
import uk.ac.ebi.utils.threading.batchproc.collectors.CollectionBatchCollector;
Expand Down Expand Up @@ -66,7 +67,7 @@
* <dl><dt>Date:</dt><dd>1 Dec 2017</dd></dl>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. TODO: helpers and examples about batching via Reactor.
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public abstract class BatchProcessor<B, BC extends BatchCollector<B>, BJ extends Consumer<B>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* <dl><dt>Date:</dt><dd>23 Nov 2019</dd></dl>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. TODO: helpers and examples about batching via Reactor.
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public interface ItemizedBatchCollector<B,E> extends BatchCollector<B>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* <dl><dt>Date:</dt><dd>23 Nov 2019</dd></dl>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. TODO: helpers and examples about batching via Reactor.
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public abstract class ItemizedBatchProcessor<E, B, BC extends ItemizedBatchCollector<B,E>, BJ extends Consumer<B>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* <dl><dt>Date:</dt><dd>23 Nov 2019</dd></dl>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. TODO: helpers and examples about batching via Reactor.
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public interface ItemizedSizedBatchCollector<B,E>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* <dl><dt>Date:</dt><dd>23 Nov 2019</dd></dl>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. TODO: helpers and examples about batching via Reactor.
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public interface SizedBatchCollector<B> extends BatchCollector<B>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*
* @param <C>
* @param <E>
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public abstract class CollectionBatchCollector<C extends Collection<E>, E>
extends AbstractSizedBatchCollector<C>
implements ItemizedSizedBatchCollector<C, E>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
* <dl><dt>Date:</dt><dd>25 Nov 2019</dd></dl>
*
* @param <E>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public class ListBatchCollector<E> extends CollectionBatchCollector<List<E>, E>
{
public ListBatchCollector ( Supplier<List<E>> batchFactory, int maxBatchSize )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
* <dl><dt>Date:</dt><dd>25 Nov 2019</dd></dl>
*
* @param <E>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public class SetBatchCollector<E> extends CollectionBatchCollector<Set<E>, E>
{
public SetBatchCollector ( Supplier<Set<E>> batchFactory, int maxBatchSize )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* @author brandizi
* <dl><dt>Date:</dt><dd>25 Nov 2019</dd></dl>
*
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public abstract class CollectionBasedBatchProcessor
<E, B extends Collection<E>, BC extends CollectionBatchCollector<B,E>, BJ extends Consumer<B>>
extends ItemizedBatchProcessor<E, B, BC, BJ>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

import uk.ac.ebi.utils.threading.batchproc.collectors.ListBatchCollector;

/**
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public abstract class ListBasedBatchProcessor<E, BJ extends Consumer<List<E>>>
extends CollectionBasedBatchProcessor<E, List<E>, ListBatchCollector<E>, BJ>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

import uk.ac.ebi.utils.threading.batchproc.collectors.SetBatchCollector;

/**
* @deprecated the functionality available in this package is provided by project
* Reactor and we recommend to switch to that. @see ReactorUtils
*/
@Deprecated
public abstract class SetBasedBatchProcessor<E, BJ extends Consumer<Set<E>>>
extends CollectionBasedBatchProcessor<E, Set<E>, SetBatchCollector<E>, BJ>
{
Expand Down
Loading

0 comments on commit 280ba79

Please sign in to comment.