Skip to content

Commit 0013991

Browse files
authored
fix: abort incomplete pipelines upon reconnect (#1084)
Elasticache severs the connection immediately after it returns a READONLY error. This can sometimes leave queued up pipelined commands in an inconsistent state when the connection is reestablished. For example, if a pipeline has 6 commands and the second one generates a READONLY error, Elasticache will only return results for the first two before severing the connection. Upon reconnect, the pipeline still thinks it has 6 commands to send but the commandQueue has only 4. This fix will detect any pipeline command sets that only had a partial response before connection loss, and abort them. This Elasticache behavior also affects transactions. If reconnectOnError returns 2, some transaction fragments may end up in the offlineQueue. This fix will check the offlineQueue for any such transaction fragments and abort them, so that we don't send mismatched multi/exec to redis upon reconnection. - Introduced piplineIndex property on pipelined commands to allow for later cleanup - Added a routine to event_handler that aborts any pipelined commands inside commandQueue and offlineQueue that were interrupted in the middle of the pipeline - Added a routine to event_handler that removes any transaction fragments from the offline queue - Introduced inTransaction property on commands to simplify pipeline logic - Added a flags param to mock_server to allow the Elasticache disconnect behavior to be simulated - Added a reconnect_on_error test case for transactions - Added some test cases testing for correct handling of this unique elasticache behavior - Added unit tests to validate inTransaction and pipelineIndex setting Fixes #965
1 parent 4bbdfd6 commit 0013991

File tree

8 files changed

+344
-16
lines changed

8 files changed

+344
-16
lines changed

examples/basic_operations.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ redis.sadd("set", [1, 3, 5, 7]);
3333
redis.spop("set"); // Promise resolves to "5" or another item in the set
3434

3535
// Most responses are strings, or arrays of strings
36-
redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three")
36+
redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three");
3737
redis.zrange("sortedSet", 0, 2, "WITHSCORES").then(res => console.log(res)); // Promise resolves to ["one", "1", "dos", "2", "three", "3"] as if the command was ` redis> ZRANGE sortedSet 0 2 WITHSCORES `
3838

3939
// Some responses have transformers to JS values

lib/command.ts

+2
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ export default class Command implements ICommand {
154154
private callback: CallbackFunction;
155155
private transformed: boolean = false;
156156
public isCustomCommand: boolean = false;
157+
public inTransaction: boolean = false;
158+
public pipelineIndex?: number;
157159

158160
private slot?: number | null;
159161
private keys?: Array<string | Buffer>;

lib/pipeline.ts

+8-13
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,9 @@ Pipeline.prototype.fillResult = function(value, position) {
6565
if (this.isCluster) {
6666
let retriable = true;
6767
let commonError: { name: string; message: string };
68-
let inTransaction: boolean;
6968
for (let i = 0; i < this._result.length; ++i) {
7069
var error = this._result[i][0];
7170
var command = this._queue[i];
72-
if (command.name === "multi") {
73-
inTransaction = true;
74-
} else if (command.name === "exec") {
75-
inTransaction = false;
76-
}
7771
if (error) {
7872
if (
7973
command.name === "exec" &&
@@ -94,7 +88,7 @@ Pipeline.prototype.fillResult = function(value, position) {
9488
retriable = false;
9589
break;
9690
}
97-
} else if (!inTransaction) {
91+
} else if (!command.inTransaction) {
9892
var isReadOnly =
9993
exists(command.name) && hasFlag(command.name, "readonly");
10094
if (!isReadOnly) {
@@ -107,7 +101,7 @@ Pipeline.prototype.fillResult = function(value, position) {
107101
var _this = this;
108102
var errv = commonError.message.split(" ");
109103
var queue = this._queue;
110-
inTransaction = false;
104+
let inTransaction = false;
111105
this._queue = [];
112106
for (let i = 0; i < queue.length; ++i) {
113107
if (
@@ -122,11 +116,7 @@ Pipeline.prototype.fillResult = function(value, position) {
122116
}
123117
queue[i].initPromise();
124118
this.sendCommand(queue[i]);
125-
if (queue[i].name === "multi") {
126-
inTransaction = true;
127-
} else if (queue[i].name === "exec") {
128-
inTransaction = false;
129-
}
119+
inTransaction = queue[i].inTransaction;
130120
}
131121

132122
let matched = true;
@@ -174,7 +164,12 @@ Pipeline.prototype.fillResult = function(value, position) {
174164
};
175165

176166
Pipeline.prototype.sendCommand = function(command) {
167+
if (this._transactions > 0) {
168+
command.inTransaction = true;
169+
}
170+
177171
const position = this._queue.length;
172+
command.pipelineIndex = position;
178173

179174
command.promise
180175
.then(result => {

lib/redis/event_handler.ts

+62
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
"use strict";
22

3+
import Deque = require("denque");
4+
import { AbortError } from "redis-errors";
35
import Command from "../command";
46
import { MaxRetriesPerRequestError } from "../errors";
7+
import { ICommandItem, ICommand } from "../types";
58
import { Debug, noop, CONNECTION_CLOSED_ERROR_MSG } from "../utils";
69
import DataHandler from "../DataHandler";
710

@@ -77,6 +80,61 @@ export function connectHandler(self) {
7780
};
7881
}
7982

83+
function abortError(command: ICommand) {
84+
const err = new AbortError("Command aborted due to connection close");
85+
(err as any).command = {
86+
name: command.name,
87+
args: command.args
88+
};
89+
return err;
90+
}
91+
92+
// If a contiguous set of pipeline commands starts from index zero then they
93+
// can be safely reattempted. If however we have a chain of pipelined commands
94+
// starting at index 1 or more it means we received a partial response before
95+
// the connection close and those pipelined commands must be aborted. For
96+
// example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
97+
// aborting and purging we'll have a queue that looks like this: [0, 1, 2]
98+
function abortIncompletePipelines(commandQueue: Deque<ICommandItem>) {
99+
let expectedIndex = 0;
100+
for (let i = 0; i < commandQueue.length; ) {
101+
const command = commandQueue.peekAt(i).command as Command;
102+
const pipelineIndex = command.pipelineIndex;
103+
if (pipelineIndex === undefined || pipelineIndex === 0) {
104+
expectedIndex = 0;
105+
}
106+
if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
107+
commandQueue.remove(i, 1);
108+
command.reject(abortError(command));
109+
continue;
110+
}
111+
i++;
112+
}
113+
}
114+
115+
// If only a partial transaction result was received before connection close,
116+
// we have to abort any transaction fragments that may have ended up in the
117+
// offline queue
118+
function abortTransactionFragments(commandQueue: Deque<ICommandItem>) {
119+
for (let i = 0; i < commandQueue.length; ) {
120+
const command = commandQueue.peekAt(i).command as Command;
121+
if (command.name === "multi") {
122+
break;
123+
}
124+
if (command.name === "exec") {
125+
commandQueue.remove(i, 1);
126+
command.reject(abortError(command));
127+
break;
128+
}
129+
if ((command as Command).inTransaction) {
130+
commandQueue.remove(i, 1);
131+
command.reject(abortError(command));
132+
} else {
133+
i++;
134+
}
135+
}
136+
}
137+
80138
export function closeHandler(self) {
81139
return function() {
82140
self.setStatus("close");
@@ -85,8 +143,12 @@ export function closeHandler(self) {
85143
self.prevCondition = self.condition;
86144
}
87145
if (self.commandQueue.length) {
146+
abortIncompletePipelines(self.commandQueue);
88147
self.prevCommandQueue = self.commandQueue;
89148
}
149+
if (self.offlineQueue.length) {
150+
abortTransactionFragments(self.offlineQueue);
151+
}
90152

91153
if (self.manuallyClosing) {
92154
self.manuallyClosing = false;

test/functional/elasticache.ts

+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import Redis from "../../lib/redis";
2+
import { expect } from "chai";
3+
import MockServer from "../helpers/mock_server";
4+
5+
// AWS Elasticache closes the connection immediately when it encounters a READONLY error
6+
function simulateElasticache(options: {
7+
reconnectOnErrorValue: boolean | number;
8+
}) {
9+
let inTransaction = false;
10+
const mockServer = new MockServer(30000, (argv, socket, flags) => {
11+
switch (argv[0]) {
12+
case "multi":
13+
inTransaction = true;
14+
return MockServer.raw("+OK\r\n");
15+
case "del":
16+
flags.disconnect = true;
17+
return new Error(
18+
"READONLY You can't write against a read only replica."
19+
);
20+
case "get":
21+
return inTransaction ? MockServer.raw("+QUEUED\r\n") : argv[1];
22+
case "exec":
23+
inTransaction = false;
24+
return [];
25+
}
26+
});
27+
28+
return new Redis({
29+
port: 30000,
30+
reconnectOnError(err: Error): boolean | number {
31+
// bring the mock server back up
32+
mockServer.connect();
33+
return options.reconnectOnErrorValue;
34+
}
35+
});
36+
}
37+
38+
function expectReplyError(err) {
39+
expect(err).to.exist;
40+
expect(err.name).to.eql("ReplyError");
41+
}
42+
43+
function expectAbortError(err) {
44+
expect(err).to.exist;
45+
expect(err.name).to.eql("AbortError");
46+
expect(err.message).to.eql("Command aborted due to connection close");
47+
}
48+
49+
describe("elasticache", function() {
50+
it("should abort a failed transaction when connection is lost", function(done) {
51+
const redis = simulateElasticache({ reconnectOnErrorValue: true });
52+
53+
redis
54+
.multi()
55+
.del("foo")
56+
.del("bar")
57+
.exec(err => {
58+
expectAbortError(err);
59+
expect(err.command).to.eql({
60+
name: "exec",
61+
args: []
62+
});
63+
expect(err.previousErrors).to.have.lengthOf(2);
64+
expectReplyError(err.previousErrors[0]);
65+
expect(err.previousErrors[0].command).to.eql({
66+
name: "del",
67+
args: ["foo"]
68+
});
69+
expectAbortError(err.previousErrors[1]);
70+
expect(err.previousErrors[1].command).to.eql({
71+
name: "del",
72+
args: ["bar"]
73+
});
74+
75+
// ensure we've recovered into a healthy state
76+
redis.get("foo", (err, res) => {
77+
expect(res).to.eql("foo");
78+
done();
79+
});
80+
});
81+
});
82+
83+
it("should not resend failed transaction commands", function(done) {
84+
const redis = simulateElasticache({ reconnectOnErrorValue: 2 });
85+
redis
86+
.multi()
87+
.del("foo")
88+
.get("bar")
89+
.exec(err => {
90+
expectAbortError(err);
91+
expect(err.command).to.eql({
92+
name: "exec",
93+
args: []
94+
});
95+
expect(err.previousErrors).to.have.lengthOf(2);
96+
expectAbortError(err.previousErrors[0]);
97+
expect(err.previousErrors[0].command).to.eql({
98+
name: "del",
99+
args: ["foo"]
100+
});
101+
expectAbortError(err.previousErrors[1]);
102+
expect(err.previousErrors[1].command).to.eql({
103+
name: "get",
104+
args: ["bar"]
105+
});
106+
107+
// ensure we've recovered into a healthy state
108+
redis.get("foo", (err, res) => {
109+
expect(res).to.eql("foo");
110+
done();
111+
});
112+
});
113+
});
114+
115+
it("should resend intact pipelines", function(done) {
116+
const redis = simulateElasticache({ reconnectOnErrorValue: true });
117+
118+
let p1Result;
119+
redis
120+
.pipeline()
121+
.del("foo")
122+
.get("bar")
123+
.exec((err, result) => (p1Result = result));
124+
125+
redis
126+
.pipeline()
127+
.get("baz")
128+
.get("qux")
129+
.exec((err, p2Result) => {
130+
// First pipeline should have been aborted
131+
expect(p1Result).to.have.lengthOf(2);
132+
expect(p1Result[0]).to.have.lengthOf(1);
133+
expect(p1Result[1]).to.have.lengthOf(1);
134+
expectReplyError(p1Result[0][0]);
135+
expect(p1Result[0][0].command).to.eql({
136+
name: "del",
137+
args: ["foo"]
138+
});
139+
expectAbortError(p1Result[1][0]);
140+
expect(p1Result[1][0].command).to.eql({
141+
name: "get",
142+
args: ["bar"]
143+
});
144+
145+
// Second pipeline was intact and should have been retried successfully
146+
expect(p2Result).to.have.lengthOf(2);
147+
expect(p2Result[0]).to.eql([null, "baz"]);
148+
expect(p2Result[1]).to.eql([null, "qux"]);
149+
150+
done();
151+
});
152+
});
153+
});

test/functional/reconnect_on_error.ts

+35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import Redis from "../../lib/redis";
22
import { expect } from "chai";
3+
import * as sinon from "sinon";
34

45
describe("reconnectOnError", function() {
56
it("should pass the error as the first param", function(done) {
@@ -109,4 +110,38 @@ describe("reconnectOnError", function() {
109110
done();
110111
});
111112
});
113+
114+
it("should work with pipelined multi", function(done) {
115+
var redis = new Redis({
116+
reconnectOnError: function() {
117+
// deleting foo allows sadd below to succeed on the second try
118+
redis.del("foo");
119+
return 2;
120+
}
121+
});
122+
var delSpy = sinon.spy(redis, "del");
123+
124+
redis.set("foo", "bar");
125+
redis.set("i", 1);
126+
redis
127+
.pipeline()
128+
.sadd("foo", "a") // trigger a WRONGTYPE error
129+
.multi()
130+
.get("foo")
131+
.incr("i")
132+
.exec()
133+
.exec(function(err, res) {
134+
expect(delSpy.calledOnce).to.eql(true);
135+
expect(delSpy.firstCall.args[0]).to.eql("foo");
136+
expect(err).to.be.null;
137+
expect(res).to.eql([
138+
[null, 1],
139+
[null, "OK"],
140+
[null, "QUEUED"],
141+
[null, "QUEUED"],
142+
[null, ["bar", 2]]
143+
]);
144+
done();
145+
});
146+
});
112147
});

test/helpers/mock_server.ts

+14-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,14 @@ export function getConnectionName(socket: Socket): string | undefined {
3232
return connectionNameMap.get(socket);
3333
}
3434

35-
export type MockServerHandler = (reply: any, socket: Socket) => any;
35+
interface IFlags {
36+
disconnect?: boolean;
37+
}
38+
export type MockServerHandler = (
39+
reply: any,
40+
socket: Socket,
41+
flags: IFlags
42+
) => any;
3643

3744
export default class MockServer extends EventEmitter {
3845
static REDIS_OK = "+OK";
@@ -84,7 +91,12 @@ export default class MockServer extends EventEmitter {
8491
this.write(c, this.slotTable);
8592
return;
8693
}
87-
this.write(c, this.handler && this.handler(reply, c));
94+
let flags: Flags = {};
95+
let handlerResult = this.handler && this.handler(reply, c, flags);
96+
this.write(c, handlerResult);
97+
if (flags.disconnect) {
98+
this.disconnect();
99+
}
88100
},
89101
returnError: function() {}
90102
});

0 commit comments

Comments
 (0)