Skip to content

Commit

Permalink
Return effective trim point LSN on successful trim request
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Jan 6, 2025
1 parent 437b2e2 commit b86ac06
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 78 deletions.
7 changes: 6 additions & 1 deletion crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ service ClusterCtrlSvc {

rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);

rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty);
rpc TrimLog(TrimLogRequest) returns (TrimLogResponse);

rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest)
returns (CreatePartitionSnapshotResponse);
Expand Down Expand Up @@ -94,6 +94,11 @@ message TrimLogRequest {
uint64 trim_point = 2;
}

message TrimLogResponse {
uint32 log_id = 1;
optional uint64 trim_point = 2;
}

message CreatePartitionSnapshotRequest { uint32 partition_id = 1; }

message CreatePartitionSnapshotResponse { string snapshot_id = 1; }
Expand Down
28 changes: 18 additions & 10 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use restate_types::protobuf::cluster::ClusterConfiguration;
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError};
use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError};
use restate_core::{Metadata, MetadataWriter};
use restate_metadata_store::MetadataStoreClient;
use restate_types::identifiers::PartitionId;
Expand All @@ -33,7 +33,7 @@ use crate::cluster_controller::protobuf::{
CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, FindTailRequest,
FindTailResponse, ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse,
SealAndExtendChainRequest, SealAndExtendChainResponse, SealedSegment, TailState,
TrimLogRequest,
TrimLogRequest, TrimLogResponse,
};

use super::protobuf::{
Expand Down Expand Up @@ -167,20 +167,28 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
}

/// Internal operations API to trigger the log truncation
async fn trim_log(&self, request: Request<TrimLogRequest>) -> Result<Response<()>, Status> {
async fn trim_log(
&self,
request: Request<TrimLogRequest>,
) -> Result<Response<TrimLogResponse>, Status> {
let request = request.into_inner();
let log_id = LogId::from(request.log_id);
let trim_point = Lsn::from(request.trim_point);
if let Err(err) = self
match self
.controller_handle
.trim_log(log_id, trim_point)
.await
.map_err(|_| Status::aborted("Node is shutting down"))?
{
info!("Failed trimming the log: {err}");
return Err(Status::internal(err.to_string()));
Err(err) => {
info!("Failed trimming the log: {err}");
Err(Status::internal(err.to_string()))
}
Ok(trim_point) => Ok(Response::new(TrimLogResponse {
log_id: request.log_id,
trim_point: trim_point.map(Lsn::as_u64),
})),
}
Ok(Response::new(()))
}

/// Handles ad-hoc snapshot requests, as sent by `restatectl snapshots create`. This is
Expand Down Expand Up @@ -271,13 +279,13 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
.writeable_loglet(log_id)
.await
.map_err(|err| match err {
BiforstError::UnknownLogId(_) => Status::invalid_argument("Unknown log-id"),
BifrostError::UnknownLogId(_) => Status::invalid_argument("Unknown log-id"),
err => Status::internal(err.to_string()),
})?;

let tail_state = tokio::time::timeout(Duration::from_secs(2), writable_loglet.find_tail())
.await
.map_err(|_elapsed| Status::deadline_exceeded("Timedout finding tail"))?
.map_err(|_elapsed| Status::deadline_exceeded("Timeout finding tail"))?
.map_err(|err| Status::internal(err.to_string()))?;

let response = FindTailResponse {
Expand All @@ -297,7 +305,7 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {

async fn get_cluster_configuration(
&self,
_request: tonic::Request<GetClusterConfigurationRequest>,
_request: Request<GetClusterConfigurationRequest>,
) -> Result<Response<GetClusterConfigurationResponse>, Status> {
let logs = Metadata::with_current(|m| m.logs_ref());
let partition_table = Metadata::with_current(|m| m.partition_table_ref());
Expand Down
6 changes: 3 additions & 3 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ enum ClusterControllerCommand {
TrimLog {
log_id: LogId,
trim_point: Lsn,
response_tx: oneshot::Sender<anyhow::Result<()>>,
response_tx: oneshot::Sender<anyhow::Result<Option<Lsn>>>,
},
CreateSnapshot {
partition_id: PartitionId,
Expand Down Expand Up @@ -213,7 +213,7 @@ impl ClusterControllerHandle {
&self,
log_id: LogId,
trim_point: Lsn,
) -> Result<Result<(), anyhow::Error>, ShutdownError> {
) -> Result<Result<Option<Lsn>, anyhow::Error>, ShutdownError> {
let (response_tx, response_rx) = oneshot::channel();

let _ = self
Expand Down Expand Up @@ -587,7 +587,7 @@ impl<T: TransportConnect> Service<T> {
info!(
?log_id,
trim_point_inclusive = ?trim_point,
"Manual trim log command received");
"Trim log command received");
let result = bifrost_admin.trim(log_id, trim_point).await;
let _ = response_tx.send(result.map_err(Into::into));
}
Expand Down
7 changes: 6 additions & 1 deletion crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,12 @@ where
debug!(
"Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'"
);
bifrost_admin.trim(log_id, min_persisted_lsn).await?
let trim_point = bifrost_admin.trim(log_id, min_persisted_lsn).await?;
if let Some(trim_point) = trim_point {
debug!("Trimmed log '{log_id}' to '{trim_point}'",);
} else {
info!("Attempted to trim log '{log_id}' to '{min_persisted_lsn}' but got none response");
}
}
} else {
warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log.");
Expand Down
33 changes: 26 additions & 7 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl Bifrost {
}

/// Read the next record from the LSN provided. The `from` indicates the LSN where we will
/// start reading from. This means that the record returned will have a LSN that is equal
/// start reading from. This means that the record returned will have an LSN that is equal
/// or greater than `from`. If no records are committed yet at this LSN, this read operation
/// will immediately return `None`.
///
Expand Down Expand Up @@ -405,24 +405,33 @@ impl BifrostInner {
Ok(trim_point.unwrap_or(Lsn::INVALID))
}

pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> {
/// Trim the log to the specified LSN trim point (inclusive). Returns the new trim point LSN if
/// the log was actually trimmed by this call, or `None` otherwise.
pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<Option<Lsn>, Error> {
let log_metadata = Metadata::with_current(|m| m.logs_ref());

let log_chain = log_metadata
.chain(&log_id)
.ok_or(Error::UnknownLogId(log_id))?;

let mut max_trim_point = Lsn::INVALID;
for segment in log_chain.iter() {
let loglet = self.get_loglet(log_id, segment).await?;

if loglet.base_lsn > trim_point {
break;
}

loglet.trim(trim_point).await?;
if let Some(effective_trim_point) = loglet.trim(trim_point).await? {
max_trim_point = Lsn::max(max_trim_point, effective_trim_point);
}
}
// todo: Update logs configuration to remove sealed and empty loglets
Ok(())
Ok(if max_trim_point == Lsn::INVALID {
None
} else {
Some(max_trim_point)
})
}

#[inline]
Expand Down Expand Up @@ -681,7 +690,14 @@ mod tests {
appender.append("").await?;
}

bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?;
assert_eq!(bifrost_admin.trim(LOG_ID, Lsn::INVALID).await?, None); // no-op
assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

assert_eq!(
bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?,
Some(Lsn::from(5))
);
assert_eq!(bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?, None); // no-op

let tail = bifrost.find_tail(LOG_ID).await?;
assert_eq!(tail.offset(), Lsn::from(11));
Expand All @@ -703,7 +719,10 @@ mod tests {
}

// trimming beyond the release point will fall back to the release point
bifrost_admin.trim(LOG_ID, Lsn::MAX).await?;
assert_eq!(
bifrost_admin.trim(LOG_ID, Lsn::MAX).await?,
Some(Lsn::from(10))
);

assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset());
let new_trim_point = bifrost.get_trim_point(LOG_ID).await?;
Expand Down Expand Up @@ -957,7 +976,7 @@ mod tests {
let lsn = appender.append_batch(payloads).await?;
println!("Appended batch {lsn}");
}
append_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
append_counter.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(1)).await;
}
println!("Appender terminated");
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<'a> BifrostAdmin<'a> {
/// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to
/// trim all records of the log.
#[instrument(level = "debug", skip(self), err)]
pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<()> {
pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<Option<Lsn>> {
self.bifrost.inner.fail_if_shutting_down()?;
self.bifrost.inner.trim(log_id, trim_point).await
}
Expand Down
10 changes: 6 additions & 4 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,16 @@ pub trait Loglet: Send + Sync + std::fmt::Debug {
async fn get_trim_point(&self) -> Result<Option<LogletOffset>, OperationError>;

/// Trim the loglet prefix up to and including the `trim_point`.
/// If trim_point equal or higher than the loglet tail, the loglet trims its data until the tail.
/// If trim_point equal or higher than the loglet tail, the loglet trims its data up to the tail.
///
/// It's acceptable to pass `trim_point` beyond the tail of the loglet (Offset::MAX is legal).
/// The behaviour in this case is equivalent to trim(find_tail() - 1).
/// It's acceptable to pass `trim_point` beyond the tail of the loglet (i.e. `Offset::MAX` is legal).
/// The behaviour in this case is equivalent to `trim(find_tail() - 1)`.
///
/// Passing `Offset::INVALID` is a no-op. (success)
/// Passing `Offset::OLDEST` trims the first record in the loglet (if exists).
async fn trim(&self, trim_point: LogletOffset) -> Result<(), OperationError>;
///
/// Returns the new trim point offset if a trim was performed by this call, or `None` otherwise.
async fn trim(&self, trim_point: LogletOffset) -> Result<Option<LogletOffset>, OperationError>;

/// Seal the loglet. This operation is idempotent.
///
Expand Down
13 changes: 9 additions & 4 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,21 @@ impl LogletWrapper {
Ok(offset.map(|o| self.base_lsn.offset_by(o)))
}

// trim_point is inclusive.
pub async fn trim(&self, trim_point: Lsn) -> Result<(), OperationError> {
/// `trim_point`: inclusive LSN up to which to trim the loglet; if the LSN is beyond the
/// loglet's end, it will be trimmed in full.
///
/// Returns the effective trim point of the loglet after a trim was performed, `None` otherwise.
pub async fn trim(&self, trim_point: Lsn) -> Result<Option<Lsn>, OperationError> {
// trimming to INVALID is no-op
if trim_point == Lsn::INVALID {
return Ok(());
return Ok(None);
}
// saturate to the loglet max possible offset.
let trim_point = trim_point.min(Lsn::new(LogletOffset::MAX.into()));
let trim_point = trim_point.into_offset(self.base_lsn);
self.loglet.trim(trim_point).await

let trim_offset = self.loglet.trim(trim_point).await?;
Ok(trim_offset.map(|o| self.base_lsn.offset_by(o)))
}

pub async fn seal(&self) -> Result<(), OperationError> {
Expand Down
11 changes: 6 additions & 5 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ impl Loglet for LocalLoglet {
}
}

/// Trim the log to the minimum of new_trim_point and last_committed_offset
/// new_trim_point is inclusive (will be trimmed)
async fn trim(&self, new_trim_point: LogletOffset) -> Result<(), OperationError> {
async fn trim(
&self,
new_trim_point: LogletOffset,
) -> Result<Option<LogletOffset>, OperationError> {
let effective_trim_point = new_trim_point.min(LogletOffset::new(
self.last_committed_offset.load(Ordering::Relaxed),
));
Expand All @@ -233,7 +234,7 @@ impl Loglet for LocalLoglet {

if current_trim_point >= effective_trim_point {
// nothing to do since we have already trimmed beyond new_trim_point
return Ok(());
return Ok(None);
}

counter!(BIFROST_LOCAL_TRIM).increment(1);
Expand All @@ -255,7 +256,7 @@ impl Loglet for LocalLoglet {
"Loglet trim operation enqueued"
);

Ok(())
Ok(Some(effective_trim_point))
}

async fn seal(&self) -> Result<(), OperationError> {
Expand Down
17 changes: 10 additions & 7 deletions crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,24 +370,27 @@ impl Loglet for MemoryLoglet {
}
}

async fn trim(&self, new_trim_point: LogletOffset) -> Result<(), OperationError> {
async fn trim(
&self,
requested_trim_point: LogletOffset,
) -> Result<Option<LogletOffset>, OperationError> {
let mut log = self.log.lock().unwrap();
let actual_trim_point = new_trim_point.min(LogletOffset::new(
let requested_trim_point = requested_trim_point.min(LogletOffset::new(
self.last_committed_offset.load(Ordering::Relaxed),
));

let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed));

if current_trim_point >= actual_trim_point {
return Ok(());
if current_trim_point >= requested_trim_point {
return Ok(Some(current_trim_point));
}

let trim_point_index = self.saturating_offset_to_index(actual_trim_point);
let trim_point_index = self.saturating_offset_to_index(requested_trim_point);
self.trim_point_offset
.store(*actual_trim_point, Ordering::Relaxed);
.store(*requested_trim_point, Ordering::Relaxed);
log.drain(0..=trim_point_index);

Ok(())
Ok(Some(requested_trim_point))
}

async fn seal(&self) -> Result<(), OperationError> {
Expand Down
Loading

0 comments on commit b86ac06

Please sign in to comment.