diff --git a/src/main/java/org/dataloader/orchestration/CF.java b/src/main/java/org/dataloader/orchestration/CF.java new file mode 100644 index 0000000..4a5e177 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/CF.java @@ -0,0 +1,160 @@ +package org.dataloader.orchestration; + +import java.util.Iterator; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Supplier; + +public class CF { + + private AtomicReference result = new AtomicReference<>(); + + private static final Object NULL = new Object(); + + private static class ExceptionWrapper { + private final Throwable throwable; + + private ExceptionWrapper(Throwable throwable) { + this.throwable = throwable; + } + } + + private final LinkedBlockingDeque> dependedActions = new LinkedBlockingDeque<>(); + + private static class CompleteAction implements Runnable { + Executor executor; + CF toComplete; + CF src; + BiFunction mapperFn; + + public CompleteAction(CF toComplete, CF src, BiFunction mapperFn, Executor executor) { + this.toComplete = toComplete; + this.src = src; + this.mapperFn = mapperFn; + this.executor = executor; + } + + public void execute() { + if (executor != null) { + executor.execute(this); + } else { + toComplete.completeViaMapper(mapperFn, src.result.get()); + } + + } + + @Override + public void run() { + toComplete.completeViaMapper(mapperFn, src.result.get()); + src.dependedActions.remove(this); + } + } + + private CF() { + + } + + public CF newIncomplete() { + return new CF(); + } + + public static CF newComplete(T completed) { + CF result = new CF<>(); + result.encodeAndSetResult(completed); + return result; + } + + public static CF newExceptionally(Throwable e) { + CF result = new CF<>(); + result.encodeAndSetResult(e); + return result; + } + + public static CF supplyAsync(Supplier supplier, + Executor executor) { + + CF result = new CF<>(); + executor.execute(() -> { + try { + result.encodeAndSetResult(supplier.get()); + } catch (Throwable ex) { + result.encodeAndSetResult(ex); + } + }); + return result; + } + + + public CF map(BiFunction fn) { + CF newCF = new CF<>(); + if (result.get() != null) { + newCF.completeViaMapper(fn, result.get()); + } else { + dependedActions.push(new CompleteAction<>(newCF, this, fn, null)); + } + return newCF; + } + + public CF mapAsync(BiFunction fn, Executor executor) { + CF newResult = new CF<>(); + dependedActions.push(new CompleteAction<>(newResult, this, fn, executor)); + return newResult; + } + + + public CF compose(BiFunction fn, Executor executor) { + CF newResult = new CF<>(); + dependedActions.push(new CompleteAction<>(newResult, this, fn, executor)); + return newResult; + } + + + public boolean complete(T value) { + boolean success = result.compareAndSet(null, value); + fireDependentActions(); + return success; + } + + private boolean encodeAndSetResult(Object rawValue) { + if (rawValue == null) { + return result.compareAndSet(null, NULL); + } else if (rawValue instanceof Throwable) { + return result.compareAndSet(null, new ExceptionWrapper((Throwable) rawValue)); + } else { + return result.compareAndSet(null, rawValue); + } + } + + private Object decodeResult(Object rawValue) { + if (rawValue instanceof ExceptionWrapper) { + return ((ExceptionWrapper) rawValue).throwable; + } else if (rawValue == NULL) { + return null; + } else { + return rawValue; + } + } + + private void fireDependentActions() { + Iterator> iterator = dependedActions.iterator(); + while (iterator.hasNext()) { + iterator.next().execute(); + } + } + + private void completeViaMapper(BiFunction fn, Object encodedResult) { + try { + Object decodedResult = decodeResult(encodedResult); + Object mappedResult = fn.apply( + (T) decodedResult, + decodedResult instanceof Throwable ? (Throwable) decodedResult : null + ); + this.result.compareAndSet(null, mappedResult); + } catch (Throwable t) { + encodeAndSetResult(t); + } + } + +} diff --git a/src/main/java/org/dataloader/orchestration/Orchestrator.java b/src/main/java/org/dataloader/orchestration/Orchestrator.java new file mode 100644 index 0000000..f0338dd --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/Orchestrator.java @@ -0,0 +1,142 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; +import org.dataloader.impl.Assertions; +import org.dataloader.orchestration.executors.ImmediateExecutor; +import org.dataloader.orchestration.observation.Tracker; +import org.dataloader.orchestration.observation.TrackingObserver; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Function; + +public class Orchestrator { + + private final Executor executor; + private final Tracker tracker; + private final DataLoader startingDL; + private final List> steps = new ArrayList<>(); + + /** + * This will create a new {@link Orchestrator} that can allow multiple calls to multiple data-loader's + * to be orchestrated so they all run optimally. + * + * @param dataLoader the data loader to start with + * @param the key type + * @param the value type + * @return a new {@link Orchestrator} + */ + public static Builder orchestrate(DataLoader dataLoader) { + return new Builder<>(dataLoader); + } + + public Orchestrator(Builder builder) { + this.tracker = new Tracker(builder.trackingObserver); + this.executor = builder.executor; + this.startingDL = builder.dataLoader; + } + + public Tracker getTracker() { + return tracker; + } + + public Executor getExecutor() { + return executor; + } + + + public Step load(K key) { + return load(key, null); + } + + public Step load(K key, Object keyContext) { + return Step.loadImpl(this, castAs(startingDL), key, keyContext); + } + + static T castAs(Object o) { + //noinspection unchecked + return (T) o; + } + + + void record(Step step) { + steps.add(step); + tracker.incrementStepCount(); + } + + /** + * This is the callback point saying to start the DataLoader loading process. + *

+ * The type of object returned here depends on the value type of the last Step. We cant be truly generic + * here and must be case. + * + * @param the value type + * @return the final composed value + */ + CompletableFuture execute() { + Assertions.assertState(!steps.isEmpty(), () -> "How can the steps to run be empty??"); + + // tell the tracker we are under way + getTracker().startingExecution(); + + int index = 0; + Step firstStep = steps.get(index); + + CompletableFuture currentCF = castAs(firstStep.codeToRun().apply(null)); // first load uses variable capture + whenComplete(index, firstStep, currentCF); + + for (index++; index < steps.size(); index++) { + Step nextStep = steps.get(index); + Function> codeToRun = castAs(nextStep.codeToRun()); + CompletableFuture nextCF = currentCF.thenCompose(value -> castAs(codeToRun.apply(value))); + currentCF = nextCF; + + // side effect when this step is complete + whenComplete(index, nextStep, nextCF); + } + + return castAs(currentCF); + + } + + private void whenComplete(int stepIndex, Step step, CompletableFuture cf) { + cf.whenComplete((v, throwable) -> { + if (throwable != null) { + // TODO - should we be cancelling future steps here - no need for dispatch tracking if they will never run + System.out.println("A throwable has been thrown on step " + stepIndex + ": " + throwable.getMessage()); + throwable.printStackTrace(System.out); + } else { + System.out.println("step " + stepIndex + " returned : " + v); + } + getTracker().loadCallComplete(stepIndex, step.dataLoader(), throwable); + }); + } + + + public static class Builder { + private Executor executor = ImmediateExecutor.INSTANCE; + private DataLoader dataLoader; + private TrackingObserver trackingObserver; + + Builder(DataLoader dataLoader) { + this.dataLoader = dataLoader; + } + + public Builder executor(Executor executor) { + this.executor = executor; + return this; + } + + public Builder observer(TrackingObserver trackingObserver) { + this.trackingObserver = trackingObserver; + return this; + } + + public Orchestrator build() { + return new Orchestrator<>(this); + } + } + +} diff --git a/src/main/java/org/dataloader/orchestration/Step.java b/src/main/java/org/dataloader/orchestration/Step.java new file mode 100644 index 0000000..34de341 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/Step.java @@ -0,0 +1,106 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; +import org.dataloader.orchestration.executors.ObservingExecutor; +import org.dataloader.orchestration.observation.Tracker; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.dataloader.orchestration.Orchestrator.castAs; + +public class Step { + private final Orchestrator orchestrator; + private final DataLoader dl; + private final Function> codeToRun; + + Step(Orchestrator orchestrator, DataLoader dataLoader, Function> codeToRun) { + this.orchestrator = orchestrator; + this.dl = castAs(dataLoader); + this.codeToRun = codeToRun; + } + + DataLoader dataLoader() { + return dl; + } + + public Function> codeToRun() { + return codeToRun; + } + + public With with(DataLoader dataLoader) { + return new With<>(orchestrator, dataLoader); + } + + public Step load(K key, Object keyContext) { + return loadImpl(orchestrator, dl, key, keyContext); + } + + public Step thenLoad(Function codeToRun) { + return thenLoadImpl(orchestrator, dl, codeToRun, false); + } + + public Step thenLoadAsync(Function codeToRun) { + return thenLoadImpl(orchestrator, dl, codeToRun, true); + } + + static Step loadImpl(Orchestrator orchestrator, DataLoader dl, K key, Object keyContext) { + Tracker tracker = orchestrator.getTracker(); + int stepIndex = tracker.getStepCount(); + Function> codeToRun = k -> { + CompletableFuture cf = castAs(dl.load(key, keyContext)); + orchestrator.getTracker().loadCall(stepIndex, dl); + return cf; + }; + Step step = new Step<>(orchestrator, dl, codeToRun); + orchestrator.record(step); + return step; + } + + static Step thenLoadImpl(Orchestrator orchestrator, DataLoader dl, Function codeToRun, boolean async) { + Tracker tracker = orchestrator.getTracker(); + Function> actualCodeToRun; + if (async) { + actualCodeToRun = mkAsyncLoadLambda(orchestrator, dl, codeToRun, tracker); + } else { + actualCodeToRun = mkSyncLoadLambda(dl, codeToRun, tracker); + } + Step step = new Step<>(orchestrator, dl, actualCodeToRun); + orchestrator.record(step); + return step; + } + + private static Function> mkSyncLoadLambda(DataLoader dl, Function codeToRun, Tracker tracker) { + int stepIndex = tracker.getStepCount(); + return v -> { + K key = codeToRun.apply(v); + CompletableFuture cf = castAs(dl.load(key)); + tracker.loadCall(stepIndex, dl); + return cf; + }; + } + + private static Function> mkAsyncLoadLambda(Orchestrator orchestrator, DataLoader dl, Function codeToRun, Tracker tracker) { + int stepIndex = tracker.getStepCount(); + return v -> { + Executor executor = orchestrator.getExecutor(); + Consumer callback = atSomePointWeNeedMoreStateButUsingStringForNowToMakeItCompile -> { + tracker.loadCall(stepIndex, dl); + }; + ObservingExecutor observingExecutor = new ObservingExecutor<>(executor, "state", callback); + Supplier> dataLoaderCall = () -> { + K key = codeToRun.apply(v); + return castAs(dl.load(key)); + }; + return CompletableFuture.supplyAsync(dataLoaderCall, observingExecutor) + .thenCompose(Function.identity()); + }; + } + + public CompletableFuture toCompletableFuture() { + return orchestrator.execute(); + } +} diff --git a/src/main/java/org/dataloader/orchestration/With.java b/src/main/java/org/dataloader/orchestration/With.java new file mode 100644 index 0000000..bf647bc --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/With.java @@ -0,0 +1,40 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; + +import java.util.function.Function; + +import static org.dataloader.orchestration.Orchestrator.castAs; +import static org.dataloader.orchestration.Step.loadImpl; + +/** + * A transitional step that allows a new step to be started with a new data loader in play + * + * @param the key type + * @param the value type + */ +public class With { + private final Orchestrator orchestrator; + private final DataLoader dl; + + public With(Orchestrator orchestrator, DataLoader dl) { + this.orchestrator = orchestrator; + this.dl = dl; + } + + public Step load(K key) { + return load(key, null); + } + + public Step load(K key, Object keyContext) { + return loadImpl(orchestrator, castAs(dl), key,keyContext); + } + + public Step thenLoad(Function codeToRun) { + return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun, false); + } + + public Step thenLoadAsync(Function codeToRun) { + return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun, true); + } +} diff --git a/src/main/java/org/dataloader/orchestration/executors/ImmediateExecutor.java b/src/main/java/org/dataloader/orchestration/executors/ImmediateExecutor.java new file mode 100644 index 0000000..e7dc2b5 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/executors/ImmediateExecutor.java @@ -0,0 +1,15 @@ +package org.dataloader.orchestration.executors; + +import org.dataloader.annotations.Internal; + +import java.util.concurrent.Executor; + +@Internal +public class ImmediateExecutor implements Executor { + public static final ImmediateExecutor INSTANCE = new ImmediateExecutor(); + + @Override + public void execute(Runnable command) { + command.run(); + } +} diff --git a/src/main/java/org/dataloader/orchestration/executors/ObservingExecutor.java b/src/main/java/org/dataloader/orchestration/executors/ObservingExecutor.java new file mode 100644 index 0000000..c3c3773 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/executors/ObservingExecutor.java @@ -0,0 +1,26 @@ +package org.dataloader.orchestration.executors; + +import org.dataloader.annotations.Internal; + +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +@Internal +public class ObservingExecutor implements Executor { + + private final Executor delegate; + private final T state; + private final Consumer callback; + + public ObservingExecutor(Executor delegate, T state, Consumer callback) { + this.delegate = delegate; + this.state = state; + this.callback = callback; + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + callback.accept(state); + } +} diff --git a/src/main/java/org/dataloader/orchestration/observation/Tracker.java b/src/main/java/org/dataloader/orchestration/observation/Tracker.java new file mode 100644 index 0000000..8a01a70 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/observation/Tracker.java @@ -0,0 +1,68 @@ +package org.dataloader.orchestration.observation; + +import org.dataloader.DataLoader; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This needs HEAPS more work - heaps more - I am not sure if its just counts of call backs or what. + *

+ * This is just POC stuff for now + */ +public class Tracker { + private final AtomicInteger stepCount = new AtomicInteger(); + private final Map, AtomicInteger> counters = new HashMap<>(); + private final TrackingObserver trackingObserver; + + public Tracker(TrackingObserver trackingObserver) { + this.trackingObserver = trackingObserver; + } + + public int getOutstandingLoadCount(DataLoader dl) { + synchronized (this) { + return getDLCounter(dl).intValue(); + } + } + + public int getOutstandingLoadCount() { + int count = 0; + synchronized (this) { + for (AtomicInteger atomicInteger : counters.values()) { + count += atomicInteger.get(); + } + } + return count; + } + + public int getStepCount() { + return stepCount.get(); + } + + public void incrementStepCount() { + this.stepCount.incrementAndGet(); + } + + public void startingExecution() { + trackingObserver.onStart(this); + } + + public void loadCall(int stepIndex, DataLoader dl) { + synchronized (this) { + getDLCounter(dl).incrementAndGet(); + trackingObserver.onLoad(this, stepIndex, dl); + } + } + + public void loadCallComplete(int stepIndex, DataLoader dl, Throwable throwable) { + synchronized (this) { + getDLCounter(dl).decrementAndGet(); + trackingObserver.onLoadComplete(this,stepIndex,dl, throwable); + } + } + + private AtomicInteger getDLCounter(DataLoader dl) { + return counters.computeIfAbsent(dl, key -> new AtomicInteger()); + } +} diff --git a/src/main/java/org/dataloader/orchestration/observation/TrackingObserver.java b/src/main/java/org/dataloader/orchestration/observation/TrackingObserver.java new file mode 100644 index 0000000..cf1031e --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/observation/TrackingObserver.java @@ -0,0 +1,18 @@ +package org.dataloader.orchestration.observation; + +import org.dataloader.DataLoader; +import org.dataloader.annotations.PublicSpi; + +/** + * This callback is invoked when the {@link org.dataloader.orchestration.Orchestrator} starts execution and then + * as each {@link DataLoader} is invoked and then again when it completes + */ +@PublicSpi +public interface TrackingObserver { + void onStart(Tracker tracker); + + void onLoad(Tracker tracker, int stepIndex, DataLoader dl); + + // TODO - should this have an exception should it fail in CF terms ??? + void onLoadComplete(Tracker tracker, int stepIndex, DataLoader dl, Throwable throwable); +} diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 04ec5e5..a5d8568 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -11,8 +11,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedHashSet; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +60,43 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static BatchLoader upperCaseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(String::toUpperCase).collect(toList())); + } + + public static BatchLoader lowerCaseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(String::toLowerCase).collect(toList())); + } + + public static BatchLoader reverseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::reverse).collect(toList())); + } + + public static BatchLoader alternateCaseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::alternateCase).collect(toList())); + } + + public static String reverse(String s) { + StringBuilder sb = new StringBuilder(); + for (int i = s.length() - 1; i >= 0; i--) { + sb.append(s.charAt(i)); + } + return sb.toString(); + } + + public static String alternateCase(String s) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i <= s.length()-1; i++) { + char c = s.charAt(i); + if (i % 2 == 0) { + sb.append(Character.toLowerCase(c)); + } else { + sb.append(Character.toUpperCase(c)); + } + } + return sb.toString(); + } + public static DataLoader idLoader() { return idLoader(null, new ArrayList<>()); } @@ -104,7 +141,7 @@ public static Set asSet(Collection elements) { public static boolean areAllDone(CompletableFuture... cfs) { for (CompletableFuture cf : cfs) { - if (! cf.isDone()) { + if (!cf.isDone()) { return false; } } diff --git a/src/test/java/org/dataloader/orchestration/OrchestratorTest.java b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java new file mode 100644 index 0000000..b9f40ff --- /dev/null +++ b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java @@ -0,0 +1,135 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.DataLoaderRegistry; +import org.dataloader.fixtures.TestKit; +import org.dataloader.orchestration.observation.Tracker; +import org.dataloader.orchestration.observation.TrackingObserver; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.fixtures.TestKit.alternateCaseBatchLoader; +import static org.dataloader.fixtures.TestKit.lowerCaseBatchLoader; +import static org.dataloader.fixtures.TestKit.reverseBatchLoader; +import static org.dataloader.fixtures.TestKit.upperCaseBatchLoader; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +class OrchestratorTest { + + DataLoaderOptions cachingAndBatchingOptions = DataLoaderOptions.newOptions().setBatchingEnabled(true).setCachingEnabled(true); + + DataLoader dlUpper = newDataLoader(upperCaseBatchLoader(), cachingAndBatchingOptions); + DataLoader dlLower = newDataLoader(lowerCaseBatchLoader(), cachingAndBatchingOptions); + DataLoader dlReverse = newDataLoader(reverseBatchLoader(), cachingAndBatchingOptions); + DataLoader dlAlternateCase = newDataLoader(alternateCaseBatchLoader(), cachingAndBatchingOptions); + + @Test + void canOrchestrate() { + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("upper", dlUpper) + .register("lower", dlLower) + .register("reverse", dlReverse) + .register("alternateCase", dlAlternateCase) + .build(); + + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper).build(); + Step step1 = orchestrator.load("aBc", null); + With with1 = step1.with(dlLower); + Step step2 = with1.thenLoad(key -> key); + With with2 = step2.with(dlReverse); + Step step3 = with2.thenLoad(key -> key); + CompletableFuture cf = step3.toCompletableFuture(); + + // because all the dls are dispatched in "perfect order" here they all end up dispatching + // at JUST the right time. A change in order would be different + registry.dispatchAll(); + + await().until(cf::isDone); + + assertThat(cf.join(), equalTo("cba")); + } + + @Test + void canOrchestrateWhenNotInPerfectOrder() { + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("reverse", dlReverse) + .register("lower", dlLower) + .register("upper", dlUpper) + .register("alternateCase", dlAlternateCase) + .build(); + + ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper).executor(forkJoinPool).build(); + CompletableFuture cf = orchestrator.load("aBc", null) + .with(dlLower).thenLoad(key1 -> key1) + .with(dlReverse).thenLoad(key -> key) + .with(dlAlternateCase).thenLoadAsync(key -> key) + .toCompletableFuture(); + + for (int i = 0; i < 10; i++) { + TestKit.snooze(50); // TODO - hack or now + registry.dispatchAll(); + System.out.println("Waiting for " + i + " to complete..."); + } + + await().until(cf::isDone); + + assertThat(cf.join(), equalTo("cBa")); + } + + @Test + void can_observe_orchestration_happening() { + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("upper", dlUpper) + .register("lower", dlLower) + .register("reverse", dlReverse) + .register("alternateCase", dlAlternateCase) + .build(); + + AtomicInteger stepCount = new AtomicInteger(); + TrackingObserver observer = new TrackingObserver() { + @Override + public void onStart(Tracker tracker) { + System.out.println("starting - step count : " + tracker.getStepCount()); + stepCount.set(tracker.getStepCount()); + } + + @Override + public void onLoad(Tracker tracker, int stepIndex, DataLoader dl) { + System.out.println("onLoad : " + stepIndex); + } + + @Override + public void onLoadComplete(Tracker tracker, int stepIndex, DataLoader dl, Throwable throwable) { + System.out.println("onLoadComplete : " + stepIndex); + } + }; + + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper).observer(observer).build(); + Step step1 = orchestrator.load("aBc", null); + With with1 = step1.with(dlLower); + Step step2 = with1.thenLoad(key -> key); + With with2 = step2.with(dlReverse); + Step step3 = with2.thenLoad(key -> key); + CompletableFuture cf = step3.toCompletableFuture(); + + // because all the dls are dispatched in "perfect order" here they all end up dispatching + // at JUST the right time. A change in order would be different + registry.dispatchAll(); + + await().until(cf::isDone); + + assertThat(cf.join(), equalTo("cba")); + assertThat(stepCount.get(), equalTo(3)); + } +} \ No newline at end of file