Skip to content

Commit

Permalink
Enhance backup logic
Browse files Browse the repository at this point in the history
- Added new module 'postgres' to support postgresql.
- Enhance `backup::list`/`backup::create`/`backup::restore`/`backup::recovery`
  function to support postgresql.
- Added `backup::purge_old_backups` for apply immediately after
  `num_backups_to_keep` is changed.

Closes: petabi#71, petabi#76, petabi#80
  • Loading branch information
Hanbeom kim committed Jun 23, 2023
1 parent 3d1f262 commit bdf5698
Show file tree
Hide file tree
Showing 6 changed files with 615 additions and 78 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- Enhanced to also support postgresql for backup `creat`/`list`/`restore`
/`recover`.

### Added
- Introduced a new module `postgres`. This module was added to allow the
existing backup `creat`/`list`/`restore`/`recover` function to also support
postgresql DBs.
- Added `backup::purge_old_backups` for apply immediately after
`num_backups_to_keep` is changed.

## [0.15.0] - 2023-06-14

### Added
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ ipnet = { version = "2", features = ["serde"] }
num-derive = "0.3"
num-traits = "0.2"
postgres-protocol = "0.6"
regex = "1"
roxy = { git = "https://github.com/aicers/roxy.git", rev="953b1a5" }
rand = "0.8"
ring = { version = "0.16", features = ["std"] }
rocksdb = "0.21"
Expand All @@ -39,6 +41,7 @@ serde_json = "1"
structured = "0.13"
strum = "0.24"
strum_macros = "0.24"
tar = "0.4"
thiserror = "1"
tokio = { version = "1", features = ["macros"] }
tokio-postgres-rustls = "0.10"
Expand Down
218 changes: 147 additions & 71 deletions src/backup.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
//! Database backup utilities.
mod postgresql;

use crate::Store;
use anyhow::Result;
#[allow(clippy::module_name_repetitions)]
pub use self::postgresql::BackupConfig;
use self::postgresql::{create_backup_path, purge_old_postgres_backups};
use crate::{
backup::postgresql::{postgres_backup, postgres_backup_list, postgres_restore},
Store,
};
use anyhow::{anyhow, Result};
use chrono::{DateTime, TimeZone, Utc};
use rocksdb::backup::BackupEngineInfo;
use std::{sync::Arc, time::Duration};
use std::{collections::HashSet, path::Path, sync::Arc, time::Duration};
use tokio::sync::{Notify, RwLock};
use tracing::{info, warn};

const BACKUP_TIMESTAMP_CONV_UNIT: i64 = 1_000_000_000;

#[derive(Debug, Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct BackupInfo {
pub id: u32,
Expand All @@ -19,7 +29,7 @@ impl From<BackupEngineInfo> for BackupInfo {
fn from(backup: BackupEngineInfo) -> Self {
Self {
id: backup.backup_id,
timestamp: Utc.timestamp_nanos(backup.timestamp),
timestamp: Utc.timestamp_nanos(backup.timestamp * BACKUP_TIMESTAMP_CONV_UNIT),
size: backup.size,
}
}
Expand All @@ -29,8 +39,8 @@ impl From<BackupEngineInfo> for BackupInfo {
#[allow(clippy::module_name_repetitions)]
pub async fn schedule_periodic(
store: Arc<RwLock<Store>>,
backup_cfg: Arc<RwLock<BackupConfig>>,
schedule: (Duration, Duration),
backups_to_keep: u32,
stop: Arc<Notify>,
) {
use tokio::time::{sleep, Instant};
Expand All @@ -43,11 +53,13 @@ pub async fn schedule_periodic(
tokio::select! {
() = &mut sleep => {
sleep.as_mut().reset(Instant::now() + duration);
let _res = create(&store, false, backups_to_keep);
let backup_cfg= backup_cfg.read().await.clone();
let _res = create(&store, false, &backup_cfg);
}
_ = stop.notified() => {
info!("creating a database backup before shutdown");
let _res = create(&store, false, backups_to_keep);
let backup_cfg= backup_cfg.read().await.clone();
let _res = create(&store, false, &backup_cfg);
stop.notify_one();
return;
}
Expand All @@ -61,102 +73,164 @@ pub async fn schedule_periodic(
/// # Errors
///
/// Returns an error if backup fails.
pub async fn create(store: &Arc<RwLock<Store>>, flush: bool, backups_to_keep: u32) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
info!("backing up database...");
let res = {
let mut store = store.write().await;
store.backup(flush, backups_to_keep)
};
match res {
Ok(_) => {
info!("backing up database completed");
Ok(())
}
Err(e) => {
warn!("database backup failed: {:?}", e);
Err(e)
pub async fn create(
store: &Arc<RwLock<Store>>,
flush: bool,
backup_cfg: &BackupConfig,
) -> Result<()> {
{
let mut backup_store = store.write().await;
if let Err(e) = backup_store.backup(flush, backup_cfg.num_of_backups) {
warn!("failed to create key-value database backup");
return Err(e);
}
}

let backup_id_list = backup_id_list(store).await?;
if let Err(e) = postgres_backup(backup_cfg, backup_id_list) {
warn!("failed to create relational database backup");
return Err(e);
}

info!("database backup created");
Ok(())
}

/// Lists the backup information of the database.
///
/// # Errors
///
/// Returns an error if backup list fails to create
pub async fn list(store: &Arc<RwLock<Store>>) -> Result<Vec<BackupInfo>> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
let res = {
let store = store.read().await;
store.get_backup_info()
};
match res {
Ok(backup_list) => {
info!("generate database backup list");
Ok(backup_list
.into_iter()
.map(std::convert::Into::into)
.collect())
}
pub async fn list(store: &Arc<RwLock<Store>>, backup_path: &Path) -> Result<Vec<BackupInfo>> {
let store = store.read().await;
let backup_list = match store.get_backup_info() {
Ok(backup) => backup,
Err(e) => {
warn!("failed to generate backup list: {:?}", e);
Err(e)
warn!("failed to generate backup list");
return Err(e);
}
}
};
let mut backup_list: Vec<BackupInfo> = backup_list
.into_iter()
.map(std::convert::Into::into)
.collect();
postgres_backup_list(backup_path, &mut backup_list)?;
info!("generate database backup list");
Ok(backup_list)
}

/// Restores the database from a backup with the specified ID.
/// Restores the database from a backup. If a backup file ID is not provided,
/// restore based on the latest backup.
///
/// # Errors
///
/// Returns an error if the restore operation fails.
pub async fn restore(store: &Arc<RwLock<Store>>, backup_id: Option<u32>) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
info!("restoring database from {:?}", backup_id);
let res = {
let mut store = store.write().await;
match &backup_id {
Some(id) => store.restore_from_backup(*id),
None => store.restore_from_latest_backup(),
pub async fn restore(
store: &Arc<RwLock<Store>>,
cfg: &BackupConfig,
backup_id: Option<u32>,
) -> Result<()> {
let backup_id_list = backup_id_list(store).await?;
let backup_id = if let Some(id) = backup_id {
if !backup_id_list.contains(&id) {
return Err(anyhow!("backup {id} is not exist."));
}
info!("start database restore {}", id);
id
} else {
let Some(id) = backup_id_list.last() else {
return Err(anyhow!("backup is not exist."));
};
info!("start database restore from latest backup");
*id
};

match res {
Ok(_) => {
info!("database restored from backup {:?}", backup_id);
Ok(())
}
Err(e) => {
warn!(
"failed to restore database from backup {:?}: {:?}",
backup_id, e
);
Err(e)
let mut store = store.write().await;
if let Err(e) = store.restore_from_backup(backup_id) {
warn!("failed to restore key-value database from {}", backup_id);
return Err(anyhow!("{e}"));
}
if let Err(e) = postgres_restore(cfg, backup_id) {
warn!("failed to restore relational database from {}", backup_id);
return Err(anyhow!("{e}"));
}
info!("database restored from {}", backup_id);
Ok(())
}

/// Returns the number of backups in the backup list.
///
/// # Errors
///
/// Returns an error if getting the number of backup lists fails.
pub async fn count(store: &Arc<RwLock<Store>>) -> Result<usize> {
let store = store.write().await;
Ok(store.get_backup_info()?.len())
}

/// Remove older backups based on the number of backups retained.
///
/// # Errors
///
/// Returns an error if removing old backup fails.
pub async fn purge_old_backups(
store: &Arc<RwLock<Store>>,
backup_cfg: &BackupConfig,
) -> Result<()> {
{
let mut backup_store = store.write().await;
if let Err(e) = backup_store.purge_old_backups(backup_cfg.num_of_backups) {
warn!("failed to purge key-value database");
return Err(e);
}
}

let backup_id_list = backup_id_list(store).await?;
let data_backup_path = create_backup_path(backup_cfg)?;
if let Err(e) = purge_old_postgres_backups(&data_backup_path, backup_id_list) {
warn!("failed to purge relational database");
return Err(e);
}
Ok(())
}

/// Restores the database from a backup with the specified ID.
///
/// # Errors
///
/// Returns an error if the restore operation fails.
pub async fn recover(store: &Arc<RwLock<Store>>) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
pub async fn recover(store: &Arc<RwLock<Store>>, cfg: &BackupConfig) -> Result<()> {
info!("recovering database from latest valid backup");
let res = {
let mut store = store.write().await;
store.recover()
};

match res {
Ok(_) => {
info!("database recovered from backup");
Ok(())
let mut store = store.write().await;
let recovery_id = match store.recover() {
Ok(id) => id,
Err(e) => {
warn!("failed to recover key-value database from backup: {e:?}");
return Err(e);
}
};

if let Err(e) = postgres_restore(cfg, recovery_id) {
warn!("failed to recover relational database from backup {e:?}",);
return Err(e);
}
info!("database's recovery success");
Ok(())
}

/// Lists the backup id.
///
/// # Errors
///
/// Returns an error if backup id list fails to create
#[allow(clippy::module_name_repetitions)]
pub async fn backup_id_list(store: &Arc<RwLock<Store>>) -> Result<Vec<u32>> {
let store = store.read().await;
match store.get_backup_info() {
Ok(backup) => Ok(backup.into_iter().map(|b| b.backup_id).collect()),
Err(e) => {
warn!("failed to recover database from backup: {e:?}");
warn!("failed to generate backup id list");
Err(e)
}
}
Expand Down Expand Up @@ -250,7 +324,9 @@ mod tests {
}

// get backup list
let backup_list = list(&store).await.unwrap();
let backup_list = list(&store, &backup_dir.path().to_path_buf())
.await
.unwrap();
assert_eq!(backup_list.len(), 3);
assert_eq!(backup_list.get(0).unwrap().id, 1);
assert_eq!(backup_list.get(1).unwrap().id, 2);
Expand Down
Loading

0 comments on commit bdf5698

Please sign in to comment.