Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowed ordering of imperative transactions #45

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 57 additions & 15 deletions src/main/java/org/modelingvalue/dclare/UniverseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.modelingvalue.collections.Collection;
import org.modelingvalue.collections.DefaultMap;
import org.modelingvalue.collections.Graph;
import org.modelingvalue.collections.Map;
import org.modelingvalue.collections.Entry;
import org.modelingvalue.collections.List;
import org.modelingvalue.collections.Set;
Expand All @@ -36,12 +38,12 @@
import org.modelingvalue.dclare.ex.ConsistencyError;
import org.modelingvalue.dclare.ex.TooManyChangesException;

import java.util.Iterator;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -92,7 +94,7 @@ public class UniverseTransaction extends MutableTransaction {
private List<Action<Universe>> timeTravelingActions = List.of(backward, forward);
private List<Action<Universe>> preActions = List.of();
private List<Action<Universe>> postActions = List.of();
private List<ImperativeTransaction> imperativeTransactions = List.of();
private Graph<ImperativeTransaction, Class<Void>> imperativeTransactions = Graph.of();
private List<State> history = List.of();
private List<State> future = List.of();
private State preState;
Expand Down Expand Up @@ -267,7 +269,7 @@ public void run() {
handleTooManyChanges(state);
runActions(postActions);
}
commit(state, timeTraveling, imperativeTransactions.iterator());
commit(state, timeTraveling);
if (!killed && inQueue.isEmpty() && isStopped(state)) {
break;
}
Expand Down Expand Up @@ -694,33 +696,73 @@ public void addPostAction(Action<Universe> action) {
public ImperativeTransaction addImperative(String id, StateDeltaHandler diffHandler, Consumer<Runnable> scheduler, boolean keepTransaction) {
ImperativeTransaction n = ImperativeTransaction.of(Imperative.of(id), preState, this, scheduler, diffHandler, keepTransaction);
synchronized (this) {
imperativeTransactions = imperativeTransactions.add(n);
imperativeTransactions = imperativeTransactions.addNode(n);
}
return n;
}

public void orderImperativeTransactions(ImperativeTransaction first, ImperativeTransaction second) {
imperativeTransactions.putEdge(first, second, Void.class);
}

public void orderImperativeTransactions(String first, String second) {
orderImperativeTransactions(getImperativeTransaction(first), getImperativeTransaction(second));
}

public List<ImperativeTransaction> getImperativeTransactions() {
return imperativeTransactions;
return imperativeTransactions.getAllNodes().asList();
}

public ImperativeTransaction getImperativeTransaction(String id) {
for (ImperativeTransaction it : imperativeTransactions) {
for (ImperativeTransaction it : imperativeTransactions.getAllNodes()) {
if (it.imperative().id().equals(id)) {
return it;
}
}
return null;
}

private void commit(State state, boolean timeTraveling, Iterator<ImperativeTransaction> it) {
if (!killed && it.hasNext()) {
ImperativeTransaction itx = it.next();
itx.schedule(() -> {
if (itx.commit(state, timeTraveling)) {
commit(itx.state(), timeTraveling, it);
}
});
}
private void commit(State state, boolean timeTraveling) {
if (imperativeTransactions.hasCycles())
throw new Error("Circular native group ordering");

AtomicReference<Set<ImperativeTransaction>> started = new AtomicReference<>(Set.of());
AtomicReference<Set<ImperativeTransaction>> stopped = new AtomicReference<>(Set.of());
AtomicBoolean inSync = new AtomicBoolean(true);
imperativeTransactions.getAllNodes().forEach(it -> tryCommit(
state, it, imperativeTransactions, timeTraveling,
started, stopped, inSync
));
}

private void tryCommit(
State state,
ImperativeTransaction it,
Graph<ImperativeTransaction, Class<Void>> graph,
boolean timeTraveling,
AtomicReference<Set<ImperativeTransaction>> started,
AtomicReference<Set<ImperativeTransaction>> stopped,
AtomicBoolean inSync
) {

if (!inSync.get()) return;

Map<ImperativeTransaction, ?> incoming = graph.getIncomingEdges(it);
if (!stopped.get().containsAll(incoming.toKeys())) return; // Not ready to start

Set<ImperativeTransaction> s = started.getAndUpdate(old -> old.add(it));
if (s.contains(it)) return; // Already started

it.schedule(() -> {
if (it.commit(state, timeTraveling)) {
stopped.updateAndGet(old -> old.add(it));
graph.getOutgoingEdges(it).forEach((next, ignored) ->
tryCommit(state, next, graph, timeTraveling, started, stopped, inSync)
);
} else {
inSync.set(false);
}
});
}

public void deriveLazy() {
Expand Down