Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate auctions to new database table #3067

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ pub struct Arguments {
/// Archive node URL used to index CoW AMM
#[clap(long, env)]
pub archive_node_url: Option<Url>,

/// Whether the migration from solver_competition table to new competition
/// tables should be run
#[clap(long, env, action = clap::ArgAction::Set, default_value = "false")]
pub migrate_auctions: bool,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -278,6 +283,7 @@ impl std::fmt::Display for Arguments {
run_loop_native_price_timeout,
max_winners_per_auction,
archive_node_url,
migrate_auctions,
} = self;

write!(f, "{}", shared)?;
Expand Down Expand Up @@ -357,6 +363,7 @@ impl std::fmt::Display for Arguments {
)?;
writeln!(f, "max_winners_per_auction: {:?}", max_winners_per_auction)?;
writeln!(f, "archive_node_url: {:?}", archive_node_url)?;
writeln!(f, "migrate_auctions: {:?}", migrate_auctions)?;
Ok(())
}
}
Expand Down
86 changes: 86 additions & 0 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,92 @@ impl Persistence {
ex.commit().await?;
Ok(())
}

pub async fn populate_historic_auctions(&self) -> Result<(), DatabaseError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to test this beforehand in a unit test or e2e test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could even set up a snapshot of the actual DB and and run a local DB migration with that to see how it will behave with real data.

const BATCH_SIZE: i64 = 50;

let mut ex = self.postgres.pool.begin().await?;

// find entry in `competition_auctions` with the lowest auction_id, as a
// starting point
let current_auction_id: Option<i64> =
sqlx::query_scalar::<_, Option<i64>>("SELECT MIN(id) FROM competition_auctions;")
.fetch_one(ex.deref_mut())
.await
.context("fetch lowest auction id")?;

let Some(mut current_auction_id) = current_auction_id else {
tracing::info!("competition_auctions is empty, nothing to process");
return Ok(());
};

loop {
tracing::debug!(
auction_id = current_auction_id,
"populating historic auctions from auction"
);

// fetch the next batch of auctions
let competitions: Vec<database::solver_competition::RichSolverCompetition> =
database::solver_competition::fetch_batch(&mut ex, current_auction_id, BATCH_SIZE)
.await?;

if competitions.is_empty() {
tracing::info!("no more auctions to process");
break;
}

tracing::debug!(competitions = ?competitions.len(), "competitions fetched");

for solver_competition in &competitions {
let competition: model::solver_competition::SolverCompetitionDB =
serde_json::from_value(solver_competition.json.clone())
.context("deserialize SolverCompetitionDB")?;

// populate historic auctions
let auction = database::auction::Auction {
id: solver_competition.id,
block: i64::try_from(competition.auction_start_block)
.context("block overflow")?,
deadline: solver_competition.deadline,
order_uids: competition
.auction
.orders
.iter()
.map(|order| ByteArray(order.0))
.collect(),
price_tokens: competition
.auction
.prices
.keys()
.map(|token| ByteArray(token.0))
.collect(),
price_values: competition
.auction
.prices
.values()
.map(u256_to_big_decimal)
.collect(),
surplus_capturing_jit_order_owners: solver_competition
.surplus_capturing_jit_order_owners
.clone(),
};

if let Err(err) = database::auction::save(&mut ex, auction).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there are reason we only populate one of the new tables. AFAICS we can populate the proposed_solutions and proposed_trade_executions (at least partially).
I think it might be good to migrate as much as data as possible if we are considering deleting the old table entirely some time in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a separate issue for proposed solutions migration: #3056

The migration of solver competition is a bit more complicated and I planned doing it in a separate step -> smaller PRs

tracing::warn!(?err, auction_id = ?solver_competition.id, "failed to save auction");
}
}

// commit each batch separately
ex.commit().await?;
ex = self.postgres.pool.begin().await?;

// update the current auction id
current_auction_id = competitions.last().unwrap().id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if this process get interrupted in the middle of it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point for a separate script, which could store the counter on disk.

}

Ok(())
}
}

#[derive(prometheus_metric_storage::MetricStorage)]
Expand Down
5 changes: 5 additions & 0 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ pub async fn run(args: Arguments) {
.instrument(tracing::info_span!("order_events_cleaner")),
);

if args.migrate_auctions {
let persistence_clone = persistence.clone();
tokio::spawn(async move { persistence_clone.populate_historic_auctions().await });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we spawn this I assume we are still populating both tables, therefore the migration will never end, right? 🤔

}

let market_makable_token_list_configuration = TokenListConfiguration {
url: args.trusted_tokens_url,
update_interval: args.trusted_tokens_update_interval,
Expand Down
37 changes: 37 additions & 0 deletions crates/database/src/solver_competition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,43 @@ pub async fn fetch(
Ok(solutions)
}

#[derive(Clone, Debug, sqlx::FromRow)]
pub struct RichSolverCompetition {
pub id: AuctionId,
pub json: JsonValue,
pub deadline: i64,
pub surplus_capturing_jit_order_owners: Vec<crate::Address>,
}

/// Migrate all the auctions from the solver_competitions table to the auctions
/// table. This is a one-time migration.
///
/// Entries are fetched going from higher auction_id to lower auction_id.
pub async fn fetch_batch(
ex: &mut PgConnection,
auction_id: AuctionId,
batch_size: i64,
) -> Result<Vec<RichSolverCompetition>, sqlx::Error> {
const QUERY: &str = r#"
SELECT
sc.id as id,
sc.json as json,
COALESCE(ss.block_deadline, 0) AS deadline,
COALESCE(jit.owners, ARRAY[]::bytea[]) AS surplus_capturing_jit_order_owners
FROM solver_competitions sc
LEFT JOIN settlement_scores ss ON sc.id = ss.auction_id
LEFT JOIN surplus_capturing_jit_order_owners jit ON sc.id = jit.auction_id
WHERE sc.id < $1
ORDER BY sc.id DESC
LIMIT $2;"#;

sqlx::query_as(QUERY)
.bind(auction_id)
.bind(batch_size)
.fetch_all(ex)
.await
}

#[cfg(test)]
mod tests {
use {
Expand Down
Loading