Skip to content

Commit 06ef4b1

Browse files
committed
new: StandardDatabase.fetch_transaction (arangodb#329)
1 parent ed35c9d commit 06ef4b1

File tree

4 files changed

+115
-11
lines changed

4 files changed

+115
-11
lines changed

arango/database.py

+12
Original file line numberDiff line numberDiff line change
@@ -2948,6 +2948,14 @@ def begin_batch_execution(
29482948
"""
29492949
return BatchDatabase(self._conn, return_result, max_workers)
29502950

2951+
def fetch_transaction(self, transaction_id: str):
2952+
"""Fetch an existing transaction.
2953+
2954+
:param transaction_id: The ID of the existing transaction.
2955+
:type transaction_id: str
2956+
"""
2957+
return TransactionDatabase(connection=self._conn, transaction_id=transaction_id)
2958+
29512959
def begin_transaction(
29522960
self,
29532961
read: Union[str, Sequence[str], None] = None,
@@ -3125,6 +3133,8 @@ class TransactionDatabase(Database):
31253133
:type lock_timeout: int | None
31263134
:param max_size: Max transaction size in bytes.
31273135
:type max_size: int | None
3136+
:param transaction_id: Initialize using existing transaction
3137+
:type transaction_id: str | None
31283138
"""
31293139

31303140
def __init__(
@@ -3137,6 +3147,7 @@ def __init__(
31373147
allow_implicit: Optional[bool] = None,
31383148
lock_timeout: Optional[int] = None,
31393149
max_size: Optional[int] = None,
3150+
transaction_id: str | None = None,
31403151
) -> None:
31413152
self._executor: TransactionApiExecutor
31423153
super().__init__(
@@ -3150,6 +3161,7 @@ def __init__(
31503161
allow_implicit=allow_implicit,
31513162
lock_timeout=lock_timeout,
31523163
max_size=max_size,
3164+
transaction_id=transaction_id,
31533165
),
31543166
)
31553167

arango/executor.py

+40-11
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ class TransactionApiExecutor:
241241
:type max_size: int
242242
:param allow_dirty_read: Allow reads from followers in a cluster.
243243
:type allow_dirty_read: bool | None
244+
:param transaction_id: Initialize using existing transaction
245+
:type transaction_id: str | None
244246
"""
245247

246248
def __init__(
@@ -254,6 +256,7 @@ def __init__(
254256
lock_timeout: Optional[int] = None,
255257
max_size: Optional[int] = None,
256258
allow_dirty_read: bool = False,
259+
transaction_id: Optional[str] = None,
257260
) -> None:
258261
self._conn = connection
259262

@@ -275,19 +278,45 @@ def __init__(
275278
if max_size is not None:
276279
data["maxTransactionSize"] = max_size
277280

278-
request = Request(
279-
method="post",
280-
endpoint="/_api/transaction/begin",
281-
data=data,
282-
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
283-
)
284-
resp = self._conn.send_request(request)
281+
if transaction_id is None:
282+
request = Request(
283+
method="post",
284+
endpoint="/_api/transaction/begin",
285+
data=data,
286+
headers={"x-arango-allow-dirty-read": "true"}
287+
if allow_dirty_read
288+
else None,
289+
)
290+
resp = self._conn.send_request(request)
285291

286-
if not resp.is_success:
287-
raise TransactionInitError(resp, request)
292+
if not resp.is_success:
293+
raise TransactionInitError(resp, request)
294+
295+
result: Json = resp.body["result"]
296+
self._id: str = result["id"]
297+
else:
298+
request = Request(
299+
method="get",
300+
endpoint=f"/_api/transaction/{transaction_id}",
301+
)
302+
resp = self._conn.send_request(request)
303+
304+
if not resp.is_success:
305+
raise TransactionInitError(resp, request)
306+
307+
result: Json = resp.body["result"]
308+
309+
if result["status"] == "committed":
310+
raise TransactionInitError(
311+
resp, request, "Transaction is already committed"
312+
)
313+
314+
if result["status"] == "aborted":
315+
raise TransactionInitError(
316+
resp, request, "Transaction is already aborted"
317+
)
288318

289-
result: Json = resp.body["result"]
290-
self._id: str = result["id"]
319+
self._id: str = transaction_id
291320

292321
@property
293322
def context(self) -> str:

docs/transaction.rst

+11
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,17 @@ logical unit of work (ACID compliant).
6868
assert '_rev' in txn_col.insert({'_key': 'Lily'})
6969
assert len(txn_col) == 6
7070

71+
# Fetch an existing transaction. Useful if you have received a Transaction ID
72+
# from some other part of your system or an external system.
73+
original_txn = db.begin_transaction(write='students')
74+
txn_col = original_txn.collection('students')
75+
assert '_rev' in txn_col.insert({'_key': 'Abby'})
76+
77+
# txn_db and original_txn now interact with the same transaction
78+
txn_db = db.fetch_transaction(original_txn.transaction_id)
79+
txn_col = txn_db.collection('students')
80+
assert '_rev' in txn_col.insert({'_key': 'Mike'})
81+
7182
# Abort the transaction
7283
txn_db.abort_transaction()
7384
assert 'Kate' not in col

tests/test_transaction.py

+52
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,58 @@ def test_transaction_commit(db, col, docs):
117117
assert err.value.error_code in {10, 1655}
118118

119119

120+
def test_transaction_fetch_existing(db, col, docs):
121+
original_txn = db.begin_transaction(
122+
read=col.name,
123+
write=col.name,
124+
exclusive=[],
125+
sync=True,
126+
allow_implicit=False,
127+
lock_timeout=1000,
128+
max_size=10000,
129+
)
130+
txn_col = original_txn.collection(col.name)
131+
132+
assert "_rev" in txn_col.insert(docs[0])
133+
assert "_rev" in txn_col.delete(docs[0])
134+
135+
txn_db = db.fetch_transaction(transaction_id=original_txn.transaction_id)
136+
137+
txn_col = txn_db.collection(col.name)
138+
assert "_rev" in txn_col.insert(docs[1])
139+
assert "_rev" in txn_col.delete(docs[1])
140+
141+
txn_db.commit_transaction()
142+
assert txn_db.transaction_status() == "committed"
143+
assert original_txn.transaction_status() == "committed"
144+
145+
# Test fetch transaction that does not exist raises TransactionInitError
146+
with pytest.raises(TransactionInitError) as err:
147+
db.fetch_transaction(transaction_id="illegal")
148+
assert err.value.error_code in {10, 1655}
149+
150+
# Test fetch transaction that is already committed raises TransactionInitError
151+
with pytest.raises(TransactionInitError) as err:
152+
db.fetch_transaction(transaction_id=original_txn.transaction_id)
153+
assert err.value.error_code in {10, 1655}
154+
155+
# Test fetch transaction that has been aborted raises TransactionInitError
156+
txn_db = db.begin_transaction(
157+
read=col.name,
158+
write=col.name,
159+
exclusive=[],
160+
sync=True,
161+
allow_implicit=False,
162+
lock_timeout=1000,
163+
max_size=10000,
164+
)
165+
txn_db.abort_transaction()
166+
167+
with pytest.raises(TransactionInitError) as err:
168+
db.fetch_transaction(transaction_id=txn_db.transaction_id)
169+
assert err.value.error_code in {10, 1655}
170+
171+
120172
def test_transaction_abort(db, col, docs):
121173
txn_db = db.begin_transaction(write=col.name)
122174
txn_col = txn_db.collection(col.name)

0 commit comments

Comments
 (0)