-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsession.py
147 lines (129 loc) · 5.45 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from datetime import datetime
import asyncio
import json
import os
from typing import Dict, Optional
import logging
from data import SessionMD, SessionSummary
from llm_providers.openai_provider import OpenAIProvider
from storage import ContextStorage
from utils.llm_types import AnalysisPrompt, LLMProvider
from utils.prompts import PromptsManager
from utils.utils import parse_json_string_to_model
logger = logging.getLogger(__name__)
class Session:
def __init__(
self,
storage: ContextStorage,
context_id: int,
session_id: Optional[int] = None,
start_time: datetime = None,
llm: LLMProvider = None,
custom_prompts: Optional[Dict[str, AnalysisPrompt]] = None
):
self.storage = storage
self.context_id = context_id
self.session_id = session_id
self.start_time = start_time
self._end_session_event = asyncio.Event()
self.llm = llm or OpenAIProvider(api_key=os.getenv("OPENAI_API_KEY"))
self.custom_prompts = custom_prompts
self.prompts = PromptsManager(custom_prompts)
self.end_time = None
@classmethod
async def create_and_start(
cls,
storage: ContextStorage,
context_id: int,
session_id: Optional[int] = None,
start_time: datetime = None,
llm: LLMProvider = None,
custom_prompts: Optional[Dict[str, AnalysisPrompt]] = None
) -> 'Session':
session = cls(
storage=storage,
context_id=context_id,
session_id=session_id,
start_time=start_time,
llm=llm,
custom_prompts=custom_prompts
)
session.session_id = await session.start()
logger.info(f"Created session with id: {session.session_id}")
return session
async def start(self) -> int:
"""Start the session and return the session id"""
if self.session_id is not None and self.is_active():
logger.warning("Session already active, skipping start")
return self.session_id
try:
self.start_time = datetime.now()
session_id = self.storage.create_session(context_id=self.context_id, start_time=self.start_time)
logger.info(f"Session created with id: {session_id}")
self.session_id = session_id
logger.info(f"Session started with id: {self.session_id}")
return session_id
except Exception as e:
logger.error(f"Failed to start session: {e}")
raise
async def end(self):
"""End the current session"""
if not self.is_active():
logger.warning("Session already ended")
return
logger.info(f"Ending session with id: {self.session_id} at {datetime.now()}")
self._end_session_event.set()
await self.summarize_and_save()
def is_active(self) -> bool:
"""Check if session is still active"""
return not self._end_session_event.is_set()
async def summarize_and_save(self):
self.end_time = datetime.now()
try:
session_summary = await self.generate_session_summary(self.session_id)
logger.info(f"Generated session summary: {session_summary}")
self.storage.end_session_updating_summary(self.session_id, self.end_time, session_summary)
return session_summary
except Exception as e:
logger.error(f"Failed to end session: {e}")
async def generate_session_summary(self, session_id: int) -> SessionSummary:
"""Generate a summary for a session"""
try:
events = self.storage.get_session_events(session_id)
session_summary_prompt = self.prompts.get_prompt("session_summary").format(
EVENTS_DATA=events,
SESSION_ID=session_id
)
session_summary = await self.llm.generate_text(
prompt=session_summary_prompt.template,
system_context=session_summary_prompt.system_context
)
return parse_json_string_to_model(session_summary, SessionSummary)
except Exception as e:
logger.error(f"[Session {self.session_id}] Failed to generate session summary: {e}")
raise
async def instruct_generate_session_markdown(self, session_id: int, instruction: str) -> SessionMD:
"""
Generate Markdown based on provided instructions.
"""
events = self.storage.get_session_events(session_id)
session_md_prompt = self.prompts.get_prompt("session_md").format(
EVENTS_DATA = events,
session_id = session_id,
instruction = instruction,
)
session_md_raw = await self.llm.generate_text(
prompt=session_md_prompt.template,
system_context=session_md_prompt.system_context
)
logger.info(f"[Session {self.session_id}] Generated session markdown raw: {session_md_raw}")
# Extract first line from the generated markdown as the name of the session
session_name = session_md_raw.split("\n")[0].replace("#", "").strip()
session_md = SessionMD(session_id=session_id, name=session_name, markdown=session_md_raw)
session_md.session_id = int(session_id)
return session_md
# async def __aenter__(self) -> 'Session':
# await self.start()
# return self
# async def __aexit__(self, exc_type, exc_val, exc_tb):
# await self.end()