forked from mehdiBezahaf/intent-deploy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstate_track.py
52 lines (35 loc) · 1.44 KB
/
state_track.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from datetime import datetime
from typing import Any, Optional
from uuid import uuid4
from pydantic import UUID4
from pydantic.main import BaseModel
class IntentStatus(BaseModel):
"""Represents the status an intent was in, such as: starting, testing, completed."""
state: str
extra: Any
start_time: datetime
end_time: Optional[datetime] = None
# I can't think of a good name for this right now, basically this is something
# that stores the current state of the intent
class ActiveIntent(BaseModel):
id: UUID4
configuration: BaseModel
name: str
statuses: list[IntentStatus]
def push_status(self, state: str, extra: Any = None):
now = datetime.utcnow()
if self.statuses:
self.statuses[-1].end_time = now
self.statuses.append(IntentStatus(state=state, extra=extra, start_time=now))
def current_status(self) -> Optional[IntentStatus]:
return self.statuses[-1] if self.statuses else None
__active_intents: dict[UUID4, ActiveIntent] = {}
def new_status_tracker(name: str, configuration: BaseModel) -> ActiveIntent:
id_ = uuid4()
active_intent = ActiveIntent(id=id_, name=name, configuration=configuration, statuses=[])
__active_intents[id_] = active_intent
return active_intent
def lookup_intent(id: UUID4) -> Optional[ActiveIntent]:
return __active_intents.get(id)
def list_intents() -> list[ActiveIntent]:
return list(__active_intents.values())