Skip to content

Commit b798107

Browse files
authored
fix(cluster): lazyConnect with pipeline (#1408)
* fix(cluster): lazyConnect with pipeline * add test for cluster lazyConnect * catch connect error
1 parent ccd381a commit b798107

File tree

5 files changed

+54
-4
lines changed

5 files changed

+54
-4
lines changed

lib/autoPipelining.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as PromiseContainer from "./promiseContainer";
2-
import { flatten, isArguments } from "./utils/lodash";
2+
import { flatten, isArguments, noop } from "./utils/lodash";
33
import * as calculateSlot from "cluster-key-slot";
44
import asCallback from "standard-as-callback";
55

@@ -120,6 +120,7 @@ export function executeWithAutoPipelining(
120120

121121
// On cluster mode let's wait for slots to be available
122122
if (client.isCluster && !client.slots.length) {
123+
if (client.status === "wait") client.connect().catch(noop);
123124
return new CustomPromise(function (resolve, reject) {
124125
client.delayUntilReady((err) => {
125126
if (err) {

lib/pipeline.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import * as pMap from "p-map";
77
import * as PromiseContainer from "./promiseContainer";
88
import { CallbackFunction } from "./types";
99
import Commander from "./commander";
10+
import { noop } from "./utils";
1011

1112
/*
1213
This function derives from the cluster-key-slot implementation.
@@ -16,7 +17,7 @@ import Commander from "./commander";
1617
function generateMultiWithNodes(redis, keys) {
1718
const slot = calculateSlot(keys[0]);
1819
const target = redis._groupsBySlot[slot];
19-
20+
2021
for (let i = 1; i < keys.length; i++) {
2122
if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) {
2223
return -1;
@@ -156,7 +157,8 @@ Pipeline.prototype.fillResult = function (value, position) {
156157
moved: function (slot, key) {
157158
_this.preferKey = key;
158159
_this.redis.slots[errv[1]] = [key];
159-
_this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
160+
_this.redis._groupsBySlot[errv[1]] =
161+
_this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
160162
_this.redis.refreshSlotsCache();
161163
_this.exec();
162164
},
@@ -241,6 +243,7 @@ Pipeline.prototype.execBuffer = deprecate(function () {
241243
Pipeline.prototype.exec = function (callback: CallbackFunction) {
242244
// Wait for the cluster to be connected, since we need nodes information before continuing
243245
if (this.isCluster && !this.redis.slots.length) {
246+
if (this.redis.status === "wait") this.redis.connect().catch(noop);
244247
this.redis.delayUntilReady((err) => {
245248
if (err) {
246249
callback(err);

lib/transaction.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { wrapMultiResult } from "./utils";
1+
import { wrapMultiResult, noop } from "./utils";
22
import asCallback from "standard-as-callback";
33
import Pipeline from "./pipeline";
44
import { CallbackFunction } from "./types";
@@ -30,6 +30,7 @@ export function addTransactionSupport(redis) {
3030
pipeline.exec = function (callback: CallbackFunction) {
3131
// Wait for the cluster to be connected, since we need nodes information before continuing
3232
if (this.isCluster && !this.redis.slots.length) {
33+
if (this.redis.status === "wait") this.redis.connect().catch(noop);
3334
return asCallback(
3435
new Promise((resolve, reject) => {
3536
this.redis.delayUntilReady((err) => {

test/functional/cluster/autopipelining.ts

+22
Original file line numberDiff line numberDiff line change
@@ -594,4 +594,26 @@ describe("autoPipelining for cluster", function () {
594594
changeSlot(cluster, key1Slot, key2Slot);
595595
});
596596
});
597+
598+
it("should support lazyConnect", async () => {
599+
const cluster = new Cluster(hosts, {
600+
enableAutoPipelining: true,
601+
lazyConnect: true,
602+
});
603+
604+
await cluster.set("foo1", "bar1");
605+
await cluster.set("foo5", "bar5");
606+
607+
expect(
608+
await Promise.all([
609+
cluster.get("foo1"),
610+
cluster.get("foo5"),
611+
cluster.get("foo1"),
612+
cluster.get("foo5"),
613+
cluster.get("foo1"),
614+
])
615+
).to.eql(["bar1", "bar5", "bar1", "bar5", "bar1"]);
616+
617+
cluster.disconnect();
618+
});
597619
});

test/functional/lazy_connect.ts

+23
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import Redis from "../../lib/redis";
22
import { expect } from "chai";
33
import * as sinon from "sinon";
44
import { Cluster } from "../../lib";
5+
import Pipeline from "../../lib/pipeline";
56

67
describe("lazy connect", function () {
78
it("should not call `connect` when init", function () {
@@ -51,6 +52,28 @@ describe("lazy connect", function () {
5152
stub.restore();
5253
});
5354

55+
it("should call connect when pipeline exec", (done) => {
56+
const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => {
57+
stub.restore();
58+
done();
59+
});
60+
const cluster = new Cluster([], { lazyConnect: true });
61+
const pipline = new Pipeline(cluster);
62+
pipline.get("fool1").exec(() => {});
63+
});
64+
65+
it("should call connect when transction exec", (done) => {
66+
const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => {
67+
stub.restore();
68+
done();
69+
});
70+
const cluster = new Cluster([], { lazyConnect: true });
71+
cluster
72+
.multi()
73+
.get("fool1")
74+
.exec(() => {});
75+
});
76+
5477
it('should quit before "close" being emited', function (done) {
5578
const stub = sinon
5679
.stub(Cluster.prototype, "connect")

0 commit comments

Comments
 (0)