Skip to content

Commit 0e77e31

Browse files
committed
Automatically retry enqueing after connection reset
1 parent cde82d1 commit 0e77e31

File tree

2 files changed

+64
-2
lines changed

2 files changed

+64
-2
lines changed

lib/queue_classic/queue.rb

+20-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,16 @@ def conn_adapter
3838
def enqueue(method, *args)
3939
QC.log_yield(:measure => 'queue.enqueue') do
4040
s = "INSERT INTO #{QC.table_name} (q_name, method, args) VALUES ($1, $2, $3) RETURNING id"
41-
conn_adapter.execute(s, name, method, JSON.dump(args))
41+
begin
42+
retries ||= 0
43+
conn_adapter.execute(s, name, method, JSON.dump(args))
44+
rescue PGError
45+
if (retries += 1) < 2
46+
retry
47+
else
48+
raise
49+
end
50+
end
4251
end
4352
end
4453

@@ -64,7 +73,16 @@ def enqueue_in(seconds, method, *args)
6473
s = "INSERT INTO #{QC.table_name} (q_name, method, args, scheduled_at)
6574
VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds')
6675
RETURNING id"
67-
conn_adapter.execute(s, name, method, JSON.dump(args))
76+
begin
77+
retries ||= 0
78+
conn_adapter.execute(s, name, method, JSON.dump(args))
79+
rescue PGError
80+
if (retries += 1) < 2
81+
retry
82+
else
83+
raise
84+
end
85+
end
6886
end
6987
end
7088

test/queue_test.rb

+44
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,50 @@ def conn.reset(*args); raise(ResetError) end
112112
queue.conn_adapter.disconnect
113113
end
114114

115+
def test_enqueue_retry
116+
queue = QC::Queue.new("queue_classic_jobs")
117+
queue.conn_adapter = QC::ConnAdapter.new
118+
conn = queue.conn_adapter.connection
119+
conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil
120+
queue.enqueue("Klass.method")
121+
assert_equal(1, queue.count)
122+
queue.conn_adapter.disconnect
123+
end
124+
125+
def test_enqueue_stops_retrying_on_permanent_error
126+
queue = QC::Queue.new("queue_classic_jobs")
127+
queue.conn_adapter = QC::ConnAdapter.new
128+
conn = queue.conn_adapter.connection
129+
conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil
130+
# Simulate permanent connection error
131+
def conn.exec(*args); raise(PGError); end
132+
# Ensure that the error is reraised on second time
133+
assert_raises(PG::Error) {queue.enqueue("Klass.other_method")}
134+
queue.conn_adapter.disconnect
135+
end
136+
137+
def test_enqueue_in_retry
138+
queue = QC::Queue.new("queue_classic_jobs")
139+
queue.conn_adapter = QC::ConnAdapter.new
140+
conn = queue.conn_adapter.connection
141+
conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil
142+
queue.enqueue_in(10,"Klass.method")
143+
assert_equal(1, queue.count)
144+
queue.conn_adapter.disconnect
145+
end
146+
147+
def test_enqueue_in_stops_retrying_on_permanent_error
148+
queue = QC::Queue.new("queue_classic_jobs")
149+
queue.conn_adapter = QC::ConnAdapter.new
150+
conn = queue.conn_adapter.connection
151+
conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil
152+
# Simulate permanent connection error
153+
def conn.exec(*args); raise(PGError); end
154+
# Ensure that the error is reraised on second time
155+
assert_raises(PG::Error) {queue.enqueue_in(10,"Klass.method")}
156+
queue.conn_adapter.disconnect
157+
end
158+
115159
def test_custom_default_queue
116160
queue_class = Class.new do
117161
attr_accessor :jobs

0 commit comments

Comments
 (0)