Skip to content

Commit c6d5f8a

Browse files
authoredFeb 26, 2025··
fix: retry 404 errors in Client.query(...) (#2135)
* fix: retry 404 errors in `Client.query(...)` * retry on 404 * only retry notfound on jobs.insert * try to improve code coverage * disambiguate job not found from dataset/table not found * remove use of private attributes * fix unit tests * fix cover for retry.py
1 parent de10185 commit c6d5f8a

File tree

3 files changed

+252
-3
lines changed

3 files changed

+252
-3
lines changed
 

‎google/cloud/bigquery/_job_helpers.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from google.cloud.bigquery import job
4848
import google.cloud.bigquery.query
4949
from google.cloud.bigquery import table
50+
import google.cloud.bigquery.retry
5051
from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE
5152

5253
# Avoid circular imports
@@ -142,12 +143,28 @@ def do_query():
142143
raise create_exc
143144

144145
try:
146+
# Sometimes we get a 404 after a Conflict. In this case, we
147+
# have pretty high confidence that by retrying the 404, we'll
148+
# (hopefully) eventually recover the job.
149+
# https://github.com/googleapis/python-bigquery/issues/2134
150+
#
151+
# Allow users who want to completely disable retries to
152+
# continue to do so by setting retry to None.
153+
get_job_retry = retry
154+
if retry is not None:
155+
# TODO(tswast): Amend the user's retry object with allowing
156+
# 404 to retry when there's a public way to do so.
157+
# https://github.com/googleapis/python-api-core/issues/796
158+
get_job_retry = (
159+
google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY
160+
)
161+
145162
query_job = client.get_job(
146163
job_id,
147164
project=project,
148165
location=location,
149-
retry=retry,
150-
timeout=timeout,
166+
retry=get_job_retry,
167+
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
151168
)
152169
except core_exceptions.GoogleAPIError: # (includes RetryError)
153170
raise
@@ -156,7 +173,13 @@ def do_query():
156173
else:
157174
return query_job
158175

176+
# Allow users who want to completely disable retries to
177+
# continue to do so by setting job_retry to None.
178+
if job_retry is not None:
179+
do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query)
180+
159181
future = do_query()
182+
160183
# The future might be in a failed state now, but if it's
161184
# unrecoverable, we'll find out when we ask for it's result, at which
162185
# point, we may retry.

‎google/cloud/bigquery/retry.py

+54
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,32 @@ def _should_retry(exc):
8282
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
8383
"""
8484

85+
86+
def _should_retry_get_job_conflict(exc):
87+
"""Predicate for determining when to retry a jobs.get call after a conflict error.
88+
89+
Sometimes we get a 404 after a Conflict. In this case, we
90+
have pretty high confidence that by retrying the 404, we'll
91+
(hopefully) eventually recover the job.
92+
https://github.com/googleapis/python-bigquery/issues/2134
93+
94+
Note: we may be able to extend this to user-specified predicates
95+
after https://github.com/googleapis/python-api-core/issues/796
96+
to tweak existing Retry object predicates.
97+
"""
98+
return isinstance(exc, exceptions.NotFound) or _should_retry(exc)
99+
100+
101+
# Pick a deadline smaller than our other deadlines since we want to timeout
102+
# before those expire.
103+
_DEFAULT_GET_JOB_CONFLICT_DEADLINE = _DEFAULT_RETRY_DEADLINE / 3.0
104+
_DEFAULT_GET_JOB_CONFLICT_RETRY = retry.Retry(
105+
predicate=_should_retry_get_job_conflict,
106+
deadline=_DEFAULT_GET_JOB_CONFLICT_DEADLINE,
107+
)
108+
"""Private, may be removed in future."""
109+
110+
85111
# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We
86112
# briefly had a default timeout, but even setting it at more than twice the
87113
# theoretical server-side default timeout of 2 minutes was not enough for
@@ -142,6 +168,34 @@ def _job_should_retry(exc):
142168
The default job retry object.
143169
"""
144170

171+
172+
def _query_job_insert_should_retry(exc):
173+
# Per https://github.com/googleapis/python-bigquery/issues/2134, sometimes
174+
# we get a 404 error. In this case, if we get this far, assume that the job
175+
# doesn't actually exist and try again. We can't add 404 to the default
176+
# job_retry because that happens for errors like "this table does not
177+
# exist", which probably won't resolve with a retry.
178+
if isinstance(exc, exceptions.RetryError):
179+
exc = exc.cause
180+
181+
if isinstance(exc, exceptions.NotFound):
182+
message = exc.message
183+
# Don't try to retry table/dataset not found, just job not found.
184+
# The URL contains jobs, so use whitespace to disambiguate.
185+
return message is not None and " job" in message.lower()
186+
187+
return _job_should_retry(exc)
188+
189+
190+
_DEFAULT_QUERY_JOB_INSERT_RETRY = retry.Retry(
191+
predicate=_query_job_insert_should_retry,
192+
# jobs.insert doesn't wait for the job to complete, so we don't need the
193+
# long _DEFAULT_JOB_DEADLINE for this part.
194+
deadline=_DEFAULT_RETRY_DEADLINE,
195+
)
196+
"""Private, may be removed in future."""
197+
198+
145199
DEFAULT_GET_JOB_TIMEOUT = 128
146200
"""
147201
Default timeout for Client.get_job().

‎tests/unit/test_client.py

+173-1
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
from unittest import mock
2929
import warnings
3030

31-
import requests
31+
import freezegun
3232
import packaging
3333
import pytest
34+
import requests
35+
36+
import google.api
3437

3538

3639
try:
@@ -55,6 +58,8 @@
5558
import google.cloud._helpers
5659
from google.cloud import bigquery
5760

61+
from google.cloud.bigquery import job as bqjob
62+
import google.cloud.bigquery._job_helpers
5863
from google.cloud.bigquery.dataset import DatasetReference
5964
from google.cloud.bigquery import exceptions
6065
from google.cloud.bigquery import ParquetOptions
@@ -5308,6 +5313,173 @@ def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails(self):
53085313
with pytest.raises(DataLoss, match="we lost your job, sorry"):
53095314
client.query("SELECT 1;", job_id=None)
53105315

5316+
def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails_no_retries(self):
5317+
from google.api_core.exceptions import Conflict
5318+
from google.api_core.exceptions import DataLoss
5319+
from google.cloud.bigquery.job import QueryJob
5320+
5321+
creds = _make_credentials()
5322+
http = object()
5323+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
5324+
5325+
job_create_error = Conflict("Job already exists.")
5326+
job_begin_patcher = mock.patch.object(
5327+
QueryJob, "_begin", side_effect=job_create_error
5328+
)
5329+
get_job_patcher = mock.patch.object(
5330+
client, "get_job", side_effect=DataLoss("we lost your job, sorry")
5331+
)
5332+
5333+
with job_begin_patcher, get_job_patcher:
5334+
# If get job request fails but supposedly there does exist a job
5335+
# with this ID already, raise the exception explaining why we
5336+
# couldn't recover the job.
5337+
with pytest.raises(DataLoss, match="we lost your job, sorry"):
5338+
client.query(
5339+
"SELECT 1;",
5340+
job_id=None,
5341+
# Explicitly test with no retries to make sure those branches are covered.
5342+
retry=None,
5343+
job_retry=None,
5344+
)
5345+
5346+
def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_retries_404(self):
5347+
"""Regression test for https://github.com/googleapis/python-bigquery/issues/2134
5348+
5349+
Sometimes after a Conflict, the fetch fails with a 404, but we know
5350+
because of the conflict that really the job does exist. Retry until we
5351+
get the job status (or timeout).
5352+
"""
5353+
job_id = "abc123"
5354+
creds = _make_credentials()
5355+
http = object()
5356+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
5357+
conn = client._connection = make_connection(
5358+
# We're mocking QueryJob._begin, so this is only going to be
5359+
# jobs.get requests and responses.
5360+
google.api_core.exceptions.TooManyRequests("this is retriable by default"),
5361+
google.api_core.exceptions.NotFound("we lost your job"),
5362+
google.api_core.exceptions.NotFound("we lost your job again, sorry"),
5363+
{
5364+
"jobReference": {
5365+
"projectId": self.PROJECT,
5366+
"location": "TESTLOC",
5367+
"jobId": job_id,
5368+
}
5369+
},
5370+
)
5371+
5372+
job_create_error = google.api_core.exceptions.Conflict("Job already exists.")
5373+
job_begin_patcher = mock.patch.object(
5374+
bqjob.QueryJob, "_begin", side_effect=job_create_error
5375+
)
5376+
job_id_patcher = mock.patch.object(
5377+
google.cloud.bigquery._job_helpers,
5378+
"make_job_id",
5379+
return_value=job_id,
5380+
)
5381+
5382+
with job_begin_patcher, job_id_patcher:
5383+
# If get job request fails there does exist a job
5384+
# with this ID already, retry 404 until we get it (or fails for a
5385+
# non-retriable reason, see other tests).
5386+
result = client.query("SELECT 1;", job_id=None)
5387+
5388+
jobs_get_path = mock.call(
5389+
method="GET",
5390+
path=f"/projects/{self.PROJECT}/jobs/{job_id}",
5391+
query_params={
5392+
"projection": "full",
5393+
},
5394+
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
5395+
)
5396+
conn.api_request.assert_has_calls(
5397+
# Double-check that it was jobs.get that was called for each of our
5398+
# mocked responses.
5399+
[jobs_get_path]
5400+
* 4,
5401+
)
5402+
assert result.job_id == job_id
5403+
5404+
def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_retries_404_and_query_job_insert(
5405+
self,
5406+
):
5407+
"""Regression test for https://github.com/googleapis/python-bigquery/issues/2134
5408+
5409+
Sometimes after a Conflict, the fetch fails with a 404. If it keeps
5410+
failing with a 404, assume that the job actually doesn't exist.
5411+
"""
5412+
job_id_1 = "abc123"
5413+
job_id_2 = "xyz789"
5414+
creds = _make_credentials()
5415+
http = object()
5416+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
5417+
5418+
# We're mocking QueryJob._begin, so that the connection should only get
5419+
# jobs.get requests.
5420+
job_create_error = google.api_core.exceptions.Conflict("Job already exists.")
5421+
job_begin_patcher = mock.patch.object(
5422+
bqjob.QueryJob, "_begin", side_effect=job_create_error
5423+
)
5424+
conn = client._connection = make_connection(
5425+
google.api_core.exceptions.NotFound("we lost your job again, sorry"),
5426+
{
5427+
"jobReference": {
5428+
"projectId": self.PROJECT,
5429+
"location": "TESTLOC",
5430+
"jobId": job_id_2,
5431+
}
5432+
},
5433+
)
5434+
5435+
# Choose a small deadline so the 404 retries give up.
5436+
retry = (
5437+
google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY.with_deadline(1)
5438+
)
5439+
job_id_patcher = mock.patch.object(
5440+
google.cloud.bigquery._job_helpers,
5441+
"make_job_id",
5442+
side_effect=[job_id_1, job_id_2],
5443+
)
5444+
retry_patcher = mock.patch.object(
5445+
google.cloud.bigquery.retry,
5446+
"_DEFAULT_GET_JOB_CONFLICT_RETRY",
5447+
retry,
5448+
)
5449+
5450+
with freezegun.freeze_time(
5451+
"2025-01-01 00:00:00",
5452+
# 10x the retry deadline to guarantee a timeout.
5453+
auto_tick_seconds=10,
5454+
), job_begin_patcher, job_id_patcher, retry_patcher:
5455+
# If get job request fails there does exist a job
5456+
# with this ID already, retry 404 until we get it (or fails for a
5457+
# non-retriable reason, see other tests).
5458+
result = client.query("SELECT 1;", job_id=None)
5459+
5460+
jobs_get_path_1 = mock.call(
5461+
method="GET",
5462+
path=f"/projects/{self.PROJECT}/jobs/{job_id_1}",
5463+
query_params={
5464+
"projection": "full",
5465+
},
5466+
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
5467+
)
5468+
jobs_get_path_2 = mock.call(
5469+
method="GET",
5470+
path=f"/projects/{self.PROJECT}/jobs/{job_id_2}",
5471+
query_params={
5472+
"projection": "full",
5473+
},
5474+
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
5475+
)
5476+
conn.api_request.assert_has_calls(
5477+
# Double-check that it was jobs.get that was called for each of our
5478+
# mocked responses.
5479+
[jobs_get_path_1, jobs_get_path_2],
5480+
)
5481+
assert result.job_id == job_id_2
5482+
53115483
def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self):
53125484
from google.api_core.exceptions import Conflict
53135485
from google.cloud.bigquery.job import QueryJob

0 commit comments

Comments
 (0)
Please sign in to comment.