Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imperative transaction ordering #47

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.modelingvalue.dclare;

public sealed interface IImperativeTransaction permits TerminalImperativeTransaction, ImperativeTransaction {
}
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@
import org.modelingvalue.collections.util.NamedIdentity;
import org.modelingvalue.dclare.Priority.Queued;

public class ImperativeTransaction extends LeafTransaction {
public final class ImperativeTransaction extends LeafTransaction implements IImperativeTransaction {

@SuppressWarnings("rawtypes")
protected static final DefaultMap<Object, Set<Setable>> SETTED_MAP = DefaultMap.of(k -> Set.of());
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.modelingvalue.dclare;

public final class TerminalImperativeTransaction implements IImperativeTransaction {
// for imperative transaction ordering in graph
}
188 changes: 166 additions & 22 deletions src/main/java/org/modelingvalue/dclare/UniverseTransaction.java
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@

package org.modelingvalue.dclare;

import java.util.Iterator;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
@@ -30,17 +29,10 @@
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.modelingvalue.collections.Collection;
import org.modelingvalue.collections.DefaultMap;
import org.modelingvalue.collections.Entry;
import org.modelingvalue.collections.List;
import org.modelingvalue.collections.Set;
import org.modelingvalue.collections.util.Concurrent;
import org.modelingvalue.collections.util.ContextPool;
import org.modelingvalue.collections.util.StatusProvider;
import org.modelingvalue.collections.*;
import org.modelingvalue.collections.util.*;
import org.modelingvalue.collections.util.StatusProvider.AbstractStatus;
import org.modelingvalue.collections.util.StatusProvider.StatusIterator;
import org.modelingvalue.collections.util.TraceTimer;
import org.modelingvalue.dclare.NonCheckingObserver.NonCheckingTransaction;
import org.modelingvalue.dclare.Priority.MutableStates;
import org.modelingvalue.dclare.ex.ConsistencyError;
@@ -50,6 +42,8 @@
public class UniverseTransaction extends MutableTransaction {

private static final Setable<Universe, Boolean> STOPPED = Setable.of("stopped", false);
private static final TerminalImperativeTransaction SOURCE = new TerminalImperativeTransaction();
private static final TerminalImperativeTransaction TARGET = new TerminalImperativeTransaction();
//
private final DclareConfig config;
protected final Concurrent<ReusableTransaction<Action<?>, ActionTransaction>> actionTransactions = Concurrent.of(() -> new ReusableTransaction<>(this));
@@ -94,7 +88,7 @@ public class UniverseTransaction extends MutableTransaction {
private List<Action<Universe>> timeTravelingActions = List.of(backward, forward);
private List<Action<Universe>> preActions = List.of();
private List<Action<Universe>> postActions = List.of();
private List<ImperativeTransaction> imperativeTransactions = List.of();
private Graph<IImperativeTransaction, Class<Void>> imperativeTransactions = Graph.of();
private List<State> history = List.of();
private List<State> future = List.of();
private State preState;
@@ -199,6 +193,7 @@ public UniverseTransaction(Universe universe, ContextPool pool, boolean pull, Dc
if (startStatusConsumer != null) {
startStatusConsumer.accept(startStatus);
}
imperativeTransactions = imperativeTransactions.putEdge(SOURCE, TARGET, Void.class);
}

@Override
@@ -282,7 +277,7 @@ public void run() {
handleTooManyChanges(state);
runActions(postActions);
}
commit(state, timeTraveling, imperativeTransactions.iterator());
commit(state, timeTraveling);
if (!killed && inQueue.isEmpty() && isStopped(state)) {
break;
}
@@ -706,35 +701,184 @@ public void addPostAction(Action<Universe> action) {
}
}

/**
* Constructs a new imperative transaction and add it to the universe transaction. Synchronized
* on {@code this}.
*
* @param id string identifier
* @param diffHandler handles the incoming difference after fixpoint achieved, always executed on {@code scheduler}
* @param scheduler a single-threaded consumer that always uses the same thread
* @param keepTransaction if true, will persist the association of this transaction on the {@code scheduler}'s thread after scheduled actions have completed
* @return the created imperative transaction
*/
public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer<Runnable> scheduler, boolean keepTransaction) {
ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction);
synchronized (this) {
imperativeTransactions = imperativeTransactions.add(n);
imperativeTransactions = imperativeTransactions.putEdge(SOURCE, n, Void.class);
}
return n;
}

/**
* Constructs a new imperative transaction and add it to the universe transaction, adding any
* orderings between imperative transactions from {@code dependencies} and {@code dependents}.
* Synchronized on {@code this}.
*
* @param id string identifier
* @param diffHandler handles the incoming difference after fixpoint achieved, always executed on {@code scheduler}
* @param scheduler a single-threaded consumer that always uses the same thread
* @param keepTransaction if true, will persist the association of this transaction on the {@code scheduler}'s thread after scheduled actions have completed
* @param dependencies set of imperative transactions that will only run before this imperative transaction
* @param dependents set of imperative transactions that will only run after this imperative transaction
* @return the created imperative transaction
*/
public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer<Runnable> scheduler, boolean keepTransaction, Set<ImperativeTransaction> dependencies, Set<ImperativeTransaction> dependents) {
ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction);
synchronized (this) {
imperativeTransactions = imperativeTransactions.putEdge(SOURCE, n, Void.class);

for (var it : dependencies) {
imperativeTransactions = imperativeTransactions.putEdge(it, n, Void.class);
}

for (var it : dependents) {
imperativeTransactions = imperativeTransactions.putEdge(n, it, Void.class);
}

if (imperativeTransactions.getIncomingNodes(n).size() > 1)
imperativeTransactions = imperativeTransactions.removeEdge(SOURCE, n, Void.class);
}
return n;
}

/**
* Removes the given imperative transaction if it has no dependents. Synchronized on
* {@code this}.
*
* @param it imperative transaction to be removed
*/
public void removeImperative(ImperativeTransaction it) {
synchronized (this) {
Set<IImperativeTransaction> outgoing = imperativeTransactions.getOutgoingNodes(it);

if (!outgoing.isEmpty()) {
throw new Error("Cannot remove " + it + " because it is a dependent of " + outgoing);
}

imperativeTransactions = imperativeTransactions.removeNode(it);
}
}
/**
* Removes the given imperative transaction if it has no dependents. Synchronized on
* {@code this}.
*
* @param id id of imperative transaction to be removed
*/
public void removeImperative(String id) {
removeImperative(getImperativeTransaction(id));
}

/**
* Creates an ordering between the two imperative transactions, ensuring that {@code first}
* will always run before {@code second}. Synchronized on {@code this}.
*
* @param first imperative transaction to be run before {@code second}
* @param second imperative transaction to be run after {@code first}
*/
public void orderImperatives(ImperativeTransaction first, ImperativeTransaction second) {
synchronized (this) {
imperativeTransactions = imperativeTransactions.putEdge(first, second, Void.class);
if (imperativeTransactions.hasCycles(n -> true, e -> true, first))
throw new Error("Circular native group ordering");

if (imperativeTransactions.getIncomingNodes(second).size() > 1)
imperativeTransactions = imperativeTransactions.removeEdge(SOURCE, second, Void.class);
}
}

/**
* Creates an ordering between the two imperative transactions, ensuring that {@code first}
* will always run before {@code second}. Synchronized on {@code this}.
*
* @param first id of imperative transaction to be run before {@code second}
* @param second id of imperative transaction to be run after {@code first}
*/
public void orderImperatives(String first, String second) {
orderImperatives(getImperativeTransaction(first), getImperativeTransaction(second));
}

/**
* Removes an ordering between the two imperative transactions if it exists. Synchronized on
* {@code this}.
*
* @param first imperative transaction that was run before {@code second}
* @param second imperative transaction that was run after {@code first}
*/
public void unorderImperatives(ImperativeTransaction first, ImperativeTransaction second) {
synchronized (this) {
if (imperativeTransactions.getIncomingNodes(second).size() == 1 && imperativeTransactions.containsEdge(first, second, Void.class))
imperativeTransactions = imperativeTransactions.putEdge(SOURCE, second, Void.class);

imperativeTransactions = imperativeTransactions.removeEdge(first, second, Void.class);
}
}

/**
* Removes an ordering between the two imperative transactions if it exists. Synchronized on
* {@code this}.
*
* @param first id of imperative transaction that runs before {@code second}
* @param second id of imperative transaction that runs after {@code first}
*/
public void unorderImperatives(String first, String second) {
unorderImperatives(getImperativeTransaction(first), getImperativeTransaction(second));
}

@SuppressWarnings("unchecked")
public List<ImperativeTransaction> getImperativeTransactions() {
return imperativeTransactions;
return imperativeTransactions.getNodes().filter(ImperativeTransaction.class::isInstance).map(ImperativeTransaction.class::cast).asList();
}

public ImperativeTransaction getImperativeTransaction(String id) {
for (ImperativeTransaction it : imperativeTransactions) {
if (it.imperative().id().equals(id)) {
for (IImperativeTransaction iit : imperativeTransactions.getNodes()) {
if (iit instanceof ImperativeTransaction it && it.imperative().id().equals(id)) {
return it;
}
}
return null;
}

private void commit(State state, boolean timeTraveling, Iterator<ImperativeTransaction> it) {
if (!killed && it.hasNext()) {
ImperativeTransaction itx = it.next();
itx.schedule(() -> {
if (itx.commit(state, timeTraveling)) {
commit(itx.state(), timeTraveling, it);
private void commit(State state, boolean timeTraveling) {
AtomicReference<Map<IImperativeTransaction, Integer>> status = new AtomicReference<>(Map.of());
status.updateAndGet(map -> map.put(TARGET, 1));

tryCommit(state, timeTraveling, SOURCE, status);
}

private void tryCommit(State state, boolean timeTraveling, IImperativeTransaction it, AtomicReference<Map<IImperativeTransaction, Integer>> status) {
if (status.get().get(TARGET) == 0 || killed || Objects.equals(it, TARGET)) return;

Set<IImperativeTransaction> incoming = imperativeTransactions.getIncomingNodes(it);
Map<IImperativeTransaction, Integer> map = status.get();
if (!incoming.isEmpty() && !incoming.allMatch(e -> map.getOrDefault(e, -1) == 2)) return;

Map<IImperativeTransaction, Integer> m = status.getAndUpdate(old -> old.put(it, 1));
if (m.getOrDefault(it, 0) == 1) return;

if (it instanceof ImperativeTransaction im) {
im.schedule(() -> {
if (im.commit(state, timeTraveling)) {
status.updateAndGet(old -> old.put(it, 2));
imperativeTransactions.getOutgoingNodes(it).forEach(next ->
tryCommit(state, timeTraveling, next, status));
} else {
status.getAndUpdate(old -> old.put(TARGET, 0));
}
});
} else {
status.updateAndGet(old -> old.put(it, 2));
imperativeTransactions.getOutgoingNodes(it).forEach(next ->
tryCommit(state, timeTraveling, next, status));
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.modelingvalue.dclare.test;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.RepetitionInfo;
import org.modelingvalue.collections.List;
import org.modelingvalue.collections.Set;
import org.modelingvalue.collections.util.Pair;
import org.modelingvalue.collections.util.StatusProvider;
import org.modelingvalue.dclare.*;
import org.modelingvalue.dclare.test.support.*;

import java.time.Duration;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.*;
import static org.modelingvalue.dclare.CoreSetableModifier.containment;
import static org.modelingvalue.dclare.test.support.Shared.THE_POOL;

public class ImperativeOrderingTests {
private static final DclareConfig BASE_CONFIG = new DclareConfig() //
.withDevMode(true) //
.withCheckOrphanState(true) //
.withMaxNrOfChanges(16) //
.withMaxTotalNrOfChanges(1000) //
.withMaxNrOfObserved(40) //
.withMaxNrOfObservers(40) //
.withTraceUniverse(false) //
.withTraceMutable(false) //
.withTraceActions(false) //
.withTraceMatching(false) //
.withTraceRippleOut(false) //
.withTraceDerivation(false);
private static final DclareConfig[] CONFIGS = new DclareConfig[]{ //
BASE_CONFIG, //
BASE_CONFIG //
.withDevMode(true) //
.withRunSequential(true) //
};

private boolean imperativeTest(DclareConfig config, int size, Set<Pair<Integer, Integer>> edges) {
return imperativeTest(config, size, edges, edges);
}

private boolean imperativeTest(DclareConfig config, int size, Set<Pair<Integer, Integer>> expectedEdges, Set<Pair<Integer, Integer>> actualEdges) {
Observed<TestMutable, List<TestNewable>> cs = Observed.of("cs", List.of(), containment);
TestMutableClass U = TestMutableClass.of("Universe", cs);

OrderedTestUniverse universe = OrderedTestUniverse.of("universe", U, size, expectedEdges, actualEdges);
UniverseTransaction utx = new UniverseTransaction(universe, THE_POOL, config);

Set<Pair<Integer, Runnable>> actions = Set.of(IntStream.range(0, size)
.mapToObj(i -> Pair.of(i, (Runnable) () -> {})).toList());

run(utx, "init", actions);

run(utx, "stop", Set.of(Pair.of(0, utx::stop)));
utx.waitForEnd();
return universe.passed();
}

@RepeatedTest(64)
public void basicOrdering(RepetitionInfo repetitionInfo) {
DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32];

assertTrue(imperativeTest(config, 1, Set.of()));
assertTrue(imperativeTest(config, 2, Set.of()));
assertTrue(imperativeTest(config, 2, Set.of(Pair.of(0, 1))));
}

@RepeatedTest(64)
public void complexOrdering(RepetitionInfo repetitionInfo) {
DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32];

assertTrue(imperativeTest(config, 6, Set.of(
Pair.of(0, 2), Pair.of(0, 3), Pair.of(1, 3),
Pair.of(2, 4), Pair.of(3, 4), Pair.of(3, 5)
)));
}

@RepeatedTest(64)
public void cyclicOrdering(RepetitionInfo repetitionInfo) {
DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32];

assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () ->
imperativeTest(config, 2, Set.of(Pair.of(0, 1), Pair.of(1, 0)))));
assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () ->
imperativeTest(config, 1, Set.of(Pair.of(0, 0)))));
assertTimeoutPreemptively(Duration.ofSeconds(2), () -> assertThrows(Error.class, () ->
imperativeTest(config, 4, Set.of(
Pair.of(0, 1), Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 1)
))));
}

@RepeatedTest(64)
public void wrongOrdering(RepetitionInfo repetitionInfo) {
DclareConfig config = CONFIGS[(repetitionInfo.getCurrentRepetition() - 1) / 32];

assertFalse(imperativeTest(config, 2, Set.of(Pair.of(0, 1)), Set.of(Pair.of(1, 0))));
assertFalse(imperativeTest(config, 3,
Set.of(Pair.of(0, 1), Pair.of(0, 2), Pair.of(1, 2)),
Set.of(Pair.of(0, 1), Pair.of(0, 2), Pair.of(2, 1))
));
}

private void run(UniverseTransaction utx, String id, Set<Pair<Integer, Runnable>> actions) {
StatusProvider.StatusIterator<UniverseTransaction.Status> it = utx.getStatusIterator();
UniverseTransaction.Status status = it.waitForStoppedOr(UniverseTransaction.Status::isIdle);
if (!status.isStopped()) {
if (utx.getConfig().isTraceUniverse()) {
System.err.println("-------------------------- " + id + " -------------------------------------------");
}
OrderedTestUniverse u = (OrderedTestUniverse) utx.universe();
u.schedule(actions);
it.waitForStoppedOr(s -> !s.active.isEmpty());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// (C) Copyright 2018-2024 Modeling Value Group B.V. (http://modelingvalue.org) ~
// ~
// Licensed under the GNU Lesser General Public License v3.0 (the 'License'). You may not use this file except in ~
// compliance with the License. You may obtain a copy of the License at: https://choosealicense.com/licenses/lgpl-3.0 ~
// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on ~
// an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the ~
// specific language governing permissions and limitations under the License. ~
// ~
// Maintainers: ~
// Wim Bast, Tom Brus ~
// ~
// Contributors: ~
// Ronald Krijgsheld ✝, Arjan Kok, Carel Bast ~
// --------------------------------------------------------------------------------------------------------------------- ~
// In Memory of Ronald Krijgsheld, 1972 - 2023 ~
// Ronald was suddenly and unexpectedly taken from us. He was not only our long-term colleague and team member ~
// but also our friend. "He will live on in many of the lines of code you see below." ~
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

package org.modelingvalue.dclare.test.support;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.modelingvalue.collections.Entry;
import org.modelingvalue.collections.List;
import org.modelingvalue.collections.Map;
import org.modelingvalue.collections.Set;
import org.modelingvalue.collections.util.Pair;
import org.modelingvalue.dclare.ImperativeTransaction;
import org.modelingvalue.dclare.LeafTransaction;
import org.modelingvalue.dclare.Mutable;
import org.modelingvalue.dclare.Setable;
import org.modelingvalue.dclare.State;
import org.modelingvalue.dclare.Universe;
import org.modelingvalue.dclare.UniverseTransaction;

@SuppressWarnings("unused")
public class OrderedTestUniverse extends TestMutable implements Universe {
public static OrderedTestUniverse of(Object id, TestMutableClass clazz, int size, Set<Pair<Integer, Integer>> expectedEdges, Set<Pair<Integer, Integer>> actualEdges) {
return new OrderedTestUniverse(id, clazz, size, expectedEdges, actualEdges);
}

private static final Setable<OrderedTestUniverse, Long> DUMMY = Setable.of("$DUMMY", 0l);

private final TestScheduler scheduler = TestScheduler.of();
private final AtomicInteger counter = new AtomicInteger(0);

private UniverseTransaction universeTransaction;

private List<ImperativeTransaction> imperativeTransactions = List.of();
private final int size;
private Set<Pair<Integer, Integer>> expectedEdges;
private Set<Pair<Integer, Integer>> actualEdges;
private Map<Integer, Set<Integer>> incoming = Map.of();
AtomicReference<Map<Integer, Boolean>> passed = new AtomicReference<>(Map.of());
private AtomicBoolean flag = new AtomicBoolean(true);

private OrderedTestUniverse(Object id, TestMutableClass clazz, int size, Set<Pair<Integer, Integer>> expectedEdges, Set<Pair<Integer, Integer>> actualEdges) {
super(id, clazz);
this.size = size;
this.expectedEdges = expectedEdges;
this.actualEdges = actualEdges;
}

@Override
public void init() {
scheduler.start();
Universe.super.init();
universeTransaction = LeafTransaction.getCurrent().universeTransaction();

for (int i = 0; i < size; i++) {
int finalI = i;
passed.getAndUpdate(map -> map.put(finalI, false));
}

for (int i = 0; i < size; i++) {
int finalI = i;
imperativeTransactions = imperativeTransactions.add(universeTransaction.addImperative("TEST" + i, (pre, post, last, setted) -> {
passed.getAndUpdate(list -> {
if (incoming.containsKey(finalI)) {
for (Integer inc : incoming.get((Integer) finalI)) {
if (!list.get(inc)) {
flag.set(false);
}
}
}

return list.put(finalI, true);
});

pre.diff(post, o -> o instanceof TestNewable, s -> s == Mutable.D_PARENT_CONTAINING).forEach(e -> {
if (e.getValue().get(Mutable.D_PARENT_CONTAINING).b() != null) {
TestNewable n = (TestNewable) e.getKey();
if (n.dInitialConstruction().isDerived()) {
TestNewable.construct(n, "init" + uniqueInt());
}
}
});
}, scheduler, false));
}

for (var edge : expectedEdges) {
incoming = incoming.put(edge.b(), incoming.getOrDefault(edge.b(), Set.of()).add(edge.a()));
}
for (var edge : actualEdges) {
universeTransaction.orderImperatives("TEST" + edge.a(), "TEST" + edge.b());
}
}

@Override
public void exit() {
scheduler.stop();
Universe.super.exit();
}

public boolean passed() {
return flag.get() && passed.get().toValues().allMatch(b -> b);
}

public int uniqueInt() {
return counter.getAndIncrement();
}

public void schedule(Set<Pair<Integer, Runnable>> actions) {
actions.forEach(action -> imperativeTransactions.get(action.a()).schedule(() -> {
DUMMY.set(this, Long::sum, 1L);
action.b().run();
}));
}

public State waitForEnd(UniverseTransaction universeTransaction) throws Throwable {
try {
return universeTransaction.waitForEnd();
} catch (Error e) {
throw e.getCause();
}
}

@Override
public boolean dIsOrphan(State state) {
return Universe.super.dIsOrphan(state);
}

}