File tree 1 file changed +5
-1
lines changed
1 file changed +5
-1
lines changed Original file line number Diff line number Diff line change @@ -186,17 +186,21 @@ async def _consume(self):
186
186
else :
187
187
await self ._pool_job ()
188
188
189
- async def process_dead_jobs (self , raise_exception : bool = False ):
189
+ async def process_dead_jobs (self , raise_exception : bool = False ) -> list [Job ]:
190
+ unprocessed_jobs = []
190
191
for job in (Job .from_redis (j ) for j in await self .redis .zrange (self .dead_key , 0 , - 1 )):
191
192
try :
192
193
await self .awaitable_function (* job .args , ** job .kwargs )
193
194
except Exception as e :
194
195
logger .exception (e )
196
+ unprocessed_jobs .append (job )
195
197
if raise_exception :
196
198
raise e
197
199
else :
198
200
await self .redis .zrem (self .dead_key , job .to_redis ())
199
201
202
+ return unprocessed_jobs
203
+
200
204
async def _move_expired_jobs (self ):
201
205
while True :
202
206
if self ._stop_event .is_set ():
You can’t perform that action at this time.
0 commit comments