Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new: StandardDatabase.fetch_transaction (#329) #331

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions arango/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,14 @@ def begin_batch_execution(
"""
return BatchDatabase(self._conn, return_result, max_workers)

def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
"""Fetch an existing transaction.

:param transaction_id: The ID of the existing transaction.
:type transaction_id: str
"""
return TransactionDatabase(connection=self._conn, transaction_id=transaction_id)

def begin_transaction(
self,
read: Union[str, Sequence[str], None] = None,
Expand Down Expand Up @@ -3125,6 +3133,9 @@ class TransactionDatabase(Database):
:type lock_timeout: int | None
:param max_size: Max transaction size in bytes.
:type max_size: int | None
:param transaction_id: Initialize using an existing transaction instead of creating
a new transaction.
:type transaction_id: str | None
"""

def __init__(
Expand All @@ -3137,6 +3148,7 @@ def __init__(
allow_implicit: Optional[bool] = None,
lock_timeout: Optional[int] = None,
max_size: Optional[int] = None,
transaction_id: Optional[str] = None,
) -> None:
self._executor: TransactionApiExecutor
super().__init__(
Expand All @@ -3150,6 +3162,7 @@ def __init__(
allow_implicit=allow_implicit,
lock_timeout=lock_timeout,
max_size=max_size,
transaction_id=transaction_id,
),
)

Expand Down
4 changes: 4 additions & 0 deletions arango/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,10 @@ class TransactionAbortError(ArangoServerError):
"""Failed to abort transaction."""


class TransactionFetchError(ArangoServerError):
"""Failed to fetch existing transaction."""


class TransactionListError(ArangoServerError):
"""Failed to retrieve transactions."""

Expand Down
37 changes: 26 additions & 11 deletions arango/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
OverloadControlExecutorError,
TransactionAbortError,
TransactionCommitError,
TransactionFetchError,
TransactionInitError,
TransactionStatusError,
)
Expand Down Expand Up @@ -241,6 +242,9 @@ class TransactionApiExecutor:
:type max_size: int
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:param transaction_id: Initialize using an existing transaction instead of starting
a new transaction.
:type transaction_id: str | None
"""

def __init__(
Expand All @@ -254,6 +258,7 @@ def __init__(
lock_timeout: Optional[int] = None,
max_size: Optional[int] = None,
allow_dirty_read: bool = False,
transaction_id: Optional[str] = None,
) -> None:
self._conn = connection

Expand All @@ -275,19 +280,29 @@ def __init__(
if max_size is not None:
data["maxTransactionSize"] = max_size

request = Request(
method="post",
endpoint="/_api/transaction/begin",
data=data,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)
resp = self._conn.send_request(request)
if transaction_id is None:
request = Request(
method="post",
endpoint="/_api/transaction/begin",
data=data,
headers=(
{"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None
),
)
resp = self._conn.send_request(request)

if not resp.is_success:
raise TransactionInitError(resp, request)
if not resp.is_success:
raise TransactionInitError(resp, request)

result = resp.body["result"]
self._id: str = result["id"]
else:
self._id = transaction_id

result: Json = resp.body["result"]
self._id: str = result["id"]
try:
self.status()
except TransactionStatusError as err:
raise TransactionFetchError(err.response, err.request)

@property
def context(self) -> str:
Expand Down
9 changes: 9 additions & 0 deletions docs/transaction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ logical unit of work (ACID compliant).
assert '_rev' in txn_col.insert({'_key': 'Lily'})
assert len(txn_col) == 6

# Fetch an existing transaction. Useful if you have received a Transaction ID
# from some other part of your system or an external system.
original_txn = db.begin_transaction(write='students')
txn_col = original_txn.collection('students')
assert '_rev' in txn_col.insert({'_key': 'Chip'})
txn_db = db.fetch_transaction(original_txn.transaction_id)
txn_col = txn_db.collection('students')
assert '_rev' in txn_col.insert({'_key': 'Alya'})

# Abort the transaction
txn_db.abort_transaction()
assert 'Kate' not in col
Expand Down
33 changes: 33 additions & 0 deletions tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
TransactionAbortError,
TransactionCommitError,
TransactionExecuteError,
TransactionFetchError,
TransactionInitError,
TransactionStatusError,
)
Expand Down Expand Up @@ -117,6 +118,38 @@ def test_transaction_commit(db, col, docs):
assert err.value.error_code in {10, 1655}


def test_transaction_fetch_existing(db, col, docs):
original_txn = db.begin_transaction(
read=col.name,
write=col.name,
exclusive=[],
sync=True,
allow_implicit=False,
lock_timeout=1000,
max_size=10000,
)
txn_col = original_txn.collection(col.name)

assert "_rev" in txn_col.insert(docs[0])
assert "_rev" in txn_col.delete(docs[0])

txn_db = db.fetch_transaction(transaction_id=original_txn.transaction_id)

txn_col = txn_db.collection(col.name)
assert "_rev" in txn_col.insert(docs[1])
assert "_rev" in txn_col.delete(docs[1])

txn_db.commit_transaction()
assert txn_db.transaction_status() == "committed"
assert original_txn.transaction_status() == "committed"
assert txn_db.transaction_id == original_txn.transaction_id

# Test fetch transaction that does not exist
with pytest.raises(TransactionFetchError) as err:
db.fetch_transaction(transaction_id="illegal")
assert err.value.error_code in {10, 1655}


def test_transaction_abort(db, col, docs):
txn_db = db.begin_transaction(write=col.name)
txn_col = txn_db.collection(col.name)
Expand Down
Loading