Skip to content

Commit ee26a22

Browse files
Legiothtaefimshabarov
authored
feat: Add trees and transactions to signals module (#21013)
* Add trees and transactions Still missing the topmost layer with the actual signal instances. The operation classes are leaking through from that layer even though they don't make much sense without the rest of that layer. * Remove the pin concept for now * Apply suggestions from code review Co-authored-by: Soroosh Taefi <[email protected]> Co-authored-by: Mikhail Shabarov <[email protected]> * Address review comments * Use a dedicated lock * Update terminology to "include" changes in transactions --------- Co-authored-by: Soroosh Taefi <[email protected]> Co-authored-by: Mikhail Shabarov <[email protected]>
1 parent c880995 commit ee26a22

14 files changed

+3212
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package com.vaadin.signals.impl;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
6+
import com.vaadin.signals.Id;
7+
import com.vaadin.signals.SignalCommand;
8+
9+
/**
10+
* A signal tree that submits commands to an event log and asynchronously waits
11+
* for external confirmation before completing handling of the command. This
12+
* means that {@link #submitted} may contain changes that are not yet in
13+
* {@link #confirmed} and might never end up there if a concurrent change causes
14+
* a conflict. This type of tree is intended for signals that are synchronized
15+
* across a cluster.
16+
*/
17+
public abstract class AsynchronousSignalTree extends SignalTree {
18+
private final CommandsAndHandlers unconfirmedCommands = new CommandsAndHandlers();
19+
20+
private Snapshot confirmed = new Snapshot(id(), true);
21+
22+
private Snapshot submitted = new Snapshot(id(), true);
23+
24+
protected AsynchronousSignalTree() {
25+
super(Type.ASYNCHRONOUS);
26+
}
27+
28+
/**
29+
* Submits a sequence of commands to the event log. It is expected that the
30+
* same sequence of commands will eventually be passed back to
31+
* {@link #confirm(List)}.
32+
*
33+
* @param commands
34+
* the list of commands to submit, not <code>null</code>
35+
*/
36+
protected abstract void submit(List<SignalCommand> commands);
37+
38+
/**
39+
* Adds a sequence of commands to the confirmed snapshot. The commands might
40+
* originate from this tree instance through {@link #submit(List)} or from a
41+
* different tree in the same cluster that uses the same underlying event
42+
* log. Any remaining commands that have been submitted but not yet
43+
* confirmed will be re-applied on top of the new confirmed state.
44+
*
45+
* @param commands
46+
* the sequence of confirmed commands, not <code>null</code>
47+
*/
48+
public void confirm(List<SignalCommand> commands) {
49+
runWithLock(() -> {
50+
MutableTreeRevision builder = new MutableTreeRevision(confirmed);
51+
52+
Map<Id, CommandResult> results = builder
53+
.applyAndGetResults(commands);
54+
55+
confirmed = new Snapshot(builder);
56+
57+
unconfirmedCommands.removeHandledCommands(results.keySet());
58+
59+
Snapshot oldSubmitted = submitted;
60+
61+
/*
62+
* TODO: could skip this part if the newly confirmed commands were
63+
* at the head of unconfirmedCommands since submitted doesn't change
64+
* in that case
65+
*/
66+
if (!unconfirmedCommands.isEmpty()) {
67+
builder.apply(unconfirmedCommands.getCommands());
68+
69+
submitted = new Snapshot(builder);
70+
} else {
71+
submitted = confirmed;
72+
}
73+
74+
notifyObservers(oldSubmitted, submitted);
75+
76+
unconfirmedCommands.notifyResultHandlers(results, commands);
77+
});
78+
}
79+
80+
@Override
81+
public PendingCommit prepareCommit(CommandsAndHandlers changes) {
82+
assert hasLock();
83+
84+
Snapshot oldSnapshot = submitted;
85+
86+
MutableTreeRevision builder = new MutableTreeRevision(submitted);
87+
88+
builder.apply(changes.getCommands());
89+
90+
Snapshot newSnapshot = new Snapshot(builder);
91+
92+
return new PendingCommit() {
93+
@Override
94+
public boolean canCommit() {
95+
assert hasLock();
96+
97+
// Can always "commit" since conflicts will be resolved only
98+
// when confirmed
99+
return true;
100+
}
101+
102+
@Override
103+
public void applyChanges() {
104+
assert hasLock();
105+
106+
unconfirmedCommands.add(changes);
107+
submitted = newSnapshot;
108+
}
109+
110+
@Override
111+
public void publishChanges() {
112+
assert hasLock();
113+
114+
notifyObservers(oldSnapshot, newSnapshot);
115+
116+
submit(changes.getCommands());
117+
}
118+
119+
@Override
120+
public void markAsAborted() {
121+
// Async transactions cannot be aborted
122+
throw new UnsupportedOperationException();
123+
}
124+
};
125+
}
126+
127+
@Override
128+
public Snapshot confirmed() {
129+
return getWithLock(() -> confirmed);
130+
}
131+
132+
@Override
133+
public Snapshot submitted() {
134+
return getWithLock(() -> submitted);
135+
}
136+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package com.vaadin.signals.impl;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.Collections;
6+
import java.util.HashMap;
7+
import java.util.List;
8+
import java.util.Map;
9+
import java.util.function.Consumer;
10+
11+
import com.vaadin.signals.Id;
12+
import com.vaadin.signals.SignalCommand;
13+
14+
/**
15+
* A list of signal commands together with their result handlers.
16+
*/
17+
public class CommandsAndHandlers {
18+
private final List<SignalCommand> commands = new ArrayList<>();
19+
private final Map<Id, Consumer<CommandResult>> resultHandlers = new HashMap<>();
20+
21+
/**
22+
* Creates a new empty command list.
23+
*/
24+
public CommandsAndHandlers() {
25+
26+
}
27+
28+
/**
29+
* Creates a new command list with the given commands and result handlers.
30+
*
31+
* @param commands
32+
* the commands to use, not <code>null</code>
33+
* @param resultHandlers
34+
* the result handlers to use, not <code>null</code>
35+
*/
36+
public CommandsAndHandlers(List<SignalCommand> commands,
37+
Map<Id, Consumer<CommandResult>> resultHandlers) {
38+
this.commands.addAll(commands);
39+
this.resultHandlers.putAll(resultHandlers);
40+
}
41+
42+
/**
43+
* Creates a new command list with a single command and optional result
44+
* handler.
45+
*
46+
* @param command
47+
* the command to use, not <code>null</code>
48+
* @param resultHandler
49+
* the result handler to use, or <code>null</code> to not use a
50+
* result handler
51+
*/
52+
public CommandsAndHandlers(SignalCommand command,
53+
Consumer<CommandResult> resultHandler) {
54+
assert command != null;
55+
commands.add(command);
56+
if (resultHandler != null) {
57+
resultHandlers.put(command.commandId(), resultHandler);
58+
}
59+
}
60+
61+
/**
62+
* Removes commands based on a collection of handled commands. Note that the
63+
* corresponding result handlers are not removed but there's instead an
64+
* assumption that the caller will invoke {@link #notifyResultHandlers(Map)}
65+
* separately.
66+
*
67+
* @param handledCommandIds
68+
* a collection of handled commands ids, not <code>null</code>
69+
*/
70+
public void removeHandledCommands(Collection<Id> handledCommandIds) {
71+
commands.removeIf(
72+
command -> handledCommandIds.contains(command.commandId()));
73+
}
74+
75+
/**
76+
* Notifies and removes result handlers for the given results.
77+
*
78+
* @param results
79+
* a map of command results, not <code>null</code>
80+
*/
81+
public void notifyResultHandlers(Map<Id, CommandResult> results) {
82+
notifyResultHandlers(results, commands);
83+
}
84+
85+
/**
86+
* Notifies and removes result handlers for the given results in the given
87+
* order. Commands in the order that have no corresponding result are
88+
* ignored.
89+
*
90+
* @param results
91+
* the map of command results, not <code>null</code>
92+
* @param commandOrder
93+
* a list of commands in the order the results should be applied.
94+
*/
95+
public void notifyResultHandlers(Map<Id, CommandResult> results,
96+
List<SignalCommand> commandOrder) {
97+
for (SignalCommand command : commandOrder) {
98+
if (command instanceof SignalCommand.TransactionCommand tx) {
99+
notifyResultHandlers(results, tx.commands());
100+
}
101+
Consumer<CommandResult> handler = resultHandlers
102+
.remove(command.commandId());
103+
if (handler != null) {
104+
handler.accept(results.get(command.commandId()));
105+
}
106+
}
107+
}
108+
109+
/**
110+
* Gets an unmodifiable view of the commands.
111+
*
112+
* @return an unmodifiable list of commands, not <code>null</code>
113+
*/
114+
public List<SignalCommand> getCommands() {
115+
return Collections.unmodifiableList(commands);
116+
}
117+
118+
/**
119+
* Gets an unmodifiable map of the result handlers.
120+
*
121+
* @return an unmodifiable map of result handlers, not <code>null</code>
122+
*/
123+
public Map<Id, Consumer<CommandResult>> getResultHandlers() {
124+
return Collections.unmodifiableMap(resultHandlers);
125+
}
126+
127+
/**
128+
* Adds another collection of commands and handlers to this one.
129+
*
130+
*
131+
* @param other
132+
* the instance to import entries from, not <code>null</code>
133+
*/
134+
public void add(CommandsAndHandlers other) {
135+
this.commands.addAll(other.commands);
136+
this.resultHandlers.putAll(other.resultHandlers);
137+
}
138+
139+
/**
140+
* Checks whether there are any commands in this list.
141+
*
142+
* @return <code>true</code> if there are no commands in this list,
143+
* <code>false</code> if there are commands
144+
*/
145+
public boolean isEmpty() {
146+
return commands.isEmpty();
147+
}
148+
}

0 commit comments

Comments
 (0)