Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread in pending state while storing memory in the background #3851

Open
4 tasks done
Saisiva123 opened this issue Mar 14, 2025 · 7 comments
Open
4 tasks done

Thread in pending state while storing memory in the background #3851

Saisiva123 opened this issue Mar 14, 2025 · 7 comments

Comments

@Saisiva123
Copy link

Checked other resources

  • This is a bug, not a usage question. For questions, please use GitHub Discussions.
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

I'm referring to this template (https://github.com/langchain-ai/memory-template) to store the memories in the background while connecting to a remote graph.

The application logic graph is running fine, but when its calling this schedule memories in the background, I'm seeing this message in the studio "Thread is in a pending state" and the memories are not getting stored.

async def schedule_memories(state: MessagesState, config: RunnableConfig) -> None:
    memory_client = get_client()

    await memory_client.runs.create(
        thread_id=config["configurable"]["thread_id"],
        multitask_strategy="enqueue",
        after_seconds=3,
        assistant_id='appointment_memory_bot',
        input={"messages": []},
        config={
            "configurable": {
                "user_id": config['configurable']['user_id'],
                "memory_types": config['configurable']['memory_types'],
            },
        },
    )

Error Message and Stack Trace (if applicable)

Description

-------------------------Application logic Agent/Graph------------------------------
def call_model(state: BookingAppointmentState, store: BaseStore, config: RunnableConfig) -> Command[Literal['tool_node', 'schedule_memories']]:
model = ChatOpenAI(model="gpt-4o", openai_api_key=os.getenv("OPEN_AI_API_KEY")).bind_tools(tools)

appointment_namespace = ('appointments', )
apntment_details = store.get(appointment_namespace, config['configurable']['user_id'])
apntment_details = apntment_details.dict()['value'] if apntment_details else None

conversations_namespace = ('conversations', config['configurable'] ['user_id'])
conversations = store.search(conversations_namespace)

conversations = [(conversation.key, conversation.value) for conversation in conversations] if conversations else []

messages = [SystemMessage(content=system_prompt.format(apntment_details = apntment_details, conversations = conversations ))] + state['messages']

result = model.invoke(messages)

if len(result.tool_calls) > 0:
    return Command(goto='tool_node', update={'messages': [result]})

return {'messages': [result]}

def tool_node(state: BookingAppointmentState, store: BaseStore, config: RunnableConfig) -> Command[Literal['ask_human', 'call_model']]:
tool_names = {tool.name: tool for tool in tools}
tool_calls = state['messages'][-1].tool_calls
results = []

for tool_call in tool_calls:
    tool_ = tool_names[tool_call["name"]]

    # inject state
    tool_input_fields = tool_.get_input_schema().model_json_schema()[
        "properties"
    ]
    if "state" in tool_input_fields:
        tool_call = {**tool_call, "args": {**tool_call["args"], "state": state}}

    print(tool_, tool_call)
    tool_response = tool_.invoke(tool_call)
    results.append(tool_response)

if len(results) > 0:
    return results
else:
    return Command(goto='call_model', update={'messages': [AIMessage(content=str(results))]})

def ask_human(state: BookingAppointmentState, store: BaseStore, config: RunnableConfig) -> Command[Literal['call_model']]:
user_response = interrupt(state['question_to_patient'])

if user_response:
    return Command(goto='call_model', update={
        'messages': [HumanMessage(content=user_response)],
        "question_to_patient": ''
    })

def schedule_memories(state: MessagesState, config: RunnableConfig) -> None:
memory_client = get_client()

memory_client.runs.create(
    thread_id=config["configurable"]["thread_id"],
    multitask_strategy="enqueue",
    after_seconds=3,
    assistant_id='appointment_memory_bot',
    input={"messages": []},
    config={
        "configurable": {
            "user_id": config['configurable']['user_id'],
            "memory_types": config['configurable']['memory_types'],
        },
    },
)

-------------------------------------- Memory Agent / Graph ---------------------------------------------
def scatter_schemas(state: MessagesState, config: RunnableConfig) -> list[Send]:
memory_types = config['configurable']['memory_types']
sends = []

for type in memory_types:
    match type:
        case "appointments":
            target = "update_appointments"
        case "conversations":
            target = "update_conversations"
        case _:
            raise ValueError(f"Unknown update mode: {type}")

    sends.append(Send(target, state))

return sends

def update_appointments(state: MessagesState, store: BaseStore, config: RunnableConfig):
user_id = config['configurable']['user_id']

namespace = ('appointments', )
key = user_id

existing_apntmnt_details = store.get(namespace, key)
existing_apntmnt_details = existing_apntmnt_details.dict()['value'] if existing_apntmnt_details else init_apntmnt_details

system_prompt = '''Observe the ongoing conversation and extract relevant appointment details. If no appointment details are found, set them as None. Use the provided tools to retain any necessary information about the appointment.'''
extractor = create_extractor(model, tools=[AppointmentDetails], tool_choice='AppointmentDetails')

result = extractor.invoke({
    'messages': [SystemMessage(content = system_prompt)] + state['messages'],
    'existing': { 'AppointmentDetails': existing_apntmnt_details}
})

updated_apntmnt_details = result['responses'][0].model_dump()

store.put(namespace, key, updated_apntmnt_details)

def update_conversations(state: MessagesState, store: BaseStore, config: RunnableConfig):
user_id = config['configurable']['user_id']

namespace = ('conversations', user_id)
# conversations = store.search(namespace)

for index, msg in enumerate(state['messages']):
    if not isinstance(msg, ToolMessage):
        store.put(namespace, str(index + 1),
              {'role': 'system' if isinstance(msg, AIMessage)  else 'human', 'content': msg.content if msg.content else msg.tool_calls[0]['args']['reason']})

builder = StateGraph(MessagesState)

builder.add_node('scatter_schemas', scatter_schemas)
builder.add_node('update_appointments', update_appointments)
builder.add_node('update_conversations', update_conversations)

builder.add_conditional_edges( START, scatter_schemas, ["update_appointments", "update_conversations"] )

System Info

using the latest versions

@Saisiva123
Copy link
Author

Image

@hinthornw
Copy link
Contributor

What's happening is it's scheduling the ingestion for the future but o nthe same thread; so long as the scheduled run hasn't completed, the thread is still considered "pending"

@Saisiva123
Copy link
Author

Saisiva123 commented Mar 14, 2025

@hinthornw in that case after 3 seconds I should see the memory created right? Also If i click on Join run then its creating a new fork.

@Saisiva123
Copy link
Author

Saisiva123 commented Mar 14, 2025

Its been there for so long and If I proceed with another question/new turn then the a new fork is getting created for the my question.
Now there will be 2 forks: 1. Current ongoing conversation 2. Once thread moves its pending state to complete.

I dont know whats happening. Could you please help me out with this. I provided the complete code.

@hinthornw
Copy link
Contributor

is there an error in the langsmith trace

@Saisiva123
Copy link
Author

There is no error in the langsmith trace as well

@Saisiva123
Copy link
Author

You could try running my example please.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants