diff --git a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java index fcc76998..d8f242f4 100644 --- a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java @@ -22,6 +22,8 @@ import org.modelingvalue.collections.Collection; import org.modelingvalue.collections.DefaultMap; +import org.modelingvalue.collections.Graph; +import org.modelingvalue.collections.Map; import org.modelingvalue.collections.Entry; import org.modelingvalue.collections.List; import org.modelingvalue.collections.Set; @@ -36,12 +38,12 @@ import org.modelingvalue.dclare.ex.ConsistencyError; import org.modelingvalue.dclare.ex.TooManyChangesException; -import java.util.Iterator; import java.util.Objects; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; @@ -92,7 +94,7 @@ public class UniverseTransaction extends MutableTransaction { private List> timeTravelingActions = List.of(backward, forward); private List> preActions = List.of(); private List> postActions = List.of(); - private List imperativeTransactions = List.of(); + private Graph> imperativeTransactions = Graph.of(); private List history = List.of(); private List future = List.of(); private State preState; @@ -267,7 +269,7 @@ public void run() { handleTooManyChanges(state); runActions(postActions); } - commit(state, timeTraveling, imperativeTransactions.iterator()); + commit(state, timeTraveling); if (!killed && inQueue.isEmpty() && isStopped(state)) { break; } @@ -694,17 +696,25 @@ public void addPostAction(Action action) { public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer scheduler, boolean keepTransaction) { ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction); synchronized (this) { - imperativeTransactions = imperativeTransactions.add(n); + imperativeTransactions = imperativeTransactions.addNode(n); } return n; } + public void orderImperativeTransactions(ImperativeTransaction first, ImperativeTransaction second) { + imperativeTransactions.putEdge(first, second, Void.class); + } + + public void orderImperativeTransactions(String first, String second) { + orderImperativeTransactions(getImperativeTransaction(first), getImperativeTransaction(second)); + } + public List getImperativeTransactions() { - return imperativeTransactions; + return imperativeTransactions.getAllNodes().asList(); } public ImperativeTransaction getImperativeTransaction(String id) { - for (ImperativeTransaction it : imperativeTransactions) { + for (ImperativeTransaction it : imperativeTransactions.getAllNodes()) { if (it.imperative().id().equals(id)) { return it; } @@ -712,15 +722,47 @@ public ImperativeTransaction getImperativeTransaction(String id) { return null; } - private void commit(State state, boolean timeTraveling, Iterator it) { - if (!killed && it.hasNext()) { - ImperativeTransaction itx = it.next(); - itx.schedule(() -> { - if (itx.commit(state, timeTraveling)) { - commit(itx.state(), timeTraveling, it); - } - }); - } + private void commit(State state, boolean timeTraveling) { + if (imperativeTransactions.hasCycles()) + throw new Error("Circular native group ordering"); + + AtomicReference> started = new AtomicReference<>(Set.of()); + AtomicReference> stopped = new AtomicReference<>(Set.of()); + AtomicBoolean inSync = new AtomicBoolean(true); + imperativeTransactions.getAllNodes().forEach(it -> tryCommit( + state, it, imperativeTransactions, timeTraveling, + started, stopped, inSync + )); + } + + private void tryCommit( + State state, + ImperativeTransaction it, + Graph> graph, + boolean timeTraveling, + AtomicReference> started, + AtomicReference> stopped, + AtomicBoolean inSync + ) { + + if (!inSync.get()) return; + + Map incoming = graph.getIncomingEdges(it); + if (!stopped.get().containsAll(incoming.toKeys())) return; // Not ready to start + + Set s = started.getAndUpdate(old -> old.add(it)); + if (s.contains(it)) return; // Already started + + it.schedule(() -> { + if (it.commit(state, timeTraveling)) { + stopped.updateAndGet(old -> old.add(it)); + graph.getOutgoingEdges(it).forEach((next, ignored) -> + tryCommit(state, next, graph, timeTraveling, started, stopped, inSync) + ); + } else { + inSync.set(false); + } + }); } public void deriveLazy() {