diff --git a/README.md b/README.md index cd8390e..e0407cb 100644 --- a/README.md +++ b/README.md @@ -57,8 +57,6 @@ $ ./attache-control -help Usage of ./attache-control: -attempt-interval duration Duration to wait between attempts to join or create a cluster (e.g. '1s') (default 3s) - -attempt-limit int - Number of times to join or create a cluster before exiting (default 20) -await-service-name string Consul Service for newly created Redis Cluster Nodes, (required) -consul-acl-token string diff --git a/cmd/attache-control/config.go b/cmd/attache-control/config.go index 507202f..af7141c 100644 --- a/cmd/attache-control/config.go +++ b/cmd/attache-control/config.go @@ -20,10 +20,6 @@ type cliOpts struct { // cluster. attemptInterval time.Duration - // attemptLimit is the number of times to attempt joining or creating a cluster before Attache - // should exit as failed. - attemptLimit int - // awaitServiceName is the name of the Consul Service that newly created // Redis Cluster nodes will join when they're first started but have yet to // form or join a cluster. This field is required. @@ -102,7 +98,6 @@ func ParseFlags() cliOpts { // CLI flag.StringVar(&conf.lockPath, "lock-kv-path", "service/attache/leader", "Consul KV path to use as a leader lock for Redis Cluster operations") flag.DurationVar(&conf.attemptInterval, "attempt-interval", 3*time.Second, "Duration to wait between attempts to join or create a cluster (e.g. '1s')") - flag.IntVar(&conf.attemptLimit, "attempt-limit", 20, "Number of times to attempt join or create a cluster before exiting") flag.StringVar(&conf.awaitServiceName, "await-service-name", "", "Consul Service for newly created Redis Cluster Nodes, (required)") flag.StringVar(&conf.destServiceName, "dest-service-name", "", "Consul Service for healthy Redis Cluster Nodes, (required)") flag.StringVar(&conf.logLevel, "log-level", "info", "Set the log level") diff --git a/cmd/attache-control/main.go b/cmd/attache-control/main.go index 36b6a37..b472e92 100644 --- a/cmd/attache-control/main.go +++ b/cmd/attache-control/main.go @@ -1,18 +1,23 @@ package main import ( + "errors" + "fmt" "os" "os/signal" + "strings" "time" - consulClient "github.com/letsencrypt/attache/src/consul/client" + consul "github.com/letsencrypt/attache/src/consul/client" lockClient "github.com/letsencrypt/attache/src/consul/lock" redisCLI "github.com/letsencrypt/attache/src/redis/cli" - redisClient "github.com/letsencrypt/attache/src/redis/client" + redis "github.com/letsencrypt/attache/src/redis/client" "github.com/letsencrypt/attache/src/redis/config" logger "github.com/sirupsen/logrus" ) +var errContinue = errors.New("continuing") + func setLogLevel(level string) { parsedLevel, err := logger.ParseLevel(level) if err != nil { @@ -21,231 +26,237 @@ func setLogLevel(level string) { logger.SetLevel(parsedLevel) } -func main() { - start := time.Now() - c := ParseFlags() - err := c.Validate() - if err != nil { - logger.Fatal(err) - } - - setLogLevel(c.logLevel) - logger.Infof("starting %s", os.Args[0]) +type leader struct { + cliOpts + lock *lockClient.Lock + scalingOpts *consul.ScalingOpts + destClient *consul.Client + nodesInDest []string + nodesInAwait []string +} - logger.Info("redis: initializing a new redis client") - newNodeClient, err := redisClient.New(c.RedisOpts) +func (l *leader) createNewRedisCluster() error { + // Check the Consul service catalog for other nodes that are waiting to form + // a cluster. We're limiting the scope of our search to nodes in the + // awaitClient Consul service that Consul considers healthy. + awaitClient, err := consul.New(l.ConsulOpts, l.awaitServiceName) if err != nil { - logger.Fatalf("redis: %s", err) + return err } - logger.Info("consul: initializing a new consul client") - destService, err := consulClient.New(c.ConsulOpts, c.destServiceName) + l.nodesInAwait, err = awaitClient.GetNodeAddresses(true) if err != nil { - logger.Fatalf("consul: %s", err) + return err } + numNodesInAwait := len(l.nodesInAwait) + logger.Infof("found %d nodes in consul service %s", numNodesInAwait, l.awaitServiceName) + + // We should only attempt to initialize a new cluster if all of the nodes + // that we expect in said cluster have finished starting up and reside in + // the awaitService Consul service. + if l.scalingOpts.NodesMissing(numNodesInAwait) >= 1 { + return fmt.Errorf("still waiting for nodes to startup, releasing lock: %w", errContinue) + + } else { + var nodesToCluster []string + if l.scalingOpts.ReplicasPerPrimary() == 0 { + // This handles a special case for clusters that are started with + // less than enough replicas to give at least one to each primary. + // Once the first primary only cluster is started and the lock is + // released our remaining replica nodes will be able to add + // themselves to the newly created cluster. + nodesToCluster = l.nodesInAwait[:l.scalingOpts.PrimaryCount] + } else { + nodesToCluster = l.nodesInAwait + } - logger.Infof("consul: fetching scaling options from KV path service/%s/scaling", c.destServiceName) - scalingOpts, err := destService.GetScalingOpts() - if err != nil { - logger.Fatalf("consul: %s", err) + logger.Infof("attempting to create a new cluster with nodes %s", strings.Join(nodesToCluster, " ")) + err := redisCLI.CreateCluster(l.RedisOpts, nodesToCluster, l.scalingOpts.ReplicasPerPrimary()) + if err != nil { + return err + } + return nil } +} - var nodesInDest []string - var nodesInAwait []string +func (l *leader) joinOrCreateRedisCluster() error { + logger.Info("attempting to join or create a cluster") - var attemptCount int - var ticks = time.Tick(c.attemptInterval) - for range ticks { - attemptCount++ + // Check the Consul service catalog for an existing Redis Cluster that we + // can join. We're limiting the scope of our search to nodes in the + // destService Consul service that Consul considers healthy. + var err error + l.nodesInDest, err = l.destClient.GetNodeAddresses(true) + if err != nil { + return err + } + numNodesInDest := len(l.nodesInDest) - nodeIsNew, err := newNodeClient.StateNewCheck() + // If no existing nodes can be found with this criteria, we know that we + // need to initialize a new cluster. + if numNodesInDest <= 0 { + err = l.createNewRedisCluster() if err != nil { - logger.Fatalf("redis: %s", err) + return err } + logger.Info("new cluster created successfully") + return nil + } + existingClusterNode := l.nodesInDest[0] + logger.Infof("found %d cluster nodes in consul service %s", numNodesInDest, l.destServiceName) + + logger.Infof("gathering info from the cluster that %s belongs to", existingClusterNode) + clusterClient, err := redis.New(config.RedisOpts{ + NodeAddr: existingClusterNode, + Username: l.RedisOpts.Username, + PasswordConfig: l.RedisOpts.PasswordConfig, + TLSConfig: l.RedisOpts.TLSConfig, + }) + if err != nil { + return err + } - if !nodeIsNew { - logger.Info("redis: this node has already joined a cluster") - break - } + primaryNodesInCluster, err := clusterClient.GetPrimaryNodes() + if err != nil { + return err + } + + replicaNodesInCluster, err := clusterClient.GetReplicaNodes() + if err != nil { + return err + } - lock, err := lockClient.New(c.ConsulOpts, c.lockPath, "10s") + if len(primaryNodesInCluster) < l.scalingOpts.PrimaryCount { + // The current cluster has less than the expected shard primary nodes. + // This node should be added as a new primary and the existing cluster + // shardslots should be rebalanced. + logger.Infof("%s should be added as a shard primary", l.RedisOpts.NodeAddr) + logger.Infof("attempting to add %s to the cluster that %s belongs to", l.RedisOpts.NodeAddr, existingClusterNode) + err := redisCLI.AddNewShardPrimary(l.RedisOpts, existingClusterNode) if err != nil { - logger.Fatalf("consul: %s", err) + return err } - - nodeHasLock, err := lock.Acquire() + logger.Infof("%s was successfully added as a shard primary", l.RedisOpts.NodeAddr) + return nil + + } else if len(replicaNodesInCluster) < l.scalingOpts.ReplicaCount { + // All expected shard primary nodes exist in the current cluster. This + // node should be added as a replica to the primary node with the least + // number of replicas. + logger.Infof("%s should be added as a new shard replica", l.RedisOpts.NodeAddr) + logger.Infof("attempting to add %s to the cluster that %s belongs to", l.RedisOpts.NodeAddr, existingClusterNode) + err := redisCLI.AddNewShardReplica(l.RedisOpts, existingClusterNode) if err != nil { - logger.Fatalf("consul: %s", err) + return err } + logger.Infof("%s was successfully added as a shard replica", l.RedisOpts.NodeAddr) + return nil + } - // If forced to exit early, cleanup our session. - catchSignals := make(chan os.Signal, 1) - signal.Notify(catchSignals, os.Interrupt) - go func() { - <-catchSignals - logger.Error("consul: operation interrupted, cleaning up session and exiting") - - lock.Cleanup() - os.Exit(1) - }() - - if nodeHasLock { - logger.Info("consul: acquired leader lock") - - // Spin-off a goroutine to periodically renew our leader lock until - // our work is complete. - doneChan := make(chan struct{}) - go lock.Renew(doneChan) - - cleanup := func() { - // Stop renewing the lock session. - close(doneChan) - lock.Cleanup() - } + // This should never happen as long as the job and scaling opts match. + return fmt.Errorf("%s couldn't be added to an existing cluster", l.RedisOpts.NodeAddr) +} - // Check the Consul service catalog for an existing Redis Cluster - // that we can join. We're limiting the scope of our search to nodes - // in the destService Consul service that Consul considers healthy. - nodesInDest, err = destService.GetNodeAddresses(true) - if err != nil { - cleanup() - logger.Fatal(err) - } - logger.Infof("consul: found nodes %q in service %q", nodesInDest, c.destServiceName) - - // If 0 existing nodes can be found with this criteria, we know that - // we need to initialize a new cluster. - if len(nodesInDest) == 0 { - // Check the Consul service catalog for other nodes that are - // waiting to form a cluster. We're limiting the scope of our - // search to nodes in the awaitService Consul service that - // Consul considers healthy. - awaitService, err := consulClient.New(c.ConsulOpts, c.awaitServiceName) - if err != nil { - cleanup() - logger.Fatal(err) - } +func attemptLeaderLock(c cliOpts, scaling *consul.ScalingOpts, dest *consul.Client) error { + lock, err := lockClient.New(c.ConsulOpts, c.lockPath, "10s") + if err != nil { + return err + } - nodesInAwait, err = awaitService.GetNodeAddresses(true) - if err != nil { - cleanup() - logger.Fatalf("consul: %s", err) - } - logger.Infof("consul: found nodes %q in service %q", nodesInAwait, c.awaitServiceName) - - // We should only attempt to initialize a new cluster if all of - // the nodes that we expect in said cluster have finished - // starting up and reside in the awaitService Consul service. - nodesMissing := scalingOpts.TotalCount() - len(nodesInAwait) - if nodesMissing <= 0 { - replicasPerPrimary := scalingOpts.ReplicaCount / scalingOpts.PrimaryCount - - var nodesToCluster []string - if replicasPerPrimary == 0 { - // This handles a special case for clusters that are - // started with less than enough replicas to give at - // least one to each primary. Once the first primary - // only cluster is started and the lock is released our - // remaining replica nodes will be able to add - // themselves to the newly created cluster. - nodesToCluster = nodesInAwait[:scalingOpts.PrimaryCount] - } else { - nodesToCluster = nodesInAwait - } + err = lock.Acquire() + if err != nil { + return err + } + defer lock.Cleanup() - logger.Infof("attempting to create a new cluster with nodes %q", nodesToCluster) - err := redisCLI.CreateCluster(c.RedisOpts, nodesToCluster, replicasPerPrimary) - if err != nil { - cleanup() - logger.Fatalf("redis: %s", err) - } - logger.Info("redis: succeeded") - cleanup() - break - } else { - logger.Infof("still waiting for %d nodes to startup, releasing lock", nodesMissing) - cleanup() - continue - } - } + if !lock.Acquired { + return fmt.Errorf("another node currently has the lock: %w", errContinue) + } - logger.Infof("redis: gathering info from the cluster that %q belongs to", nodesInDest[0]) - clusterClient, err := redisClient.New( - config.RedisOpts{ - NodeAddr: nodesInDest[0], - Username: c.RedisOpts.Username, - PasswordConfig: c.RedisOpts.PasswordConfig, - TLSConfig: c.RedisOpts.TLSConfig, - }, - ) - if err != nil { - cleanup() - logger.Fatalf("redis: %s", err) - } + logger.Info("acquired the lock") + leader := &leader{ + cliOpts: c, + lock: lock, + scalingOpts: scaling, + destClient: dest, + } + return leader.joinOrCreateRedisCluster() +} - primaryNodesInCluster, err := clusterClient.GetPrimaryNodes() - if err != nil { - cleanup() - logger.Fatalf("redis: %s", err) - } +func main() { + c := ParseFlags() + err := c.Validate() + if err != nil { + logger.Fatal(err) + } - replicaNodesInCluster, err := clusterClient.GetReplicaNodes() - if err != nil { - cleanup() - logger.Fatalf("redis: %s", err) - } + setLogLevel(c.logLevel) + logger.Infof("starting %s", os.Args[0]) - if len(primaryNodesInCluster) < scalingOpts.PrimaryCount { - // The current cluster has less than the expected shard primary - // nodes. This node should be added as a new primary and the - // existing cluster shardslots should be rebalanced. - logger.Infof("redis: node %q should be added as a new shard primary", c.RedisOpts.NodeAddr) - logger.Infof("redis: attempting to join %q to the cluster that %q belongs to", c.RedisOpts.NodeAddr, nodesInDest[0]) + logger.Info("initializing a new redis client") + thisNode, err := redis.New(c.RedisOpts) + if err != nil { + logger.Fatal(err) + } - err := redisCLI.AddNewShardPrimary(c.RedisOpts, nodesInDest[0]) + logger.Info("initializing a new consul client") + dest, err := consul.New(c.ConsulOpts, c.destServiceName) + if err != nil { + logger.Fatal(err) + } + + logger.Infof("fetching scaling options from consul path 'service/%s/scaling'", c.destServiceName) + scaling, err := dest.GetScalingOpts() + if err != nil { + logger.Fatal(err) + } + + catchSignals := make(chan os.Signal, 1) + signal.Notify(catchSignals, os.Interrupt) + + ticker := time.NewTicker(c.attemptInterval) + done := make(chan bool, 1) + + go func() { + for { + select { + case <-done: + // Exit. + return + + case <-catchSignals: + // Gracefully shutdown. + ticker.Stop() + done <- true + + case <-ticker.C: + // Attempt to create or modify a cluster. + thisNodeIsNew, err := thisNode.IsNew() if err != nil { - cleanup() - logger.Fatalf("redis: %s", err) + logger.Errorf("while attempting to check that status of %s: %s", c.RedisOpts.NodeAddr, err) + continue } - logger.Info("redis: succeeded") - cleanup() - break - - } else if len(replicaNodesInCluster) < scalingOpts.ReplicaCount { - // All expected shard primary nodes exist in the current - // cluster. This node should be added as a replica to the - // primary node with the least number of replicas. - logger.Infof("redis: node %q should be added as a new shard replica", c.RedisOpts.NodeAddr) - logger.Infof("redis: attempting to join %q to the cluster that %q belongs to", c.RedisOpts.NodeAddr, nodesInDest[0]) - - err := redisCLI.AddNewShardReplica(c.RedisOpts, nodesInDest[0]) + if !thisNodeIsNew { + logger.Info("this node is already part of an existing cluster") + + // Stop the ticker and run until killed due to: + // https://github.com/hashicorp/nomad/issues/10058 + ticker.Stop() + logger.Info("running until killed...") + continue + } + err = attemptLeaderLock(c, scaling, dest) if err != nil { - cleanup() - logger.Fatalf("redis: %s", err) + if errors.Is(err, errContinue) { + logger.Info(err) + continue + } + logger.Errorf("while attempting to join or create a cluster: %s", err) + continue } - logger.Info("redis: succeeded") - cleanup() - break } - } else { - if attemptCount >= c.attemptLimit { - logger.Fatal("failed to join or initialize a cluster during the time permitted") - } - logger.Info("another node currently has the lock") - logger.Infof("continuing to wait, %d attempts remain", (c.attemptLimit - attemptCount)) } - } - - // TODO: Remove once https://github.com/hashicorp/nomad/issues/10058 has - // been solved. Nomad Post-Start tasks need to stay healthy for at least 10s - // after the Main Tasks are marked healthy. Attache is a Post-Start Task, so - // just sleeping for a really long time will ensure that we don't - // accidentally trigger this behavior during a deployment. - duration := time.Since(start) - if duration < time.Minute*10 { - timeToWait := time.Minute*10 - duration - logger.Infof("waiting %s to exit", timeToWait.String()) - time.Sleep(timeToWait) - } + }() + <-done logger.Info("exiting...") } diff --git a/example/redis-cluster.hcl b/example/redis-cluster.hcl index 88f948b..2bca120 100644 --- a/example/redis-cluster.hcl +++ b/example/redis-cluster.hcl @@ -195,7 +195,7 @@ job "redis-cluster" { task "attache-control" { lifecycle { hook = "poststart" - sidecar = false + sidecar = true } service { name = var.await-service-name diff --git a/src/consul/client/client.go b/src/consul/client/client.go index 0a5d91a..34f8861 100644 --- a/src/consul/client/client.go +++ b/src/consul/client/client.go @@ -57,12 +57,22 @@ type ScalingOpts struct { ReplicaCount int `yaml:"replica-count"` } -// TotalCount returns the total count of expected replica and primary Redis -// Cluster nodes. -func (s *ScalingOpts) TotalCount() int { +// totalCount returns the total count of expected replica and primary nodes. +func (s *ScalingOpts) totalCount() int { return s.PrimaryCount + s.ReplicaCount } +// NodesMissing returns the total count of expected replica and primary nodes +// minus nodesInAwait. +func (s *ScalingOpts) NodesMissing(nodesInAwait int) int { + return s.totalCount() - nodesInAwait +} + +// ReplicasPerPrimary returns the number of replica nodes per primary shard. +func (s *ScalingOpts) ReplicasPerPrimary() int { + return s.ReplicaCount / s.PrimaryCount +} + // GetScalingOpts fetches the count of Redis primary and replica nodes from KV // path: "service/destServiceName/scaling", and return them as a `*ScalingOpts` // to the caller. diff --git a/src/consul/lock/lock.go b/src/consul/lock/lock.go index f844ae2..6fc920e 100644 --- a/src/consul/lock/lock.go +++ b/src/consul/lock/lock.go @@ -11,10 +11,12 @@ import ( // sessions. This is used by attache-control to ensure that only one Redis // Cluster node operation (create, add, remove) happens at once. type Lock struct { + Acquired bool client *consul.Client key string sessionID string sessionTimeout string + renewChan chan struct{} } // New creates a new Consul client, aquires an ephemeral session with that @@ -59,41 +61,59 @@ func (l *Lock) createSession() error { return nil } -// Acquire attempts to obtain a lock for the Consul KV path of `l.key`. Returns -// true on success or false on failure. -func (l *Lock) Acquire() (bool, error) { +// Acquire attempts to obtain a lock for the Consul KV path of `l.key`. Sets `l.Acquired` +// true on success and false on failure. +func (l *Lock) Acquire() error { kvPair := &consul.KVPair{ Key: l.key, Value: []byte(l.sessionID), Session: l.sessionID, } - acquired, _, err := l.client.KV().Acquire(kvPair, nil) - return acquired, err + var err error + l.Acquired, _, err = l.client.KV().Acquire(kvPair, nil) + if l.Acquired { + // Spin off a long-running go-routine to continuously renew our session. + go l.periodicallyRenew() + } + return err } -// Renew is used to periodically invoke Session.Renew() on a session until a -// `doneChan` is closed, it should only be called from a long running goroutine. -func (l *Lock) Renew(doneChan <-chan struct{}) { - err := l.client.Session().RenewPeriodic(l.sessionTimeout, l.sessionID, nil, doneChan) +// periodicallyRenew will invoke periodicallyRenew() before l.sessionTimeout on +// a session until a l.renewChan is closed, it should only be called from a long +// running goroutine. +func (l *Lock) periodicallyRenew() { + l.renewChan = make(chan struct{}) + err := l.client.Session().RenewPeriodic(l.sessionTimeout, l.sessionID, nil, l.renewChan) if err != nil { logger.Error(err) } } -// Cleanup releases our leader lock by deleting the KV pair and destroying the -// session that was used to create it in the first place. These calls only need -// to be best-effort. In the event that either of them fail the lock will -// release once the session expires and any KV pair created during that session -// will be deleted as well. +// Cleanup stops periodic session renewals used to hold the lock, releases the +// lock by deleting the key, and destroys the session. Deleting the key and +// destroying the session only need to be best effort. In the event that either +// of these calls fail the lock will be released and the session will be +// destroyed l.sessionTimeout after l.renewChan is closed. func (l *Lock) Cleanup() { - _, err := l.client.KV().Delete(l.key, nil) - if err != nil { - logger.Errorf("cannot delete lock key %q: %s", l.key, err) - } + if l.Acquired { + // Halt periodic session renewals. + close(l.renewChan) - _, err = l.client.Session().Destroy(l.sessionID, nil) - if err != nil { - logger.Errorf("cannot cleanup session %q: %s", l.sessionID, err) + // Delete the key holding the lock. + _, err := l.client.KV().Delete(l.key, nil) + if err != nil { + logger.Errorf("cannot lock key %q: %s", l.key, err) + } + l.Acquired = false + + } + if l.sessionID != "" { + // Destroy the session. + _, err := l.client.Session().Destroy(l.sessionID, nil) + if err != nil { + logger.Errorf("cannot cleanup session %q: %s", l.sessionID, err) + } + l.sessionID = "" } } diff --git a/src/redis/cli/cli.go b/src/redis/cli/cli.go index 1647301..f92d995 100644 --- a/src/redis/cli/cli.go +++ b/src/redis/cli/cli.go @@ -8,6 +8,7 @@ import ( "github.com/letsencrypt/attache/src/redis/client" "github.com/letsencrypt/attache/src/redis/config" + logger "github.com/sirupsen/logrus" ) func makeAuthArgs(conf config.RedisOpts) ([]string, error) { @@ -95,21 +96,26 @@ func AddNewShardPrimary(conf config.RedisOpts, destNodeAddr string) error { if err != nil { return err } + logger.Info("cluster MEET succeeded") - // Occasionally a cluster won't be ready for a shard slot rebalance - // immediately after meeting a new primary node because gossip about this - // new master hasn't propogated yet. This should be reattempted a few times. + // Retry shard slot belance for a full minute before failing. Occasionally a + // cluster won't be ready for a shard slot rebalance immediately after + // meeting a new primary node because gossip about this new master hasn't + // propogated yet. + logger.Info("attempting cluster shard slot rebalance") var attempts int - var ticks = time.Tick(5 * time.Second) - for range ticks { + ticker := time.NewTicker(6 * time.Second) + defer ticker.Stop() + for range ticker.C { attempts++ err = execute(conf, []string{"--cluster", "rebalance", conf.NodeAddr, "--cluster-use-empty-masters"}) if err != nil { - if attempts >= 5 { + if attempts >= 10 { return err } continue } + logger.Info("cluster shard slot rebalance succeeded") break } return nil diff --git a/src/redis/client/client.go b/src/redis/client/client.go index cdc23af..d482503 100644 --- a/src/redis/client/client.go +++ b/src/redis/client/client.go @@ -23,7 +23,7 @@ type Client struct { Client *redis.Client } -func (h *Client) StateNewCheck() (bool, error) { +func (h *Client) IsNew() (bool, error) { var infoMatchingNewNodes = clusterInfo{"fail", 0, 0, 0, 0, 1, 0, 0, 0, 0, 0} clusterInfo, err := h.GetClusterInfo() if err != nil {