Skip to content

OOB connection_reuse webhooks #1581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 63 additions & 3 deletions aries_cloudagent/protocols/out_of_band/v1_0/manager.py
Original file line number Diff line number Diff line change
@@ -53,6 +53,8 @@
from .models.invitation import InvitationRecord

LOGGER = logging.getLogger(__name__)
REUSE_WEBHOOK_TOPIC = "acapy::webhook::connection_reuse"
REUSE_ACCEPTED_WEBHOOK_TOPIC = "acapy::webhook:connection_reuse_accepted"


class OutOfBandManagerError(BaseError):
@@ -508,14 +510,33 @@ async def receive_invitation(
# If no reuse_accepted or problem_report message was received within
# the 15s timeout then a new connection to be created
async with self.profile.session() as session:
sent_reuse_msg_id = await conn_rec.metadata_get(
session=session, key="reuse_msg_id"
)
await conn_rec.metadata_delete(
session=session, key="reuse_msg_id"
)
await conn_rec.metadata_delete(
session=session, key="reuse_msg_state"
)
conn_rec.state = ConnRecord.State.ABANDONED.rfc160
await conn_rec.save(session, reason="Sent connection request")
await conn_rec.save(
session, reason="No HandshakeReuseAccept message received"
)
# Emit webhook
await self.profile.notify(
REUSE_ACCEPTED_WEBHOOK_TOPIC,
{
"thread_id": sent_reuse_msg_id,
"connection_id": conn_rec.connection_id,
"state": "rejected",
"comment": (
"No HandshakeReuseAccept message received, "
f"connection {conn_rec.connection_id} ",
f"and invitation {invitation._id}",
),
},
)
conn_rec = None
# Inverse of the following cases
# Handshake_Protocol not included
@@ -1035,6 +1056,18 @@ async def receive_reuse_message(
await conn_rec.save(session, reason="Assigning new invitation_msg_id")
# Delete the ConnRecord created; re-use existing connection
await self.delete_stale_connection_by_invitation(invi_msg_id)
# Emit webhook
await self.profile.notify(
REUSE_WEBHOOK_TOPIC,
{
"thread_id": reuse_msg_id,
"connection_id": conn_rec.connection_id,
"comment": (
f"Connection {conn_rec.connection_id} is being reused ",
f"for invitation {invi_msg_id}",
),
},
)

async def receive_reuse_accepted_message(
self,
@@ -1059,9 +1092,9 @@ async def receive_reuse_accepted_message(
HandshakeReuseAccept message

"""
invi_msg_id = reuse_accepted_msg._thread.pthid
thread_reuse_msg_id = reuse_accepted_msg._thread.thid
try:
invi_msg_id = reuse_accepted_msg._thread.pthid
thread_reuse_msg_id = reuse_accepted_msg._thread.thid
async with self.profile.session() as session:
conn_reuse_msg_id = await conn_record.metadata_get(
session=session, key="reuse_msg_id"
@@ -1074,7 +1107,34 @@ async def receive_reuse_accepted_message(
await conn_record.save(
session, reason="Assigning new invitation_msg_id"
)
# Emit webhook
await self.profile.notify(
REUSE_ACCEPTED_WEBHOOK_TOPIC,
{
"thread_id": thread_reuse_msg_id,
"connection_id": conn_record.connection_id,
"state": "accepted",
"comment": (
f"Connection {conn_record.connection_id} is being reused ",
f"for invitation {invi_msg_id}",
),
},
)
except Exception as e:
# Emit webhook
await self.profile.notify(
REUSE_ACCEPTED_WEBHOOK_TOPIC,
{
"thread_id": thread_reuse_msg_id,
"connection_id": conn_record.connection_id,
"state": "rejected",
"comment": (
"Unable to process HandshakeReuseAccept message, "
f"connection {conn_record.connection_id} ",
f"and invitation {invi_msg_id}",
),
},
)
raise OutOfBandManagerError(
(
(
40 changes: 32 additions & 8 deletions aries_cloudagent/protocols/out_of_band/v1_0/tests/test_manager.py
Original file line number Diff line number Diff line change
@@ -1537,7 +1537,9 @@ async def test_receive_reuse_message_existing_found(self):
InvitationRecord,
"retrieve_by_tag_filter",
autospec=True,
) as retrieve_invi_rec:
) as retrieve_invi_rec, async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:
oob_mgr_find_existing_conn.return_value = self.test_conn_rec
oob_mgr_fetch_conn.return_value = ConnectionTarget(
did=TestConfig.test_did,
@@ -1552,6 +1554,7 @@ async def test_receive_reuse_message_existing_found(self):
await self.manager.receive_reuse_message(
reuse_msg, receipt, self.test_conn_rec
)
mock_notify.assert_called_once()
assert (
len(
await ConnRecord.query(
@@ -1594,7 +1597,9 @@ async def test_receive_reuse_message_existing_not_found(self):
ConnRecord,
"find_existing_connection",
async_mock.CoroutineMock(),
) as oob_mgr_find_existing_conn:
) as oob_mgr_find_existing_conn, async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:
oob_mgr_find_existing_conn.return_value = None
oob_mgr_fetch_conn.return_value = ConnectionTarget(
did=TestConfig.test_did,
@@ -1609,6 +1614,7 @@ async def test_receive_reuse_message_existing_not_found(self):
await self.manager.receive_reuse_message(
reuse_msg, receipt, self.test_conn_rec
)
mock_notify.assert_called_once()
assert len(self.responder.messages) == 0

async def test_receive_reuse_message_problem_report_logic(self):
@@ -1629,7 +1635,9 @@ async def test_receive_reuse_message_problem_report_logic(self):
OutOfBandManager,
"fetch_connection_targets",
autospec=True,
) as oob_mgr_fetch_conn:
) as oob_mgr_fetch_conn, async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:
oob_mgr_fetch_conn.return_value = ConnectionTarget(
did=TestConfig.test_did,
endpoint=TestConfig.test_endpoint,
@@ -1639,6 +1647,7 @@ async def test_receive_reuse_message_problem_report_logic(self):
await self.manager.receive_reuse_message(
reuse_msg, receipt, self.test_conn_rec
)
mock_notify.assert_called_once()

async def test_receive_reuse_accepted(self):
async with self.profile.session() as session:
@@ -1664,11 +1673,14 @@ async def test_receive_reuse_accepted(self):
OutOfBandManager,
"fetch_connection_targets",
autospec=True,
) as oob_mgr_fetch_conn:
) as oob_mgr_fetch_conn, async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:

await self.manager.receive_reuse_accepted_message(
reuse_msg_accepted, receipt, self.test_conn_rec
)
mock_notify.assert_called_once()
assert (
await self.test_conn_rec.metadata_get(session, "reuse_msg_state")
== "accepted"
@@ -1698,11 +1710,14 @@ async def test_receive_reuse_accepted(self):
OutOfBandManager,
"fetch_connection_targets",
autospec=True,
) as oob_mgr_fetch_conn:
) as oob_mgr_fetch_conn, async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:

await self.manager.receive_reuse_accepted_message(
reuse_msg_accepted, receipt, self.test_conn_rec
)
mock_notify.assert_called_once()
assert (
await self.test_conn_rec.metadata_get(session, "reuse_msg_state")
== "accepted"
@@ -1732,11 +1747,14 @@ async def test_receive_reuse_accepted_invalid_conn(self):
OutOfBandManager,
"fetch_connection_targets",
autospec=True,
) as oob_mgr_fetch_conn:
) as oob_mgr_fetch_conn, async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:
with self.assertRaises(OutOfBandManagerError) as context:
await self.manager.receive_reuse_accepted_message(
reuse_msg_accepted, receipt, test_invalid_conn
)
mock_notify.assert_called_once()
assert "Error processing reuse accepted message" in str(context.exception)

async def test_receive_reuse_accepted_message_catch_exception(self):
@@ -1759,11 +1777,14 @@ async def test_receive_reuse_accepted_message_catch_exception(self):
self.test_conn_rec,
"metadata_set",
async_mock.CoroutineMock(side_effect=StorageNotFoundError),
):
), async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:
with self.assertRaises(OutOfBandManagerError) as context:
await self.manager.receive_reuse_accepted_message(
reuse_msg_accepted, receipt, self.test_conn_rec
)
mock_notify.assert_called_once()
assert "Error processing reuse accepted message" in str(
context.exception
)
@@ -2172,7 +2193,9 @@ async def test_existing_conn_record_public_did_timeout(self):
OutOfBandManager,
"check_reuse_msg_state",
autospec=True,
) as oob_mgr_check_reuse_state:
) as oob_mgr_check_reuse_state, async_mock.patch.object(
self.profile, "notify", autospec=True
) as mock_notify:
oob_mgr_find_existing_conn.return_value = test_exist_conn
oob_mgr_check_reuse_state.side_effect = asyncio.TimeoutError
mock_oob_invi = async_mock.MagicMock(
@@ -2187,6 +2210,7 @@ async def test_existing_conn_record_public_did_timeout(self):
result = await self.manager.receive_invitation(
mock_oob_invi, use_existing_connection=True
)
mock_notify.assert_called()
retrieved_conn_records = await ConnRecord.query(
session=session,
tag_filter={"their_public_did": TestConfig.test_target_did},