Skip to content

Commit

Permalink
Add periodic refresh back to per-namespace worker manager (#7105)
Browse files Browse the repository at this point in the history
## What changed?
- Add periodic refresh back to `perNamespaceWorkerManager`
- Pass full `refreshArgs` to `getWorkerAllocation` and use `ns` from
`args`
- Rename fields in `workerAllocation`
- Small optimization to building the component set + options string

## Why?
- The dynamic config subscriptions only reacted to the two settings
directly used by `perNamespaceWorkerManager`, not the enabled/disabled
settings in components.
- `getWorkerAllocation`/`getLocallyDesiredWorkers` was reading `w.ns`
which is technically a data race since `w` is not locked there.
- The fields are not used outside the package.

## How did you test it?
existing tests
  • Loading branch information
dnr authored Jan 17, 2025
1 parent e08772a commit 5c21b7c
Showing 1 changed file with 44 additions and 27 deletions.
71 changes: 44 additions & 27 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"errors"
"fmt"
"maps"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -59,6 +60,11 @@ import (

const (
perNamespaceWorkerManagerListenerKey = "perNamespaceWorkerManager"

// Always refresh workers after this time even if there were no membership or namespace
// state changes. This is to pick up dynamic config changes in component enabled status,
// and heal anything that may have gotten into a bad state.
refreshInterval = 10 * time.Minute
)

type (
Expand Down Expand Up @@ -120,8 +126,7 @@ type (
}

workerAllocation struct {
Total int
Local int
total, local int
}

errRetryAfter time.Duration
Expand Down Expand Up @@ -178,6 +183,7 @@ func (wm *perNamespaceWorkerManager) Start(
wm.logger.Fatal("Unable to register membership listener", tag.Error(err))
}
go wm.membershipChangedListener()
go wm.periodicRefresh()

wm.logger.Info("", tag.LifeCycleStarted)
}
Expand Down Expand Up @@ -231,6 +237,15 @@ func (wm *perNamespaceWorkerManager) membershipChangedListener() {
}
}

func (wm *perNamespaceWorkerManager) periodicRefresh() {
for range time.NewTicker(refreshInterval).C {
if atomic.LoadInt32(&wm.status) != common.DaemonStatusStarted {
return
}
wm.refreshAll()
}
}

func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespace) *perNamespaceWorker {
wm.lock.Lock()
defer wm.lock.Unlock()
Expand Down Expand Up @@ -265,28 +280,28 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {
}
}

func (w *perNamespaceWorker) getWorkerAllocation(count int) (workerAllocation, error) {
if count < 0 {
func (w *perNamespaceWorker) getWorkerAllocation(args refreshArgs) (workerAllocation, error) {
if args.count < 0 {
return workerAllocation{}, errInvalidConfiguration
} else if count == 0 {
} else if args.count == 0 {
return workerAllocation{0, 0}, nil
}
localCount, err := w.getLocallyDesiredWorkers(count)
localCount, err := w.getLocallyDesiredWorkers(args)
if err != nil {
return workerAllocation{}, err
}
return workerAllocation{count, localCount}, nil
return workerAllocation{total: args.count, local: localCount}, nil
}

func (w *perNamespaceWorker) getLocallyDesiredWorkers(count int) (int, error) {
key := w.ns.ID().String()
availableHosts := w.wm.serviceResolver.LookupN(key, count)
func (w *perNamespaceWorker) getLocallyDesiredWorkers(args refreshArgs) (int, error) {
key := args.ns.ID().String()
availableHosts := w.wm.serviceResolver.LookupN(key, args.count)
hostsCount := len(availableHosts)
if hostsCount == 0 {
return 0, membership.ErrInsufficientHosts
}
maxWorkersPerHost := count/hostsCount + 1
desiredDistribution := util.RepeatSlice(availableHosts, maxWorkersPerHost)[:count]
maxWorkersPerHost := args.count/hostsCount + 1
desiredDistribution := util.RepeatSlice(availableHosts, maxWorkersPerHost)[:args.count]

isLocal := func(info membership.HostInfo) bool { return info.Identity() == w.wm.self.Identity() }
result := len(util.FilterSlice(desiredDistribution, isLocal))
Expand Down Expand Up @@ -382,6 +397,8 @@ func (w *perNamespaceWorker) refresh(args refreshArgs) (retErr error) {
w.handleError(retErr)
}()

// note w.lock is not locked until we're about to start/stop a worker

if !w.wm.Running() ||
args.ns.State() == enumspb.NAMESPACE_STATE_DELETED ||
!args.ns.ActiveInCluster(w.wm.thisClusterName) {
Expand All @@ -390,12 +407,12 @@ func (w *perNamespaceWorker) refresh(args refreshArgs) (retErr error) {

// figure out which components are enabled at all for this namespace
var enabledComponents []workercommon.PerNSWorkerComponent
var componentSet string
var componentSet strings.Builder
for _, cmp := range w.wm.components {
options := cmp.DedicatedWorkerOptions(args.ns)
if options.Enabled {
enabledComponents = append(enabledComponents, cmp)
componentSet += fmt.Sprintf("%p,", cmp)
fmt.Fprintf(&componentSet, "%p,", cmp)
}
}

Expand All @@ -405,21 +422,21 @@ func (w *perNamespaceWorker) refresh(args refreshArgs) (retErr error) {
}

// check if we are responsible for this namespace at all
workerAllocation, err := w.getWorkerAllocation(args.count)
workerAllocation, err := w.getWorkerAllocation(args)
if err != nil {
w.logger.Error("Failed to look up hosts", tag.Error(err))
// TODO: add metric also
return err
}
if workerAllocation.Local == 0 {
if workerAllocation.local == 0 {
// not ours, don't need a worker
return errNoWorkerNeeded
}
// ensure this changes if multiplicity changes
componentSet += fmt.Sprintf(",%d", workerAllocation.Local)
fmt.Fprintf(&componentSet, "%d,", workerAllocation.local)

// get sdk worker options
componentSet += fmt.Sprintf(",%+v", w.opts)
fmt.Fprintf(&componentSet, "%+v,", w.opts)

// we do need a worker, but maybe we have one already
w.lock.Lock()
Expand All @@ -430,7 +447,7 @@ func (w *perNamespaceWorker) refresh(args refreshArgs) (retErr error) {
return nil
}

if componentSet == w.componentSet {
if componentSet.String() == w.componentSet {
// no change in set of components enabled, leave existing running
return nil
}
Expand Down Expand Up @@ -458,7 +475,7 @@ func (w *perNamespaceWorker) refresh(args refreshArgs) (retErr error) {

w.client = client
w.worker = worker
w.componentSet = componentSet
w.componentSet = componentSet.String()
return nil
}

Expand Down Expand Up @@ -489,18 +506,18 @@ func (w *perNamespaceWorker) startWorker(
sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(nsName))
sdkoptions.Identity = fmt.Sprintf("temporal-system@%s@%s", w.wm.hostName, nsName)
// increase these if we're supposed to run with more allocation
sdkoptions.MaxConcurrentWorkflowTaskPollers *= allocation.Local
sdkoptions.MaxConcurrentActivityTaskPollers *= allocation.Local
sdkoptions.MaxConcurrentLocalActivityExecutionSize *= allocation.Local
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= allocation.Local
sdkoptions.MaxConcurrentActivityExecutionSize *= allocation.Local
sdkoptions.MaxConcurrentWorkflowTaskPollers *= allocation.local
sdkoptions.MaxConcurrentActivityTaskPollers *= allocation.local
sdkoptions.MaxConcurrentLocalActivityExecutionSize *= allocation.local
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= allocation.local
sdkoptions.MaxConcurrentActivityExecutionSize *= allocation.local
sdkoptions.OnFatalError = w.onFatalError

// this should not block because the client already has server capabilities
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
details := workercommon.RegistrationDetails{
TotalWorkers: allocation.Total,
Multiplicity: allocation.Local,
TotalWorkers: allocation.total,
Multiplicity: allocation.local,
}
for _, cmp := range components {
cleanup := cmp.Register(worker, w.ns, details)
Expand Down

0 comments on commit 5c21b7c

Please sign in to comment.