-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy path03_how_to_persist_shared_state_lg.py
81 lines (61 loc) · 2.3 KB
/
03_how_to_persist_shared_state_lg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Author: Rajib Deb
# A simple example showing how langgraph persists state
import operator
import os
import pickle
from typing import TypedDict, Annotated, Sequence
from dotenv import load_dotenv
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.runnables import RunnableConfig, ConfigurableFieldSpec
from langchain_openai import ChatOpenAI
from langgraph.checkpoint import BaseCheckpointSaver, Checkpoint
from langgraph.graph import MessageGraph
from chkpt_client.sqllite_client import SQLLiteClient
load_dotenv()
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
class DBCheckPointer(BaseCheckpointSaver):
class Config:
arbitrary_types_allowed = True
client = SQLLiteClient()
@property
def config_specs(self) -> list[ConfigurableFieldSpec]:
return [
ConfigurableFieldSpec(
id="session_id",
annotation=str,
name="Session ID",
description=None,
default="",
is_shared=True,
),
]
def get(self, config: RunnableConfig) -> Checkpoint:
checkpoint = self.client.select_chkpt(config["configurable"]["session_id"])
return checkpoint
def put(self, config: RunnableConfig, checkpoint: Checkpoint) -> None:
record = (config["configurable"]["session_id"],
pickle.dumps(checkpoint),)
try:
self.client.insert_chkpt(record=record)
except Exception as e:
print(e)
model = ChatOpenAI(temperature=0, streaming=False)
def personal_assistant(messages):
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return response
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
workflow = MessageGraph()
workflow.add_node("assistant", personal_assistant)
workflow.set_entry_point("assistant")
workflow.set_finish_point("assistant")
checkpoint = DBCheckPointer()
app = workflow.compile(checkpointer=checkpoint)
while True:
content = input("Ask me a question \n")
if content == "exit":
exit(0)
human_message = [HumanMessage(content=content)]
response = app.invoke(human_message, {"configurable": {"session_id": "2"}})
print(response[-1].content)