Skip to content

Commit d2f9ee0

Browse files
authored
feat: queue message payloads (#20749)
* feat: queue message payloads Add sent payloads to message queue and resend if no response to message inside MaxMessageSuspendTimeout fixes #20507 * Add test for re-request Fix queued message send timing. * fix test server response format server * Make custom service simpler Fix concurrent issue with custom uidl handler. * use the has method clear queue for push messaging. * Do not up clientId for message already containing clientId * cleanup
1 parent 0440699 commit d2f9ee0

File tree

12 files changed

+461
-13
lines changed

12 files changed

+461
-13
lines changed

flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,10 @@ protected void handleJSON(final ValueMap valueMap) {
295295
}
296296

297297
/**
298-
* Should only prepare resync after the if (locked ||
298+
* Should only prepare resync after the (locked ||
299299
* !isNextExpectedMessage(serverId)) {...} since
300300
* stateTree.repareForResync() will remove the nodes, and if locked is
301-
* true, it will return without handling the message, thus won't adding
301+
* true, it will return without handling the message, thus won't add
302302
* nodes back.
303303
*
304304
* This is related to https://github.com/vaadin/flow/issues/8699 It

flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java

+90-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
*/
1616
package com.vaadin.client.communication;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
1821
import com.google.gwt.core.client.GWT;
22+
import com.google.gwt.user.client.Timer;
1923

2024
import com.vaadin.client.ConnectionIndicator;
2125
import com.vaadin.client.Console;
@@ -67,6 +71,10 @@ public enum ResynchronizationState {
6771

6872
private JsonObject pushPendingMessage;
6973

74+
private List<JsonObject> messageQueue = new ArrayList<>();
75+
76+
private Timer resendMessageTimer;
77+
7078
/**
7179
* Creates a new instance connected to the given registry.
7280
*
@@ -119,7 +127,10 @@ private void doSendInvocationsToServer() {
119127
JsonObject payload = pushPendingMessage;
120128
pushPendingMessage = null;
121129
registry.getRequestResponseTracker().startRequest();
122-
send(payload);
130+
sendPayload(payload);
131+
return;
132+
} else if (hasQueuedMessages() && resendMessageTimer == null) {
133+
sendPayload(messageQueue.get(0));
123134
return;
124135
}
125136

@@ -146,6 +157,8 @@ private void doSendInvocationsToServer() {
146157
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
147158
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
148159
Console.warn("Resynchronizing from server");
160+
messageQueue.clear();
161+
resetTimer();
149162
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
150163
}
151164
if (showLoadingIndicator) {
@@ -166,7 +179,6 @@ protected void send(final JsonArray reqInvocations,
166179
final JsonObject extraJson) {
167180
registry.getRequestResponseTracker().startRequest();
168181
send(preparePayload(reqInvocations, extraJson));
169-
170182
}
171183

172184
private JsonObject preparePayload(final JsonArray reqInvocations,
@@ -177,10 +189,6 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
177189
payload.put(ApplicationConstants.CSRF_TOKEN, csrfToken);
178190
}
179191
payload.put(ApplicationConstants.RPC_INVOCATIONS, reqInvocations);
180-
payload.put(ApplicationConstants.SERVER_SYNC_ID,
181-
registry.getMessageHandler().getLastSeenServerSyncId());
182-
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
183-
clientToServerMessageId++);
184192
if (extraJson != null) {
185193
for (String key : extraJson.keys()) {
186194
JsonValue value = extraJson.get(key);
@@ -192,12 +200,44 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
192200

193201
/**
194202
* Sends an asynchronous or synchronous UIDL request to the server using the
195-
* given URI.
203+
* given URI. Adds message to message queue and postpones sending if queue
204+
* not empty.
196205
*
197206
* @param payload
198207
* The contents of the request to send
199208
*/
200209
public void send(final JsonObject payload) {
210+
if (hasQueuedMessages()) {
211+
messageQueue.add(payload);
212+
return;
213+
}
214+
messageQueue.add(payload);
215+
sendPayload(payload);
216+
}
217+
218+
/**
219+
* Sends an asynchronous or synchronous UIDL request to the server using the
220+
* given URI.
221+
*
222+
* @param payload
223+
* The contents of the request to send
224+
*/
225+
private void sendPayload(final JsonObject payload) {
226+
payload.put(ApplicationConstants.SERVER_SYNC_ID,
227+
registry.getMessageHandler().getLastSeenServerSyncId());
228+
// clientID should only be set and updated if payload doesn't contain
229+
// clientID. If one exists we are probably trying to resend.
230+
if (!payload.hasKey(ApplicationConstants.CLIENT_TO_SERVER_ID)) {
231+
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
232+
clientToServerMessageId++);
233+
}
234+
235+
if (!registry.getRequestResponseTracker().hasActiveRequest()) {
236+
// Direct calls to send from outside probably have not started
237+
// request.
238+
registry.getRequestResponseTracker().startRequest();
239+
}
240+
201241
if (push != null && push.isBidirectional()) {
202242
// When using bidirectional transport, the payload is not resent
203243
// to the server during reconnection attempts.
@@ -211,6 +251,31 @@ public void send(final JsonObject payload) {
211251
} else {
212252
Console.debug("send XHR");
213253
registry.getXhrConnection().send(payload);
254+
255+
resetTimer();
256+
// resend last payload if response hasn't come in.
257+
resendMessageTimer = new Timer() {
258+
@Override
259+
public void run() {
260+
resendMessageTimer
261+
.schedule(registry.getApplicationConfiguration()
262+
.getMaxMessageSuspendTimeout() + 500);
263+
if (!registry.getRequestResponseTracker()
264+
.hasActiveRequest()) {
265+
registry.getRequestResponseTracker().startRequest();
266+
}
267+
registry.getXhrConnection().send(payload);
268+
}
269+
};
270+
resendMessageTimer.schedule(registry.getApplicationConfiguration()
271+
.getMaxMessageSuspendTimeout() + 500);
272+
}
273+
}
274+
275+
private void resetTimer() {
276+
if (resendMessageTimer != null) {
277+
resendMessageTimer.cancel();
278+
resendMessageTimer = null;
214279
}
215280
}
216281

@@ -289,6 +354,8 @@ public String getCommunicationMethodName() {
289354
*/
290355
public void resynchronize() {
291356
if (requestResynchronize()) {
357+
messageQueue.clear();
358+
resetTimer();
292359
sendInvocationsToServer();
293360
}
294361
}
@@ -311,12 +378,24 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
311378
ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) {
312379
pushPendingMessage = null;
313380
}
381+
if (hasQueuedMessages()) {
382+
// If queued message is the expected one. remove from queue
383+
// and send next message if any.
384+
if (messageQueue.get(0)
385+
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID)
386+
+ 1 == nextExpectedId) {
387+
resetTimer();
388+
messageQueue.remove(0);
389+
}
390+
}
314391
return;
315392
}
316393
if (force) {
317394
Console.debug(
318395
"Forced update of clientId to " + clientToServerMessageId);
319396
clientToServerMessageId = nextExpectedId;
397+
messageQueue.clear();
398+
resetTimer();
320399
return;
321400
}
322401

@@ -372,4 +451,8 @@ void clearResynchronizationState() {
372451
ResynchronizationState getResynchronizationState() {
373452
return resynchronizationState;
374453
}
454+
455+
public boolean hasQueuedMessages() {
456+
return !messageQueue.isEmpty();
457+
}
375458
}

flow-client/src/main/java/com/vaadin/client/communication/RequestResponseTracker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ public void endRequest() {
112112
if ((registry.getUILifecycle().isRunning()
113113
&& registry.getServerRpcQueue().isFlushPending())
114114
|| registry.getMessageSender()
115-
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) {
115+
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER
116+
|| registry.getMessageSender().hasQueuedMessages()) {
116117
// Send the pending RPCs immediately.
117118
// This might be an unnecessary optimization as ServerRpcQueue has a
118119
// finally scheduled command which trigger the send if we do not do

flow-client/src/test/frontend/FlowTests.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ function stubServerRemoteFunction(
748748
handlers.leaveNavigation();
749749
}
750750
}
751-
req.respond(200, { 'content-type': 'application/json' }, 'for(;;);[{}]');
751+
req.respond(200, {'content-type': 'application/json'}, 'for(;;);[{"syncId":' + (payload["syncId"] + 1) + ',"clientId":' + (payload["clientId"] + 1) + '}]');
752752
});
753753
}
754754

flow-server/src/main/java/com/vaadin/flow/server/communication/ServerRpcHandler.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) {
113113
this.csrfToken = csrfToken;
114114
}
115115

116-
if (isSyncIdCheckEnabled) {
116+
if (isSyncIdCheckEnabled && !isUnloadBeaconRequest()) {
117117
syncId = (int) json
118118
.getNumber(ApplicationConstants.SERVER_SYNC_ID);
119119
} else {
@@ -131,7 +131,10 @@ public RpcRequest(String jsonString, boolean isSyncIdCheckEnabled) {
131131
clientToServerMessageId = (int) json
132132
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID);
133133
} else {
134-
getLogger().warn("Server message without client id received");
134+
if (!isUnloadBeaconRequest()) {
135+
getLogger()
136+
.warn("Server message without client id received");
137+
}
135138
clientToServerMessageId = -1;
136139
}
137140
invocations = json.getArray(ApplicationConstants.RPC_INVOCATIONS);

flow-tests/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@
335335
<module>test-react-adapter</module>
336336
<module>test-react-adapter/pom-production.xml</module>
337337
<module>test-legacy-frontend</module>
338+
<module>test-client-queue</module>
338339
</modules>
339340
</profile>
340341
<profile>

flow-tests/test-client-queue/pom.xml

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<artifactId>flow-tests</artifactId>
6+
<groupId>com.vaadin</groupId>
7+
<version>24.7-SNAPSHOT</version>
8+
</parent>
9+
<artifactId>flow-client-queue-test</artifactId>
10+
<name>Test Flow client queue</name>
11+
12+
<packaging>war</packaging>
13+
<properties>
14+
<maven.deploy.skip>true</maven.deploy.skip>
15+
<!-- Test checks client log so java.util.logging.Level import is needed -->
16+
<enforcer.skip>true</enforcer.skip>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>com.vaadin</groupId>
22+
<artifactId>flow-test-resources</artifactId>
23+
<version>${project.version}</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>com.vaadin</groupId>
27+
<artifactId>vaadin-dev-server</artifactId>
28+
<version>${project.version}</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>com.vaadin</groupId>
32+
<artifactId>flow-html-components-testbench</artifactId>
33+
<version>${project.version}</version>
34+
<scope>test</scope>
35+
</dependency>
36+
</dependencies>
37+
38+
<build>
39+
<plugins>
40+
<!-- Run flow plugin to build frontend -->
41+
<plugin>
42+
<groupId>com.vaadin</groupId>
43+
<artifactId>flow-maven-plugin</artifactId>
44+
<executions>
45+
<execution>
46+
<goals>
47+
<goal>prepare-frontend</goal>
48+
</goals>
49+
</execution>
50+
</executions>
51+
<!-- <configuration>-->
52+
<!-- <frontendHotdeploy>true</frontendHotdeploy>-->
53+
<!-- </configuration>-->
54+
</plugin>
55+
<!-- Run jetty before integration tests, and stop after -->
56+
<plugin>
57+
<groupId>org.eclipse.jetty.ee10</groupId>
58+
<artifactId>jetty-ee10-maven-plugin</artifactId>
59+
</plugin>
60+
</plugins>
61+
</build>
62+
63+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2000-2025 Vaadin Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.vaadin.flow.misc.ui;
17+
18+
import java.util.List;
19+
20+
import com.vaadin.flow.function.DeploymentConfiguration;
21+
import com.vaadin.flow.server.RequestHandler;
22+
import com.vaadin.flow.server.ServiceException;
23+
import com.vaadin.flow.server.VaadinServlet;
24+
import com.vaadin.flow.server.VaadinServletService;
25+
import com.vaadin.flow.server.communication.UidlRequestHandler;
26+
27+
public class CustomService extends VaadinServletService {
28+
29+
public CustomService(VaadinServlet servlet,
30+
DeploymentConfiguration deploymentConfiguration) {
31+
super(servlet, deploymentConfiguration);
32+
}
33+
34+
@Override
35+
protected List<RequestHandler> createRequestHandlers()
36+
throws ServiceException {
37+
List<RequestHandler> requestHandlers = super.createRequestHandlers();
38+
requestHandlers.replaceAll(handler -> {
39+
if (handler instanceof UidlRequestHandler) {
40+
return new CustomUidlRequestHandler();
41+
}
42+
return handler;
43+
});
44+
return requestHandlers;
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2000-2024 Vaadin Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.vaadin.flow.misc.ui;
17+
18+
import jakarta.servlet.annotation.WebServlet;
19+
20+
import com.vaadin.flow.function.DeploymentConfiguration;
21+
import com.vaadin.flow.server.ServiceException;
22+
import com.vaadin.flow.server.VaadinServlet;
23+
import com.vaadin.flow.server.VaadinServletService;
24+
25+
@WebServlet(urlPatterns = "/*", asyncSupported = true)
26+
public class CustomServlet extends VaadinServlet {
27+
28+
@Override
29+
protected VaadinServletService createServletService(
30+
DeploymentConfiguration deploymentConfiguration)
31+
throws ServiceException {
32+
CustomService service = new CustomService(this,
33+
deploymentConfiguration);
34+
service.init();
35+
return service;
36+
}
37+
}

0 commit comments

Comments
 (0)