Skip to content

Commit

Permalink
ProcessingService
Browse files Browse the repository at this point in the history
  • Loading branch information
apete committed Nov 10, 2024
1 parent ebd2196 commit f3b0c6a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 32 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ Added / Changed / Deprecated / Fixed / Removed / Security

> Corresponds to changes in the `develop` branch since the last release
### Added

#### org.ojalgo.concurrent

- Addition to `ProcessingService` that simplify concurrently taking items from a `BlockingQueue`.

### Changed

#### org.ojalgo.random
Expand Down
108 changes: 76 additions & 32 deletions src/main/java/org/ojalgo/concurrent/ProcessingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
Expand All @@ -20,10 +22,10 @@
import org.ojalgo.type.function.TwoStepMapper;

/**
* A simple wrapper around an {@link ExecutorService} that makes it easier to process collections of items in
* parallel. The work items are processed by a {@link Consumer}, {@link Function} or {@link TwoStepMapper}. In
* particular the {@link TwoStepMapper} can be used to aggregate/reduce data from the work items and then
* combine the collected data into a final result.
* A simple wrapper around an {@link ExecutorService} that makes it easier to process collections of items in parallel.
* The work items are processed by a {@link Consumer}, {@link Function} or {@link TwoStepMapper}. In particular the
* {@link TwoStepMapper} can be used to aggregate/reduce data from the work items and then combine the collected data
* into a final result.
*/
public final class ProcessingService {

Expand Down Expand Up @@ -98,12 +100,12 @@ public <W, R> Map<W, R> compute(final Collection<W> work, final Function<W, R> c
}

/**
* Compute an output item for each (unique) input item, and return the results as a {@link Map}. If the
* input contains duplicates, the output will have fewer items. It is therefore vital that the input type
* implements {@link Object#hashCode()} and {@link Object#equals(Object)} properly.
* Compute an output item for each (unique) input item, and return the results as a {@link Map}. If the input
* contains duplicates, the output will have fewer items. It is therefore vital that the input type implements
* {@link Object#hashCode()} and {@link Object#equals(Object)} properly.
* <p>
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them
* with {@code computer} and collectiing the results in a {@link Map}.
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them with
* {@code computer} and collectiing the results in a {@link Map}.
*
* @param <W> The work item type
* @param <R> The function return type
Expand Down Expand Up @@ -145,9 +147,8 @@ public <W, R> Collection<R> map(final Collection<W> work, final Function<W, R> m

/**
* Simply map each (unique) input item to an output item - a {@link Collection} of input results in a
* {@link Collection} of output. If the input contains duplicates, the output will have fewer items. It is
* therefore vital that the input type implements {@link Object#hashCode()} and
* {@link Object#equals(Object)} properly.
* {@link Collection} of output. If the input contains duplicates, the output will have fewer items. It is therefore
* vital that the input type implements {@link Object#hashCode()} and {@link Object#equals(Object)} properly.
*
* @param <W> The input item type
* @param <R> The output item type
Expand Down Expand Up @@ -177,8 +178,8 @@ public <W> void process(final Collection<? extends W> work, final Consumer<W> pr
}

/**
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them
* with {@code processor}.
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them with
* {@code processor}.
*
* @param <W> The work item type
* @param work The collection of work items
Expand Down Expand Up @@ -231,8 +232,8 @@ public <W> void processTriplet(final W work1, final W work2, final W work3, fina
}

/**
* @deprecated v54 Use {@link #reduceMergeable(Collection<W>,int,Supplier<? extends
* TwoStepMapper.Mergeable<W, R>>)} instead
* @deprecated v54 Use {@link #reduceMergeable(Collection<W>,int,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)}
* instead
*/
@Deprecated
public <W, R> R reduce(final Collection<W> work, final int parallelism, final Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) {
Expand All @@ -249,22 +250,22 @@ public <W, R> R reduce(final Collection<W> work, final IntSupplier parallelism,
}

/**
* @deprecated v54 Use {@link #reduceMergeable(Collection<W>,Supplier<? extends TwoStepMapper.Mergeable<W,
* R>>)} instead
* @deprecated v54 Use {@link #reduceMergeable(Collection<W>,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)}
* instead
*/
@Deprecated
public <W, R> R reduce(final Collection<W> work, final Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) {
return this.reduceMergeable(work, reducer);
}

/**
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them
* with {@code reducer}. The state of each task's {@code reducer} will be combined into a single instance,
* and the results of that instance will be returned.
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them with
* {@code reducer}. The state of each task's {@code reducer} will be combined into a single instance, and the
* results of that instance will be returned.
* <p>
* Each {@link TwoStepMapper.Combineable} is only worked on by a single thread, and the results are
* combined into a single instance. The instances are not reused.
*
* Each {@link TwoStepMapper.Combineable} is only worked on by a single thread, and the results are combined into a
* single instance. The instances are not reused.
*
* @param work The collection of work items
* @param parallelism The maximum number of concurrent workers that will process the work items
* @param reducer A {@link TwoStepMapper.Combineable} implementation that does what you want.
Expand Down Expand Up @@ -307,21 +308,21 @@ public <W, R, A extends TwoStepMapper.Combineable<W, R, A>> R reduceCombineable(

/**
* Using parallelism {@link Parallelism#CORES}.
*
*
* @see ProcessingService#reduceCombineable(Collection, int, Supplier)
*/
public <W, R, A extends TwoStepMapper.Combineable<W, R, A>> R reduceCombineable(final Collection<W> work, final Supplier<A> reducer) {
return this.reduceCombineable(work, Parallelism.CORES, reducer);
}

/**
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them
* with {@code reducer}. The results of each task's {@code reducer} will be merged into a single instance,
* and the results of that instance will be returned.
* Will create at most {@code parallelism} tasks to work through the {@code work} items, processing them with
* {@code reducer}. The results of each task's {@code reducer} will be merged into a single instance, and the
* results of that instance will be returned.
* <p>
* Each {@link TwoStepMapper.Mergeable} is only worked on by a single thread, and the results are combined
* into a single instance. The instances are not reused.
*
* Each {@link TwoStepMapper.Mergeable} is only worked on by a single thread, and the results are combined into a
* single instance. The instances are not reused.
*
* @param work The collection of work items
* @param parallelism The maximum number of concurrent workers that will process the work items
* @param reducer A {@link TwoStepMapper.Mergeable} implementation that does what you want.
Expand Down Expand Up @@ -363,7 +364,7 @@ public <W, R, A extends TwoStepMapper.Mergeable<W, R>> R reduceMergeable(final C

/**
* Using parallelism {@link Parallelism#CORES}.
*
*
* @see ProcessingService#reduceMergeable(Collection, int, Supplier)
*/
public <W, R, A extends TwoStepMapper.Mergeable<W, R>> R reduceMergeable(final Collection<W> work, final Supplier<A> reducer) {
Expand Down Expand Up @@ -399,4 +400,47 @@ public void run(final IntSupplier parallelism, final Runnable processor) {
this.run(parallelism.getAsInt(), processor);
}

/**
* Will submit precisely {@code parallelism} tasks that each take from the {@code queue} feeding the items to the
* {@code processor}. The tasks will continue to run until the returned {@link AtomicBoolean} is set to
* {@code false} (or the thread is interrupted).
* <p>
* If the threads of the underlying {@link ExecutorService} are daemon threads, the JVM will not wait for them to
* finish before it exits. The default behaviour, using {@link #INSTANCE} or {@link #newInstance(String)}, is to
* make use of ojAlgo's {@link DaemonPoolExecutor}.
*
* @param <T> The work item type
* @param queue The queue to take from
* @param parallelism How many parallel workers to create
* @param processor What to do with each of the work items
* @return A flag that can be used to signal the tasks to stop
*/
@SuppressWarnings("unused")
public <T> AtomicBoolean take(final BlockingQueue<T> queue, final int parallelism, final Consumer<T> processor) {

AtomicBoolean active = new AtomicBoolean(true);

for (int i = 0; i < parallelism; i++) {
myExecutor.submit(() -> {
while (active.get()) {
try {
processor.accept(queue.take());
} catch (InterruptedException cause) {
Thread.currentThread().interrupt();
break;
}
}
});
}

return active;
}

/**
* @see ProcessingService#take(BlockingQueue, int, Consumer)
*/
public <T> AtomicBoolean take(final BlockingQueue<T> queue, final IntSupplier parallelism, final Consumer<T> processor) {
return this.take(queue, parallelism.getAsInt(), processor);
}

}

0 comments on commit f3b0c6a

Please sign in to comment.