Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating langfuse integration #345

Merged
merged 22 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ignore_missing_imports = True
# We don't want to ignore all missing imports as we want to catch those in our own code
# But for certain libraries they don't have a stub file, so we only enforce import checking for our own libraries.
# Another alternative would be to list out every single dependency that does not have a stub.
[mypy-prediction_market_agent.*]
[mypy-prediction_market_agent_tooling.*]
ignore_missing_imports = False
[mypy-scripts.*]
ignore_missing_imports = False
Expand Down
136 changes: 115 additions & 21 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions prediction_market_agent_tooling/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class APIKeys(BaseSettings):
GOOGLE_SEARCH_ENGINE_ID: t.Optional[SecretStr] = None

LANGFUSE_SECRET_KEY: t.Optional[SecretStr] = None
LANGFUSE_PUBLIC_KEY: t.Optional[SecretStr] = None
LANGFUSE_PUBLIC_KEY: t.Optional[str] = None
LANGFUSE_HOST: t.Optional[str] = None
LANGFUSE_DEPLOYMENT_VERSION: t.Optional[str] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new env variable that is populated by git commit sha in PMA's docker building pipeline; it will show us, in the UI, what version of PMA was used.


ENABLE_CACHE: bool = True
CACHE_DIR: str = "./.cache"
Expand Down Expand Up @@ -123,7 +124,7 @@ def langfuse_secret_key(self) -> SecretStr:
)

@property
def langfuse_public_key(self) -> SecretStr:
def langfuse_public_key(self) -> str:
return check_not_none(
self.LANGFUSE_PUBLIC_KEY, "LANGFUSE_PUBLIC_KEY missing in the environment."
)
Expand All @@ -134,6 +135,14 @@ def langfuse_host(self) -> str:
self.LANGFUSE_HOST, "LANGFUSE_HOST missing in the environment."
)

@property
def default_enable_langfuse(self) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used as the default argument in DeployableAgent init method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and self.LANGFUSE_DEPLOYMENT_VERSION is not None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not necessary for langfuse to work, it can be None

return (
self.LANGFUSE_SECRET_KEY is not None
and self.LANGFUSE_PUBLIC_KEY is not None
and self.LANGFUSE_HOST is not None
)

def model_dump_public(self) -> dict[str, t.Any]:
return {
k: v
Expand Down
270 changes: 216 additions & 54 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import getpass
import inspect
import os
import tempfile
import time
import typing as t
from datetime import datetime, timedelta
from enum import Enum
from functools import cached_property

from pydantic import BaseModel, BeforeValidator
from pydantic import BaseModel, BeforeValidator, computed_field
from typing_extensions import Annotated

from prediction_market_agent_tooling.config import APIKeys
Expand Down Expand Up @@ -39,16 +42,17 @@
redeem_from_all_user_positions,
withdraw_wxdai_to_xdai_to_keep_balance,
)
from prediction_market_agent_tooling.monitor.langfuse.langfuse_wrapper import (
LangfuseWrapper,
)
from prediction_market_agent_tooling.monitor.monitor_app import (
MARKET_TYPE_TO_DEPLOYED_AGENT,
)
from prediction_market_agent_tooling.tools.is_predictable import is_predictable_binary
from prediction_market_agent_tooling.tools.is_predictable import (
is_predictable_binary_observed,
)
from prediction_market_agent_tooling.tools.langfuse_ import langfuse_context, observe
from prediction_market_agent_tooling.tools.utils import DatetimeWithTimezone, utcnow

MAX_AVAILABLE_MARKETS = 20
TRADER_TAG = "trader"


def to_boolean_outcome(value: str | bool) -> bool:
Expand Down Expand Up @@ -89,10 +93,76 @@ def p_no(self) -> Probability:
return Probability(1 - self.p_yes)


class ProcessedMarket(BaseModel):
answer: Answer
amount: BetAmount


class AnsweredEnum(str, Enum):
ANSWERED = "answered"
NOT_ANSWERED = "not_answered"


class DeployableAgent:
def __init__(self) -> None:
self.langfuse_wrapper = LangfuseWrapper(agent_name=self.__class__.__name__)
def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
) -> None:
self.load()
self.start_time = utcnow()
self.enable_langfuse = enable_langfuse
self.initialize_langfuse()

def initialize_langfuse(self) -> None:
# Configure Langfuse singleton with our APIKeys.
# If langfuse is disabled, it will just ignore all the calls, so no need to do if-else around the code.
keys = APIKeys()
if self.enable_langfuse:
langfuse_context.configure(
public_key=keys.langfuse_public_key,
secret_key=keys.langfuse_secret_key.get_secret_value(),
host=keys.langfuse_host,
enabled=self.enable_langfuse,
)
else:
langfuse_context.configure(enabled=self.enable_langfuse)

def langfuse_update_current_trace(
self,
name: str | None = None,
input: t.Any | None = None,
output: t.Any | None = None,
user_id: str | None = None,
session_id: str | None = None,
version: str | None = None,
release: str | None = None,
metadata: t.Any | None = None,
tags: list[str] | None = None,
public: bool | None = None,
) -> None:
"""
Provide some useful default arguments when updating the current trace in our agents.
"""
langfuse_context.update_current_trace(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a wrapper around update_current_trace, see the default arguments below. The reason is to always include these in our PMAT-based agents.

name=name,
input=input,
output=output,
user_id=user_id or getpass.getuser(),
session_id=session_id
or self.session_id, # All traces within a single run execution will be grouped under a single session.
version=version
or APIKeys().LANGFUSE_DEPLOYMENT_VERSION, # Optionally, mark the current deployment with version (e.g. add git commit hash during docker building).
release=release,
metadata=metadata,
tags=tags,
public=public,
)

@computed_field # type: ignore[prop-decorator] # Mypy issue: https://github.com/python/mypy/issues/14461
@cached_property
def session_id(self) -> str:
# Each agent should be an unique class.
return f"{self.__class__.__name__} - {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}"

def __init_subclass__(cls, **kwargs: t.Any) -> None:
if "DeployableAgent" not in str(
Expand Down Expand Up @@ -209,12 +279,41 @@ class DeployableTraderAgent(DeployableAgent):
min_required_balance_to_operate: xDai | None = xdai_type(1)
min_balance_to_keep_in_native_currency: xDai | None = xdai_type(0.1)

def __init__(self, place_bet: bool = True) -> None:
super().__init__()
def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
place_bet: bool = True,
) -> None:
super().__init__(enable_langfuse=enable_langfuse)
self.place_bet = place_bet

def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)
def update_langfuse_trace_by_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.langfuse_update_current_trace(
# UI allows to do filtering by these.
metadata={
"agent_class": self.__class__.__name__,
"market_id": market.id,
"market_question": market.question,
"market_outcomes": market.outcomes,
},
)

def update_langfuse_trace_by_processed_market(
self, market_type: MarketType, processed_market: ProcessedMarket | None
) -> None:
self.langfuse_update_current_trace(
tags=[
TRADER_TAG,
(
AnsweredEnum.ANSWERED
if processed_market is not None
else AnsweredEnum.NOT_ANSWERED
),
market_type.value,
]
)

def check_min_required_balance_to_operate(self, market_type: MarketType) -> None:
api_keys = APIKeys()
Expand All @@ -229,42 +328,55 @@ def check_min_required_balance_to_operate(self, market_type: MarketType) -> None
f"for agent with address {api_keys.public_key} is not met."
)

def pick_markets(
self, market_type: MarketType, markets: t.Sequence[AgentMarket]
) -> t.Sequence[AgentMarket]:
@observe()
def have_bet_on_market_since_observed(
self, market: AgentMarket, since: timedelta
) -> bool:
return self.have_bet_on_market_since(market, since)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the pattern I was talking about last time, I don't like it so much but I also don't see any clearer way to proceed.

This pattern is used on all observed methods of DeployableAgent.

The reason is, that if some method is @observed() and the agent class needs to implement that method (answer_binary_market is a clear example, but I think it will be usable for others as well), then the overridden method would not be @observed() anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this pattern either, this means every time you add a new method, you need to add an observed one (to avoid people overriding the observed one and only overriding the "pure" one).

One solution I came across is adding a class decorator to DeployableAgent, which "observes" the a set of functions without annotating them. Hence you could avoid having to manually decorate them, instead only decorating the base class (see snippet below).
What this would imply:
-> Annotate DeployableAgent with decorator, define that methods [have_bet_on_market, ...] should be observed. Add logic to observe function (e.g. call decorator(my_function) or similar)
-> If subclass (e.g. CoinflipDeployableAgent) overwrites the method, it doesn't matter, it will keep getting observed (due to class decorator being in the base class).
-> This is nice because you avoid having duplicate methods everywhere.

Let me know what you think @kongzii - Gist here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could work, but based on the gist, it would decorate all the methods, which is not needed (and maybe including methods as __init__ and similar?)

So we would need something like

@prepend_foo_to_methods(allowed_methods=["have_bet_on_market", ...])
class MyClass:

which isn't type-safe and can be easily forgotten.

Another approach I was experimenting with was something like

def initialize_langfuse(self) -> None:
    ...
    self.have_bet_on_market = observe()(self.have_bet_on_market)  # type: ignore[method-assign]

It works similarly to your suggestion, but mypy should warn us if for example have_bet_on_market gets deleted or something (but I didn't test it), although it still needs type: ignore comment, because changing methods like this isn't allowed.

I didn't use that approach because it needs type: ignore, but I don't feel strongly about it.

Do you have preferences?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prepend_foo_to_methods(allowed_methods=["have_bet_on_market", ...])
class MyClass:

What I had in mind was:
-> Add the decorator to the base class (DeployableTraderAgent). Note that subclasses (e.g. DeployableCoinFlipTraderAgent) and similar do not get decorated. Hence we need to decorate only once (or maybe more, but only on base classes), so not very hard to remember.
-> The decorator does not modify all functions, only a subset (either hardcoded or even better, as you suggested it, provided as an argument).
-> Not sure how to satisfy mypy here, maybe using a FrozenSet but not sure

Regarding the other approach (overriding methods with observe()(self.have_bet_on_market) # type: ignore[method-assign], it's a similar approach to the decorator, but I like the decorator a bit better because I find it more elegant (no real reason).

The point I'm trying to make is: I don't love the duplicated-function pattern (observed + pure). If we can find any solution to this (be it decorator or non-decorator), I'm happy with any option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I don't care for duplicated *_observer method or doing some magic, but what I'm afraid of is that

@magic(["predict"])
class Agent:
    def predictt(self):
        print("Predicting")

will never trigger mypy as a mistake, but

from prediction_market_agent_tooling.tools.langfuse_ import observe

class Agent:
    def init_langfuse(self) -> None:
        self.predict = observe()(self.predict)

    def predictt(self) -> None:
        print("Predicting")

will. And it also works with the auto-refactoring features of VSCode.

In any case, we will get rid of _observed methods, I'll change it.


def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)

@observe()
def verify_market_observed(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to refactor the logic of these methods, before it was:

  1. Fetch markets
  2. Filter markets
  3. Predict+Bet on them one by one

Now it is

  1. Fetch markets
  2. One by one check if the market is ok to predict and if so, predict + bet on it.

The reason is that we need the whole process end-to-end saved in Langfuse, this is how it looks like:

Screenshot by Dropbox Capture

self, market_type: MarketType, market: AgentMarket
) -> bool:
return self.verify_market(market_type, market)

def verify_market(self, market_type: MarketType, market: AgentMarket) -> bool:
"""
Subclasses can implement their own logic instead of this one, or on top of this one.
By default, it picks only the first {n_markets_per_run} markets where user didn't bet recently and it's a reasonable question.
By default, it allows only markets where user didn't bet recently and it's a reasonable question.
"""
picked: list[AgentMarket] = []

for market in markets:
if len(picked) >= self.bet_on_n_markets_per_run:
break

if self.have_bet_on_market_since(market, since=timedelta(hours=24)):
continue
if self.have_bet_on_market_since_observed(market, since=timedelta(hours=24)):
return False

# Do as a last check, as it uses paid OpenAI API.
if not is_predictable_binary(market.question):
continue
# Manifold allows to bet only on markets with probability between 1 and 99.
if market_type == MarketType.MANIFOLD and not (1 < market.current_p_yes < 99):
return False

# Manifold allows to bet only on markets with probability between 1 and 99.
if market_type == MarketType.MANIFOLD and not (
1 < market.current_p_yes < 99
):
continue
# Do as a last check, as it uses paid OpenAI API.
if not is_predictable_binary_observed(market.question):
return False
kongzii marked this conversation as resolved.
Show resolved Hide resolved

picked.append(market)
return True

return picked
@observe()
def answer_binary_market_observed(self, market: AgentMarket) -> Answer | None:
return self.answer_binary_market(market)

def answer_binary_market(self, market: AgentMarket) -> Answer | None:
"""
Answer the binary market. This method must be implemented by the subclass.
"""
raise NotImplementedError("This method must be implemented by the subclass")

@observe()
def calculate_bet_amount_observed(
self, answer: Answer, market: AgentMarket
) -> BetAmount:
return self.calculate_bet_amount(answer, market)

def calculate_bet_amount(self, answer: Answer, market: AgentMarket) -> BetAmount:
"""
Calculate the bet amount. By default, it returns the minimum bet amount.
Expand All @@ -285,7 +397,61 @@ def get_markets(
)
return available_markets

def before(self, market_type: MarketType) -> None:
def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.update_langfuse_trace_by_market(market_type, market)

@observe()
def process_market_observed(
self, market_type: MarketType, market: AgentMarket, verify_market: bool = True
) -> ProcessedMarket | None:
return self.process_market(market_type, market, verify_market)

def process_market(
self, market_type: MarketType, market: AgentMarket, verify_market: bool
) -> ProcessedMarket | None:
self.before_process_market(market_type, market)

if verify_market and not self.verify_market_observed(market_type, market):
logger.info(f"Market '{market.question}' doesn't meet the criteria.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

answer = self.answer_binary_market_observed(market)

if answer is None:
logger.info(f"No answer for market '{market.question}'.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

amount = self.calculate_bet_amount_observed(answer, market)

if self.place_bet:
logger.info(
f"Placing bet on {market} with result {answer} and amount {amount}"
)
market.place_bet(
amount=amount,
outcome=answer.decision,
)

self.after_process_market(market_type, market)

processed_market = ProcessedMarket(
answer=answer,
amount=amount,
)
self.update_langfuse_trace_by_processed_market(market_type, processed_market)

return processed_market

def after_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
pass

def before_process_markets(self, market_type: MarketType) -> None:
"""
Executes actions that occur before bets are placed.
"""
Expand All @@ -302,33 +468,29 @@ def before(self, market_type: MarketType) -> None:
withdraw_multiplier=2,
)

def process_bets(self, market_type: MarketType) -> None:
def process_markets(self, market_type: MarketType) -> None:
"""
Processes bets placed by agents on a given market.
"""
available_markets = self.get_markets(market_type)
markets = self.pick_markets(market_type, available_markets)
for market in markets:
processed = 0

for market in available_markets:
# We need to check it again before each market bet, as the balance might have changed.
self.check_min_required_balance_to_operate(market_type)
result = self.answer_binary_market(market)
if result is None:
logger.info(f"Skipping market {market} as no answer was provided")
continue
if self.place_bet:
amount = self.calculate_bet_amount(result, market)
logger.info(
f"Placing bet on {market} with result {result} and amount {amount}"
)
market.place_bet(
amount=amount,
outcome=result.decision,
)

def after(self, market_type: MarketType) -> None:
processed_market = self.process_market_observed(market_type, market)

if processed_market is not None:
processed += 1

if processed == self.bet_on_n_markets_per_run:
break

def after_process_markets(self, market_type: MarketType) -> None:
pass

def run(self, market_type: MarketType) -> None:
self.before(market_type)
self.process_bets(market_type)
self.after(market_type)
self.before_process_markets(market_type)
self.process_markets(market_type)
self.after_process_markets(market_type)
Loading
Loading