forked from openwallet-foundation/acapy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroutes.py
323 lines (260 loc) · 10.6 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
"""Connection handling admin routes."""
import json
from aiohttp import web
from aiohttp_apispec import (
docs,
match_info_schema,
querystring_schema,
request_schema,
response_schema,
)
from marshmallow import fields
from ....admin.request_context import AdminRequestContext
from ....connections.models.conn_record import ConnRecord, ConnRecordSchema
from ....messaging.models.base import BaseModelError
from ....messaging.models.openapi import OpenAPISchema
from ....messaging.valid import ENDPOINT, INDY_DID, UUIDFour, UUID4
from ....storage.error import StorageError, StorageNotFoundError
from ....wallet.error import WalletError
from .manager import DIDXManager, DIDXManagerError
from .message_types import SPEC_URI
from .messages.request import DIDXRequest, DIDXRequestSchema
class DIDXAcceptInvitationQueryStringSchema(OpenAPISchema):
"""Parameters and validators for accept invitation request query string."""
my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT)
my_label = fields.Str(
description="Label for connection request", required=False, example="Broker"
)
class DIDXCreateRequestImplicitQueryStringSchema(OpenAPISchema):
"""Parameters and validators for create-request-implicit request query string."""
their_public_did = fields.Str(
required=True,
allow_none=False,
description="Public DID to which to request connection",
**INDY_DID,
)
my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT)
my_label = fields.Str(
description="Label for connection request", required=False, example="Broker"
)
mediation_id = fields.Str(
required=False,
description="Identifier for active mediation record to be used",
**UUID4,
)
send_request = fields.Bool(
description="Whether to send request for implicit invitations", default=False
)
class DIDXReceiveRequestImplicitQueryStringSchema(OpenAPISchema):
"""Parameters and validators for receive-request-implicit request query string."""
alias = fields.Str(
description="Alias for connection",
required=False,
example="Barry",
)
my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT)
auto_accept = fields.Boolean(
description="Auto-accept connection (defaults to configuration)",
required=False,
)
mediation_id = fields.Str(
required=False,
description="Identifier for active mediation record to be used",
**UUID4,
)
class DIDXAcceptRequestQueryStringSchema(OpenAPISchema):
"""Parameters and validators for accept-request request query string."""
my_endpoint = fields.Str(description="My URL endpoint", required=False, **ENDPOINT)
mediation_id = fields.Str(
required=False,
description="Identifier for active mediation record to be used",
**UUID4,
)
class DIDXConnIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking connection id."""
conn_id = fields.Str(
description="Connection identifier", required=True, example=UUIDFour.EXAMPLE
)
class DIDXConnIdRefIdMatchInfoSchema(OpenAPISchema):
"""Path parameters and validators for request taking connection and ref ids."""
conn_id = fields.Str(
description="Connection identifier", required=True, example=UUIDFour.EXAMPLE
)
ref_id = fields.Str(
description="Inbound connection identifier",
required=True,
example=UUIDFour.EXAMPLE,
)
@docs(
tags=["did-exchange"],
summary="Accept a stored connection invitation",
)
@match_info_schema(DIDXConnIdMatchInfoSchema())
@querystring_schema(DIDXAcceptInvitationQueryStringSchema())
@response_schema(ConnRecordSchema(), 200, description="")
async def didx_accept_invitation(request: web.BaseRequest):
"""
Request handler for accepting a stored connection invitation.
Args:
request: aiohttp request object
Returns:
The resulting connection record details
"""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
connection_id = request.match_info["conn_id"]
session = await context.session()
my_label = request.query.get("my_label") or None
my_endpoint = request.query.get("my_endpoint") or None
mediation_id = request.query.get("mediation_id") or None
didx_mgr = DIDXManager(session)
try:
conn_rec = await ConnRecord.retrieve_by_id(session, connection_id)
request = await didx_mgr.create_request(
conn_rec=conn_rec,
my_label=my_label,
my_endpoint=my_endpoint,
mediation_id=mediation_id,
)
result = conn_rec.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(request, connection_id=conn_rec.connection_id)
return web.json_response(result)
@docs(
tags=["did-exchange"],
summary="Create request against public DID's implicit invitation",
)
@querystring_schema(DIDXCreateRequestImplicitQueryStringSchema())
@response_schema(DIDXRequestSchema(), 200, description="")
async def didx_create_request_implicit(request: web.BaseRequest):
"""
Request handler for creating a request to an implicit invitation.
Args:
request: aiohttp request object
Returns:
The resulting connection record details
"""
context: AdminRequestContext = request["context"]
session = await context.session()
their_public_did = request.query.get("their_public_did")
my_label = request.query.get("my_label") or None
my_endpoint = request.query.get("my_endpoint") or None
mediation_id = request.query.get("mediation_id") or None
send_request = request.query.get("send_request")
didx_mgr = DIDXManager(session)
try:
request = await didx_mgr.create_request_implicit(
their_public_did=their_public_did,
my_label=my_label,
my_endpoint=my_endpoint,
mediation_id=mediation_id,
send_request=send_request,
)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(request.serialize())
@docs(
tags=["did-exchange"],
summary="Receive request against public DID's implicit invitation",
)
@querystring_schema(DIDXReceiveRequestImplicitQueryStringSchema())
@request_schema(DIDXRequestSchema())
@response_schema(ConnRecordSchema(), 200, description="")
async def didx_receive_request_implicit(request: web.BaseRequest):
"""
Request handler for receiving a request against public DID's implicit invitation.
Args:
request: aiohttp request object
Returns:
The resulting connection record details
"""
context: AdminRequestContext = request["context"]
session = await context.session()
body = await request.json()
alias = request.query.get("alias")
my_endpoint = request.query.get("my_endpoint")
auto_accept = json.loads(request.query.get("auto_accept", "null"))
mediation_id = request.query.get("mediation_id") or None
didx_mgr = DIDXManager(session)
try:
request = DIDXRequest.deserialize(body)
conn_rec = await didx_mgr.receive_request(
request=request,
recipient_did=request._thread.pthid.split(":")[-1],
alias=alias,
my_endpoint=my_endpoint,
auto_accept_implicit=auto_accept,
mediation_id=mediation_id,
)
result = conn_rec.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
return web.json_response(result)
@docs(
tags=["did-exchange"],
summary="Accept a stored connection request",
)
@match_info_schema(DIDXConnIdMatchInfoSchema())
@querystring_schema(DIDXAcceptRequestQueryStringSchema())
@response_schema(ConnRecordSchema(), 200, description="")
async def didx_accept_request(request: web.BaseRequest):
"""
Request handler for accepting a stored connection request.
Args:
request: aiohttp request object
Returns:
The resulting connection record details
"""
context: AdminRequestContext = request["context"]
outbound_handler = request["outbound_message_router"]
connection_id = request.match_info["conn_id"]
session = await context.session()
my_endpoint = request.query.get("my_endpoint") or None
mediation_id = request.query.get("mediation_id") or None
didx_mgr = DIDXManager(session)
try:
conn_rec = await ConnRecord.retrieve_by_id(session, connection_id)
response = await didx_mgr.create_response(
conn_rec=conn_rec,
my_endpoint=my_endpoint,
mediation_id=mediation_id,
)
result = conn_rec.serialize()
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
except (StorageError, WalletError, DIDXManagerError, BaseModelError) as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
await outbound_handler(response, connection_id=conn_rec.connection_id)
return web.json_response(result)
async def register(app: web.Application):
"""Register routes."""
app.add_routes(
[
web.post(
"/didexchange/{conn_id}/accept-invitation",
didx_accept_invitation,
),
web.post("/didexchange/create-request", didx_create_request_implicit),
web.post("/didexchange/receive-request", didx_receive_request_implicit),
web.post("/didexchange/{conn_id}/accept-request", didx_accept_request),
]
)
def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "did-exchange",
"description": "Connection management via DID exchange",
"externalDocs": {"description": "Specification", "url": SPEC_URI},
}
)