Skip to content

Commit

Permalink
Updating langfuse integration (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
kongzii authored Aug 21, 2024
1 parent d9b13c7 commit 3df8e89
Show file tree
Hide file tree
Showing 12 changed files with 629 additions and 312 deletions.
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ignore_missing_imports = True
# We don't want to ignore all missing imports as we want to catch those in our own code
# But for certain libraries they don't have a stub file, so we only enforce import checking for our own libraries.
# Another alternative would be to list out every single dependency that does not have a stub.
[mypy-prediction_market_agent.*]
[mypy-prediction_market_agent_tooling.*]
ignore_missing_imports = False
[mypy-scripts.*]
ignore_missing_imports = False
Expand Down
548 changes: 337 additions & 211 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions prediction_market_agent_tooling/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class APIKeys(BaseSettings):
GOOGLE_SEARCH_ENGINE_ID: t.Optional[SecretStr] = None

LANGFUSE_SECRET_KEY: t.Optional[SecretStr] = None
LANGFUSE_PUBLIC_KEY: t.Optional[SecretStr] = None
LANGFUSE_PUBLIC_KEY: t.Optional[str] = None
LANGFUSE_HOST: t.Optional[str] = None
LANGFUSE_DEPLOYMENT_VERSION: t.Optional[str] = None

TAVILY_API_KEY: t.Optional[SecretStr] = None

Expand Down Expand Up @@ -127,7 +128,7 @@ def langfuse_secret_key(self) -> SecretStr:
)

@property
def langfuse_public_key(self) -> SecretStr:
def langfuse_public_key(self) -> str:
return check_not_none(
self.LANGFUSE_PUBLIC_KEY, "LANGFUSE_PUBLIC_KEY missing in the environment."
)
Expand All @@ -138,6 +139,14 @@ def langfuse_host(self) -> str:
self.LANGFUSE_HOST, "LANGFUSE_HOST missing in the environment."
)

@property
def default_enable_langfuse(self) -> bool:
return (
self.LANGFUSE_SECRET_KEY is not None
and self.LANGFUSE_PUBLIC_KEY is not None
and self.LANGFUSE_HOST is not None
)

@property
def tavily_api_key(self) -> SecretStr:
return check_not_none(
Expand Down
253 changes: 199 additions & 54 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import getpass
import inspect
import os
import tempfile
import time
import typing as t
from datetime import datetime, timedelta
from enum import Enum
from functools import cached_property

from pydantic import BaseModel, BeforeValidator
from pydantic import BaseModel, BeforeValidator, computed_field
from typing_extensions import Annotated

from prediction_market_agent_tooling.config import APIKeys
Expand Down Expand Up @@ -39,16 +42,15 @@
redeem_from_all_user_positions,
withdraw_wxdai_to_xdai_to_keep_balance,
)
from prediction_market_agent_tooling.monitor.langfuse.langfuse_wrapper import (
LangfuseWrapper,
)
from prediction_market_agent_tooling.monitor.monitor_app import (
MARKET_TYPE_TO_DEPLOYED_AGENT,
)
from prediction_market_agent_tooling.tools.is_predictable import is_predictable_binary
from prediction_market_agent_tooling.tools.langfuse_ import langfuse_context, observe
from prediction_market_agent_tooling.tools.utils import DatetimeWithTimezone, utcnow

MAX_AVAILABLE_MARKETS = 20
TRADER_TAG = "trader"


def to_boolean_outcome(value: str | bool) -> bool:
Expand All @@ -71,6 +73,21 @@ def to_boolean_outcome(value: str | bool) -> bool:
raise ValueError(f"Expected a boolean or a string, but got {value}")


def initialize_langfuse(enable_langfuse: bool) -> None:
# Configure Langfuse singleton with our APIKeys.
# If langfuse is disabled, it will just ignore all the calls, so no need to do if-else around the code.
keys = APIKeys()
if enable_langfuse:
langfuse_context.configure(
public_key=keys.langfuse_public_key,
secret_key=keys.langfuse_secret_key.get_secret_value(),
host=keys.langfuse_host,
enabled=enable_langfuse,
)
else:
langfuse_context.configure(enabled=enable_langfuse)


Decision = Annotated[bool, BeforeValidator(to_boolean_outcome)]


Expand All @@ -89,11 +106,66 @@ def p_no(self) -> Probability:
return Probability(1 - self.p_yes)


class ProcessedMarket(BaseModel):
answer: Answer
amount: BetAmount


class AnsweredEnum(str, Enum):
ANSWERED = "answered"
NOT_ANSWERED = "not_answered"


class DeployableAgent:
def __init__(self) -> None:
self.langfuse_wrapper = LangfuseWrapper(agent_name=self.__class__.__name__)
def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
) -> None:
self.start_time = utcnow()
self.enable_langfuse = enable_langfuse
self.initialize_langfuse()
self.load()

def initialize_langfuse(self) -> None:
initialize_langfuse(self.enable_langfuse)

def langfuse_update_current_trace(
self,
name: str | None = None,
input: t.Any | None = None,
output: t.Any | None = None,
user_id: str | None = None,
session_id: str | None = None,
version: str | None = None,
release: str | None = None,
metadata: t.Any | None = None,
tags: list[str] | None = None,
public: bool | None = None,
) -> None:
"""
Provide some useful default arguments when updating the current trace in our agents.
"""
langfuse_context.update_current_trace(
name=name,
input=input,
output=output,
user_id=user_id or getpass.getuser(),
session_id=session_id
or self.session_id, # All traces within a single run execution will be grouped under a single session.
version=version
or APIKeys().LANGFUSE_DEPLOYMENT_VERSION, # Optionally, mark the current deployment with version (e.g. add git commit hash during docker building).
release=release,
metadata=metadata,
tags=tags,
public=public,
)

@computed_field # type: ignore[prop-decorator] # Mypy issue: https://github.com/python/mypy/issues/14461
@cached_property
def session_id(self) -> str:
# Each agent should be an unique class.
return f"{self.__class__.__name__} - {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}"

def __init_subclass__(cls, **kwargs: t.Any) -> None:
if "DeployableAgent" not in str(
cls.__init__
Expand Down Expand Up @@ -209,12 +281,50 @@ class DeployableTraderAgent(DeployableAgent):
min_required_balance_to_operate: xDai | None = xdai_type(1)
min_balance_to_keep_in_native_currency: xDai | None = xdai_type(0.1)

def __init__(self, place_bet: bool = True) -> None:
super().__init__()
def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
place_bet: bool = True,
) -> None:
super().__init__(enable_langfuse=enable_langfuse)
self.place_bet = place_bet

def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)
def initialize_langfuse(self) -> None:
super().initialize_langfuse()
# Auto-observe all the methods where it makes sense, so that subclassses don't need to do it manually.
self.have_bet_on_market_since = observe()(self.have_bet_on_market_since) # type: ignore[method-assign]
self.verify_market = observe()(self.verify_market) # type: ignore[method-assign]
self.answer_binary_market = observe()(self.answer_binary_market) # type: ignore[method-assign]
self.calculate_bet_amount = observe()(self.calculate_bet_amount) # type: ignore[method-assign]
self.process_market = observe()(self.process_market) # type: ignore[method-assign]

def update_langfuse_trace_by_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.langfuse_update_current_trace(
# UI allows to do filtering by these.
metadata={
"agent_class": self.__class__.__name__,
"market_id": market.id,
"market_question": market.question,
"market_outcomes": market.outcomes,
},
)

def update_langfuse_trace_by_processed_market(
self, market_type: MarketType, processed_market: ProcessedMarket | None
) -> None:
self.langfuse_update_current_trace(
tags=[
TRADER_TAG,
(
AnsweredEnum.ANSWERED
if processed_market is not None
else AnsweredEnum.NOT_ANSWERED
),
market_type.value,
]
)

def check_min_required_balance_to_operate(self, market_type: MarketType) -> None:
api_keys = APIKeys()
Expand All @@ -229,35 +339,26 @@ def check_min_required_balance_to_operate(self, market_type: MarketType) -> None
f"for agent with address {api_keys.public_key} is not met."
)

def pick_markets(
self, market_type: MarketType, markets: t.Sequence[AgentMarket]
) -> t.Sequence[AgentMarket]:
def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)

def verify_market(self, market_type: MarketType, market: AgentMarket) -> bool:
"""
Subclasses can implement their own logic instead of this one, or on top of this one.
By default, it picks only the first {n_markets_per_run} markets where user didn't bet recently and it's a reasonable question.
By default, it allows only markets where user didn't bet recently and it's a reasonable question.
"""
picked: list[AgentMarket] = []

for market in markets:
if len(picked) >= self.bet_on_n_markets_per_run:
break

if self.have_bet_on_market_since(market, since=timedelta(hours=24)):
continue

# Do as a last check, as it uses paid OpenAI API.
if not is_predictable_binary(market.question):
continue
if self.have_bet_on_market_since(market, since=timedelta(hours=24)):
return False

# Manifold allows to bet only on markets with probability between 1 and 99.
if market_type == MarketType.MANIFOLD and not (
1 < market.current_p_yes < 99
):
continue
# Manifold allows to bet only on markets with probability between 1 and 99.
if market_type == MarketType.MANIFOLD and not (1 < market.current_p_yes < 99):
return False

picked.append(market)
# Do as a last check, as it uses paid OpenAI API.
if not is_predictable_binary(market.question):
return False

return picked
return True

def answer_binary_market(self, market: AgentMarket) -> Answer | None:
"""
Expand Down Expand Up @@ -285,7 +386,55 @@ def get_markets(
)
return available_markets

def before(self, market_type: MarketType) -> None:
def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.update_langfuse_trace_by_market(market_type, market)

def process_market(
self, market_type: MarketType, market: AgentMarket, verify_market: bool = True
) -> ProcessedMarket | None:
self.before_process_market(market_type, market)

if verify_market and not self.verify_market(market_type, market):
logger.info(f"Market '{market.question}' doesn't meet the criteria.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

answer = self.answer_binary_market(market)

if answer is None:
logger.info(f"No answer for market '{market.question}'.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

amount = self.calculate_bet_amount(answer, market)

if self.place_bet:
logger.info(
f"Placing bet on {market} with result {answer} and amount {amount}"
)
market.place_bet(
amount=amount,
outcome=answer.decision,
)

self.after_process_market(market_type, market)

processed_market = ProcessedMarket(
answer=answer,
amount=amount,
)
self.update_langfuse_trace_by_processed_market(market_type, processed_market)

return processed_market

def after_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
pass

def before_process_markets(self, market_type: MarketType) -> None:
"""
Executes actions that occur before bets are placed.
"""
Expand All @@ -302,33 +451,29 @@ def before(self, market_type: MarketType) -> None:
withdraw_multiplier=2,
)

def process_bets(self, market_type: MarketType) -> None:
def process_markets(self, market_type: MarketType) -> None:
"""
Processes bets placed by agents on a given market.
"""
available_markets = self.get_markets(market_type)
markets = self.pick_markets(market_type, available_markets)
for market in markets:
processed = 0

for market in available_markets:
# We need to check it again before each market bet, as the balance might have changed.
self.check_min_required_balance_to_operate(market_type)
result = self.answer_binary_market(market)
if result is None:
logger.info(f"Skipping market {market} as no answer was provided")
continue
if self.place_bet:
amount = self.calculate_bet_amount(result, market)
logger.info(
f"Placing bet on {market} with result {result} and amount {amount}"
)
market.place_bet(
amount=amount,
outcome=result.decision,
)

def after(self, market_type: MarketType) -> None:
processed_market = self.process_market(market_type, market)

if processed_market is not None:
processed += 1

if processed == self.bet_on_n_markets_per_run:
break

def after_process_markets(self, market_type: MarketType) -> None:
pass

def run(self, market_type: MarketType) -> None:
self.before(market_type)
self.process_bets(market_type)
self.after(market_type)
self.before_process_markets(market_type)
self.process_markets(market_type)
self.after_process_markets(market_type)
Loading

0 comments on commit 3df8e89

Please sign in to comment.