Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split DeployableTraderAgent into DeployablePredictionAgent #525

Merged
merged 12 commits into from
Oct 24, 2024
250 changes: 162 additions & 88 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
AgentMarket,
FilterBy,
ProcessedMarket,
ProcessedTradedMarket,
SortBy,
)
from prediction_market_agent_tooling.markets.data_models import (
Expand Down Expand Up @@ -165,9 +166,11 @@ def session_id(self) -> str:
return f"{self.__class__.__name__} - {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}"

def __init_subclass__(cls, **kwargs: t.Any) -> None:
if "DeployableAgent" not in str(
cls.__init__
) and "DeployableTraderAgent" not in str(cls.__init__):
if (
"DeployableAgent" not in str(cls.__init__)
and "DeployableTraderAgent" not in str(cls.__init__)
and "DeployablePredictionAgent" not in str(cls.__init__)
):
Comment on lines +169 to +173
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a more maintainable approach for class name checks.

The string-based class name checks could become fragile if class names change. Consider using cls.__bases__ to check inheritance directly.

-        if (
-            "DeployableAgent" not in str(cls.__init__)
-            and "DeployableTraderAgent" not in str(cls.__init__)
-            and "DeployablePredictionAgent" not in str(cls.__init__)
-        ):
+        base_classes = [base.__name__ for base in cls.__mro__]
+        allowed_classes = {"DeployableAgent", "DeployableTraderAgent", "DeployablePredictionAgent"}
+        if not any(base in allowed_classes for base in base_classes):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (
"DeployableAgent" not in str(cls.__init__)
and "DeployableTraderAgent" not in str(cls.__init__)
and "DeployablePredictionAgent" not in str(cls.__init__)
):
base_classes = [base.__name__ for base in cls.__mro__]
allowed_classes = {"DeployableAgent", "DeployableTraderAgent", "DeployablePredictionAgent"}
if not any(base in allowed_classes for base in base_classes):

raise TypeError(
"Cannot override __init__ method of deployable agent class, please override the `load` method to set up the agent."
)
Expand Down Expand Up @@ -274,28 +277,21 @@ def get_gcloud_fname(self, market_type: MarketType) -> str:
return f"{self.__class__.__name__.lower()}-{market_type}-{utcnow().strftime('%Y-%m-%d--%H-%M-%S')}"


class DeployableTraderAgent(DeployableAgent):
class DeployablePredictionAgent(DeployableAgent):
bet_on_n_markets_per_run: int = 1
min_balance_to_keep_in_native_currency: xDai | None = xdai_type(0.1)
allow_invalid_questions: bool = False
same_market_bet_interval: timedelta = timedelta(hours=24)
# Only Metaculus allows to post predictions without trading (buying/selling of outcome tokens).
supported_markets: t.Sequence[MarketType] = [MarketType.METACULUS]

def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
place_bet: bool = True,
store_prediction: bool = True,
) -> None:
super().__init__(enable_langfuse=enable_langfuse)
self.place_bet = place_bet

def get_betting_strategy(self, market: AgentMarket) -> BettingStrategy:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything around trading was moved to DeployableTraderAgent. DeployablePredictionAgent only does the prediction.

user_id = market.get_user_id(api_keys=APIKeys())

total_amount = market.get_tiny_bet_amount().amount
if existing_position := market.get_position(user_id=user_id):
total_amount += existing_position.total_amount.amount

return MaxAccuracyBettingStrategy(bet_amount=total_amount)
self.store_prediction = store_prediction

def initialize_langfuse(self) -> None:
super().initialize_langfuse()
Expand All @@ -304,7 +300,6 @@ def initialize_langfuse(self) -> None:
self.verify_market = observe()(self.verify_market) # type: ignore[method-assign]
self.answer_binary_market = observe()(self.answer_binary_market) # type: ignore[method-assign]
self.process_market = observe()(self.process_market) # type: ignore[method-assign]
self.build_trades = observe()(self.build_trades) # type: ignore[method-assign]

def update_langfuse_trace_by_market(
self, market_type: MarketType, market: AgentMarket
Expand Down Expand Up @@ -342,19 +337,6 @@ def check_min_required_balance_to_operate(self, market_type: MarketType) -> None
f"{api_keys=} doesn't have enough operational balance."
)

def check_min_required_balance_to_trade(self, market: AgentMarket) -> None:
api_keys = APIKeys()

# Get the strategy to know how much it will bet.
strategy = self.get_betting_strategy(market)
# Have a little bandwidth after the bet.
min_required_balance_to_trade = strategy.maximum_possible_bet_amount * 1.01

if market.get_trade_balance(api_keys) < min_required_balance_to_trade:
raise OutOfFundsError(
f"Minimum required balance {min_required_balance_to_trade} for agent is not met."
)

def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)

Expand Down Expand Up @@ -399,26 +381,11 @@ def get_markets(
)
return available_markets

def build_trades(
self,
market: AgentMarket,
answer: ProbabilisticAnswer,
existing_position: Position | None,
) -> list[Trade]:
strategy = self.get_betting_strategy(market=market)
trades = strategy.calculate_trades(existing_position, answer, market)
BettingStrategy.assert_trades_currency_match_markets(market, trades)
return trades

def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.update_langfuse_trace_by_market(market_type, market)

api_keys = APIKeys()

self.check_min_required_balance_to_trade(market)

if market_type.is_blockchain_market:
# Exchange wxdai back to xdai if the balance is getting low, so we can keep paying for fees.
if self.min_balance_to_keep_in_native_currency is not None:
Expand All @@ -434,67 +401,39 @@ def process_market(
market: AgentMarket,
verify_market: bool = True,
) -> ProcessedMarket | None:
self.update_langfuse_trace_by_market(market_type, market)
logger.info(f"Processing market {market.question=} from {market.url=}.")

self.before_process_market(market_type, market)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to process_markets function, so that it will be called correctly in the DeployableTraderAgent subclass.


answer: ProbabilisticAnswer | None
if verify_market and not self.verify_market(market_type, market):
logger.info(f"Market '{market.question}' doesn't meet the criteria.")
self.update_langfuse_trace_by_processed_market(market_type, None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to after_process_market hook.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking back, it needs to be in process_market method otherwise it won't be assigned to the correct trace in Langfuse

return None

answer = self.answer_binary_market(market)

if answer is None:
logger.info(f"No answer for market '{market.question}'.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None
answer = None
else:
answer = self.answer_binary_market(market)

existing_position = market.get_position(user_id=APIKeys().bet_from_address)
trades = self.build_trades(
market=market,
answer=answer,
existing_position=existing_position,
processed_market = (
ProcessedMarket(answer=answer) if answer is not None else None
Comment on lines +414 to +415
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider early return for None answer.

Creating a ProcessedMarket instance only to return None could be simplified.

-        processed_market = (
-            ProcessedMarket(answer=answer) if answer is not None else None
-        )
+        if answer is None:
+            return None
+        return ProcessedMarket(answer=answer)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
processed_market = (
ProcessedMarket(answer=answer) if answer is not None else None
if answer is None:
return None
return ProcessedMarket(answer=answer)

)

placed_trades = []
for trade in trades:
logger.info(f"Executing trade {trade} on market {market.id} ({market.url})")

if self.place_bet:
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
case TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
placed_trades.append(PlacedTrade.from_trade(trade, id))
else:
logger.info(f"Trade execution skipped because {self.place_bet=}.")

processed_market = ProcessedMarket(answer=answer, trades=placed_trades)
self.update_langfuse_trace_by_processed_market(market_type, processed_market)

self.after_process_market(
market_type, market, processed_market=processed_market
logger.info(
f"Processed market {market.question=} from {market.url=} with {answer=}."
Comment on lines +404 to +420
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for market processing.

The market processing logic should handle potential exceptions from answer_binary_market to ensure graceful failure.

         self.update_langfuse_trace_by_market(market_type, market)
         logger.info(f"Processing market {market.question=} from {market.url=}.")
 
         answer: ProbabilisticAnswer | None
         if verify_market and not self.verify_market(market_type, market):
             logger.info(f"Market '{market.question}' doesn't meet the criteria.")
             answer = None
         else:
+            try:
                 answer = self.answer_binary_market(market)
+            except Exception as e:
+                logger.error(f"Failed to answer market: {e}")
+                answer = None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.update_langfuse_trace_by_market(market_type, market)
logger.info(f"Processing market {market.question=} from {market.url=}.")
self.before_process_market(market_type, market)
answer: ProbabilisticAnswer | None
if verify_market and not self.verify_market(market_type, market):
logger.info(f"Market '{market.question}' doesn't meet the criteria.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None
answer = self.answer_binary_market(market)
if answer is None:
logger.info(f"No answer for market '{market.question}'.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None
answer = None
else:
answer = self.answer_binary_market(market)
existing_position = market.get_position(user_id=APIKeys().bet_from_address)
trades = self.build_trades(
market=market,
answer=answer,
existing_position=existing_position,
processed_market = (
ProcessedMarket(answer=answer) if answer is not None else None
)
placed_trades = []
for trade in trades:
logger.info(f"Executing trade {trade} on market {market.id} ({market.url})")
if self.place_bet:
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
case TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
placed_trades.append(PlacedTrade.from_trade(trade, id))
else:
logger.info(f"Trade execution skipped because {self.place_bet=}.")
processed_market = ProcessedMarket(answer=answer, trades=placed_trades)
self.update_langfuse_trace_by_processed_market(market_type, processed_market)
self.after_process_market(
market_type, market, processed_market=processed_market
logger.info(
f"Processed market {market.question=} from {market.url=} with {answer=}."
self.update_langfuse_trace_by_market(market_type, market)
logger.info(f"Processing market {market.question=} from {market.url=}.")
answer: ProbabilisticAnswer | None
if verify_market and not self.verify_market(market_type, market):
logger.info(f"Market '{market.question}' doesn't meet the criteria.")
answer = None
else:
try:
answer = self.answer_binary_market(market)
except Exception as e:
logger.error(f"Failed to answer market: {e}")
answer = None
processed_market = (
ProcessedMarket(answer=answer) if answer is not None else None
)
self.update_langfuse_trace_by_processed_market(market_type, processed_market)
logger.info(
f"Processed market {market.question=} from {market.url=} with {answer=}."

)

logger.info(f"Processed market {market.question=} from {market.url=}.")
return processed_market

def after_process_market(
self,
market_type: MarketType,
market: AgentMarket,
processed_market: ProcessedMarket,
processed_market: ProcessedMarket | None,
) -> None:
keys = APIKeys()
market.store_prediction(processed_market=processed_market, keys=keys)
if self.store_prediction:
market.store_prediction(processed_market=processed_market, keys=keys)
else:
logger.info(
f"Prediction {processed_market} not stored because {self.store_prediction=}."
)
Comment on lines +428 to +436
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for prediction storage.

The prediction storage operation could fail due to various reasons (network issues, API failures, etc.).

         if self.store_prediction:
+            try:
                 market.store_prediction(processed_market=processed_market, keys=keys)
+            except Exception as e:
+                logger.error(f"Failed to store prediction: {e}")
+                raise
         else:
             logger.info(
                 f"Prediction {processed_market} not stored because {self.store_prediction=}."
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
processed_market: ProcessedMarket | None,
) -> None:
keys = APIKeys()
market.store_prediction(processed_market=processed_market, keys=keys)
if self.store_prediction:
market.store_prediction(processed_market=processed_market, keys=keys)
else:
logger.info(
f"Prediction {processed_market} not stored because {self.store_prediction=}."
)
processed_market: ProcessedMarket | None,
) -> None:
keys = APIKeys()
if self.store_prediction:
try:
market.store_prediction(processed_market=processed_market, keys=keys)
except Exception as e:
logger.error(f"Failed to store prediction: {e}")
raise
else:
logger.info(
f"Prediction {processed_market} not stored because {self.store_prediction=}."
)


def before_process_markets(self, market_type: MarketType) -> None:
"""
Expand All @@ -516,7 +455,9 @@ def process_markets(self, market_type: MarketType) -> None:
processed = 0

for market in available_markets:
self.before_process_market(market_type, market)
processed_market = self.process_market(market_type, market)
self.after_process_market(market_type, market, processed_market)

if processed_market is not None:
processed += 1
Expand All @@ -530,6 +471,139 @@ def after_process_markets(self, market_type: MarketType) -> None:
"Executes actions that occur after bets are placed."

def run(self, market_type: MarketType) -> None:
if market_type not in self.supported_markets:
raise ValueError(
f"Only {self.supported_markets} are supported by this agent."
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already had this in mind here #519

But now it's required here as well, because only METACULUS can be used with DeployablePredictionAgent, and rest of the markets is for DeployableTraderAgent.

)
self.before_process_markets(market_type)
self.process_markets(market_type)
self.after_process_markets(market_type)


class DeployableTraderAgent(DeployablePredictionAgent):
# These markets require place of bet, not just predictions.
supported_markets: t.Sequence[MarketType] = [
MarketType.OMEN,
MarketType.MANIFOLD,
MarketType.POLYMARKET,
]

def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
store_prediction: bool = True,
store_trades: bool = True,
place_trades: bool = True,
) -> None:
super().__init__(
enable_langfuse=enable_langfuse, store_prediction=store_prediction
)
self.store_trades = store_trades
self.place_trades = place_trades

def initialize_langfuse(self) -> None:
super().initialize_langfuse()
# Auto-observe all the methods where it makes sense, so that subclassses don't need to do it manually.
self.get_betting_strategy = observe()(self.get_betting_strategy) # type: ignore[method-assign]
self.build_trades = observe()(self.build_trades) # type: ignore[method-assign]

Comment on lines +504 to +509
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider alternative approaches to method observation for better maintainability.

Reassigning methods in initialize_langfuse by wrapping them with observe() may lead to maintainability issues, especially with inheritance. Consider using decorators at the method definitions or a metaclass to automatically apply observe() to the desired methods.

def check_min_required_balance_to_trade(self, market: AgentMarket) -> None:
api_keys = APIKeys()

# Get the strategy to know how much it will bet.
strategy = self.get_betting_strategy(market)
# Have a little bandwidth after the bet.
min_required_balance_to_trade = strategy.maximum_possible_bet_amount * 1.01

if market.get_trade_balance(api_keys) < min_required_balance_to_trade:
raise OutOfFundsError(
f"Minimum required balance {min_required_balance_to_trade} for agent is not met."
)

Comment on lines +510 to +522
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding retry mechanism for balance checks.

The balance check is crucial for trading operations. Consider implementing a retry mechanism with exponential backoff for balance checks to handle temporary network issues or blockchain delays.

+    @retry(
+        stop_max_attempt_number=3,
+        wait_exponential_multiplier=1000,
+        wait_exponential_max=10000
+    )
     def check_min_required_balance_to_trade(self, market: AgentMarket) -> None:
         api_keys = APIKeys()

Committable suggestion was skipped due to low confidence.

def get_betting_strategy(self, market: AgentMarket) -> BettingStrategy:
user_id = market.get_user_id(api_keys=APIKeys())

total_amount = market.get_tiny_bet_amount().amount
if existing_position := market.get_position(user_id=user_id):
total_amount += existing_position.total_amount.amount

return MaxAccuracyBettingStrategy(bet_amount=total_amount)

def build_trades(
self,
market: AgentMarket,
answer: ProbabilisticAnswer,
existing_position: Position | None,
) -> list[Trade]:
strategy = self.get_betting_strategy(market=market)
trades = strategy.calculate_trades(existing_position, answer, market)
BettingStrategy.assert_trades_currency_match_markets(market, trades)
return trades

def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
super().before_process_market(market_type, market)
self.check_min_required_balance_to_trade(market)

def process_market(
self,
market_type: MarketType,
market: AgentMarket,
verify_market: bool = True,
) -> ProcessedTradedMarket | None:
processed_market = super().process_market(market_type, market, verify_market)
if processed_market is None:
return None

api_keys = APIKeys()
existing_position = market.get_position(
user_id=market.get_user_id(api_keys=api_keys)
)
trades = self.build_trades(
market=market,
answer=processed_market.answer,
existing_position=existing_position,
)

placed_trades = []
for trade in trades:
logger.info(f"Executing trade {trade} on market {market.id} ({market.url})")

if self.place_trades:
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
case TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
Comment on lines +574 to +584
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure compatibility with Python 3.10 for match statements.

The use of match statements requires Python 3.10 or later. Verify that the deployment environment supports Python 3.10+. If backward compatibility is needed, consider replacing match statements with equivalent if-elif-else constructs.

Apply this diff to replace the match statement for compatibility:

                    # Replace `match` statement with `if-elif-else`
-                    match trade.trade_type:
-                        case TradeType.BUY:
+                    if trade.trade_type == TradeType.BUY:
                         id = market.buy_tokens(
                             outcome=trade.outcome, amount=trade.amount
                         )
-                        case TradeType.SELL:
+                    elif trade.trade_type == TradeType.SELL:
                         id = market.sell_tokens(
                             outcome=trade.outcome, amount=trade.amount
                         )
-                        case _:
+                    else:
                         raise ValueError(f"Unexpected trade type {trade.trade_type}.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
case TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
if trade.trade_type == TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
elif trade.trade_type == TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
else:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")

placed_trades.append(PlacedTrade.from_trade(trade, id))
else:
logger.info(f"Trade execution skipped because {self.place_trades=}.")
Comment on lines +573 to +587
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Extract trade execution logic to a separate method.

The trade execution logic is complex and could be extracted to improve readability and maintainability.

+    def _execute_trade(self, trade: Trade, market: AgentMarket) -> PlacedTrade | None:
+        if not self.place_trades:
+            logger.info(f"Trade execution skipped because {self.place_trades=}.")
+            return None
+
+        match trade.trade_type:
+            case TradeType.BUY:
+                id = market.buy_tokens(outcome=trade.outcome, amount=trade.amount)
+            case TradeType.SELL:
+                id = market.sell_tokens(outcome=trade.outcome, amount=trade.amount)
+            case _:
+                raise ValueError(f"Unexpected trade type {trade.trade_type}.")
+        
+        return PlacedTrade.from_trade(trade, id)

     def process_market(self, market_type: MarketType, market: AgentMarket, verify_market: bool = True) -> ProcessedTradedMarket | None:
         # ...
         placed_trades = []
         for trade in trades:
             logger.info(f"Executing trade {trade} on market {market.id} ({market.url})")
-            if self.place_trades:
-                match trade.trade_type:
-                    case TradeType.BUY:
-                        id = market.buy_tokens(outcome=trade.outcome, amount=trade.amount)
-                    case TradeType.SELL:
-                        id = market.sell_tokens(outcome=trade.outcome, amount=trade.amount)
-                    case _:
-                        raise ValueError(f"Unexpected trade type {trade.trade_type}.")
-                placed_trades.append(PlacedTrade.from_trade(trade, id))
-            else:
-                logger.info(f"Trade execution skipped because {self.place_trades=}.")
+            if placed_trade := self._execute_trade(trade, market):
+                placed_trades.append(placed_trade)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self.place_trades:
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
case TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
placed_trades.append(PlacedTrade.from_trade(trade, id))
else:
logger.info(f"Trade execution skipped because {self.place_trades=}.")
if self.place_trades:
if placed_trade := self._execute_trade(trade, market):
placed_trades.append(placed_trade)
else:
logger.info(f"Trade execution skipped because {self.place_trades=}.")
def _execute_trade(self, trade: Trade, market: AgentMarket) -> PlacedTrade | None:
if not self.place_trades:
logger.info(f"Trade execution skipped because {self.place_trades=}.")
return None
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(outcome=trade.outcome, amount=trade.amount)
case TradeType.SELL:
id = market.sell_tokens(outcome=trade.outcome, amount=trade.amount)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
return PlacedTrade.from_trade(trade, id)


traded_market = ProcessedTradedMarket(
answer=processed_market.answer, trades=placed_trades
)
logger.info(f"Traded market {market.question=} from {market.url=}.")
return traded_market

def after_process_market(
self,
market_type: MarketType,
market: AgentMarket,
processed_market: ProcessedMarket | None,
) -> None:
api_keys = APIKeys()
super().after_process_market(market_type, market, processed_market)
if isinstance(processed_market, ProcessedTradedMarket):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not nice, but idk how to do it better. after_process_market hook in this subclass will always receive ProcessedTradedMarket, but we need to accept processed_market: ProcessedMarket | ProcessedTradedMarket | None in the arguments because otherwise, mypy will it complain about subclass not being compatible with parent classs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do:
processed_market: ProcessedMarket | None,
and then do:
assert isinstance(processed_market, ProcessedTradedMarket)
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that should work. | ProcessedTradedMarket in the argument typing is just to be explicit. Removed.

if self.store_trades:
market.store_trades(processed_market, api_keys)
else:
logger.info(
f"Trades {processed_market.trades} not stored because {self.store_trades=}."
)
Comment on lines +604 to +609
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for trade storage operations.

The trade storage operation could fail due to various reasons (network issues, API failures, etc.). Consider adding error handling.

         if isinstance(processed_market, ProcessedTradedMarket):
             if self.store_trades:
-                market.store_trades(processed_market, api_keys)
+                try:
+                    market.store_trades(processed_market, api_keys)
+                except Exception as e:
+                    logger.error(f"Failed to store trades: {e}")
+                    # Consider if we should re-raise or handle differently
+                    raise
             else:
                 logger.info(
                     f"Trades {processed_market.trades} not stored because {self.store_trades=}."
                 )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self.store_trades:
market.store_trades(processed_market, api_keys)
else:
logger.info(
f"Trades {processed_market.trades} not stored because {self.store_trades=}."
)
if self.store_trades:
try:
market.store_trades(processed_market, api_keys)
except Exception as e:
logger.error(f"Failed to store trades: {e}")
# Consider if we should re-raise or handle differently
raise
else:
logger.info(
f"Trades {processed_market.trades} not stored because {self.store_trades=}."
)

13 changes: 12 additions & 1 deletion prediction_market_agent_tooling/markets/agent_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

class ProcessedMarket(BaseModel):
answer: ProbabilisticAnswer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could remove ProcessedMarket class now, as it is just a wrapper around ProbabilisticAnswer without any additional functionality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but then we have ProcessedTradedMarket which adds trades field.

For all to come nicely together, we need this ProcessedMarket, because:

DeployablePredictionAgent's process_market returns ProcessedMarket
DeployableTraderAgent's process_market returns ProcessedTradedMarket

If ProcessedTradedMarket is subclass of ProcessedMarket, then process_market methods are compatible.

If DeployablePredictionAgent's process_market starts to return only ProbabilisticAnswer, then we'd need to add trades into ProbabilisticAnswer.

But not sure if I explain this properly.. 😄 If you have some other idea how to structure it, show me please. 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I had a look, so you could alias:

ProcessedMarket = ProbabilisticAnswer

but mypy

or subclass

class ProcessedMarket(ProbabilisticAnswer):
    pass

but then constructing a ProcessedTradedMarket instance from a ProcessedMarket becomes more messy if you don't want to do **processed_market.dict().

soooo okay, i retract my objection!


class ProcessedTradedMarket(ProcessedMarket):
trades: list[PlacedTrade]


Expand Down Expand Up @@ -228,13 +231,21 @@ def verify_operational_balance(api_keys: APIKeys) -> bool:
raise NotImplementedError("Subclasses must implement this method")

def store_prediction(
self, processed_market: ProcessedMarket, keys: APIKeys
self, processed_market: ProcessedMarket | None, keys: APIKeys
) -> None:
"""
If market allows to upload predictions somewhere, implement it in this method.
"""
raise NotImplementedError("Subclasses must implement this method")

def store_trades(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now store_prediction is implemented on MetaculusAgentMarket and store_trades on the others.

self, traded_market: ProcessedTradedMarket | None, keys: APIKeys
) -> None:
"""
If market allows to upload trades somewhere, implement it in this method.
"""
raise NotImplementedError("Subclasses must implement this method")

@staticmethod
def get_bets_made_since(
better_address: ChecksumAddress, start_time: DatetimeUTC
Expand Down
Loading
Loading