diff --git a/src/main/java/org/modelingvalue/dclare/IImperativeTransaction.java b/src/main/java/org/modelingvalue/dclare/IImperativeTransaction.java new file mode 100644 index 00000000..a4839f7b --- /dev/null +++ b/src/main/java/org/modelingvalue/dclare/IImperativeTransaction.java @@ -0,0 +1,4 @@ +package org.modelingvalue.dclare; + +public sealed interface IImperativeTransaction permits TerminalImperativeTransaction, ImperativeTransaction { +} diff --git a/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java b/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java index 4e0f8fe8..a4589f73 100644 --- a/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/ImperativeTransaction.java @@ -31,7 +31,7 @@ import org.modelingvalue.collections.util.NamedIdentity; import org.modelingvalue.dclare.Priority.Queued; -public class ImperativeTransaction extends LeafTransaction { +public final class ImperativeTransaction extends LeafTransaction implements IImperativeTransaction { @SuppressWarnings("rawtypes") protected static final DefaultMap> SETTED_MAP = DefaultMap.of(k -> Set.of()); diff --git a/src/main/java/org/modelingvalue/dclare/TerminalImperativeTransaction.java b/src/main/java/org/modelingvalue/dclare/TerminalImperativeTransaction.java new file mode 100644 index 00000000..d6356680 --- /dev/null +++ b/src/main/java/org/modelingvalue/dclare/TerminalImperativeTransaction.java @@ -0,0 +1,5 @@ +package org.modelingvalue.dclare; + +public final class TerminalImperativeTransaction implements IImperativeTransaction { + // for imperative transaction ordering in graph +} diff --git a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java index 24a81363..002aa591 100644 --- a/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java +++ b/src/main/java/org/modelingvalue/dclare/UniverseTransaction.java @@ -20,7 +20,6 @@ package org.modelingvalue.dclare; -import java.util.Iterator; import java.util.Objects; import java.util.Timer; import java.util.TimerTask; @@ -30,17 +29,10 @@ import java.util.function.Consumer; import java.util.function.Predicate; -import org.modelingvalue.collections.Collection; -import org.modelingvalue.collections.DefaultMap; -import org.modelingvalue.collections.Entry; -import org.modelingvalue.collections.List; -import org.modelingvalue.collections.Set; -import org.modelingvalue.collections.util.Concurrent; -import org.modelingvalue.collections.util.ContextPool; -import org.modelingvalue.collections.util.StatusProvider; +import org.modelingvalue.collections.*; +import org.modelingvalue.collections.util.*; import org.modelingvalue.collections.util.StatusProvider.AbstractStatus; import org.modelingvalue.collections.util.StatusProvider.StatusIterator; -import org.modelingvalue.collections.util.TraceTimer; import org.modelingvalue.dclare.NonCheckingObserver.NonCheckingTransaction; import org.modelingvalue.dclare.Priority.MutableStates; import org.modelingvalue.dclare.ex.ConsistencyError; @@ -50,6 +42,8 @@ public class UniverseTransaction extends MutableTransaction { private static final Setable STOPPED = Setable.of("stopped", false); + private static final TerminalImperativeTransaction SOURCE = new TerminalImperativeTransaction(); + private static final TerminalImperativeTransaction TARGET = new TerminalImperativeTransaction(); // private final DclareConfig config; protected final Concurrent, ActionTransaction>> actionTransactions = Concurrent.of(() -> new ReusableTransaction<>(this)); @@ -94,7 +88,7 @@ public class UniverseTransaction extends MutableTransaction { private List> timeTravelingActions = List.of(backward, forward); private List> preActions = List.of(); private List> postActions = List.of(); - private List imperativeTransactions = List.of(); + private Graph> imperativeTransactions = Graph.of(); private List history = List.of(); private List future = List.of(); private State preState; @@ -199,6 +193,7 @@ public UniverseTransaction(Universe universe, ContextPool pool, boolean pull, Dc if (startStatusConsumer != null) { startStatusConsumer.accept(startStatus); } + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, TARGET, Void.class); } @Override @@ -282,7 +277,7 @@ public void run() { handleTooManyChanges(state); runActions(postActions); } - commit(state, timeTraveling, imperativeTransactions.iterator()); + commit(state, timeTraveling); if (!killed && inQueue.isEmpty() && isStopped(state)) { break; } @@ -706,35 +701,184 @@ public void addPostAction(Action action) { } } + /** + * Constructs a new imperative transaction and add it to the universe transaction. Synchronized + * on {@code this}. + * + * @param id string identifier + * @param diffHandler handles the incoming difference after fixpoint achieved, always executed on {@code scheduler} + * @param scheduler a single-threaded consumer that always uses the same thread + * @param keepTransaction if true, will persist the association of this transaction on the {@code scheduler}'s thread after scheduled actions have completed + * @return the created imperative transaction + */ public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer scheduler, boolean keepTransaction) { ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction); synchronized (this) { - imperativeTransactions = imperativeTransactions.add(n); + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, n, Void.class); } return n; } + /** + * Constructs a new imperative transaction and add it to the universe transaction, adding any + * orderings between imperative transactions from {@code dependencies} and {@code dependents}. + * Synchronized on {@code this}. + * + * @param id string identifier + * @param diffHandler handles the incoming difference after fixpoint achieved, always executed on {@code scheduler} + * @param scheduler a single-threaded consumer that always uses the same thread + * @param keepTransaction if true, will persist the association of this transaction on the {@code scheduler}'s thread after scheduled actions have completed + * @param dependencies set of imperative transactions that will only run before this imperative transaction + * @param dependents set of imperative transactions that will only run after this imperative transaction + * @return the created imperative transaction + */ + public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer scheduler, boolean keepTransaction, Set dependencies, Set dependents) { + ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction); + synchronized (this) { + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, n, Void.class); + + for (var it : dependencies) { + imperativeTransactions = imperativeTransactions.putEdge(it, n, Void.class); + } + + for (var it : dependents) { + imperativeTransactions = imperativeTransactions.putEdge(n, it, Void.class); + } + + if (imperativeTransactions.getIncomingNodes(n).size() > 1) + imperativeTransactions = imperativeTransactions.removeEdge(SOURCE, n, Void.class); + } + return n; + } + + /** + * Removes the given imperative transaction if it has no dependents. Synchronized on + * {@code this}. + * + * @param it imperative transaction to be removed + */ + public void removeImperative(ImperativeTransaction it) { + synchronized (this) { + Set outgoing = imperativeTransactions.getOutgoingNodes(it); + + if (!outgoing.isEmpty()) { + throw new Error("Cannot remove " + it + " because it is a dependent of " + outgoing); + } + + imperativeTransactions = imperativeTransactions.removeNode(it); + } + } + /** + * Removes the given imperative transaction if it has no dependents. Synchronized on + * {@code this}. + * + * @param id id of imperative transaction to be removed + */ + public void removeImperative(String id) { + removeImperative(getImperativeTransaction(id)); + } + + /** + * Creates an ordering between the two imperative transactions, ensuring that {@code first} + * will always run before {@code second}. Synchronized on {@code this}. + * + * @param first imperative transaction to be run before {@code second} + * @param second imperative transaction to be run after {@code first} + */ + public void orderImperatives(ImperativeTransaction first, ImperativeTransaction second) { + synchronized (this) { + imperativeTransactions = imperativeTransactions.putEdge(first, second, Void.class); + if (imperativeTransactions.hasCycles(n -> true, e -> true, first)) + throw new Error("Circular native group ordering"); + + if (imperativeTransactions.getIncomingNodes(second).size() > 1) + imperativeTransactions = imperativeTransactions.removeEdge(SOURCE, second, Void.class); + } + } + + /** + * Creates an ordering between the two imperative transactions, ensuring that {@code first} + * will always run before {@code second}. Synchronized on {@code this}. + * + * @param first id of imperative transaction to be run before {@code second} + * @param second id of imperative transaction to be run after {@code first} + */ + public void orderImperatives(String first, String second) { + orderImperatives(getImperativeTransaction(first), getImperativeTransaction(second)); + } + + /** + * Removes an ordering between the two imperative transactions if it exists. Synchronized on + * {@code this}. + * + * @param first imperative transaction that was run before {@code second} + * @param second imperative transaction that was run after {@code first} + */ + public void unorderImperatives(ImperativeTransaction first, ImperativeTransaction second) { + synchronized (this) { + if (imperativeTransactions.getIncomingNodes(second).size() == 1 && imperativeTransactions.containsEdge(first, second, Void.class)) + imperativeTransactions = imperativeTransactions.putEdge(SOURCE, second, Void.class); + + imperativeTransactions = imperativeTransactions.removeEdge(first, second, Void.class); + } + } + + /** + * Removes an ordering between the two imperative transactions if it exists. Synchronized on + * {@code this}. + * + * @param first id of imperative transaction that runs before {@code second} + * @param second id of imperative transaction that runs after {@code first} + */ + public void unorderImperatives(String first, String second) { + unorderImperatives(getImperativeTransaction(first), getImperativeTransaction(second)); + } + + @SuppressWarnings("unchecked") public List getImperativeTransactions() { - return imperativeTransactions; + return imperativeTransactions.getNodes().filter(ImperativeTransaction.class::isInstance).map(ImperativeTransaction.class::cast).asList(); } public ImperativeTransaction getImperativeTransaction(String id) { - for (ImperativeTransaction it : imperativeTransactions) { - if (it.imperative().id().equals(id)) { + for (IImperativeTransaction iit : imperativeTransactions.getNodes()) { + if (iit instanceof ImperativeTransaction it && it.imperative().id().equals(id)) { return it; } } return null; } - private void commit(State state, boolean timeTraveling, Iterator it) { - if (!killed && it.hasNext()) { - ImperativeTransaction itx = it.next(); - itx.schedule(() -> { - if (itx.commit(state, timeTraveling)) { - commit(itx.state(), timeTraveling, it); + private void commit(State state, boolean timeTraveling) { + AtomicReference> status = new AtomicReference<>(Map.of()); + status.updateAndGet(map -> map.put(TARGET, 1)); + + tryCommit(state, timeTraveling, SOURCE, status); + } + + private void tryCommit(State state, boolean timeTraveling, IImperativeTransaction it, AtomicReference> status) { + if (status.get().get(TARGET) == 0 || killed || Objects.equals(it, TARGET)) return; + + Set incoming = imperativeTransactions.getIncomingNodes(it); + Map map = status.get(); + if (!incoming.isEmpty() && !incoming.allMatch(e -> map.getOrDefault(e, -1) == 2)) return; + + Map m = status.getAndUpdate(old -> old.put(it, 1)); + if (m.getOrDefault(it, 0) == 1) return; + + if (it instanceof ImperativeTransaction im) { + im.schedule(() -> { + if (im.commit(state, timeTraveling)) { + status.updateAndGet(old -> old.put(it, 2)); + imperativeTransactions.getOutgoingNodes(it).forEach(next -> + tryCommit(state, timeTraveling, next, status)); + } else { + status.getAndUpdate(old -> old.put(TARGET, 0)); } }); + } else { + status.updateAndGet(old -> old.put(it, 2)); + imperativeTransactions.getOutgoingNodes(it).forEach(next -> + tryCommit(state, timeTraveling, next, status)); } } diff --git a/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java new file mode 100644 index 00000000..ab343a09 --- /dev/null +++ b/src/test/java/org/modelingvalue/dclare/test/ImperativeOrderingTests.java @@ -0,0 +1,118 @@ +package org.modelingvalue.dclare.test; + +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; +import org.modelingvalue.collections.List; +import org.modelingvalue.collections.Set; +import org.modelingvalue.collections.util.Pair; +import org.modelingvalue.collections.util.StatusProvider; +import org.modelingvalue.dclare.*; +import org.modelingvalue.dclare.test.support.*; + +import java.time.Duration; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.modelingvalue.dclare.CoreSetableModifier.containment; +import static org.modelingvalue.dclare.test.support.Shared.THE_POOL; + +public class ImperativeOrderingTests { + private static final DclareConfig BASE_CONFIG = new DclareConfig() // + .withDevMode(true) // + .withCheckOrphanState(true) // + .withMaxNrOfChanges(16) // + .withMaxTotalNrOfChanges(1000) // + .withMaxNrOfObserved(40) // + .withMaxNrOfObservers(40) // + .withTraceUniverse(false) // + .withTraceMutable(false) // + .withTraceActions(false) // + .withTraceMatching(false) // + .withTraceRippleOut(false) // + .withTraceDerivation(false); + private static final DclareConfig[] CONFIGS = new DclareConfig[]{ // + BASE_CONFIG, // + BASE_CONFIG // + .withDevMode(true) // + .withRunSequential(true) // + }; + + private boolean imperativeTest(DclareConfig config, int size, Set> edges) { + return imperativeTest(config, size, edges, edges); + } + + private boolean imperativeTest(DclareConfig config, int size, Set> expectedEdges, Set> actualEdges) { + Observed> cs = Observed.of("cs", List.of(), containment); + TestMutableClass U = TestMutableClass.of("Universe", cs); + + OrderedTestUniverse universe = OrderedTestUniverse.of("universe", U, size, expectedEdges, actualEdges); + UniverseTransaction utx = new UniverseTransaction(universe, THE_POOL, config); + + Set> actions = Set.of(IntStream.range(0, size) + .mapToObj(i -> Pair.of(i, (Runnable) () -> {})).toList()); + + run(utx, "init", actions); + + run(utx, "stop", Set.of(Pair.of(0, utx::stop))); + utx.waitForEnd(); + return universe.passed(); + } + + @RepeatedTest(64) + public void basicOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + + assertTrue(imperativeTest(config, 1, Set.of())); + assertTrue(imperativeTest(config, 2, Set.of())); + assertTrue(imperativeTest(config, 2, Set.of(Pair.of(0, 1)))); + } + + @RepeatedTest(64) + public void complexOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + + assertTrue(imperativeTest(config, 6, Set.of( + Pair.of(0, 2), Pair.of(0, 3), Pair.of(1, 3), + Pair.of(2, 4), Pair.of(3, 4), Pair.of(3, 5) + ))); + } + + @RepeatedTest(64) + public void cyclicOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () -> + imperativeTest(config, 2, Set.of(Pair.of(0, 1), Pair.of(1, 0))))); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () -> + imperativeTest(config, 1, Set.of(Pair.of(0, 0))))); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () -> + imperativeTest(config, 4, Set.of( + Pair.of(0, 1), Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 1) + )))); + } + + @RepeatedTest(64) + public void wrongOrdering(RepetitionInfo repetitionInfo) { + DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32]; + + assertFalse(imperativeTest(config, 2, Set.of(Pair.of(0, 1)), Set.of(Pair.of(1, 0)))); + assertFalse(imperativeTest(config, 3, + Set.of(Pair.of(0, 1), Pair.of(0, 2), Pair.of(1, 2)), + Set.of(Pair.of(0, 1), Pair.of(0, 2), Pair.of(2, 1)) + )); + } + + private void run(UniverseTransaction utx, String id, Set> actions) { + StatusProvider.StatusIterator it = utx.getStatusIterator(); + UniverseTransaction.Status status = it.waitForStoppedOr(UniverseTransaction.Status::isIdle); + if (!status.isStopped()) { + if (utx.getConfig().isTraceUniverse()) { + System.err.println("-------------------------- " + id + " -------------------------------------------"); + } + OrderedTestUniverse u = (OrderedTestUniverse) utx.universe(); + u.schedule(actions); + it.waitForStoppedOr(s -> !s.active.isEmpty()); + } + } + +} diff --git a/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java new file mode 100644 index 00000000..969db6ad --- /dev/null +++ b/src/test/java/org/modelingvalue/dclare/test/support/OrderedTestUniverse.java @@ -0,0 +1,147 @@ +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// (C) Copyright 2018-2024 Modeling Value Group B.V. (http://modelingvalue.org) ~ +// ~ +// Licensed under the GNU Lesser General Public License v3.0 (the 'License'). You may not use this file except in ~ +// compliance with the License. You may obtain a copy of the License at: https://choosealicense.com/licenses/lgpl-3.0 ~ +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on ~ +// an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the ~ +// specific language governing permissions and limitations under the License. ~ +// ~ +// Maintainers: ~ +// Wim Bast, Tom Brus ~ +// ~ +// Contributors: ~ +// Ronald Krijgsheld ✝, Arjan Kok, Carel Bast ~ +// --------------------------------------------------------------------------------------------------------------------- ~ +// In Memory of Ronald Krijgsheld, 1972 - 2023 ~ +// Ronald was suddenly and unexpectedly taken from us. He was not only our long-term colleague and team member ~ +// but also our friend. "He will live on in many of the lines of code you see below." ~ +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +package org.modelingvalue.dclare.test.support; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.modelingvalue.collections.Entry; +import org.modelingvalue.collections.List; +import org.modelingvalue.collections.Map; +import org.modelingvalue.collections.Set; +import org.modelingvalue.collections.util.Pair; +import org.modelingvalue.dclare.ImperativeTransaction; +import org.modelingvalue.dclare.LeafTransaction; +import org.modelingvalue.dclare.Mutable; +import org.modelingvalue.dclare.Setable; +import org.modelingvalue.dclare.State; +import org.modelingvalue.dclare.Universe; +import org.modelingvalue.dclare.UniverseTransaction; + +@SuppressWarnings("unused") +public class OrderedTestUniverse extends TestMutable implements Universe { + public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { + return new OrderedTestUniverse(id, clazz, size, expectedEdges, actualEdges); + } + + private static final Setable DUMMY = Setable.of("$DUMMY", 0l); + + private final TestScheduler scheduler = TestScheduler.of(); + private final AtomicInteger counter = new AtomicInteger(0); + + private UniverseTransaction universeTransaction; + + private List imperativeTransactions = List.of(); + private final int size; + private Set> expectedEdges; + private Set> actualEdges; + private Map> incoming = Map.of(); + AtomicReference> passed = new AtomicReference<>(Map.of()); + private AtomicBoolean flag = new AtomicBoolean(true); + + private OrderedTestUniverse(Object id, TestMutableClass clazz, int size, Set> expectedEdges, Set> actualEdges) { + super(id, clazz); + this.size = size; + this.expectedEdges = expectedEdges; + this.actualEdges = actualEdges; + } + + @Override + public void init() { + scheduler.start(); + Universe.super.init(); + universeTransaction = LeafTransaction.getCurrent().universeTransaction(); + + for (int i = 0; i < size; i++) { + int finalI = i; + passed.getAndUpdate(map -> map.put(finalI, false)); + } + + for (int i = 0; i < size; i++) { + int finalI = i; + imperativeTransactions = imperativeTransactions.add(universeTransaction.addImperative("TEST" + i, (pre, post, last, setted) -> { + passed.getAndUpdate(list -> { + if (incoming.containsKey(finalI)) { + for (Integer inc : incoming.get((Integer) finalI)) { + if (!list.get(inc)) { + flag.set(false); + } + } + } + + return list.put(finalI, true); + }); + + pre.diff(post, o -> o instanceof TestNewable, s -> s == Mutable.D_PARENT_CONTAINING).forEach(e -> { + if (e.getValue().get(Mutable.D_PARENT_CONTAINING).b() != null) { + TestNewable n = (TestNewable) e.getKey(); + if (n.dInitialConstruction().isDerived()) { + TestNewable.construct(n, "init" + uniqueInt()); + } + } + }); + }, scheduler, false)); + } + + for (var edge : expectedEdges) { + incoming = incoming.put(edge.b(), incoming.getOrDefault(edge.b(), Set.of()).add(edge.a())); + } + for (var edge : actualEdges) { + universeTransaction.orderImperatives("TEST" + edge.a(), "TEST" + edge.b()); + } + } + + @Override + public void exit() { + scheduler.stop(); + Universe.super.exit(); + } + + public boolean passed() { + return flag.get() && passed.get().toValues().allMatch(b -> b); + } + + public int uniqueInt() { + return counter.getAndIncrement(); + } + + public void schedule(Set> actions) { + actions.forEach(action -> imperativeTransactions.get(action.a()).schedule(() -> { + DUMMY.set(this, Long::sum, 1L); + action.b().run(); + })); + } + + public State waitForEnd(UniverseTransaction universeTransaction) throws Throwable { + try { + return universeTransaction.waitForEnd(); + } catch (Error e) { + throw e.getCause(); + } + } + + @Override + public boolean dIsOrphan(State state) { + return Universe.super.dIsOrphan(state); + } + +}