From a5b2bbae7c0c47f8ab65765fb887f3a2135959c9 Mon Sep 17 00:00:00 2001 From: bbaker Date: Thu, 20 Mar 2025 12:52:35 +1100 Subject: [PATCH 1/7] POC- Orchestration of DLs --- .../orchestration/Orchestrator.java | 104 ++++++++++++++++++ .../org/dataloader/orchestration/Step.java | 67 +++++++++++ .../org/dataloader/orchestration/Tracker.java | 57 ++++++++++ .../org/dataloader/orchestration/With.java | 36 ++++++ .../java/org/dataloader/fixtures/TestKit.java | 24 +++- .../orchestration/OrchestratorTest.java | 77 +++++++++++++ 6 files changed, 363 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/dataloader/orchestration/Orchestrator.java create mode 100644 src/main/java/org/dataloader/orchestration/Step.java create mode 100644 src/main/java/org/dataloader/orchestration/Tracker.java create mode 100644 src/main/java/org/dataloader/orchestration/With.java create mode 100644 src/test/java/org/dataloader/orchestration/OrchestratorTest.java diff --git a/src/main/java/org/dataloader/orchestration/Orchestrator.java b/src/main/java/org/dataloader/orchestration/Orchestrator.java new file mode 100644 index 0000000..3419f28 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/Orchestrator.java @@ -0,0 +1,104 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; +import org.dataloader.impl.Assertions; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class Orchestrator { + + private final Tracker tracker; + private final DataLoader startingDL; + private final List> steps = new ArrayList<>(); + + /** + * This will create a new {@link Orchestrator} that can allow multiple calls to multiple data-loader's + * to be orchestrated so they all run optimally. + * + * @param dataLoader the data loader to start with + * @param the key type + * @param the value type + * @return a new {@link Orchestrator} + */ + public static Orchestrator orchestrate(DataLoader dataLoader) { + return new Orchestrator<>(new Tracker(), dataLoader); + } + + public Tracker getTracker() { + return tracker; + } + + private Orchestrator(Tracker tracker, DataLoader dataLoader) { + this.tracker = tracker; + this.startingDL = dataLoader; + } + + + public Step load(K key) { + return load(key, null); + } + + public Step load(K key, Object keyContext) { + return Step.loadImpl(this, castAs(startingDL), key, keyContext); + } + + static T castAs(Object o) { + //noinspection unchecked + return (T) o; + } + + + void record(Step step) { + steps.add(step); + tracker.incrementStepCount(); + } + + /** + * This is the callback point saying to start the DataLoader loading process. + *

+ * The type of object returned here depends on the value type of the last Step. We cant be truly generic + * here and must be case. + * + * @param the value type + * @return the final composed value + */ + CompletableFuture execute() { + Assertions.assertState(!steps.isEmpty(), () -> "How can the steps to run be empty??"); + int index = 0; + Step firstStep = steps.get(index); + + CompletableFuture currentCF = castAs(firstStep.codeToRun().apply(null)); // first load uses variable capture + whenComplete(index, firstStep, currentCF); + + for (index++; index < steps.size(); index++) { + Step nextStep = steps.get(index); + Function> codeToRun = castAs(nextStep.codeToRun()); + CompletableFuture nextCF = currentCF.thenCompose(value -> castAs(codeToRun.apply(value))); + currentCF = nextCF; + + // side effect when this step is complete + whenComplete(index, nextStep, nextCF); + } + return castAs(currentCF); + + } + + private void whenComplete(int index, Step step, CompletableFuture cf) { + cf.whenComplete((v, throwable) -> { + getTracker().loadCallComplete(step.dataLoader()); + // replace with instrumentation code + if (throwable != null) { + // TODO - should we be cancelling future steps here - no need for dispatch tracking if they will never run + System.out.println("A throwable has been thrown on step " + index + ": " + throwable.getMessage()); + throwable.printStackTrace(System.out); + } else { + System.out.println("step " + index + " returned : " + v); + } + }); + } + + +} diff --git a/src/main/java/org/dataloader/orchestration/Step.java b/src/main/java/org/dataloader/orchestration/Step.java new file mode 100644 index 0000000..b86c4a8 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/Step.java @@ -0,0 +1,67 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.dataloader.orchestration.Orchestrator.castAs; + +public class Step { + private final Orchestrator orchestrator; + private final DataLoader dl; + private final Function> codeToRun; + + Step(Orchestrator orchestrator, DataLoader dataLoader, Function> codeToRun) { + this.orchestrator = orchestrator; + this.dl = castAs(dataLoader); + this.codeToRun = codeToRun; + } + + DataLoader dataLoader() { + return dl; + } + + public Function> codeToRun() { + return codeToRun; + } + + public With with(DataLoader dataLoader) { + return new With<>(orchestrator, dataLoader); + } + + public Step load(K key, Object keyContext) { + return loadImpl(orchestrator, dl, key, keyContext); + } + + public Step thenLoad(Function codeToRun) { + return thenLoadImpl(orchestrator, dl, codeToRun); + } + + static Step loadImpl(Orchestrator orchestrator, DataLoader dl, K key, Object keyContext) { + Function> codeToRun = k -> { + CompletableFuture cf = castAs(dl.load(key, keyContext)); + orchestrator.getTracker().loadCall(dl); + return cf; + }; + Step step = new Step<>(orchestrator, dl, codeToRun); + orchestrator.record(step); + return step; + } + + static Step thenLoadImpl(Orchestrator orchestrator, DataLoader dl, Function codeToRun) { + Function> actualCodeToRun = v -> { + K key = codeToRun.apply(v); + CompletableFuture cf = castAs(dl.load(key)); + orchestrator.getTracker().loadCall(dl); + return cf; + }; + Step step = new Step<>(orchestrator, dl, actualCodeToRun); + orchestrator.record(step); + return step; + } + + public CompletableFuture toCompletableFuture() { + return orchestrator.execute(); + } +} diff --git a/src/main/java/org/dataloader/orchestration/Tracker.java b/src/main/java/org/dataloader/orchestration/Tracker.java new file mode 100644 index 0000000..de172de --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/Tracker.java @@ -0,0 +1,57 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This needs HEAPS more work - heaps more - I am not sure if its just counts of call backs or what. + *

+ * This is just POC stuff for now + */ +public class Tracker { + private final AtomicInteger stepCount = new AtomicInteger(); + private final Map, AtomicInteger> counters = new HashMap<>(); + + public int getOutstandingLoadCount(DataLoader dl) { + synchronized (this) { + return getDLCounter(dl).intValue(); + } + } + + public int getOutstandingLoadCount() { + int count = 0; + synchronized (this) { + for (AtomicInteger atomicInteger : counters.values()) { + count += atomicInteger.get(); + } + } + return count; + } + + public int getStepCount() { + return stepCount.get(); + } + + void incrementStepCount() { + this.stepCount.incrementAndGet(); + } + + void loadCall(DataLoader dl) { + synchronized (this) { + getDLCounter(dl).incrementAndGet(); + } + } + + void loadCallComplete(DataLoader dl) { + synchronized (this) { + getDLCounter(dl).decrementAndGet(); + } + } + + private AtomicInteger getDLCounter(DataLoader dl) { + return counters.computeIfAbsent(dl, key -> new AtomicInteger()); + } +} diff --git a/src/main/java/org/dataloader/orchestration/With.java b/src/main/java/org/dataloader/orchestration/With.java new file mode 100644 index 0000000..e380a5b --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/With.java @@ -0,0 +1,36 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; + +import java.util.function.Function; + +import static org.dataloader.orchestration.Orchestrator.castAs; +import static org.dataloader.orchestration.Step.loadImpl; + +/** + * A transitional step that allows a new step to be started with a new data loader in play + * + * @param the key type + * @param the value type + */ +public class With { + private final Orchestrator orchestrator; + private final DataLoader dl; + + public With(Orchestrator orchestrator, DataLoader dl) { + this.orchestrator = orchestrator; + this.dl = dl; + } + + public Step load(K key) { + return load(key, null); + } + + public Step load(K key, Object keyContext) { + return loadImpl(orchestrator, castAs(dl), key,keyContext); + } + + public Step thenLoad(Function codeToRun) { + return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun); + } +} diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 04ec5e5..474a7dc 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -11,8 +11,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedHashSet; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +60,26 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static BatchLoader upperCaseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(String::toUpperCase).collect(toList())); + } + + public static BatchLoader lowerCaseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(String::toLowerCase).collect(toList())); + } + + public static BatchLoader reverseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::reverse).collect(toList())); + } + + public static String reverse(String s) { + StringBuilder sb = new StringBuilder(); + for (int i = s.length() - 1; i >= 0; i--) { + sb.append(s.charAt(i)); + } + return sb.toString(); + } + public static DataLoader idLoader() { return idLoader(null, new ArrayList<>()); } @@ -104,7 +124,7 @@ public static Set asSet(Collection elements) { public static boolean areAllDone(CompletableFuture... cfs) { for (CompletableFuture cf : cfs) { - if (! cf.isDone()) { + if (!cf.isDone()) { return false; } } diff --git a/src/test/java/org/dataloader/orchestration/OrchestratorTest.java b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java new file mode 100644 index 0000000..89e4be1 --- /dev/null +++ b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java @@ -0,0 +1,77 @@ +package org.dataloader.orchestration; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.DataLoaderRegistry; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.fixtures.TestKit.lowerCaseBatchLoader; +import static org.dataloader.fixtures.TestKit.reverseBatchLoader; +import static org.dataloader.fixtures.TestKit.upperCaseBatchLoader; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +class OrchestratorTest { + + DataLoaderOptions cachingAndBatchingOptions = DataLoaderOptions.newOptions().setBatchingEnabled(true).setCachingEnabled(true); + + DataLoader dlUpper = newDataLoader(upperCaseBatchLoader(), cachingAndBatchingOptions); + DataLoader dlLower = newDataLoader(lowerCaseBatchLoader(), cachingAndBatchingOptions); + DataLoader dlReverse = newDataLoader(reverseBatchLoader(), cachingAndBatchingOptions); + + @Test + void canOrchestrate() { + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("upper", dlUpper) + .register("lower", dlLower) + .register("reverse", dlReverse) + .build(); + + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper); + Step step1 = orchestrator.load("aBc", null); + With with1 = step1.with(dlLower); + Step step2 = with1.thenLoad(key -> key); + With with2 = step2.with(dlReverse); + Step step3 = with2.thenLoad(key -> key); + CompletableFuture cf = step3.toCompletableFuture(); + + // because all the dls are dispatched in "perfect order" here they all end up dispatching + // at JUST the right time. A change in order would be different + registry.dispatchAll(); + + await().until(cf::isDone); + + assertThat(cf.join(), equalTo("cba")); + } + + @Test + void canOrchestrateWhenNotInPerfectOrder() { + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("reverse", dlReverse) + .register("lower", dlLower) + .register("upper", dlUpper) + .build(); + + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper); + CompletableFuture cf = orchestrator.load("aBc", null) + .with(dlLower).thenLoad(key1 -> key1) + .with(dlReverse).thenLoad(key -> key) + .toCompletableFuture(); + + registry.dispatchAll(); + + assertThat(cf.isDone(), equalTo(false)); + + assertThat(orchestrator.getTracker().getOutstandingLoadCount(),equalTo(2)); + + await().until(cf::isDone); + + assertThat(cf.join(), equalTo("cba")); + } +} \ No newline at end of file From 4774a205357d00435fb0fb28cd392095357d3ad9 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 21 Mar 2025 15:23:41 +1000 Subject: [PATCH 2/7] add CF class --- .../java/org/dataloader/orchestration/CF.java | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 src/main/java/org/dataloader/orchestration/CF.java diff --git a/src/main/java/org/dataloader/orchestration/CF.java b/src/main/java/org/dataloader/orchestration/CF.java new file mode 100644 index 0000000..00c38f6 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/CF.java @@ -0,0 +1,143 @@ +package org.dataloader.orchestration; + +import java.util.Iterator; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; + +public class CF { + + private AtomicReference result = new AtomicReference<>(); + + private static final Object NULL = new Object(); + + private static class ExceptionWrapper { + private final Throwable throwable; + + private ExceptionWrapper(Throwable throwable) { + this.throwable = throwable; + } + } + + private final LinkedBlockingDeque> dependedActions = new LinkedBlockingDeque<>(); + + private static class CompleteAction implements Runnable { + Executor executor; + CF toComplete; + CF src; + BiFunction mapperFn; + + public CompleteAction(CF toComplete, CF src, BiFunction mapperFn, Executor executor) { + this.toComplete = toComplete; + this.src = src; + this.mapperFn = mapperFn; + this.executor = executor; + } + + public void execute() { + if (executor != null) { + executor.execute(this); + } else { + toComplete.completeViaMapper(mapperFn, src.result.get()); + } + + } + + @Override + public void run() { + toComplete.completeViaMapper(mapperFn, src.result.get()); + src.dependedActions.remove(this); + } + } + + private CF() { + + } + + public CF newIncomplete() { + return new CF(); + } + + public static CF newComplete(T completed) { + CF result = new CF<>(); + result.encodeAndSetResult(completed); + return result; + } + + public static CF newExceptionally(Throwable e) { + CF result = new CF<>(); + result.encodeAndSetResult(e); + return result; + } + + + public CF map(BiFunction fn) { + CF newResult = new CF<>(); + dependedActions.push(new CompleteAction<>(newResult, this, fn, null)); + return newResult; + } + + public CF mapAsync(BiFunction fn, Executor executor) { + CF newResult = new CF<>(); + dependedActions.push(new CompleteAction<>(newResult, this, fn, executor)); + return newResult; + } + + + public CF compose(BiFunction fn, Executor executor) { + CF newResult = new CF<>(); + dependedActions.push(new CompleteAction<>(newResult, this, fn, executor)); + return newResult; + } + + + public boolean complete(T value) { + boolean success = result.compareAndSet(null, value); + return success; + } + + private boolean encodeAndSetResult(Object rawValue) { + if (rawValue == null) { + return result.compareAndSet(null, NULL); + } else if (rawValue instanceof Throwable) { + return result.compareAndSet(null, new ExceptionWrapper((Throwable) rawValue)); + } else { + return result.compareAndSet(null, rawValue); + } + } + + private Object decodeResult(Object rawValue) { + if (rawValue instanceof ExceptionWrapper) { + return ((ExceptionWrapper) rawValue).throwable; + } else if (rawValue == NULL) { + return null; + } else { + return rawValue; + } + } + + private void fireDependentActions() { + Iterator> iterator = dependedActions.iterator(); + while (iterator.hasNext()) { + iterator.next().execute(); + } + } + + private void completeViaMapper(BiFunction fn, Object encodedResult) { + try { + Object decodedResult = decodeResult(encodedResult); + Object mappedResult = fn.apply( + (T) decodedResult, + decodedResult instanceof Throwable ? (Throwable) decodedResult : null + ); + this.result.compareAndSet(null, mappedResult); + } catch (Throwable t) { + encodeAndSetResult(t); + } + + + } + + +} From 9159c3f3c8b19ff20d7982031ca21f819c75c8ea Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 21 Mar 2025 15:35:27 +1000 Subject: [PATCH 3/7] add CF supplyAsync --- .../java/org/dataloader/orchestration/CF.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/org/dataloader/orchestration/CF.java b/src/main/java/org/dataloader/orchestration/CF.java index 00c38f6..e4d01a8 100644 --- a/src/main/java/org/dataloader/orchestration/CF.java +++ b/src/main/java/org/dataloader/orchestration/CF.java @@ -5,6 +5,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Supplier; public class CF { @@ -71,6 +72,20 @@ public static CF newExceptionally(Throwable e) { return result; } + public static CF supplyAsync(Supplier supplier, + Executor executor) { + + CF result = new CF<>(); + executor.execute(() -> { + try { + result.encodeAndSetResult(supplier.get()); + } catch (Throwable ex) { + result.encodeAndSetResult(ex); + } + }); + return result; + } + public CF map(BiFunction fn) { CF newResult = new CF<>(); From 98711dbb07a3174dad68c1a77a545645d2728db1 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Fri, 21 Mar 2025 15:37:17 +1000 Subject: [PATCH 4/7] CF work --- src/main/java/org/dataloader/orchestration/CF.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/dataloader/orchestration/CF.java b/src/main/java/org/dataloader/orchestration/CF.java index e4d01a8..e50d540 100644 --- a/src/main/java/org/dataloader/orchestration/CF.java +++ b/src/main/java/org/dataloader/orchestration/CF.java @@ -109,6 +109,7 @@ public CF compose(BiFunction fn, Execu public boolean complete(T value) { boolean success = result.compareAndSet(null, value); + fireDependentActions(); return success; } @@ -150,8 +151,6 @@ private void completeViaMapper(BiFunction Date: Fri, 21 Mar 2025 16:43:18 +1100 Subject: [PATCH 5/7] POC- Orchestration of DLs - tried and executor that can observe --- .../orchestration/ImmediateExecutor.java | 12 +++++ .../orchestration/ObservingExecutor.java | 23 +++++++++ .../orchestration/Orchestrator.java | 20 ++++++-- .../org/dataloader/orchestration/Step.java | 47 ++++++++++++++++--- .../org/dataloader/orchestration/With.java | 6 ++- .../java/org/dataloader/fixtures/TestKit.java | 17 +++++++ .../orchestration/OrchestratorTest.java | 25 +++++++--- 7 files changed, 131 insertions(+), 19 deletions(-) create mode 100644 src/main/java/org/dataloader/orchestration/ImmediateExecutor.java create mode 100644 src/main/java/org/dataloader/orchestration/ObservingExecutor.java diff --git a/src/main/java/org/dataloader/orchestration/ImmediateExecutor.java b/src/main/java/org/dataloader/orchestration/ImmediateExecutor.java new file mode 100644 index 0000000..35dccfc --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/ImmediateExecutor.java @@ -0,0 +1,12 @@ +package org.dataloader.orchestration; + +import java.util.concurrent.Executor; + +class ImmediateExecutor implements Executor { + static final ImmediateExecutor INSTANCE = new ImmediateExecutor(); + + @Override + public void execute(Runnable command) { + command.run(); + } +} diff --git a/src/main/java/org/dataloader/orchestration/ObservingExecutor.java b/src/main/java/org/dataloader/orchestration/ObservingExecutor.java new file mode 100644 index 0000000..1cf661e --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/ObservingExecutor.java @@ -0,0 +1,23 @@ +package org.dataloader.orchestration; + +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +class ObservingExecutor implements Executor { + + private final Executor delegate; + private final T state; + private final Consumer callback; + + public ObservingExecutor(Executor delegate, T state, Consumer callback) { + this.delegate = delegate; + this.state = state; + this.callback = callback; + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + callback.accept(state); + } +} diff --git a/src/main/java/org/dataloader/orchestration/Orchestrator.java b/src/main/java/org/dataloader/orchestration/Orchestrator.java index 3419f28..990a0be 100644 --- a/src/main/java/org/dataloader/orchestration/Orchestrator.java +++ b/src/main/java/org/dataloader/orchestration/Orchestrator.java @@ -6,10 +6,12 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Function; public class Orchestrator { + private final Executor executor; private final Tracker tracker; private final DataLoader startingDL; private final List> steps = new ArrayList<>(); @@ -24,16 +26,26 @@ public class Orchestrator { * @return a new {@link Orchestrator} */ public static Orchestrator orchestrate(DataLoader dataLoader) { - return new Orchestrator<>(new Tracker(), dataLoader); + return new Orchestrator<>(new Tracker(), dataLoader, ImmediateExecutor.INSTANCE); } - public Tracker getTracker() { - return tracker; + // TODO - make this a builder + public static Orchestrator orchestrate(DataLoader dataLoader, Executor executor) { + return new Orchestrator<>(new Tracker(), dataLoader, executor); } - private Orchestrator(Tracker tracker, DataLoader dataLoader) { + private Orchestrator(Tracker tracker, DataLoader dataLoader, Executor executor) { this.tracker = tracker; this.startingDL = dataLoader; + this.executor = executor; + } + + public Tracker getTracker() { + return tracker; + } + + public Executor getExecutor() { + return executor; } diff --git a/src/main/java/org/dataloader/orchestration/Step.java b/src/main/java/org/dataloader/orchestration/Step.java index b86c4a8..854d438 100644 --- a/src/main/java/org/dataloader/orchestration/Step.java +++ b/src/main/java/org/dataloader/orchestration/Step.java @@ -3,7 +3,10 @@ import org.dataloader.DataLoader; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import static org.dataloader.orchestration.Orchestrator.castAs; @@ -35,7 +38,11 @@ public Step load(K key, Object keyContext) { } public Step thenLoad(Function codeToRun) { - return thenLoadImpl(orchestrator, dl, codeToRun); + return thenLoadImpl(orchestrator, dl, codeToRun, false); + } + + public Step thenLoadAsync(Function codeToRun) { + return thenLoadImpl(orchestrator, dl, codeToRun, true); } static Step loadImpl(Orchestrator orchestrator, DataLoader dl, K key, Object keyContext) { @@ -49,16 +56,42 @@ static Step loadImpl(Orchestrator orchestrator, DataLoader Step thenLoadImpl(Orchestrator orchestrator, DataLoader dl, Function codeToRun) { - Function> actualCodeToRun = v -> { + static Step thenLoadImpl(Orchestrator orchestrator, DataLoader dl, Function codeToRun, boolean async) { + Tracker tracker = orchestrator.getTracker(); + Function> actualCodeToRun; + if (async) { + actualCodeToRun = mkAsyncLoadLambda(orchestrator, dl, codeToRun, tracker); + } else { + actualCodeToRun = mkSyncLoadLambda(dl, codeToRun, tracker); + } + Step step = new Step<>(orchestrator, dl, actualCodeToRun); + orchestrator.record(step); + return step; + } + + private static Function> mkSyncLoadLambda(DataLoader dl, Function codeToRun, Tracker tracker) { + return v -> { K key = codeToRun.apply(v); CompletableFuture cf = castAs(dl.load(key)); - orchestrator.getTracker().loadCall(dl); + tracker.loadCall(dl); return cf; }; - Step step = new Step<>(orchestrator, dl, actualCodeToRun); - orchestrator.record(step); - return step; + } + + private static Function> mkAsyncLoadLambda(Orchestrator orchestrator, DataLoader dl, Function codeToRun, Tracker tracker) { + return v -> { + Executor executor = orchestrator.getExecutor(); + Consumer callback = atSomePointWeNeedMoreStateButUsingStringForNowToMakeItCompile -> { + tracker.loadCall(dl); + }; + ObservingExecutor observingExecutor = new ObservingExecutor<>(executor, "state", callback); + Supplier> dataLoaderCall = () -> { + K key = codeToRun.apply(v); + return castAs(dl.load(key)); + }; + return CompletableFuture.supplyAsync(dataLoaderCall, observingExecutor) + .thenCompose(Function.identity()); + }; } public CompletableFuture toCompletableFuture() { diff --git a/src/main/java/org/dataloader/orchestration/With.java b/src/main/java/org/dataloader/orchestration/With.java index e380a5b..bf647bc 100644 --- a/src/main/java/org/dataloader/orchestration/With.java +++ b/src/main/java/org/dataloader/orchestration/With.java @@ -31,6 +31,10 @@ public Step load(K key, Object keyContext) { } public Step thenLoad(Function codeToRun) { - return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun); + return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun, false); + } + + public Step thenLoadAsync(Function codeToRun) { + return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun, true); } } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 474a7dc..a5d8568 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -72,6 +72,10 @@ public static BatchLoader reverseBatchLoader() { return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::reverse).collect(toList())); } + public static BatchLoader alternateCaseBatchLoader() { + return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::alternateCase).collect(toList())); + } + public static String reverse(String s) { StringBuilder sb = new StringBuilder(); for (int i = s.length() - 1; i >= 0; i--) { @@ -80,6 +84,19 @@ public static String reverse(String s) { return sb.toString(); } + public static String alternateCase(String s) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i <= s.length()-1; i++) { + char c = s.charAt(i); + if (i % 2 == 0) { + sb.append(Character.toLowerCase(c)); + } else { + sb.append(Character.toUpperCase(c)); + } + } + return sb.toString(); + } + public static DataLoader idLoader() { return idLoader(null, new ArrayList<>()); } diff --git a/src/test/java/org/dataloader/orchestration/OrchestratorTest.java b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java index 89e4be1..fecbcc8 100644 --- a/src/test/java/org/dataloader/orchestration/OrchestratorTest.java +++ b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java @@ -3,12 +3,18 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.DataLoaderRegistry; +import org.dataloader.fixtures.TestKit; import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Semaphore; import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.fixtures.TestKit.alternateCaseBatchLoader; import static org.dataloader.fixtures.TestKit.lowerCaseBatchLoader; import static org.dataloader.fixtures.TestKit.reverseBatchLoader; import static org.dataloader.fixtures.TestKit.upperCaseBatchLoader; @@ -22,6 +28,7 @@ class OrchestratorTest { DataLoader dlUpper = newDataLoader(upperCaseBatchLoader(), cachingAndBatchingOptions); DataLoader dlLower = newDataLoader(lowerCaseBatchLoader(), cachingAndBatchingOptions); DataLoader dlReverse = newDataLoader(reverseBatchLoader(), cachingAndBatchingOptions); + DataLoader dlAlternateCase = newDataLoader(alternateCaseBatchLoader(), cachingAndBatchingOptions); @Test void canOrchestrate() { @@ -30,6 +37,7 @@ void canOrchestrate() { .register("upper", dlUpper) .register("lower", dlLower) .register("reverse", dlReverse) + .register("alternateCase", dlAlternateCase) .build(); Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper); @@ -56,22 +64,25 @@ void canOrchestrateWhenNotInPerfectOrder() { .register("reverse", dlReverse) .register("lower", dlLower) .register("upper", dlUpper) + .register("alternateCase", dlAlternateCase) .build(); - Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper); + ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper, forkJoinPool); CompletableFuture cf = orchestrator.load("aBc", null) .with(dlLower).thenLoad(key1 -> key1) .with(dlReverse).thenLoad(key -> key) + .with(dlAlternateCase).thenLoadAsync(key -> key) .toCompletableFuture(); - registry.dispatchAll(); - - assertThat(cf.isDone(), equalTo(false)); - - assertThat(orchestrator.getTracker().getOutstandingLoadCount(),equalTo(2)); + for (int i = 0; i < 10; i++) { + TestKit.snooze(50); // TODO - hack or now + registry.dispatchAll(); + System.out.println("Waiting for " + i + " to complete..."); + } await().until(cf::isDone); - assertThat(cf.join(), equalTo("cba")); + assertThat(cf.join(), equalTo("cBa")); } } \ No newline at end of file From 3843282724ab142761b5a6c47ece9d3bca4c3b0e Mon Sep 17 00:00:00 2001 From: bbaker Date: Sat, 22 Mar 2025 16:55:27 +1100 Subject: [PATCH 6/7] POC- Orchestration of DLs - Added a tracker that can observe via a callback --- .../orchestration/ImmediateExecutor.java | 12 ---- .../orchestration/Orchestrator.java | 58 ++++++++++++++----- .../org/dataloader/orchestration/Step.java | 12 +++- .../executors/ImmediateExecutor.java | 15 +++++ .../{ => executors}/ObservingExecutor.java | 7 ++- .../{ => observation}/Tracker.java | 19 ++++-- .../observation/TrackingObserver.java | 18 ++++++ .../orchestration/OrchestratorTest.java | 57 ++++++++++++++++-- 8 files changed, 156 insertions(+), 42 deletions(-) delete mode 100644 src/main/java/org/dataloader/orchestration/ImmediateExecutor.java create mode 100644 src/main/java/org/dataloader/orchestration/executors/ImmediateExecutor.java rename src/main/java/org/dataloader/orchestration/{ => executors}/ObservingExecutor.java (75%) rename src/main/java/org/dataloader/orchestration/{ => observation}/Tracker.java (67%) create mode 100644 src/main/java/org/dataloader/orchestration/observation/TrackingObserver.java diff --git a/src/main/java/org/dataloader/orchestration/ImmediateExecutor.java b/src/main/java/org/dataloader/orchestration/ImmediateExecutor.java deleted file mode 100644 index 35dccfc..0000000 --- a/src/main/java/org/dataloader/orchestration/ImmediateExecutor.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.dataloader.orchestration; - -import java.util.concurrent.Executor; - -class ImmediateExecutor implements Executor { - static final ImmediateExecutor INSTANCE = new ImmediateExecutor(); - - @Override - public void execute(Runnable command) { - command.run(); - } -} diff --git a/src/main/java/org/dataloader/orchestration/Orchestrator.java b/src/main/java/org/dataloader/orchestration/Orchestrator.java index 990a0be..f0338dd 100644 --- a/src/main/java/org/dataloader/orchestration/Orchestrator.java +++ b/src/main/java/org/dataloader/orchestration/Orchestrator.java @@ -2,6 +2,9 @@ import org.dataloader.DataLoader; import org.dataloader.impl.Assertions; +import org.dataloader.orchestration.executors.ImmediateExecutor; +import org.dataloader.orchestration.observation.Tracker; +import org.dataloader.orchestration.observation.TrackingObserver; import java.util.ArrayList; import java.util.List; @@ -25,19 +28,14 @@ public class Orchestrator { * @param the value type * @return a new {@link Orchestrator} */ - public static Orchestrator orchestrate(DataLoader dataLoader) { - return new Orchestrator<>(new Tracker(), dataLoader, ImmediateExecutor.INSTANCE); + public static Builder orchestrate(DataLoader dataLoader) { + return new Builder<>(dataLoader); } - // TODO - make this a builder - public static Orchestrator orchestrate(DataLoader dataLoader, Executor executor) { - return new Orchestrator<>(new Tracker(), dataLoader, executor); - } - - private Orchestrator(Tracker tracker, DataLoader dataLoader, Executor executor) { - this.tracker = tracker; - this.startingDL = dataLoader; - this.executor = executor; + public Orchestrator(Builder builder) { + this.tracker = new Tracker(builder.trackingObserver); + this.executor = builder.executor; + this.startingDL = builder.dataLoader; } public Tracker getTracker() { @@ -79,6 +77,10 @@ void record(Step step) { */ CompletableFuture execute() { Assertions.assertState(!steps.isEmpty(), () -> "How can the steps to run be empty??"); + + // tell the tracker we are under way + getTracker().startingExecution(); + int index = 0; Step firstStep = steps.get(index); @@ -94,23 +96,47 @@ CompletableFuture execute() { // side effect when this step is complete whenComplete(index, nextStep, nextCF); } + return castAs(currentCF); } - private void whenComplete(int index, Step step, CompletableFuture cf) { + private void whenComplete(int stepIndex, Step step, CompletableFuture cf) { cf.whenComplete((v, throwable) -> { - getTracker().loadCallComplete(step.dataLoader()); - // replace with instrumentation code if (throwable != null) { // TODO - should we be cancelling future steps here - no need for dispatch tracking if they will never run - System.out.println("A throwable has been thrown on step " + index + ": " + throwable.getMessage()); + System.out.println("A throwable has been thrown on step " + stepIndex + ": " + throwable.getMessage()); throwable.printStackTrace(System.out); } else { - System.out.println("step " + index + " returned : " + v); + System.out.println("step " + stepIndex + " returned : " + v); } + getTracker().loadCallComplete(stepIndex, step.dataLoader(), throwable); }); } + public static class Builder { + private Executor executor = ImmediateExecutor.INSTANCE; + private DataLoader dataLoader; + private TrackingObserver trackingObserver; + + Builder(DataLoader dataLoader) { + this.dataLoader = dataLoader; + } + + public Builder executor(Executor executor) { + this.executor = executor; + return this; + } + + public Builder observer(TrackingObserver trackingObserver) { + this.trackingObserver = trackingObserver; + return this; + } + + public Orchestrator build() { + return new Orchestrator<>(this); + } + } + } diff --git a/src/main/java/org/dataloader/orchestration/Step.java b/src/main/java/org/dataloader/orchestration/Step.java index 854d438..34de341 100644 --- a/src/main/java/org/dataloader/orchestration/Step.java +++ b/src/main/java/org/dataloader/orchestration/Step.java @@ -1,6 +1,8 @@ package org.dataloader.orchestration; import org.dataloader.DataLoader; +import org.dataloader.orchestration.executors.ObservingExecutor; +import org.dataloader.orchestration.observation.Tracker; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -46,9 +48,11 @@ public Step thenLoadAsync(Function codeToRun) { } static Step loadImpl(Orchestrator orchestrator, DataLoader dl, K key, Object keyContext) { + Tracker tracker = orchestrator.getTracker(); + int stepIndex = tracker.getStepCount(); Function> codeToRun = k -> { CompletableFuture cf = castAs(dl.load(key, keyContext)); - orchestrator.getTracker().loadCall(dl); + orchestrator.getTracker().loadCall(stepIndex, dl); return cf; }; Step step = new Step<>(orchestrator, dl, codeToRun); @@ -70,19 +74,21 @@ static Step thenLoadImpl(Orchestrator orchestrator, DataLoade } private static Function> mkSyncLoadLambda(DataLoader dl, Function codeToRun, Tracker tracker) { + int stepIndex = tracker.getStepCount(); return v -> { K key = codeToRun.apply(v); CompletableFuture cf = castAs(dl.load(key)); - tracker.loadCall(dl); + tracker.loadCall(stepIndex, dl); return cf; }; } private static Function> mkAsyncLoadLambda(Orchestrator orchestrator, DataLoader dl, Function codeToRun, Tracker tracker) { + int stepIndex = tracker.getStepCount(); return v -> { Executor executor = orchestrator.getExecutor(); Consumer callback = atSomePointWeNeedMoreStateButUsingStringForNowToMakeItCompile -> { - tracker.loadCall(dl); + tracker.loadCall(stepIndex, dl); }; ObservingExecutor observingExecutor = new ObservingExecutor<>(executor, "state", callback); Supplier> dataLoaderCall = () -> { diff --git a/src/main/java/org/dataloader/orchestration/executors/ImmediateExecutor.java b/src/main/java/org/dataloader/orchestration/executors/ImmediateExecutor.java new file mode 100644 index 0000000..e7dc2b5 --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/executors/ImmediateExecutor.java @@ -0,0 +1,15 @@ +package org.dataloader.orchestration.executors; + +import org.dataloader.annotations.Internal; + +import java.util.concurrent.Executor; + +@Internal +public class ImmediateExecutor implements Executor { + public static final ImmediateExecutor INSTANCE = new ImmediateExecutor(); + + @Override + public void execute(Runnable command) { + command.run(); + } +} diff --git a/src/main/java/org/dataloader/orchestration/ObservingExecutor.java b/src/main/java/org/dataloader/orchestration/executors/ObservingExecutor.java similarity index 75% rename from src/main/java/org/dataloader/orchestration/ObservingExecutor.java rename to src/main/java/org/dataloader/orchestration/executors/ObservingExecutor.java index 1cf661e..c3c3773 100644 --- a/src/main/java/org/dataloader/orchestration/ObservingExecutor.java +++ b/src/main/java/org/dataloader/orchestration/executors/ObservingExecutor.java @@ -1,9 +1,12 @@ -package org.dataloader.orchestration; +package org.dataloader.orchestration.executors; + +import org.dataloader.annotations.Internal; import java.util.concurrent.Executor; import java.util.function.Consumer; -class ObservingExecutor implements Executor { +@Internal +public class ObservingExecutor implements Executor { private final Executor delegate; private final T state; diff --git a/src/main/java/org/dataloader/orchestration/Tracker.java b/src/main/java/org/dataloader/orchestration/observation/Tracker.java similarity index 67% rename from src/main/java/org/dataloader/orchestration/Tracker.java rename to src/main/java/org/dataloader/orchestration/observation/Tracker.java index de172de..8a01a70 100644 --- a/src/main/java/org/dataloader/orchestration/Tracker.java +++ b/src/main/java/org/dataloader/orchestration/observation/Tracker.java @@ -1,4 +1,4 @@ -package org.dataloader.orchestration; +package org.dataloader.orchestration.observation; import org.dataloader.DataLoader; @@ -14,6 +14,11 @@ public class Tracker { private final AtomicInteger stepCount = new AtomicInteger(); private final Map, AtomicInteger> counters = new HashMap<>(); + private final TrackingObserver trackingObserver; + + public Tracker(TrackingObserver trackingObserver) { + this.trackingObserver = trackingObserver; + } public int getOutstandingLoadCount(DataLoader dl) { synchronized (this) { @@ -35,19 +40,25 @@ public int getStepCount() { return stepCount.get(); } - void incrementStepCount() { + public void incrementStepCount() { this.stepCount.incrementAndGet(); } - void loadCall(DataLoader dl) { + public void startingExecution() { + trackingObserver.onStart(this); + } + + public void loadCall(int stepIndex, DataLoader dl) { synchronized (this) { getDLCounter(dl).incrementAndGet(); + trackingObserver.onLoad(this, stepIndex, dl); } } - void loadCallComplete(DataLoader dl) { + public void loadCallComplete(int stepIndex, DataLoader dl, Throwable throwable) { synchronized (this) { getDLCounter(dl).decrementAndGet(); + trackingObserver.onLoadComplete(this,stepIndex,dl, throwable); } } diff --git a/src/main/java/org/dataloader/orchestration/observation/TrackingObserver.java b/src/main/java/org/dataloader/orchestration/observation/TrackingObserver.java new file mode 100644 index 0000000..cf1031e --- /dev/null +++ b/src/main/java/org/dataloader/orchestration/observation/TrackingObserver.java @@ -0,0 +1,18 @@ +package org.dataloader.orchestration.observation; + +import org.dataloader.DataLoader; +import org.dataloader.annotations.PublicSpi; + +/** + * This callback is invoked when the {@link org.dataloader.orchestration.Orchestrator} starts execution and then + * as each {@link DataLoader} is invoked and then again when it completes + */ +@PublicSpi +public interface TrackingObserver { + void onStart(Tracker tracker); + + void onLoad(Tracker tracker, int stepIndex, DataLoader dl); + + // TODO - should this have an exception should it fail in CF terms ??? + void onLoadComplete(Tracker tracker, int stepIndex, DataLoader dl, Throwable throwable); +} diff --git a/src/test/java/org/dataloader/orchestration/OrchestratorTest.java b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java index fecbcc8..b9f40ff 100644 --- a/src/test/java/org/dataloader/orchestration/OrchestratorTest.java +++ b/src/test/java/org/dataloader/orchestration/OrchestratorTest.java @@ -4,13 +4,13 @@ import org.dataloader.DataLoaderOptions; import org.dataloader.DataLoaderRegistry; import org.dataloader.fixtures.TestKit; +import org.dataloader.orchestration.observation.Tracker; +import org.dataloader.orchestration.observation.TrackingObserver; import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; @@ -40,7 +40,7 @@ void canOrchestrate() { .register("alternateCase", dlAlternateCase) .build(); - Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper); + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper).build(); Step step1 = orchestrator.load("aBc", null); With with1 = step1.with(dlLower); Step step2 = with1.thenLoad(key -> key); @@ -68,7 +68,7 @@ void canOrchestrateWhenNotInPerfectOrder() { .build(); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); - Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper, forkJoinPool); + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper).executor(forkJoinPool).build(); CompletableFuture cf = orchestrator.load("aBc", null) .with(dlLower).thenLoad(key1 -> key1) .with(dlReverse).thenLoad(key -> key) @@ -85,4 +85,51 @@ void canOrchestrateWhenNotInPerfectOrder() { assertThat(cf.join(), equalTo("cBa")); } + + @Test + void can_observe_orchestration_happening() { + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("upper", dlUpper) + .register("lower", dlLower) + .register("reverse", dlReverse) + .register("alternateCase", dlAlternateCase) + .build(); + + AtomicInteger stepCount = new AtomicInteger(); + TrackingObserver observer = new TrackingObserver() { + @Override + public void onStart(Tracker tracker) { + System.out.println("starting - step count : " + tracker.getStepCount()); + stepCount.set(tracker.getStepCount()); + } + + @Override + public void onLoad(Tracker tracker, int stepIndex, DataLoader dl) { + System.out.println("onLoad : " + stepIndex); + } + + @Override + public void onLoadComplete(Tracker tracker, int stepIndex, DataLoader dl, Throwable throwable) { + System.out.println("onLoadComplete : " + stepIndex); + } + }; + + Orchestrator orchestrator = Orchestrator.orchestrate(dlUpper).observer(observer).build(); + Step step1 = orchestrator.load("aBc", null); + With with1 = step1.with(dlLower); + Step step2 = with1.thenLoad(key -> key); + With with2 = step2.with(dlReverse); + Step step3 = with2.thenLoad(key -> key); + CompletableFuture cf = step3.toCompletableFuture(); + + // because all the dls are dispatched in "perfect order" here they all end up dispatching + // at JUST the right time. A change in order would be different + registry.dispatchAll(); + + await().until(cf::isDone); + + assertThat(cf.join(), equalTo("cba")); + assertThat(stepCount.get(), equalTo(3)); + } } \ No newline at end of file From abb00aef550ffb903d2f6fde15e0d3904e9471c0 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Tue, 25 Mar 2025 10:57:26 +1000 Subject: [PATCH 7/7] CF work --- src/main/java/org/dataloader/orchestration/CF.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/dataloader/orchestration/CF.java b/src/main/java/org/dataloader/orchestration/CF.java index e50d540..4a5e177 100644 --- a/src/main/java/org/dataloader/orchestration/CF.java +++ b/src/main/java/org/dataloader/orchestration/CF.java @@ -88,9 +88,13 @@ public static CF supplyAsync(Supplier supplier, public CF map(BiFunction fn) { - CF newResult = new CF<>(); - dependedActions.push(new CompleteAction<>(newResult, this, fn, null)); - return newResult; + CF newCF = new CF<>(); + if (result.get() != null) { + newCF.completeViaMapper(fn, result.get()); + } else { + dependedActions.push(new CompleteAction<>(newCF, this, fn, null)); + } + return newCF; } public CF mapAsync(BiFunction fn, Executor executor) { @@ -153,5 +157,4 @@ private void completeViaMapper(BiFunction