diff --git a/config.go b/config.go index dbbf8fc..337fef0 100644 --- a/config.go +++ b/config.go @@ -29,4 +29,6 @@ type Config struct { TLSConfig *tls.Config // RaftConfig provides any necessary configuration for the Raft server. RaftConfig *raft.Config + // When UseBoltSnapshot is set, use bbolt as snapshot store instead of FileSnapshot + UseBoltSnapshot bool } diff --git a/dispatcher.go b/dispatcher.go index 5b7a04a..916188e 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -103,8 +103,9 @@ func NewHRaftDispatcherWithLogger(config *Config, logger *zap.Logger) (*HRaftDis MaxPool: 5, Logger: nil, }, - Enforcer: config.Enforcer, - RaftConfig: config.RaftConfig, + Enforcer: config.Enforcer, + RaftConfig: config.RaftConfig, + UseBoltSnapshot: config.UseBoltSnapshot, } s, err := store.NewStore(logger, storeConfig) if err != nil { diff --git a/go.mod b/go.mod index 5a36746..5f15e54 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,7 @@ require ( github.com/stretchr/testify v1.6.1 go.etcd.io/bbolt v1.3.5 go.uber.org/zap v1.16.0 - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 google.golang.org/protobuf v1.25.0 - gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 2f3b06d..19fc015 100644 --- a/go.sum +++ b/go.sum @@ -7,12 +7,7 @@ github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= -github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= -github.com/casbin/casbin/v2 v2.23.4 h1:izvAG3KA49C3/m1zpYfkLcZlkYQO5VeHj7dhwurwZR4= -github.com/casbin/casbin/v2 v2.23.4/go.mod h1:wUgota0cQbTXE6Vd+KWpg41726jFRi7upxio0sR+Xd0= -github.com/casbin/casbin/v2 v2.24.0 h1:peiFTw+PNpAMSz7hDDz768nEwzUsBMMZNV4yFGCayI8= -github.com/casbin/casbin/v2 v2.24.0/go.mod h1:wUgota0cQbTXE6Vd+KWpg41726jFRi7upxio0sR+Xd0= github.com/casbin/casbin/v2 v2.35.0 h1:f0prVg9LgTJTihjAxWEZhfJptXvah1GpZh12sb5KXNA= github.com/casbin/casbin/v2 v2.35.0/go.mod h1:vByNa/Fchek0KZUgG5wEsl7iFsiviAYKRtgrQfcJqHg= github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= @@ -28,11 +23,11 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-chi/chi v1.5.1 h1:kfTK3Cxd/dkMu/rKs5ZceWYp+t5CtiE7vmaTv3LjC6w= github.com/go-chi/chi v1.5.1/go.mod h1:REp24E+25iKvxgeTfHmdUoL5x15kBiDBlnIl5bCwe2k= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= -github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -40,6 +35,7 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -69,7 +65,6 @@ github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCO github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0= github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= -github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -123,7 +118,6 @@ go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -137,10 +131,7 @@ golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -152,11 +143,9 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -178,9 +167,11 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -195,7 +186,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/store/boltstore/bolt_store.go b/store/boltstore/bolt_store.go new file mode 100644 index 0000000..966f85d --- /dev/null +++ b/store/boltstore/bolt_store.go @@ -0,0 +1,490 @@ +package boltstore + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "sort" + "time" + + "github.com/hashicorp/raft" + bolt "go.etcd.io/bbolt" +) + +const ( + // Permissions to use on the db file. This is only used if the + // database file does not exist and needs to be created. + dbFileMode = 0600 +) + +var ( + // Bucket names we perform transactions in + dbLogs = []byte("logs") + dbConf = []byte("conf") + dbSnapMeta = []byte("snap_meta") + dbSnapData = []byte("snap_data") + // An error indicating a given key does not exist + ErrKeyNotFound = errors.New("not found") +) + +// BoltStore provides access to BoltDB for Raft to store and retrieve +// log entries. It also provides key/value storage, and can be used as +// a LogStore and StableStore. +type BoltStore struct { + // conn is the underlying handle to the db. + conn *bolt.DB + + // The path to the Bolt database file + path string + + // retain controls how many snapshots are retained. Must be at least 1. + retain int +} + +// BoltSnapshotSink implements SnapshotSink with bbolt. +type BoltSnapshotSink struct { + boltStore *BoltStore + + // The metadata of the snapshot + meta raft.SnapshotMeta + // The true content of the snapshot + contents *bytes.Buffer +} + +// snapMetaSlice is used to implement Sort interface +type snapMetaSlice []*raft.SnapshotMeta + +// Options contains all the configuraiton used to open the BoltDB +type Options struct { + // Path is the file path to the BoltDB to use + Path string + + // BoltOptions contains any specific BoltDB options you might + // want to specify [e.g. open timeout] + BoltOptions *bolt.Options + + // NoSync causes the database to skip fsync calls after each + // write to the log. This is unsafe, so it should be used + // with caution. + NoSync bool +} + +// readOnly returns true if the contained bolt options say to open +// the DB in readOnly mode [this can be useful to tools that want +// to examine the log] +func (o *Options) readOnly() bool { + return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly +} + +// NewBoltStore takes a file path and returns a connected Raft backend. +func NewBoltStore(path string, retain int) (*BoltStore, error) { + return New(Options{Path: path}, retain) +} + +// New uses the supplied options to open the BoltDB and prepare it for use as a raft backend. +func New(options Options, retain int) (*BoltStore, error) { + // Try to connect + handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions) + if err != nil { + return nil, err + } + handle.NoSync = options.NoSync + + // Create the new store + store := &BoltStore{ + conn: handle, + retain: retain, + path: options.Path, + } + + // If the store was opened read-only, don't try and create buckets + if !options.readOnly() { + // Set up our buckets + if err := store.initialize(); err != nil { + store.Close() + return nil, err + } + } + return store, nil +} + +// initialize is used to set up all of the buckets. +func (b *BoltStore) initialize() error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + // Create all the buckets + if _, err := tx.CreateBucketIfNotExists(dbLogs); err != nil { + return err + } + if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil { + return err + } + if _, err := tx.CreateBucketIfNotExists(dbSnapMeta); err != nil { + return err + } + if _, err := tx.CreateBucketIfNotExists(dbSnapData); err != nil { + return err + } + + return tx.Commit() +} + +// Close is used to gracefully close the DB connection. +func (b *BoltStore) Close() error { + return b.conn.Close() +} + +// FirstIndex returns the first known index from the Raft log. +func (b *BoltStore) FirstIndex() (uint64, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return 0, err + } + defer tx.Rollback() + + curs := tx.Bucket(dbLogs).Cursor() + if first, _ := curs.First(); first == nil { + return 0, nil + } else { + return bytesToUint64(first), nil + } +} + +// LastIndex returns the last known index from the Raft log. +func (b *BoltStore) LastIndex() (uint64, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return 0, err + } + defer tx.Rollback() + + curs := tx.Bucket(dbLogs).Cursor() + if last, _ := curs.Last(); last == nil { + return 0, nil + } else { + return bytesToUint64(last), nil + } +} + +// GetLog is used to retrieve a log from BoltDB at a given index. +func (b *BoltStore) GetLog(idx uint64, log *raft.Log) error { + tx, err := b.conn.Begin(false) + if err != nil { + return err + } + defer tx.Rollback() + + bucket := tx.Bucket(dbLogs) + val := bucket.Get(uint64ToBytes(idx)) + + if val == nil { + return raft.ErrLogNotFound + } + return decodeMsgPack(val, log) +} + +// StoreLog is used to store a single raft log +func (b *BoltStore) StoreLog(log *raft.Log) error { + return b.StoreLogs([]*raft.Log{log}) +} + +// StoreLogs is used to store a set of raft logs +func (b *BoltStore) StoreLogs(logs []*raft.Log) error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + for _, log := range logs { + key := uint64ToBytes(log.Index) + val, err := encodeMsgPack(log) + if err != nil { + return err + } + bucket := tx.Bucket(dbLogs) + if err := bucket.Put(key, val.Bytes()); err != nil { + return err + } + } + + return tx.Commit() +} + +// DeleteRange is used to delete logs within a given range inclusively. +func (b *BoltStore) DeleteRange(min, max uint64) error { + minKey := uint64ToBytes(min) + + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + curs := tx.Bucket(dbLogs).Cursor() + for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() { + // Handle out-of-range log index + if bytesToUint64(k) > max { + break + } + + // Delete in-range log index + if err := curs.Delete(); err != nil { + return err + } + } + + return tx.Commit() +} + +// Set is used to set a key/value set outside of the raft log +func (b *BoltStore) Set(k, v []byte) error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + bucket := tx.Bucket(dbConf) + if err := bucket.Put(k, v); err != nil { + return err + } + + return tx.Commit() +} + +// Get is used to retrieve a value from the k/v store by key +func (b *BoltStore) Get(k []byte) ([]byte, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return nil, err + } + defer tx.Rollback() + + bucket := tx.Bucket(dbConf) + val := bucket.Get(k) + + if val == nil { + return nil, ErrKeyNotFound + } + return append([]byte(nil), val...), nil +} + +// SetUint64 is like Set, but handles uint64 values +func (b *BoltStore) SetUint64(key []byte, val uint64) error { + return b.Set(key, uint64ToBytes(val)) +} + +// GetUint64 is like Get, but handles uint64 values +func (b *BoltStore) GetUint64(key []byte) (uint64, error) { + val, err := b.Get(key) + if err != nil { + return 0, err + } + return bytesToUint64(val), nil +} + +// encodePeers is used to serialize a Configuration into the old peers format. +// This is here for backwards compatibility when operating with a mix of old +// servers and should be removed once we deprecate support for protocol version 1. +func encodePeers(configuration raft.Configuration, trans raft.Transport) []byte { + // Gather up all the voters, other suffrage types are not supported by + // this data format. + var encPeers [][]byte + for _, server := range configuration.Servers { + if server.Suffrage == raft.Voter { + encPeers = append(encPeers, trans.EncodePeer(server.ID, server.Address)) + } + } + + // Encode the entire array. + buf, err := encodeMsgPack(encPeers) + if err != nil { + panic(fmt.Errorf("failed to encode peers: %v", err)) + } + + return buf.Bytes() +} + +// Create is used to start a new bbolt snapshot +func (b *BoltStore) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, + configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) { + if version != 1 { + return nil, fmt.Errorf("unsupported snapshot version %d", version) + } + + // Get the name of the snapshot + now := time.Now() + msec := now.UnixNano() / int64(time.Millisecond) + name := fmt.Sprintf("%d-%d-%d", term, index, msec) + + sink := &BoltSnapshotSink{ + boltStore: b, + meta: raft.SnapshotMeta{ + Version: version, + ID: name, + Index: index, + Term: term, + Peers: encodePeers(configuration, trans), + Configuration: configuration, + ConfigurationIndex: configurationIndex, + }, + contents: &bytes.Buffer{}, + } + + return sink, nil +} + +// getSnapshot returns a list of snapshot metadata in bbolt store +func (b *BoltStore) getSnapshot(tx *bolt.Tx) []*raft.SnapshotMeta { + var snapMeta []*raft.SnapshotMeta + curs := tx.Bucket(dbSnapMeta).Cursor() + for k, v := curs.First(); k != nil; k, v = curs.Next() { + tempMeta := new(raft.SnapshotMeta) + if decodeMsgPack(v, tempMeta) == nil { + snapMeta = append(snapMeta, tempMeta) + } + } + + // sort the snapshot in reverse order (new->old) + sort.Sort(sort.Reverse(snapMetaSlice(snapMeta))) + return snapMeta +} + +// List returns available snapshots in bbolt store. +func (b *BoltStore) List() ([]*raft.SnapshotMeta, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return []*raft.SnapshotMeta{}, err + } + defer tx.Rollback() + + snapMeta := b.getSnapshot(tx) + if len(snapMeta) > b.retain { + snapMeta = snapMeta[:b.retain] + } + return snapMeta, nil +} + +// Open takes a snapshot ID and returns the metadata and a ReadCloser for that snapshot. +func (b *BoltStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return nil, nil, err + } + defer tx.Rollback() + + // Get the metadata of the snapshot + metaBucket := tx.Bucket(dbSnapMeta) + val := metaBucket.Get([]byte(id)) + if val == nil { + return nil, nil, fmt.Errorf("[ERR] snapshot: failed to open snapshot metadata. id: %s", id) + } + + snapMeta := new(raft.SnapshotMeta) + if err := decodeMsgPack(val, snapMeta); err != nil { + return nil, nil, err + } + + // Get the contents of the snapshot + dataBucket := tx.Bucket(dbSnapData) + val = dataBucket.Get([]byte(id)) + if val == nil { + return nil, nil, fmt.Errorf("[ERR] snapshot: failed to open snapshot contents. id: %s", id) + } + + contents := bytes.NewBuffer(val) + return snapMeta, ioutil.NopCloser(contents), nil +} + +// ID returns the ID of the snapshot, can be used with Open() after the snapshot is finalized. +func (s *BoltSnapshotSink) ID() string { + return s.meta.ID +} + +func (s *BoltSnapshotSink) Cancel() error { + return nil +} + +// Write is used to append the content of the snapshot +func (s *BoltSnapshotSink) Write(b []byte) (int, error) { + written, err := s.contents.Write(b) + if err != nil { + return 0, err + } + s.meta.Size += int64(written) + return written, nil +} + +// Close is used to indicate a successful end. +func (s *BoltSnapshotSink) Close() error { + return s.boltStore.PutSnapshot(s.meta, s.contents.Bytes()) +} + +// Implement the sort interface for []*SnapshotMeta. +func (s snapMetaSlice) Len() int { + return len(s) +} + +func (s snapMetaSlice) Less(i, j int) bool { + if s[i].Term != s[j].Term { + return s[i].Term < s[j].Term + } + if s[i].Index != s[j].Index { + return s[i].Index < s[j].Index + } + return s[i].ID < s[j].ID +} + +func (s snapMetaSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (b *BoltStore) PutSnapshot(meta raft.SnapshotMeta, state []byte) error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + metaBucket := tx.Bucket(dbSnapMeta) + buf, err := encodeMsgPack(meta) + if err != nil { + return err + } + if err = metaBucket.Put([]byte(meta.ID), buf.Bytes()); err != nil { + return err + } + + dataBucket := tx.Bucket(dbSnapData) + if err = dataBucket.Put([]byte(meta.ID), state); err != nil { + return err + } + + // reap any snapshots beyond the retain count. + snapMeta := b.getSnapshot(tx) + for i := b.retain; i < len(snapMeta); i++ { + if err := metaBucket.Delete([]byte(snapMeta[i].ID)); err != nil { + return err + } + + if err := dataBucket.Delete([]byte(snapMeta[i].ID)); err != nil { + return err + } + } + + return tx.Commit() +} + +// Sync performs an fsync on the database file handle. This is not necessary +// under normal operation unless NoSync is enabled, in which this forces the +// database file to sync against the disk. +func (b *BoltStore) Sync() error { + return b.conn.Sync() +} diff --git a/store/logstore/bolt_store_test.go b/store/boltstore/bolt_store_test.go similarity index 60% rename from store/logstore/bolt_store_test.go rename to store/boltstore/bolt_store_test.go index d971507..199cfdf 100644 --- a/store/logstore/bolt_store_test.go +++ b/store/boltstore/bolt_store_test.go @@ -1,7 +1,8 @@ -package logstore +package boltstore import ( "bytes" + "io" "io/ioutil" "os" "reflect" @@ -12,6 +13,11 @@ import ( bolt "go.etcd.io/bbolt" ) +const ( + retainSnapshotCount = 2 + SnapshotVersionMax = 1 +) + func testBoltStore(t testing.TB) *BoltStore { fh, err := ioutil.TempFile("", "bolt") if err != nil { @@ -20,7 +26,7 @@ func testBoltStore(t testing.TB) *BoltStore { defer os.RemoveAll(fh.Name()) // Successfully creates and returns a store - store, err := NewBoltStore(fh.Name()) + store, err := NewBoltStore(fh.Name(), retainSnapshotCount) if err != nil { t.Fatalf("err: %s", err) } @@ -37,12 +43,20 @@ func testRaftLog(idx uint64, data string) *raft.Log { func TestBoltStore_Implements(t *testing.T) { var store interface{} = &BoltStore{} + var sink interface{} = &BoltSnapshotSink{} if _, ok := store.(raft.StableStore); !ok { t.Fatalf("BoltStore does not implement raft.StableStore") } if _, ok := store.(raft.LogStore); !ok { t.Fatalf("BoltStore does not implement raft.LogStore") } + + if _, ok := store.(raft.SnapshotStore); !ok { + t.Fatalf("BoltStore does not implement raft.SnapshotStore") + } + if _, ok := sink.(raft.SnapshotSink); !ok { + t.Fatalf("BoltSnapshotSink does not implement raft.SnapshotSink") + } } func TestBoltOptionsTimeout(t *testing.T) { @@ -58,7 +72,7 @@ func TestBoltOptionsTimeout(t *testing.T) { Timeout: time.Second / 10, }, } - store, err := New(options) + store, err := New(options, retainSnapshotCount) if err != nil { t.Fatalf("err: %v", err) } @@ -67,7 +81,7 @@ func TestBoltOptionsTimeout(t *testing.T) { // trying to open it again should timeout doneCh := make(chan error, 1) go func() { - _, err := New(options) + _, err := New(options, retainSnapshotCount) doneCh <- err }() select { @@ -86,7 +100,7 @@ func TestBoltOptionsReadOnly(t *testing.T) { t.Fatalf("err: %s", err) } defer os.RemoveAll(fh.Name()) - store, err := NewBoltStore(fh.Name()) + store, err := NewBoltStore(fh.Name(), retainSnapshotCount) if err != nil { t.Fatalf("err: %s", err) } @@ -108,7 +122,7 @@ func TestBoltOptionsReadOnly(t *testing.T) { ReadOnly: true, }, } - roStore, err := New(options) + roStore, err := New(options, retainSnapshotCount) if err != nil { t.Fatalf("err: %s", err) } @@ -137,7 +151,7 @@ func TestNewBoltStore(t *testing.T) { defer os.RemoveAll(fh.Name()) // Successfully creates and returns a store - store, err := NewBoltStore(fh.Name()) + store, err := NewBoltStore(fh.Name(), retainSnapshotCount) if err != nil { t.Fatalf("err: %s", err) } @@ -170,6 +184,12 @@ func TestNewBoltStore(t *testing.T) { if _, err := tx.CreateBucket([]byte(dbConf)); err != bolt.ErrBucketExists { t.Fatalf("bad: %v", err) } + if _, err := tx.CreateBucket([]byte(dbSnapMeta)); err != bolt.ErrBucketExists { + t.Fatalf("bad: %v", err) + } + if _, err := tx.CreateBucket([]byte(dbSnapData)); err != bolt.ErrBucketExists { + t.Fatalf("bad: %v", err) + } } func TestBoltStore_FirstIndex(t *testing.T) { @@ -414,3 +434,212 @@ func TestBoltStore_SetUint64_GetUint64(t *testing.T) { t.Fatalf("bad: %v", val) } } + +func TestBoltStore_Snapshot_Create(t *testing.T) { + store := testBoltStore(t) + defer store.Close() + defer os.RemoveAll(store.path) + + // Check no snapshots + snaps, err := store.List() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(snaps) != 0 { + t.Fatalf("did not expect any snapshots: %v", snaps) + } + + // Create a new sink + var configuration raft.Configuration + configuration.Servers = append(configuration.Servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID("my id"), + Address: raft.ServerAddress("over here"), + }) + _, trans := raft.NewInmemTransport(raft.NewInmemAddr()) + sink, err := store.Create(SnapshotVersionMax, 10, 3, configuration, 2, trans) + if err != nil { + t.Fatalf("err: %v", err) + } + + // The sink is not done, should not be in a list! + snaps, err = store.List() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(snaps) != 0 { + t.Fatalf("did not expect any snapshots: %v", snaps) + } + + // Write to the sink + _, err = sink.Write([]byte("first\n")) + if err != nil { + t.Fatalf("err: %v", err) + } + _, err = sink.Write([]byte("second\n")) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Done! + err = sink.Close() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should have a snapshot! + snaps, err = store.List() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(snaps) != 1 { + t.Fatalf("expect a snapshots: %v", snaps) + } + + // Check the latest + latest := snaps[0] + if latest.Index != 10 { + t.Fatalf("bad snapshot: %v", *latest) + } + if latest.Term != 3 { + t.Fatalf("bad snapshot: %v", *latest) + } + if !reflect.DeepEqual(latest.Configuration, configuration) { + t.Fatalf("bad snapshot: %v", *latest) + } + if latest.ConfigurationIndex != 2 { + t.Fatalf("bad snapshot: %v", *latest) + } + if latest.Size != 13 { + t.Fatalf("bad snapshot: %v", *latest) + } + + // Read the snapshot + _, r, err := store.Open(latest.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Read out everything + var buf bytes.Buffer + if _, err := io.Copy(&buf, r); err != nil { + t.Fatalf("err: %v", err) + } + if err := r.Close(); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a match + if bytes.Compare(buf.Bytes(), []byte("first\nsecond\n")) != 0 { + t.Fatalf("content mismatch") + } +} + +func TestBoltSore_Snapshot_Cancel(t *testing.T) { + store := testBoltStore(t) + defer store.Close() + defer os.RemoveAll(store.path) + + _, trans := raft.NewInmemTransport(raft.NewInmemAddr()) + sink, err := store.Create(SnapshotVersionMax, 10, 3, raft.Configuration{}, 0, trans) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Cancel the snapshot! Should delete + err = sink.Cancel() + if err != nil { + t.Fatalf("err: %v", err) + } + + // The sink is canceled, should not be in a list! + snaps, err := store.List() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(snaps) != 0 { + t.Fatalf("did not expect any snapshots: %v", snaps) + } +} + +func TestBoltStore_Snapshot_Retention(t *testing.T) { + var err error + + store := testBoltStore(t) + defer store.Close() + defer os.RemoveAll(store.path) + + _, trans := raft.NewInmemTransport(raft.NewInmemAddr()) + for i := 10; i < 15; i++ { + var sink raft.SnapshotSink + sink, err = store.Create(SnapshotVersionMax, uint64(i), 3, raft.Configuration{}, 0, trans) + if err != nil { + t.Fatalf("err: %v", err) + } + err = sink.Close() + if err != nil { + t.Fatalf("err: %v", err) + } + } + + // Should only have 2 listed! + var snaps []*raft.SnapshotMeta + snaps, err = store.List() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(snaps) != 2 { + t.Fatalf("expect 2 snapshots: %v", snaps) + } + + // Check they are the latest + if snaps[0].Index != 14 { + t.Fatalf("bad snap: %#v", *snaps[0]) + } + if snaps[1].Index != 13 { + t.Fatalf("bad snap: %#v", *snaps[1]) + } +} + +func TestBoltStore_Snapshot_Ordering(t *testing.T) { + store := testBoltStore(t) + defer store.Close() + defer os.RemoveAll(store.path) + + // Create a new sink + _, trans := raft.NewInmemTransport(raft.NewInmemAddr()) + sink, err := store.Create(SnapshotVersionMax, 130350, 5, raft.Configuration{}, 0, trans) + if err != nil { + t.Fatalf("err: %v", err) + } + err = sink.Close() + if err != nil { + t.Fatalf("err: %v", err) + } + + sink, err = store.Create(SnapshotVersionMax, 204917, 36, raft.Configuration{}, 0, trans) + if err != nil { + t.Fatalf("err: %v", err) + } + err = sink.Close() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should only have 2 listed! + snaps, err := store.List() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(snaps) != 2 { + t.Fatalf("expect 2 snapshots: %v", snaps) + } + + // Check they are ordered + if snaps[0].Term != 36 { + t.Fatalf("bad snap: %#v", *snaps[0]) + } + if snaps[1].Term != 5 { + t.Fatalf("bad snap: %#v", *snaps[1]) + } +} diff --git a/store/logstore/util.go b/store/boltstore/util.go similarity index 97% rename from store/logstore/util.go rename to store/boltstore/util.go index d35dde8..a2b6afa 100644 --- a/store/logstore/util.go +++ b/store/boltstore/util.go @@ -1,4 +1,4 @@ -package logstore +package boltstore import ( "bytes" diff --git a/store/fsm.go b/store/fsm.go index 3dcd624..91a8ed9 100644 --- a/store/fsm.go +++ b/store/fsm.go @@ -200,7 +200,7 @@ func (f *FSM) Apply(log *raft.Log) interface{} { // concurrently with any other command. The FSM must discard all previous // state. // When Raft starts, if the local snapshot exists, call Restore, otherwise -// read log from logstore and pass it to Apply. +// read log from boltstore and pass it to Apply. func (f *FSM) Restore(rc io.ReadCloser) error { f.logger.Info("restore an FSM from the snapshot") err := f.policyOperator.Restore(rc) diff --git a/store/logstore/bolt_store.go b/store/logstore/bolt_store.go deleted file mode 100644 index 98366db..0000000 --- a/store/logstore/bolt_store.go +++ /dev/null @@ -1,268 +0,0 @@ -package logstore - -import ( - "errors" - - "github.com/hashicorp/raft" - bolt "go.etcd.io/bbolt" -) - -const ( - // Permissions to use on the db file. This is only used if the - // database file does not exist and needs to be created. - dbFileMode = 0600 -) - -var ( - // Bucket names we perform transactions in - dbLogs = []byte("logs") - dbConf = []byte("conf") - - // An error indicating a given key does not exist - ErrKeyNotFound = errors.New("not found") -) - -// BoltStore provides access to BoltDB for Raft to store and retrieve -// log entries. It also provides key/value storage, and can be used as -// a LogStore and StableStore. -type BoltStore struct { - // conn is the underlying handle to the db. - conn *bolt.DB - - // The path to the Bolt database file - path string -} - -// Options contains all the configuraiton used to open the BoltDB -type Options struct { - // Path is the file path to the BoltDB to use - Path string - - // BoltOptions contains any specific BoltDB options you might - // want to specify [e.g. open timeout] - BoltOptions *bolt.Options - - // NoSync causes the database to skip fsync calls after each - // write to the log. This is unsafe, so it should be used - // with caution. - NoSync bool -} - -// readOnly returns true if the contained bolt options say to open -// the DB in readOnly mode [this can be useful to tools that want -// to examine the log] -func (o *Options) readOnly() bool { - return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly -} - -// NewBoltStore takes a file path and returns a connected Raft backend. -func NewBoltStore(path string) (*BoltStore, error) { - return New(Options{Path: path}) -} - -// New uses the supplied options to open the BoltDB and prepare it for use as a raft backend. -func New(options Options) (*BoltStore, error) { - // Try to connect - handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions) - if err != nil { - return nil, err - } - handle.NoSync = options.NoSync - - // Create the new store - store := &BoltStore{ - conn: handle, - path: options.Path, - } - - // If the store was opened read-only, don't try and create buckets - if !options.readOnly() { - // Set up our buckets - if err := store.initialize(); err != nil { - store.Close() - return nil, err - } - } - return store, nil -} - -// initialize is used to set up all of the buckets. -func (b *BoltStore) initialize() error { - tx, err := b.conn.Begin(true) - if err != nil { - return err - } - defer tx.Rollback() - - // Create all the buckets - if _, err := tx.CreateBucketIfNotExists(dbLogs); err != nil { - return err - } - if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil { - return err - } - - return tx.Commit() -} - -// Close is used to gracefully close the DB connection. -func (b *BoltStore) Close() error { - return b.conn.Close() -} - -// FirstIndex returns the first known index from the Raft log. -func (b *BoltStore) FirstIndex() (uint64, error) { - tx, err := b.conn.Begin(false) - if err != nil { - return 0, err - } - defer tx.Rollback() - - curs := tx.Bucket(dbLogs).Cursor() - if first, _ := curs.First(); first == nil { - return 0, nil - } else { - return bytesToUint64(first), nil - } -} - -// LastIndex returns the last known index from the Raft log. -func (b *BoltStore) LastIndex() (uint64, error) { - tx, err := b.conn.Begin(false) - if err != nil { - return 0, err - } - defer tx.Rollback() - - curs := tx.Bucket(dbLogs).Cursor() - if last, _ := curs.Last(); last == nil { - return 0, nil - } else { - return bytesToUint64(last), nil - } -} - -// GetLog is used to retrieve a log from BoltDB at a given index. -func (b *BoltStore) GetLog(idx uint64, log *raft.Log) error { - tx, err := b.conn.Begin(false) - if err != nil { - return err - } - defer tx.Rollback() - - bucket := tx.Bucket(dbLogs) - val := bucket.Get(uint64ToBytes(idx)) - - if val == nil { - return raft.ErrLogNotFound - } - return decodeMsgPack(val, log) -} - -// StoreLog is used to store a single raft log -func (b *BoltStore) StoreLog(log *raft.Log) error { - return b.StoreLogs([]*raft.Log{log}) -} - -// StoreLogs is used to store a set of raft logs -func (b *BoltStore) StoreLogs(logs []*raft.Log) error { - tx, err := b.conn.Begin(true) - if err != nil { - return err - } - defer tx.Rollback() - - for _, log := range logs { - key := uint64ToBytes(log.Index) - val, err := encodeMsgPack(log) - if err != nil { - return err - } - bucket := tx.Bucket(dbLogs) - if err := bucket.Put(key, val.Bytes()); err != nil { - return err - } - } - - return tx.Commit() -} - -// DeleteRange is used to delete logs within a given range inclusively. -func (b *BoltStore) DeleteRange(min, max uint64) error { - minKey := uint64ToBytes(min) - - tx, err := b.conn.Begin(true) - if err != nil { - return err - } - defer tx.Rollback() - - curs := tx.Bucket(dbLogs).Cursor() - for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() { - // Handle out-of-range log index - if bytesToUint64(k) > max { - break - } - - // Delete in-range log index - if err := curs.Delete(); err != nil { - return err - } - } - - return tx.Commit() -} - -// Set is used to set a key/value set outside of the raft log -func (b *BoltStore) Set(k, v []byte) error { - tx, err := b.conn.Begin(true) - if err != nil { - return err - } - defer tx.Rollback() - - bucket := tx.Bucket(dbConf) - if err := bucket.Put(k, v); err != nil { - return err - } - - return tx.Commit() -} - -// Get is used to retrieve a value from the k/v store by key -func (b *BoltStore) Get(k []byte) ([]byte, error) { - tx, err := b.conn.Begin(false) - if err != nil { - return nil, err - } - defer tx.Rollback() - - bucket := tx.Bucket(dbConf) - val := bucket.Get(k) - - if val == nil { - return nil, ErrKeyNotFound - } - return append([]byte(nil), val...), nil -} - -// SetUint64 is like Set, but handles uint64 values -func (b *BoltStore) SetUint64(key []byte, val uint64) error { - return b.Set(key, uint64ToBytes(val)) -} - -// GetUint64 is like Get, but handles uint64 values -func (b *BoltStore) GetUint64(key []byte) (uint64, error) { - val, err := b.Get(key) - if err != nil { - return 0, err - } - return bytesToUint64(val), nil -} - -// Sync performs an fsync on the database file handle. This is not necessary -// under normal operation unless NoSync is enabled, in which this forces the -// database file to sync against the disk. -func (b *BoltStore) Sync() error { - return b.conn.Sync() -} diff --git a/store/store.go b/store/store.go index d50663b..ccac5f5 100644 --- a/store/store.go +++ b/store/store.go @@ -1,14 +1,18 @@ package store import ( + "bufio" "context" + "encoding/json" "fmt" + "io/ioutil" "os" "path" "path/filepath" + "strings" "time" - "github.com/casbin/hraft-dispatcher/store/logstore" + "github.com/casbin/hraft-dispatcher/store/boltstore" "github.com/cenkalti/backoff/v4" "github.com/pkg/errors" @@ -46,7 +50,8 @@ type Store struct { logStore raft.LogStore stableStore raft.StableStore fms raft.FSM - boltStore *logstore.BoltStore + boltStore *boltstore.BoltStore + useBoltSnapshot bool enforcer casbin.IDistributedEnforcer @@ -62,6 +67,7 @@ type Config struct { NetworkTransportConfig *raft.NetworkTransportConfig Enforcer casbin.IDistributedEnforcer RaftConfig *raft.Config + UseBoltSnapshot bool } // NewStore return a instance of Store. @@ -73,6 +79,7 @@ func NewStore(logger *zap.Logger, config *Config) (*Store, error) { networkTransportConfig: config.NetworkTransportConfig, enforcer: config.Enforcer, raftConfig: config.RaftConfig, + useBoltSnapshot: config.UseBoltSnapshot, } return s, nil @@ -100,34 +107,41 @@ func (s *Store) Start(enableBootstrap bool) error { } s.transport = transport - var snapshots raft.SnapshotStore - if s.inMemory { - snapshots = raft.NewInmemSnapshotStore() - } else { - fileSnapshots, err := raft.NewFileSnapshotStore(s.dataDir, retainSnapshotCount, os.Stderr) - if err != nil { - s.logger.Error("failed to new file snapshot store", zap.Error(err), zap.String("raftData", s.dataDir)) - return err - } - snapshots = fileSnapshots - } - s.snapshotStore = snapshots - if s.inMemory { + snapshots := raft.NewInmemSnapshotStore() inMemStore := raft.NewInmemStore() s.logStore = inMemStore + s.snapshotStore = snapshots s.stableStore = inMemStore } else { + if err := os.MkdirAll(s.dataDir, 0755); err != nil && !os.IsExist(err) { + return fmt.Errorf("data directory is not accessible: %v", err) + } dbPath := filepath.Join(s.dataDir, raftDBName) - boltDB, err := logstore.NewBoltStore(dbPath) + boltDB, err := boltstore.NewBoltStore(dbPath, retainSnapshotCount) if err != nil { s.logger.Error("failed to new bolt store", zap.Error(err), zap.String("path", dbPath)) return err } - s.boltStore = boltDB s.logStore = boltDB s.stableStore = boltDB + + if s.useBoltSnapshot { + // Try to migrate previous file snapshots to bbolt if necessary + if err := s.checkAndMigrateFileSnapshot(); err != nil { + s.logger.Error("failed to migrate previous file snapshots to bbolt") + return err + } + s.snapshotStore = boltDB + } else { + fileSnapshots, err := raft.NewFileSnapshotStore(s.dataDir, retainSnapshotCount, os.Stderr) + if err != nil { + s.logger.Error("failed to new file snapshot store", zap.Error(err), zap.String("raftData", s.dataDir)) + return err + } + s.snapshotStore = fileSnapshots + } } fsm, err := NewFSM(s.logger, s.dataDir, s.enforcer) @@ -237,6 +251,74 @@ func (s *Store) DataDir() string { return s.dataDir } +func (s *Store) checkAndMigrateFileSnapshot() error { + const ( + snapPath = "snapshots" + metaFilePath = "meta.json" + stateFilePath = "state.bin" + tmpSuffix = ".tmp" + ) + + // Get snapshot directory + snapDir := filepath.Join(s.dataDir, snapPath) + if stat, err := os.Stat(snapDir); err == nil && stat.IsDir() { + snapshots, err := ioutil.ReadDir(snapDir) + if err != nil { + return err + } + + for _, snap := range snapshots { + // Ignore any files + if !snap.IsDir() { + continue + } + + // Ignore any temporary snapshots + dirName := snap.Name() + if strings.HasSuffix(dirName, tmpSuffix) { + continue + } + + metaPath := filepath.Join(snapDir, dirName, metaFilePath) + fh, err := os.Open(metaPath) + if err != nil { + continue + } + + // Read the metadata of the snapshot + buffered := bufio.NewReader(fh) + meta := &raft.SnapshotMeta{} + dec := json.NewDecoder(buffered) + if err := dec.Decode(meta); err != nil { + fh.Close() + continue + } + fh.Close() + + // Make sure we can understand this version. + if meta.Version < raft.SnapshotVersionMin || meta.Version > raft.SnapshotVersionMax { + continue + } + + // Read the contents of the snapshot + statePath := filepath.Join(snapDir, dirName, stateFilePath) + state, err := ioutil.ReadFile(statePath) + if err != nil { + continue + } + + // Put FileSnapshot into bbolt + if err := s.boltStore.PutSnapshot(*meta, state); err != nil { + continue + } + } + + os.RemoveAll(snapDir) + } + + return nil +} + // applyProtoMessage applies a proto message. func (s *Store) applyProtoMessage(m proto.Message) error { cmd, err := proto.Marshal(m) diff --git a/store/store_test.go b/store/store_test.go index ccbf091..5471b59 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -5,6 +5,8 @@ import ( "crypto/x509" "io/ioutil" "os" + "path/filepath" + "reflect" "testing" "time" @@ -134,7 +136,7 @@ func TestStore_SingleNode(t *testing.T) { raftID := "node-leader" raftAddress := GetLocalIP() + ":6790" - store, err := newStore(enforcer, raftID, raftAddress, true) + store, err := newStore("", enforcer, raftID, raftAddress, true, false) assert.NoError(t, err) defer store.Stop() defer os.RemoveAll(store.DataDir()) @@ -267,14 +269,14 @@ func TestStore_MultipleNode(t *testing.T) { leaderID := "node-leader" followerID := "node-follower" - leaderStore, err := newStore(leaderEnforcer, leaderID, leaderAddress, true) + leaderStore, err := newStore("", leaderEnforcer, leaderID, leaderAddress, true, false) assert.NoError(t, err) defer leaderStore.Stop() err = leaderStore.WaitLeader() assert.NoError(t, err) - followerStore, err := newStore(followerEnforcer, followerID, followerAddress, false) + followerStore, err := newStore("", followerEnforcer, followerID, followerAddress, false, false) assert.NoError(t, err) defer followerStore.Stop() @@ -423,10 +425,84 @@ func TestStore_MultipleNode(t *testing.T) { }) } -func newStore(enforcer casbin.IDistributedEnforcer, id string, address string, enableBootstrap bool) (*Store, error) { +func TestStore_MigrateFileSnapshot(t *testing.T) { dir, err := ioutil.TempDir("", "casbin-hraft-") - if err != nil { - return nil, err + assert.NoError(t, err) + + fileSnapshot, err := raft.NewFileSnapshotStore(dir, retainSnapshotCount, os.Stderr) + assert.NoError(t, err) + _, trans := raft.NewInmemTransport(raft.NewInmemAddr()) + + ctl := gomock.NewController(t) + defer ctl.Finish() + + enforcer := mocks.NewMockIDistributedEnforcer(ctl) + p, err := NewPolicyOperator(zap.NewExample(), dir, enforcer) + assert.NoError(t, err) + + enforcer.EXPECT().AddPoliciesSelf(nil, "p", "p", [][]string{{"role:admin", "/", "*"}, {"role:user", "/", "GET"}}).Return([][]string{{"role:admin", "/", "*"}, {"role:user", "/", "GET"}}, nil) + err = p.AddPolicies("p", "p", [][]string{{"role:admin", "/", "*"}, {"role:user", "/", "GET"}}) + assert.NoError(t, err) + b, err := p.Backup() + assert.NoError(t, err) + size := len(b) + p.db.Close() + + for i := 0; i < 3; i++ { + var sink raft.SnapshotSink + sink, err = fileSnapshot.Create(1, uint64(i), 3, raft.Configuration{}, 0, trans) + assert.NoError(t, err) + _, err := sink.Write(b) + assert.NoError(t, err) + err = sink.Close() + assert.NoError(t, err) + } + + snaps, err := fileSnapshot.List() + assert.NoError(t, err) + assert.Equal(t, retainSnapshotCount, len(snaps)) + + raftID := "node-leader" + raftAddress := GetLocalIP() + ":6790" + + enforcer.EXPECT().ClearPolicySelf(nil) + enforcer.EXPECT().AddPoliciesSelf(nil, "p", "p", [][]string{{"role:admin", "/", "*"}}) + enforcer.EXPECT().AddPoliciesSelf(nil, "p", "p", [][]string{{"role:user", "/", "GET"}}) + + // Create store with bbolt snapshots + store, err := newStore(dir, enforcer, raftID, raftAddress, true, true) + assert.NoError(t, err) + defer store.Stop() + defer os.RemoveAll(store.DataDir()) + + // Make sure file snapshot directory does not exist + const snapPath = "snapshots" + snapDir := filepath.Join(dir, snapPath) + _, err = os.Stat(snapDir) + assert.Error(t, err) + + // Make sure the data is consistent + snaps, err = store.snapshotStore.List() + assert.NoError(t, err) + assert.Equal(t, retainSnapshotCount, len(snaps)) + + for k, v := range snaps { + assert.Equal(t, uint64(2-k), v.Index) + assert.Equal(t, uint64(3), v.Term) + assert.Equal(t, true, reflect.DeepEqual(v.Configuration, raft.Configuration{})) + assert.Equal(t, uint64(0), v.ConfigurationIndex) + assert.Equal(t, int64(size), v.Size) + } +} + +func newStore(dataDir string, enforcer casbin.IDistributedEnforcer, id string, address string, enableBootstrap, useBoltSnapshot bool) (*Store, error) { + dir := dataDir + if dir == "" { + var err error + dir, err = ioutil.TempDir("", "casbin-hraft-") + if err != nil { + return nil, err + } } tlsConfig, err := GetTLSConfig() @@ -453,7 +529,8 @@ func newStore(enforcer casbin.IDistributedEnforcer, id string, address string, e MaxPool: 5, Timeout: 10 * time.Second, }, - Enforcer: enforcer, + Enforcer: enforcer, + UseBoltSnapshot: useBoltSnapshot, }) if err != nil { return nil, err