快照有压缩日志和加速启动的作用,其创建流程如下:
- 当快照定时器超时或用户手动调用接口,会触发节点执行创建快照的任务
- 节点会将任务放进 ApplyTaskQueue,等待其被执行
- 当任务被执行时,会创建一个
temp
目录用来保存临时快照,并返回一个SnapshotWriter
- 以
SnapshotWriter
和Closure
做为参数调用用户状态机的on_snapshot_save
- 用户需通过
SnapshotWriter
将状态数据写入到临时快照中:- 5.1 调用
SnapshotWriter::get_path
获得快照目录绝对路径,并将快照文件写入该目录 - 5.2 调用
SnapshotWriter::add_file
将快照文件相对路径添加至快照元数据中
- 5.1 调用
- 待数据写入完成,用户需回调
Closure
将其转换为正式快照:- 6.1 将快照元数据持久化到文件
- 6.2 通过
rename()
将临时快照转换为正式快照 - 6.3 删除上一个快照
- 6.4 删除上一个快照对应的日志
- 至此,快照创建完成
流程整体分为以下 3 个阶段:创建临时快照(1-3),用户写入数据(4-5),转为正式快照(6-7)
正常情况下用户指定的快照存储目录下只会有一个快照目录。当创建快照时,会创建一个 temp
目录来保存临时快照,待创建完成后,通过 rename
将其转换为正式快照。
正式快照目录以当时创建快照时的 applyIndex
命名,如 snapshot_00000000000000002000
。
快照目录下除了用户写入的快照文件集合外,还有一个框架写入的元数据文件 __raft_snapshot_meta
,元数据主要保存以下 2 部分信息:
- 快照中每一个文件的相对路径
- 快照中包含的最后日志的
index
和term
,以及集群配置
参见以下元数据 proto
:
enum FileSource {
FILE_SOURCE_LOCAL = 0;
FILE_SOURCE_REFERENCE = 1;
}
message LocalFileMeta {
optional bytes user_meta = 1;
optional FileSource source = 2;
optional string checksum = 3;
}
message SnapshotMeta {
required int64 last_included_index = 1;
required int64 last_included_term = 2;
repeated string peers = 3;
repeated string old_peers = 4;
}
message LocalSnapshotPbMeta {
message File {
required string name = 1;
optional LocalFileMeta meta = 2;
};
optional SnapshotMeta meta = 1; // (1) 快照元数据
repeated File files = 2; // (2) 用户快照文件列表
}
用户手动触发快照:
class Node {
public:
// Start a snapshot immediately if possible. done->Run() would be invoked
// when the snapshot finishes, describing the detailed result.
void snapshot(Closure* done);
};
用户需要实现的快照函数:
class StateMachine {
public:
// user defined snapshot generate function, this method will block on_apply.
// user can make snapshot async when fsm can be cow(copy-on-write).
// call done->Run() when snapshot finished.
// success return 0, fail return errno
// Default: Save nothing and returns error.
virtual void on_snapshot_save(::braft::SnapshotWriter* writer,
::braft::Closure* done);
};
SnapshotWriter
相关接口:
class SnapshotWriter : public Snapshot {
public:
// Add a file to the snapshot.
// |file_meta| is an implmentation-defined protobuf message
// All the implementation must handle the case that |file_meta| is NULL and
// no error can be raised.
// Note that whether the file will be created onto the backing storage is
// implementation-defined.
virtual int add_file(const std::string& filename);
// Remove a file from the snapshot
// Note that whether the file will be removed from the backing storage is
// implementation-defined.
virtual int remove_file(const std::string& filename) = 0;
};
class Snapshot : public butil::Status {
public:
Snapshot() {}
virtual ~Snapshot() {}
// Get the path of the Snapshot
virtual std::string get_path() = 0;
// List all the existing files in the Snapshot currently
virtual void list_files(std::vector<std::string> *files) = 0;
// Get the implementation-defined file_meta
virtual int get_file_meta(const std::string& filename,
::google::protobuf::Message* file_meta) {
(void)filename;
if (file_meta != NULL) {
file_meta->Clear();
}
return 0;
}
};
节点有以下 2 种方式触发快照任务,其最终都是调用 NodeImpl::do_snapshot
执行快照任务。
- 快照定时器超时:
void SnapshotTimer::run() {
_node->handle_snapshot_timeout();
}
void NodeImpl::handle_snapshot_timeout() {
...
do_snapshot(NULL);
}
- 用户手动触发:
void Node::snapshot(Closure* done) {
_impl->snapshot(done);
}
void NodeImpl::snapshot(Closure* done) {
do_snapshot(done);
}
NodeImpl::do_snapshot
会调用 SnapshotExecutor::do_snapshot
执行快照任务。在正式创建快照前会做一些判断,决定是否要创建快照;若确定要创建快照,则:
- 创建
temp
目录用于保存临时快照 - 将创建快照任务放入 ApplyTaskQueue 中等待被执行
void NodeImpl::do_snapshot(Closure* done) {
...
if (_snapshot_executor) {
_snapshot_executor->do_snapshot(done);
}
...
}
void SnapshotExecutor::do_snapshot(Closure* done) {
...
// (1) 判断 1:正在从 Leader 下载快照
// check snapshot install/load
if (_downloading_snapshot.load(butil::memory_order_relaxed)) {
...
return;
}
// (2) 判断 2:已经有创建快照任务在运行了
// check snapshot saving?
if (_saving_snapshot) {
...
return;
}
// (3) 判断 3:如果上次创建快照到目前为止被应用的日志极少,则放弃本次任务
// FLAGS_raft_do_snapshot_min_index_gap 默认是 1
int64_t saved_fsm_applied_index = _fsm_caller->last_applied_index();
if (saved_fsm_applied_index - _last_snapshot_index <
FLAGS_raft_do_snapshot_min_index_gap) {
// There might be false positive as the last_applied_index() is being
// updated. But it's fine since we will do next snapshot saving in a
// predictable time.
...
_log_manager->clear_bufferred_logs(); // TODO(Wine93)
...
return;
}
// (4) 调用 LocalSnapshotStorage::create 创建 temp 目录用于保存临时快照
// 并返回 SnapshotWriter
SnapshotWriter* writer = _snapshot_storage->create();
...
_saving_snapshot = true;
// 当用户保存完快照后,将回调这个 Closure,见阶段三
SaveSnapshotDone* snapshot_save_done = new SaveSnapshotDone(this, writer, done);
// (5) 将创建快照任务放入 ApplyTaskQueue 中等待被执行
if (_fsm_caller->on_snapshot_save(snapshot_save_done) != 0) {
...
return;
}
}
LocalSnapshotStorage::create
会创建 temp
目录,若事先已存在 temp
目录,则先将其删除后再创建,并返回 SnapshotWriter
:
SnapshotWriter* LocalSnapshotStorage::create() {
return create(true);
}
SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) {
LocalSnapshotWriter* writer = NULL;
do {
std::string snapshot_path(_path); // _path 为用户配置的快照存储目录
snapshot_path.append("/");
snapshot_path.append(_s_temp_path); // e.g. data/temp
// (1) 删除 temp 目录
// delete temp
// TODO: Notify watcher before deleting
if (_fs->path_exists(snapshot_path) && from_empty) {
if (destroy_snapshot(snapshot_path) != 0) {
break;
}
}
// (2) 调用 LocalSnapshotWriter::init 创建 temp 目录
writer = new LocalSnapshotWriter(snapshot_path, _fs.get());
if (writer->init() != 0) {
...
break;
}
} while (0);
return writer;
}
int LocalSnapshotWriter::init() {
butil::File::Error e;
// (3) 创建 temp 目录
if (!_fs->create_directory(_path, &e, false)) {
return EIO;
}
...
return 0;
}
创建好 temp
目录后,会调用 FSMCaller::on_snapshot_save
将快照任务放入 ApplyTaskQueue,等待其被执行:
int FSMCaller::on_snapshot_save(SaveSnapshotClosure* done) {
ApplyTask task;
task.type = SNAPSHOT_SAVE;
task.done = done;
return bthread::execution_queue_execute(_queue_id, task);
}
队列消费函数会调用 FSMCaller::do_snapshot_save
执行快照任务:
int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
...
for (; iter; ++iter) {
switch (iter->type) {
...
case SNAPSHOT_SAVE:
caller->_cur_task = SNAPSHOT_SAVE;
if (caller->pass_by_status(iter->done)) {
caller->do_snapshot_save((SaveSnapshotClosure*)iter->done);
}
break;
...
}
}
...
}
在 FSMCaller::do_snapshot_save
函数中主要做以下 2 件事:
- 准备好元数据,将其保存在
Closure
中 - 将上述创建的
SnapshotWriter
和Closure
作为参数调用用户状态机的on_snapshot_save
void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
//(1)准备快照元数据
int64_t last_applied_index = _last_applied_index.load(butil::memory_order_relaxed); // applyIndex
SnapshotMeta meta;
// (1.1) 最后一条应用日志的 index
meta.set_last_included_index(last_applied_index);
// (1.2) 最后一条应用日志的 term
meta.set_last_included_term(_last_applied_term);
// (1.3) 当前节点的配置
// 若 old_peers 不为空,则为配置变更的 C{old,new}
// 否则为当前集群的配置,即 C{new}
ConfigurationEntry conf_entry;
_log_manager->get_configuration(last_applied_index, &conf_entry);
for (Configuration::const_iterator
iter = conf_entry.conf.begin();
iter != conf_entry.conf.end(); ++iter) {
*meta.add_peers() = iter->to_string();
}
for (Configuration::const_iterator
iter = conf_entry.old_conf.begin();
iter != conf_entry.old_conf.end(); ++iter) {
*meta.add_old_peers() = iter->to_string();
}
// (1.4) 将元数据保存在 Closure 中
SnapshotWriter* writer = done->start(meta);
...
// (2) 调用用户状态机 on_snapshot_save
_fsm->on_snapshot_save(writer, done);
return;
}
用户需要实现状态机的 on_snapshot_save
函数,在该函数中用户需要做以下 3 件事:
- 调用
writer->get_path
获取临时快照的目录路径,并将快照文件写入到该目录下 - 调用
write->add
将快照中每一个文件的相对路径加入到快照元数据中 - 待以上全部完成后,调用
done->Run
将临时快照转换为正式快照
class StateMachine {
public:
// user defined snapshot generate function, this method will block on_apply.
// user can make snapshot async when fsm can be cow(copy-on-write).
// call done->Run() when snapshot finished.
// success return 0, fail return errno
// Default: Save nothing and returns error.
virtual void on_snapshot_save(::braft::SnapshotWriter* writer,
::braft::Closure* done);
};
get_path
接口会返回临时快照(temp
)目录的绝对路径,用户需要在该目录中写入快照文件:
class LocalSnapshotWriter : public SnapshotWriter {
public:
...
virtual std::string get_path() { return _path; }
...
private:
// Users shouldn't create LocalSnapshotWriter Directly
LocalSnapshotWriter(const std::string& path,
FileSystemAdaptor* fs);
...
std::string _path;
...
};
class SnapshotWriter : public Snapshot {
public:
// Add a file to the snapshot.
// |file_meta| is an implmentation-defined protobuf message
// All the implementation must handle the case that |file_meta| is NULL and
// no error can be raised.
// Note that whether the file will be created onto the backing storage is
// implementation-defined.
virtual int add_file(const std::string& filename) {
return add_file(filename, NULL);
}
virtual int add_file(const std::string& filename,
const ::google::protobuf::Message* file_meta) = 0;
};
add_file
接口只是将文件路径添加到元数据中而已:
int LocalSnapshotWriter::add_file(
const std::string& filename,
const ::google::protobuf::Message* file_meta) {
...
return _meta_table.add_file(filename, meta);
}
int LocalSnapshotMetaTable::add_file(const std::string& filename,
const LocalFileMeta& meta) {
Map::value_type value(filename, meta);
std::pair<Map::iterator, bool> ret = _file_map.insert(value);
...
return ret.second ? 0 : -1;
}
用户完成快照的创建后,需调用 Closure
,即 SaveSnapshotDone::Run()
,而该函数会将临时快照 rename
成正式快照,并删除上一个快照,以及删除上一个快照对应的日志。
用户调用 SaveSnapshotDone::Run
,该函数主要执行以下 2 件事:
void SaveSnapshotDone::Run() {
...
bthread_t tid;
if (bthread_start_urgent(&tid, NULL, continue_run, this) != 0) {
...
}
}
void* SaveSnapshotDone::continue_run(void* arg) {
SaveSnapshotDone* self = (SaveSnapshotDone*)arg;
...
// Must call on_snapshot_save_done to clear _saving_snapshot
// (1) 调用 SnapshotExecutor::on_snapshot_save_done 执行实际的收尾动作
int ret = self->_se->on_snapshot_save_done(
self->status(), self->_meta, self->_writer);
}
// (2) 如果是用户手动触发快照,这时候会回调用户传入的 Closure
if (self->_done) {
run_closure_in_bthread(self->_done, true);
}
return NULL;
}
on_snapshot_save_done
会完成以下几项工作:
int SnapshotExecutor::on_snapshot_save_done(
const butil::Status& st, const SnapshotMeta& meta, SnapshotWriter* writer) {
...
// (1.1) 将快照元数据保存到 LocalSnapshotMetaTable,等待持久化
if (writer->save_meta(meta)) {
...
}
// (1.2) 调用 LocalSnapshotStorage::close 完成写入元数据、将快照 rename 成正式快照等工作
if (_snapshot_storage->close(writer) != 0) {
...
}
// (1.3) 调用 LogManager::set_snapshot 删除上一个快照对应的日志
if (ret == 0) {
_last_snapshot_index = meta.last_included_index();
_last_snapshot_term = meta.last_included_term();
...
_log_manager->set_snapshot(&meta);
}
...
_saving_snapshot = false;
return ret;
}
save_meta
会将元数据保存到 LocalSnapshotMetaTable
中:
int LocalSnapshotWriter::save_meta(const SnapshotMeta& meta) {
_meta_table.set_meta(meta);
return 0;
}
class LocalSnapshotMetaTable {
public:
...
void set_meta(const SnapshotMeta& meta) { _meta = meta; }
...
private:
...
SnapshotMeta _meta;
};
在 close
函数中主要以下做四件事:
- (1) 将元数据持久化到文件
- (2) 通过
rename()
将临时快照变为正式快照 - (3) 删除上一个快照
int LocalSnapshotStorage::close(SnapshotWriter* writer_base,
bool keep_data_on_error) {
LocalSnapshotWriter* writer = dynamic_cast<LocalSnapshotWriter*>(writer_base);
do {
...
// (1) 调用 LocalSnapshotWriter::sync 将快照元数据写入文件
ret = writer->sync();
...
//
int old_index = _last_snapshot_index;
int64_t new_index = writer->snapshot_index();
// (2) 将临时快照 rename 成正式快照
// rename temp to new
std::string temp_path(_path);
temp_path.append("/");
temp_path.append(_s_temp_path);
std::string new_path(_path);
butil::string_appendf(&new_path, "/" BRAFT_SNAPSHOT_PATTERN, new_index);
if (!_fs->delete_file(new_path, true)) {
...
}
...
if (!_fs->rename(temp_path, new_path)) {
...
}
ref(new_index);
{
BAIDU_SCOPED_LOCK(_mutex);
CHECK_EQ(old_index, _last_snapshot_index);
_last_snapshot_index = new_index;
}
// unref old_index, ref new_index
// (3) 删除上一个快照。
// 需要注意的是,这里有一个引用判断
// 因为当前节点可能是 Leader,而该快照可能正用于下载给 Follower
unref(old_index);
} while (0);
...
}
LocalSnapshotWriter::sync
会调用 save_to_file
将元数据填充到 proto
(LocalSnapshotPbMeta
)中,并将其序列化,最终持久化到文件:
int LocalSnapshotWriter::sync() {
// BRAFT_SNAPSHOT_META_FILE: __raft_snapshot_meta
const int rc = _meta_table.save_to_file(_fs, _path + "/" BRAFT_SNAPSHOT_META_FILE);
...
return rc;
}
int LocalSnapshotMetaTable::save_to_file(FileSystemAdaptor* fs, const std::string& path) const {
// (1.1) _meta 中保存的是 lastIncludeIndex,lastIncludedTerm 以及集群配置
LocalSnapshotPbMeta pb_meta;
if (_meta.IsInitialized()) {
*pb_meta.mutable_meta() = _meta;
}
// (1.2) 将所有文件列表加入到 proto
for (Map::const_iterator
iter = _file_map.begin(); iter != _file_map.end(); ++iter) {
LocalSnapshotPbMeta::File *f = pb_meta.add_files();
f->set_name(iter->first);
*f->mutable_meta() = iter->second;
}
// (1.3) 序列化并持久化到文件
ProtoBufFile pb_file(path, fs);
int ret = pb_file.save(&pb_meta, raft_sync_meta());
...
return ret;
}
调用 unref
删除上一个快照。需要注意的是,当前节点可能是 Leader,而该快照可能正用于下载给其他 Follower,所以需要判断其引用计数,若引用计数为 0,则删除其目录:
// (1) index 为上一个快照的 lastIncludeIndex
void LocalSnapshotStorage::unref(const int64_t index) {
std::unique_lock<raft_mutex_t> lck(_mutex);
std::map<int64_t, int>::iterator it = _ref_map.find(index);
// (2) 找到上一个快照
if (it != _ref_map.end()) {
// (3) 将其引用计数减一
it->second--;
// (4) 如果减到 0,则将其删除
if (it->second == 0) {
_ref_map.erase(it);
std::string old_path(_path);
butil::string_appendf(&old_path, "/" BRAFT_SNAPSHOT_PATTERN, index);
destroy_snapshot(old_path);
}
}
}
// (5) 删除上一个快照的目录
int LocalSnapshotStorage::destroy_snapshot(const std::string& path) {
if (!_fs->delete_file(path, true)) {
...
return -1;
}
return 0;
}
在 Closure
中最后会调用 set_snapshot
删除上一个快照的日志,之所以只删除上一个快照的日志,而不立即删除当前快照的日志,主要考虑到有些 Follower 还没有同步完日志,如果删除了当前的日志,哪怕只差几条日志也只能发送快照进行同步,见以下注释:
void LogManager::set_snapshot(const SnapshotMeta* meta) {
...
// (1) last_but_one_snapshot_id 是上一个快照的 Id
const LogId last_but_one_snapshot_id = _last_snapshot_id;
// Truncating log to the index of the last snapshot.
// We don't truncate log before the latest snapshot immediately since
// some log around last_snapshot_index is probably needed by some
// followers
if (last_but_one_snapshot_id.index > 0) {
// We have last snapshot index
_virtual_first_log_id = last_but_one_snapshot_id;
// (2) 删除上一个快照对应的日志
truncate_prefix(last_but_one_snapshot_id.index + 1, lck);
}
return;
...
}