Skip to content

Commit 0f86b8b

Browse files
committed
Implement some form of state tracking
1 parent c939c92 commit 0f86b8b

13 files changed

+1005
-880
lines changed

app.py

+29-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from fastapi.exceptions import HTTPException, RequestValidationError
99
from fastapi.params import Depends
1010
from fastapi.security import HTTPBasic
11+
from pydantic import UUID4
1112
from pydantic.main import BaseModel
1213
from starlette.background import BackgroundTasks
1314
from starlette.responses import JSONResponse
@@ -32,11 +33,15 @@
3233
from intent_deployer.config import get_settings
3334
from intent_deployer.deploy import deploy_policy
3435
from intent_deployer.dialogflow import DialogflowContext
36+
from intent_deployer.state_track import ActiveIntent, list_intents, lookup_intent, new_status_tracker
3537
from intent_deployer.switches import Switches
3638
from intent_deployer.types import (
3739
APIResponse,
3840
APIResponseStatus,
41+
CreatedIntentResponse,
42+
GetIntentResponse,
3943
IntentDeployException,
44+
ListIntentResponse,
4045
NileIntentRequest,
4146
)
4247
from intents.utils import PushIntentPolicy
@@ -207,20 +212,25 @@ async def dialogflow_webhook(
207212
@app.post(
208213
f"/{intent.__name__}",
209214
name=f"invoke_{intent.__name__}",
210-
response_model=APIResponse,
215+
response_model=CreatedIntentResponse,
211216
responses={400: {"model": APIResponse}, 422: {"model": APIResponse}},
212217
)
213218
async def invoke_intent(body: intent, background_tasks: BackgroundTasks):
219+
ctx = None
214220
try:
215-
ctx = Context(background_tasks)
221+
tracked_intent = new_status_tracker(intent.__name__, body)
222+
ctx = Context(background_tasks, tracked_intent)
216223
await handle_intent(ctx, body)
217224

218-
return APIResponse(status=APIResponseStatus(code=200, details="success"))
225+
return CreatedIntentResponse(status=APIResponseStatus(code=200, details="success"),
226+
intent_id=tracked_intent.id)
219227
except IntentDeployException as e:
220228
log.exception(e)
229+
if ctx: ctx.push_status("failure", str(e))
221230
return format_json_error(400, f"Could not deploy intent. \n{e}")
222231
except Exception as e:
223232
log.exception(e)
233+
if ctx: ctx.push_status("failure", str(e))
224234
return format_json_error(400, f"Could not deploy intent. internal error")
225235

226236

@@ -234,3 +244,19 @@ async def alertmanager_webhook(
234244
await handle_alert(body)
235245

236246
return APIResponse(status=APIResponseStatus(code=200, details="success"))
247+
248+
@app.get("/intents", response_model=ListIntentResponse)
249+
async def list_intents_endpoint():
250+
return ListIntentResponse(status=APIResponseStatus(code=200, details="success"),
251+
intents=list_intents())
252+
253+
@app.get("/intents/{intent_id}", response_model=GetIntentResponse,
254+
responses={404: {"model": APIResponse}})
255+
async def get_intent_endpoint(intent_id: UUID4):
256+
intent_status = lookup_intent(intent_id)
257+
258+
if intent_status:
259+
return GetIntentResponse(status=APIResponseStatus(code=200, details="success"),
260+
intent=intent_status)
261+
262+
return format_json_error(404, f"Could not find an intent with id: {intent_id}")

intent_deployer/compile.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from fastapi import BackgroundTasks
1414

1515
from intent_deployer.parser import NileIntent
16+
from intent_deployer.state_track import ActiveIntent, new_status_tracker
1617
from intent_deployer.types import (
1718
IntentDeployException,
1819
Policy,
@@ -27,6 +28,11 @@
2728
@dataclass
2829
class Context:
2930
background_tasks: BackgroundTasks
31+
status: Optional[ActiveIntent]
32+
33+
def push_status(self, state: str, extra: Any = None):
34+
assert self.status
35+
self.status.push_status(state, extra)
3036

3137
async def confirm_if_needed(
3238
self, msg: str, on_confirm: Callable[[], Coroutine[None, None, None]]
@@ -129,15 +135,17 @@ def wrapper(handler: AlertExecutor) -> AlertExecutor:
129135
return wrapper
130136

131137

132-
async def handle_nile(ctx: Context, intent: NileIntent):
133-
handler = nile_handlers.get(intent.name)
138+
async def handle_nile(ctx: Context, nile_intent: NileIntent):
139+
handler = nile_handlers.get(nile_intent.name)
134140
if handler is None:
135141
raise IntentDeployException(
136-
f'No known nile handler for the intent type "{intent.name}"',
142+
f'No known nile handler for the intent type "{nile_intent.name}"',
137143
f"Possible intents types are: {list(nile_handlers.keys())}",
138144
)
139145

140-
intent = handler(intent)
146+
intent = handler(nile_intent)
147+
tracked_intent_status = new_status_tracker(nile_intent.name, intent)
148+
ctx.status = tracked_intent_status
141149
await handle_intent(ctx, intent)
142150

143151

@@ -150,6 +158,8 @@ async def handle_dialogflow(ctx: Context, type_: str, params: dict[str, Any]):
150158
)
151159

152160
intent = handler(params)
161+
tracked_intent_status = new_status_tracker(type_, intent)
162+
ctx.status = tracked_intent_status
153163
await handle_intent(ctx, intent)
154164

155165

intent_deployer/dialogflow.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def get_or_create(cls, background_tasks, session, *args, **kwargs):
6565
instance.background_tasks = background_tasks
6666
return instance
6767

68-
instance = cls(background_tasks, session, *args, **kwargs)
68+
instance = cls(background_tasks, None, session, *args, **kwargs)
6969
cls.instances[session] = instance
7070
return instance
7171

@@ -124,11 +124,14 @@ async def inner():
124124
await notifier.send("Intent deployed")
125125
except IntentDeployException as e:
126126
log.exception(e)
127+
self.push_status("failure", str(e))
127128
await notifier.send(f"Could not deploy that intent: {e}")
128129
except Exception as e:
129130
log.exception(e)
131+
self.push_status("failure", str(e))
130132
await notifier.send(f"Could not deploy that intent, internal error")
131133
else:
134+
self.push_status("cancelled")
132135
await notifier.send("Cancelled")
133136

134137
self.background_tasks.add_task(inner)

intent_deployer/state_track.py

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from datetime import datetime
2+
from typing import Any, Optional
3+
from uuid import uuid4
4+
5+
from pydantic import UUID4
6+
from pydantic.main import BaseModel
7+
8+
class IntentStatus(BaseModel):
9+
"""Represents the status an intent was in, such as: starting, testing, completed."""
10+
11+
state: str
12+
extra: Any
13+
start_time: datetime
14+
end_time: Optional[datetime] = None
15+
16+
# I can't think of a good name for this right now, basically this is something
17+
# that stores the current state of the intent
18+
19+
class ActiveIntent(BaseModel):
20+
id: UUID4
21+
configuration: BaseModel
22+
name: str
23+
statuses: list[IntentStatus]
24+
25+
def push_status(self, state: str, extra: Any = None):
26+
now = datetime.utcnow()
27+
28+
if self.statuses:
29+
self.statuses[-1].end_time = now
30+
31+
self.statuses.append(IntentStatus(state=state, extra=extra, start_time=now))
32+
33+
def current_status(self) -> Optional[IntentStatus]:
34+
return self.statuses[-1] if self.statuses else None
35+
36+
37+
__active_intents: dict[UUID4, ActiveIntent] = {}
38+
39+
def new_status_tracker(name: str, configuration: BaseModel) -> ActiveIntent:
40+
id_ = uuid4()
41+
42+
active_intent = ActiveIntent(id=id_, name=name, configuration=configuration, statuses=[])
43+
44+
__active_intents[id_] = active_intent
45+
46+
return active_intent
47+
48+
def lookup_intent(id: UUID4) -> Optional[ActiveIntent]:
49+
return __active_intents.get(id)
50+
51+
def list_intents() -> list[ActiveIntent]:
52+
return list(__active_intents.values())

intent_deployer/types.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
from typing import Generic, Literal, TypeVar
44
from pydantic.config import BaseConfig
55
from pydantic.generics import GenericModel
6-
from pydantic import BaseModel
6+
from pydantic import UUID4, BaseModel
7+
8+
from intent_deployer.state_track import ActiveIntent
79

810

911
class APIResponseStatus(BaseModel):
@@ -15,6 +17,15 @@ class APIResponse(BaseModel):
1517
status: APIResponseStatus
1618

1719

20+
class CreatedIntentResponse(APIResponse):
21+
intent_id: UUID4
22+
23+
class ListIntentResponse(APIResponse):
24+
intents: list[ActiveIntent]
25+
26+
class GetIntentResponse(APIResponse):
27+
intent: ActiveIntent
28+
1829
class NileIntentRequest(BaseModel):
1930
type: Literal["nile"]
2031
intent: str

intents/drain.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ def handle_dialogflow_intent(params: dict[str, Any]) -> DrainIntent:
4545

4646
@register_intent_executor(DrainIntent, PushIntentPolicy)
4747
async def handle_drain_intent(ctx: Context, intent: DrainIntent):
48-
await raise_if_spp()
48+
await raise_if_spp(ctx)
49+
ctx.push_status("starting")
4950

5051
hosts = await Hosts.from_api()
5152
switches = await Switches.from_api()
@@ -69,17 +70,20 @@ async def drain_intent_test_followup():
6970
if notifier is not None:
7071
await notifier.send("Testing configuration, please wait")
7172

73+
ctx.push_status("testing")
7274
runner = NEATRunner.generate_mtv_config(new_topology)
7375
success = await runner.run()
7476

7577
if success:
78+
ctx.push_status("applying")
7679
if isinstance(ctx, DialogflowContext):
7780
notifier = ctx.get_notifier()
7881
if notifier is not None:
7982
await notifier.send("Tests passed, applying intent")
8083
await add_intents(drain_intents, hosts, switches)
8184
if not success:
8285
raise IntentDeployException("After testing the new configuration the tests did not pass, abandoning")
86+
ctx.push_status("complete")
8387

8488
await ctx.confirm_if_needed(
8589
f"This will drain intents from {intent.node} by rerouting {len(drain_intents)} flows",

intents/forwarding.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ def handle_dialogflow_intent(params: dict[str, Any]) -> ForwardIntent:
6868

6969
@register_intent_executor(ForwardIntent)
7070
async def handle_forward_intent(ctx: Context, intent: ForwardIntent):
71-
await raise_if_spp()
71+
await raise_if_spp(ctx)
72+
ctx.push_status("starting")
7273

7374
hosts = await Hosts.from_api()
7475
switches = await Switches.from_api()

intents/rebalance.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def handle_dialogflow_intent(_params: dict[str, Any]) -> RebalanceIntent:
3333

3434
@register_intent_executor(RebalanceIntent, PushIntentPolicy)
3535
async def handle_drain_intent(ctx: Context, intent: RebalanceIntent):
36-
await raise_if_spp()
36+
await raise_if_spp(ctx)
37+
ctx.push_status("starting")
3738

3839
hosts = await Hosts.from_api()
3940
switches = await Switches.from_api()
@@ -52,10 +53,12 @@ async def rebalance_intent_test_followup():
5253
if notifier is not None:
5354
await notifier.send("Testing configuration, please wait")
5455

56+
ctx.push_status("testing")
5557
runner = NEATRunner.generate_mtv_config(new_topology)
5658
success = await runner.run()
5759

5860
if success:
61+
ctx.push_status("applying")
5962
if isinstance(ctx, DialogflowContext):
6063
notifier = ctx.get_notifier()
6164
if notifier is not None:
@@ -65,6 +68,7 @@ async def rebalance_intent_test_followup():
6568
raise IntentDeployException(
6669
"After testing the new configuration the tests did not pass, abandoning"
6770
)
71+
ctx.push_status("complete")
6872

6973
await ctx.confirm_if_needed(
7074
f"This will reroute all intents",

intents/upgrade.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ def handle_dialogflow_downgrade_intent(params: dict[str, Any]) -> DowngradeInten
9090

9191
@register_intent_executor(UpgradeIntent)
9292
async def handle_upgrade_intent(ctx: Context, intent: UpgradeIntent):
93-
await raise_if_spp()
93+
await raise_if_spp(ctx)
94+
ctx.push_status("starting")
9495

9596
topology = await Topology.from_current_state([])
9697

@@ -101,6 +102,7 @@ async def handle_upgrade_intent(ctx: Context, intent: UpgradeIntent):
101102
)
102103

103104
async def upgrade_intent_test_followup():
105+
ctx.push_status("applying")
104106
with Telnet("172.17.0.1", 7123) as tn:
105107
x = tn.read_until(b"_________", timeout=3)
106108
print(x.decode(), end="")
@@ -113,6 +115,7 @@ async def upgrade_intent_test_followup():
113115
notifier = ctx.get_notifier()
114116
if notifier is not None:
115117
await notifier.send("Reconfigured router!")
118+
ctx.push_status("complete")
116119

117120
# if isinstance(ctx, DialogflowContext):
118121
# notifier = ctx.get_notifier()

intents/utils.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import httpx
77

88
from intent_deployer import config
9+
from intent_deployer.compile import Context
910
from intent_deployer.network_state import is_spp
1011
from intent_deployer.types import IntentDeployException, Policy
1112

@@ -44,6 +45,7 @@ async def get_possible_routes(src: str, dst: str) -> list[list[str]]:
4445

4546
return list(data["routes"].values())
4647

47-
async def raise_if_spp():
48+
async def raise_if_spp(ctx: Context):
4849
if await is_spp():
50+
ctx.push_status("failed", "spp enabled")
4951
raise IntentDeployException("SPP Active, no modification allowed")

0 commit comments

Comments
 (0)