diff --git a/lib/queue_classic/queue.rb b/lib/queue_classic/queue.rb index ae789d25..f02d8293 100644 --- a/lib/queue_classic/queue.rb +++ b/lib/queue_classic/queue.rb @@ -38,7 +38,16 @@ def conn_adapter def enqueue(method, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{QC.table_name} (q_name, method, args) VALUES ($1, $2, $3) RETURNING id" - conn_adapter.execute(s, name, method, JSON.dump(args)) + begin + retries ||= 0 + conn_adapter.execute(s, name, method, JSON.dump(args)) + rescue PGError + if (retries += 1) < 2 + retry + else + raise + end + end end end @@ -64,7 +73,16 @@ def enqueue_in(seconds, method, *args) s = "INSERT INTO #{QC.table_name} (q_name, method, args, scheduled_at) VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds') RETURNING id" - conn_adapter.execute(s, name, method, JSON.dump(args)) + begin + retries ||= 0 + conn_adapter.execute(s, name, method, JSON.dump(args)) + rescue PGError + if (retries += 1) < 2 + retry + else + raise + end + end end end diff --git a/test/queue_test.rb b/test/queue_test.rb index ce788810..c512cbdf 100644 --- a/test/queue_test.rb +++ b/test/queue_test.rb @@ -112,6 +112,50 @@ def conn.reset(*args); raise(ResetError) end queue.conn_adapter.disconnect end + def test_enqueue_retry + queue = QC::Queue.new("queue_classic_jobs") + queue.conn_adapter = QC::ConnAdapter.new + conn = queue.conn_adapter.connection + conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil + queue.enqueue("Klass.method") + assert_equal(1, queue.count) + queue.conn_adapter.disconnect + end + + def test_enqueue_stops_retrying_on_permanent_error + queue = QC::Queue.new("queue_classic_jobs") + queue.conn_adapter = QC::ConnAdapter.new + conn = queue.conn_adapter.connection + conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil + # Simulate permanent connection error + def conn.exec(*args); raise(PGError); end + # Ensure that the error is reraised on second time + assert_raises(PG::Error) {queue.enqueue("Klass.other_method")} + queue.conn_adapter.disconnect + end + + def test_enqueue_in_retry + queue = QC::Queue.new("queue_classic_jobs") + queue.conn_adapter = QC::ConnAdapter.new + conn = queue.conn_adapter.connection + conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil + queue.enqueue_in(10,"Klass.method") + assert_equal(1, queue.count) + queue.conn_adapter.disconnect + end + + def test_enqueue_in_stops_retrying_on_permanent_error + queue = QC::Queue.new("queue_classic_jobs") + queue.conn_adapter = QC::ConnAdapter.new + conn = queue.conn_adapter.connection + conn.exec('select pg_terminate_backend(pg_backend_pid())') rescue nil + # Simulate permanent connection error + def conn.exec(*args); raise(PGError); end + # Ensure that the error is reraised on second time + assert_raises(PG::Error) {queue.enqueue_in(10,"Klass.method")} + queue.conn_adapter.disconnect + end + def test_custom_default_queue queue_class = Class.new do attr_accessor :jobs