Skip to content

Commit

Permalink
feat: return stream for startup
Browse files Browse the repository at this point in the history
  • Loading branch information
ts0yu committed Feb 28, 2024
1 parent 1f956f7 commit 240871d
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/behaviors/pool_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Behavior<Message> for PoolAdmin {
self.client = Some(client.clone());
self.messager = Some(messager.clone());

Ok(None)
Ok(Some(messager.clone().stream().unwrap()))
}

async fn process(&mut self, event: Message) -> Result<ControlFlow> {
Expand Down
23 changes: 10 additions & 13 deletions src/behaviors/price_changer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{fmt, sync::Arc};

use anyhow::Result;
use arbiter_core::middleware::ArbiterMiddleware;
use crate::behaviors::deployer::DeploymentData;
use arbiter_engine::messager::{Message, Messager};
use ethers::types::H160;
use futures::stream::StreamExt;
Expand All @@ -12,7 +11,7 @@ use RustQuant::{
};

use super::*;
use crate::bindings::liquid_exchange::LiquidExchange;
use crate::{behaviors::deployer::DeploymentData, bindings::liquid_exchange::LiquidExchange};

#[derive(Serialize, Deserialize)]
pub struct PriceChanger {
Expand Down Expand Up @@ -83,19 +82,17 @@ impl Behavior<Message> for PriceChanger {
) -> Result<Option<EventStream<Message>>> {
self.client = Some(client);

loop {
while let Some(message) = messager.clone().stream().unwrap().next().await {
match serde_json::from_str::<DeploymentData>(&message.data) {
Ok(data) => {
self.liquid_exchange = Some(data.liquid_exchange);
break;
},
Err(_) => continue,
};
}
while let Some(message) = messager.clone().stream().unwrap().next().await {
match serde_json::from_str::<DeploymentData>(&message.data) {
Ok(data) => {
self.liquid_exchange = Some(data.liquid_exchange);
break;
}
Err(_) => continue,
};
}

Ok(None)
Ok(Some(messager.clone().stream().unwrap()))
}

async fn process(&mut self, event: Message) -> Result<ControlFlow> {
Expand Down
2 changes: 1 addition & 1 deletion src/behaviors/token_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Behavior<Message> for TokenAdmin {

let _ = messager.send(To::All, &message_content).await;

Ok(None)
Ok(Some(messager.clone().stream().unwrap()))
}

async fn process(&mut self, event: Message) -> Result<ControlFlow> {
Expand Down

0 comments on commit 240871d

Please sign in to comment.