Skip to content

Commit 14c68db

Browse files
authored
Merge pull request #3027 from activeloopai/azure_retry
added retry
2 parents bd67a34 + 8ef5212 commit 14c68db

File tree

1 file changed

+137
-6
lines changed

1 file changed

+137
-6
lines changed

deeplake/core/storage/azure.py

+137-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from concurrent.futures import ThreadPoolExecutor
1111
from deeplake.util.path import relpath
1212
from concurrent import futures
13+
from deeplake.util.warnings import always_warn
14+
import time
1315

1416

1517
class AzureProvider(StorageProvider):
@@ -87,7 +89,7 @@ def _set_clients(self):
8789
self.container_name
8890
)
8991

90-
def __setitem__(self, path, content):
92+
def _set(self, path, content):
9193
self.check_readonly()
9294
self._check_update_creds()
9395
if isinstance(content, memoryview):
@@ -99,10 +101,49 @@ def __setitem__(self, path, content):
99101
)
100102
blob_client.upload_blob(content, overwrite=True)
101103

104+
def __setitem__(self, path, content):
105+
from azure.core.exceptions import ClientAuthenticationError # type: ignore
106+
107+
try:
108+
self._set(path, content)
109+
except ClientAuthenticationError as ex:
110+
tries = 5
111+
retry_wait = 0
112+
for i in range(1, tries + 1):
113+
always_warn(f"Encountered connection error, retry {i} out of {tries}")
114+
retry_wait = self._retry_wait_and_extend(retry_wait, ex)
115+
try:
116+
self._set(path, content)
117+
always_warn(
118+
f"Connection re-established after {i} {['retries', 'retry'][i==1]}."
119+
)
120+
return
121+
except Exception:
122+
pass
123+
raise ex
124+
102125
def __getitem__(self, path):
103-
return self.get_bytes(path)
126+
from azure.core.exceptions import ClientAuthenticationError # type: ignore
104127

105-
def __delitem__(self, path):
128+
try:
129+
return self.get_bytes(path)
130+
except ClientAuthenticationError as ex:
131+
tries = 5
132+
retry_wait = 0
133+
for i in range(1, tries + 1):
134+
always_warn(f"Encountered connection error, retry {i} out of {tries}")
135+
retry_wait = self._retry_wait_and_extend(retry_wait, ex)
136+
try:
137+
res = self.get_bytes(path)
138+
always_warn(
139+
f"Connection re-established after {i} {['retries', 'retry'][i==1]}."
140+
)
141+
return res
142+
except Exception:
143+
pass
144+
raise ex
145+
146+
def _del(self, path):
106147
self.check_readonly()
107148
self._check_update_creds()
108149
blob_client = self.container_client.get_blob_client(
@@ -112,6 +153,27 @@ def __delitem__(self, path):
112153
raise KeyError(path)
113154
blob_client.delete_blob()
114155

156+
def __delitem__(self, path):
157+
from azure.core.exceptions import ClientAuthenticationError # type: ignore
158+
159+
try:
160+
return self._del(path)
161+
except ClientAuthenticationError as ex:
162+
tries = 5
163+
retry_wait = 0
164+
for i in range(1, tries + 1):
165+
always_warn(f"Encountered connection error, retry {i} out of {tries}")
166+
retry_wait = self._retry_wait_and_extend(retry_wait, ex)
167+
try:
168+
self._del(path)
169+
always_warn(
170+
f"Connection re-established after {i} {['retries', 'retry'][i==1]}."
171+
)
172+
return
173+
except Exception:
174+
pass
175+
raise ex
176+
115177
def get_bytes(
116178
self,
117179
path: str,
@@ -145,7 +207,7 @@ def get_bytes(
145207
byts = blob_client.download_blob(offset=offset, length=length).readall()
146208
return byts
147209

148-
def clear(self, prefix=""):
210+
def _clear(self, prefix=""):
149211
self.check_readonly()
150212
self._check_update_creds()
151213
blobs = [
@@ -156,6 +218,27 @@ def clear(self, prefix=""):
156218
for batch in batches:
157219
self.container_client.delete_blobs(*batch)
158220

221+
def clear(self, prefix=""):
222+
from azure.core.exceptions import ClientAuthenticationError # type: ignore
223+
224+
try:
225+
return self._clear(prefix)
226+
except ClientAuthenticationError as ex:
227+
tries = 5
228+
retry_wait = 0
229+
for i in range(1, tries + 1):
230+
always_warn(f"Encountered connection error, retry {i} out of {tries}")
231+
retry_wait = self._retry_wait_and_extend(retry_wait, ex)
232+
try:
233+
self._clear(prefix)
234+
always_warn(
235+
f"Connection re-established after {i} {['retries', 'retry'][i==1]}."
236+
)
237+
return
238+
except Exception:
239+
pass
240+
raise ex
241+
159242
def get_sas_token(self):
160243
from azure.storage.blob import generate_container_sas, ContainerSasPermissions # type: ignore
161244

@@ -275,7 +358,7 @@ def rename(self, root: str):
275358
source_blob.delete_blob()
276359
self.root_folder = root_folder
277360

278-
def get_object_size(self, path: str) -> int:
361+
def _get_object_size(self, path: str) -> int:
279362
self._check_update_creds()
280363
blob_client = self.container_client.get_blob_client(
281364
f"{self.root_folder}/{path}"
@@ -284,6 +367,27 @@ def get_object_size(self, path: str) -> int:
284367
raise KeyError(path)
285368
return blob_client.get_blob_properties().size
286369

370+
def get_object_size(self, path: str) -> int:
371+
from azure.core.exceptions import ClientAuthenticationError # type: ignore
372+
373+
try:
374+
return self._get_object_size(path)
375+
except ClientAuthenticationError as ex:
376+
tries = 5
377+
retry_wait = 0
378+
for i in range(1, tries + 1):
379+
always_warn(f"Encountered connection error, retry {i} out of {tries}")
380+
retry_wait = self._retry_wait_and_extend(retry_wait, ex)
381+
try:
382+
res = self._get_object_size(path)
383+
always_warn(
384+
f"Connection re-established after {i} {['retries', 'retry'][i==1]}."
385+
)
386+
return res
387+
except Exception:
388+
pass
389+
raise ex
390+
287391
def get_clients_from_full_path(self, url: str):
288392
from azure.storage.blob import BlobServiceClient # type: ignore
289393

@@ -351,13 +455,33 @@ def get_presigned_url(self, path: str, full: bool = False) -> str:
351455

352456
return url
353457

354-
def get_object_from_full_url(self, url: str) -> bytes:
458+
def _get_object_from_full_url(self, url: str) -> bytes:
459+
self._check_update_creds()
355460
blob_client, _ = self.get_clients_from_full_path(url)
356461
# Azure raises an error when trying to download an empty blob
357462
if blob_client.get_blob_properties().size == 0:
358463
return b""
359464
return blob_client.download_blob().readall()
360465

466+
def get_object_from_full_url(self, url: str) -> bytes:
467+
try:
468+
return self._get_object_from_full_url(url)
469+
except ClientAuthenticationError as ex:
470+
tries = 5
471+
retry_wait = 0
472+
for i in range(1, tries + 1):
473+
always_warn(f"Encountered connection error, retry {i} out of {tries}")
474+
retry_wait = self._retry_wait_and_extend(retry_wait, ex)
475+
try:
476+
res = self._get_object_from_full_url(url)
477+
always_warn(
478+
f"Connection re-established after {i} {['retries', 'retry'][i==1]}."
479+
)
480+
return res
481+
except Exception:
482+
pass
483+
raise ex
484+
361485
def _check_update_creds(self, force=False):
362486
"""If the client has an expiration time, check if creds are expired and fetch new ones.
363487
This would only happen for datasets stored on Deep Lake storage for which temporary 12 hour credentials are generated.
@@ -402,3 +526,10 @@ def get_items(self, keys):
402526
yield key, future.result()
403527
else:
404528
yield key, exception
529+
530+
def _retry_wait_and_extend(self, retry_wait: int, err: Exception):
531+
time.sleep(retry_wait)
532+
533+
if retry_wait == 0:
534+
return 1
535+
return retry_wait * 2

0 commit comments

Comments
 (0)