diff --git a/cmd-rpc.go b/cmd-rpc.go index 92f3f17e..4e269513 100644 --- a/cmd-rpc.go +++ b/cmd-rpc.go @@ -12,7 +12,6 @@ import ( "time" "github.com/allegro/bigcache/v3" - "github.com/davecgh/go-spew/spew" "github.com/fsnotify/fsnotify" hugecache "github.com/rpcpool/yellowstone-faithful/huge-cache" splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher" @@ -109,7 +108,7 @@ func newCmd_rpc() *cli.Command { } klog.Infof("Found %d config files:", len(configFiles)) for _, configFile := range configFiles { - fmt.Printf(" - %s\n", configFile) + klog.V(3).Infof(" - %s", configFile) } conf := bigcache.DefaultConfig(5 * time.Minute) @@ -205,109 +204,93 @@ func newCmd_rpc() *cli.Command { return cli.Exit(err.Error(), 1) } klog.Infof("Found %d directories; will start watching them for changes ...", len(dirs)) - spew.Dump(dirs) + for _, dir := range dirs { + klog.V(3).Infof(" - %s", dir) + } ctx, cancel := context.WithCancel(c.Context) defer cancel() - // create a map that tracks files that are already being processed because of an event: - // this is to avoid processing the same file multiple times - // (e.g. if a file is create and then modified, we don't want to process it twice) - fileProcessingTracker := make(map[string]struct{}) - mu := &sync.Mutex{} - - err = onFileChanged(ctx, dirs, func(event fsnotify.Event) { - if !isJSONFile(event.Name) && !isYAMLFile(event.Name) { - klog.V(1).Infof("File %q is not a JSON or YAML file; do nothing", event.Name) - return - } - klog.V(1).Infof("File event: name=%q, op=%q", event.Name, event.Op) + err = onFileChanged( + ctx, + epochLoadConcurrency, + dirs, + func(event fsnotify.Event) { + if !isJSONFile(event.Name) && !isYAMLFile(event.Name) { + klog.V(3).Infof("File %q is not a JSON or YAML file; do nothing", event.Name) + return + } + klog.V(3).Infof("File event: name=%q, op=%q", event.Name, event.Op) - if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) { - klog.V(1).Infof("Epoch with same hash as file %q is already loaded; do nothing", event.Name) - return - } - // register the file as being processed - mu.Lock() - _, ok := fileProcessingTracker[event.Name] - if ok { - klog.V(1).Infof("File %q is already being processed; do nothing", event.Name) - mu.Unlock() - return - } - fileProcessingTracker[event.Name] = struct{}{} - mu.Unlock() - // remove the file from the tracker when we're done processing it - defer func() { - mu.Lock() - delete(fileProcessingTracker, event.Name) - mu.Unlock() - }() - - switch event.Op { - case fsnotify.Write: - { - startedAt := time.Now() - klog.V(1).Infof("File %q was modified; processing...", event.Name) - // find the config file, load it, and update the epoch (replace) - config, err := LoadConfig(event.Name) - if err != nil { - klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) - return - } - epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo) - if err != nil { - klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) - return - } - err = multi.ReplaceOrAddEpoch(epoch.Epoch(), epoch) - if err != nil { - klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error()) - return - } - klog.V(1).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt)) + if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) { + klog.V(3).Infof("Epoch with same hash as file %q is already loaded; do nothing", event.Name) + return } - case fsnotify.Create: - { - startedAt := time.Now() - klog.V(1).Infof("File %q was created; processing...", event.Name) - // find the config file, load it, and add it to the multi-epoch (if not already added) - config, err := LoadConfig(event.Name) - if err != nil { - klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) - return - } - epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo) - if err != nil { - klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) - return + + switch event.Op { + case fsnotify.Write: + { + startedAt := time.Now() + klog.V(3).Infof("File %q was modified; processing...", event.Name) + // find the config file, load it, and update the epoch (replace) + config, err := LoadConfig(event.Name) + if err != nil { + klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) + return + } + epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo) + if err != nil { + klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) + return + } + err = multi.ReplaceOrAddEpoch(epoch.Epoch(), epoch) + if err != nil { + klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error()) + return + } + klog.V(2).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt)) } - err = multi.AddEpoch(epoch.Epoch(), epoch) - if err != nil { - klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error()) - return + case fsnotify.Create: + { + startedAt := time.Now() + klog.V(3).Infof("File %q was created; processing...", event.Name) + // find the config file, load it, and add it to the multi-epoch (if not already added) + config, err := LoadConfig(event.Name) + if err != nil { + klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) + return + } + epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo) + if err != nil { + klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) + return + } + err = multi.AddEpoch(epoch.Epoch(), epoch) + if err != nil { + klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error()) + return + } + klog.V(2).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt)) } - klog.V(1).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt)) - } - case fsnotify.Remove: - { - startedAt := time.Now() - klog.V(1).Infof("File %q was removed; processing...", event.Name) - // find the epoch that corresponds to this file, and remove it (if any) - epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name) - if err != nil { - klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error()) + case fsnotify.Remove: + { + startedAt := time.Now() + klog.V(3).Infof("File %q was removed; processing...", event.Name) + // find the epoch that corresponds to this file, and remove it (if any) + epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name) + if err != nil { + klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error()) + } + klog.V(2).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt)) } - klog.V(1).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt)) + case fsnotify.Rename: + klog.V(3).Infof("File %q was renamed; do nothing", event.Name) + case fsnotify.Chmod: + klog.V(3).Infof("File %q had its permissions changed; do nothing", event.Name) + default: + klog.V(3).Infof("File %q had an unknown event %q; do nothing", event.Name, event.Op) } - case fsnotify.Rename: - klog.V(1).Infof("File %q was renamed; do nothing", event.Name) - case fsnotify.Chmod: - klog.V(1).Infof("File %q had its permissions changed; do nothing", event.Name) - default: - klog.V(1).Infof("File %q had an unknown event %q; do nothing", event.Name, event.Op) - } - }) + }) if err != nil { return cli.Exit(err.Error(), 1) } @@ -329,13 +312,49 @@ func newCmd_rpc() *cli.Command { } } +// create a map that tracks files that are already being processed because of an event: +// this is to avoid processing the same file multiple times +// (e.g. if a file is create and then modified, we don't want to process it twice) +type fileProcessingTracker struct { + mu sync.Mutex + m map[string]struct{} +} + +func newFileProcessingTracker() *fileProcessingTracker { + return &fileProcessingTracker{ + m: make(map[string]struct{}), + } +} + +func (f *fileProcessingTracker) isBeingProcessedOrAdd(filename string) bool { + f.mu.Lock() + defer f.mu.Unlock() + _, ok := f.m[filename] + if !ok { + f.m[filename] = struct{}{} + } + // if ok is true, then the file is already being processed + return ok +} + +func (f *fileProcessingTracker) removeFromList(filename string) { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.m, filename) +} + // - get the list of provided arguments, and distinguish between files and directories // - load all the config files, etc. // - start a goroutine that monitors the config files for changes // - when a config file changes, reload it and update the epoch // - start a goroutine that monitors the directories and subdirectories for changes (new files, deleted files, etc.) // - is only watching directories sufficient? or do we need to watch files too? -func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Event)) error { +func onFileChanged( + ctx context.Context, + epochLoadConcurrency int, + dirs []string, + callback func(fsnotify.Event), +) error { // monitor a directory for file changes watcher, err := fsnotify.NewWatcher() if err != nil { @@ -353,6 +372,10 @@ func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Ev // start a goroutine to handle events go func() { defer watcher.Close() + tracker := newFileProcessingTracker() + wg := new(errgroup.Group) + wg.SetLimit(epochLoadConcurrency) + defer wg.Wait() for { select { case <-ctx.Done(): @@ -361,7 +384,15 @@ func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Ev if !ok { return } - callback(event) + wg.Go(func() error { + if tracker.isBeingProcessedOrAdd(event.Name) { + klog.V(3).Infof("File %q is already being processed; do nothing", event.Name) + return nil + } + defer tracker.removeFromList(event.Name) + callback(event) + return nil + }) case err, ok := <-watcher.Errors: if !ok { return diff --git a/epoch.go b/epoch.go index 46189d38..861d20af 100644 --- a/epoch.go +++ b/epoch.go @@ -301,7 +301,7 @@ func NewEpochFromConfig( if !ok { return nil, fmt.Errorf("failed to find miner for piece CID %s", piece.CommP) } - klog.V(2).Infof("piece CID %s is stored on miner %s", piece.CommP, minerID) + klog.V(3).Infof("piece CID %s is stored on miner %s", piece.CommP, minerID) minerInfo, err := minerInfo.GetProviderInfo(c.Context, minerID) if err != nil { return nil, fmt.Errorf("failed to get miner info for miner %s, for piece %s: %w", minerID, piece.CommP, err) @@ -309,7 +309,7 @@ func NewEpochFromConfig( if len(minerInfo.Multiaddrs) == 0 { return nil, fmt.Errorf("miner %s has no multiaddrs", minerID) } - klog.V(2).Infof("miner info: %s", spew.Sdump(minerInfo)) + klog.V(3).Infof("miner info: %s", spew.Sdump(minerInfo)) // extract the IP address from the multiaddr: split := multiaddr.Split(minerInfo.Multiaddrs[0]) if len(split) < 2 { @@ -330,7 +330,7 @@ func NewEpochFromConfig( return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0]) } minerIP := fmt.Sprintf("%s:%s", ip, port) - klog.V(2).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP) + klog.V(3).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP) formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String()) return splitcarfetcher.NewRemoteFileSplitCarReader( piece.CommP.String(), @@ -425,12 +425,12 @@ func NewEpochFromConfig( } ep.onClose = append(ep.onClose, sigExists.Close) - { - // warm up the cache - for i := 0; i < 100_000; i++ { - sigExists.Has(newRandomSignature()) - } - } + // { + // // warm up the cache + // for i := 0; i < 10; i++ { + // sigExists.Has(newRandomSignature()) + // } + // } ep.sigExists = sigExists } else { @@ -440,12 +440,12 @@ func NewEpochFromConfig( } ep.onClose = append(ep.onClose, sigExists.Close) - { - // warm up the cache - for i := 0; i < 100_000; i++ { - sigExists.Has(newRandomSignature()) - } - } + // { + // // warm up the cache + // for i := 0; i < 10; i++ { + // sigExists.Has(newRandomSignature()) + // } + // } ep.sigExists = sigExists diff --git a/go.mod b/go.mod index 1932a9ef..9deac589 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gagliardetto/binary v0.7.8 github.com/gagliardetto/solana-go v1.8.4 github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.6.0 github.com/hannahhoward/go-pubsub v1.0.0 // indirect github.com/ipfs/go-blockservice v0.5.0 // indirect github.com/ipfs/go-cid v0.4.1 diff --git a/go.sum b/go.sum index 9c1e67d4..5e5443c5 100644 --- a/go.sum +++ b/go.sum @@ -307,8 +307,8 @@ github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b/go.mod h1:czg5+yv1E0Z github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= diff --git a/klog.go b/klog.go index faa0e64e..da0dd458 100644 --- a/klog.go +++ b/klog.go @@ -9,7 +9,7 @@ import ( ) func NewKlogFlagSet() []cli.Flag { - fs := flag.NewFlagSet("", flag.PanicOnError) + fs := flag.NewFlagSet("klog", flag.PanicOnError) klog.InitFlags(fs) return []cli.Flag{ @@ -39,9 +39,10 @@ func NewKlogFlagSet() []cli.Flag { }, // "log_file_max_size", 1800, &cli.Uint64Flag{ - Name: "log_file_max_size", - Usage: "Defines the maximum size a log file can grow to (no effect when -logtostderr=true). Unit is megabytes. If the value is 0, the maximum file size is unlimited.", - EnvVars: []string{"FAITHFUL_LOG_FILE_MAX_SIZE"}, + Name: "log_file_max_size", + Usage: "Defines the maximum size a log file can grow to (no effect when -logtostderr=true). Unit is megabytes. If the value is 0, the maximum file size is unlimited.", + EnvVars: []string{"FAITHFUL_LOG_FILE_MAX_SIZE"}, + DefaultText: "1800", Action: func(cctx *cli.Context, v uint64) error { fs.Set("log_file_max_size", fmt.Sprint(v)) return nil @@ -50,9 +51,10 @@ func NewKlogFlagSet() []cli.Flag { // "logtostderr", true, "log to standard error instead of files") &cli.BoolFlag{ - Name: "logtostderr", - Usage: "log to standard error instead of files", - EnvVars: []string{"FAITHFUL_LOGTOSTDERR"}, + Name: "logtostderr", + Usage: "log to standard error instead of files", + EnvVars: []string{"FAITHFUL_LOGTOSTDERR"}, + DefaultText: "true", Action: func(cctx *cli.Context, v bool) error { fs.Set("logtostderr", fmt.Sprint(v)) return nil @@ -60,9 +62,10 @@ func NewKlogFlagSet() []cli.Flag { }, // "alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)") &cli.BoolFlag{ - Name: "alsologtostderr", - Usage: "log to standard error as well as files (no effect when -logtostderr=true)", - EnvVars: []string{"FAITHFUL_ALSOLOGTOSTDERR"}, + Name: "alsologtostderr", + Usage: "log to standard error as well as files (no effect when -logtostderr=true)", + EnvVars: []string{"FAITHFUL_ALSOLOGTOSTDERR"}, + DefaultText: "false", Action: func(cctx *cli.Context, v bool) error { fs.Set("alsologtostderr", fmt.Sprint(v)) return nil @@ -70,9 +73,10 @@ func NewKlogFlagSet() []cli.Flag { }, // "v", "number for the log level verbosity") &cli.IntFlag{ - Name: "v", - Usage: "number for the log level verbosity", - EnvVars: []string{"FAITHFUL_V"}, + Name: "v", + Usage: "number for the log level verbosity", + EnvVars: []string{"FAITHFUL_V"}, + DefaultText: "3", Action: func(cctx *cli.Context, v int) error { fs.Set("v", fmt.Sprint(v)) return nil @@ -125,7 +129,9 @@ func NewKlogFlagSet() []cli.Flag { Usage: "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)", EnvVars: []string{"FAITHFUL_STDERRTHRESHOLD"}, Action: func(cctx *cli.Context, v string) error { - fs.Set("stderrthreshold", v) + if v != "" { + fs.Set("stderrthreshold", v) + } return nil }, }, @@ -135,7 +141,9 @@ func NewKlogFlagSet() []cli.Flag { Usage: "comma-separated list of pattern=N settings for file-filtered logging", EnvVars: []string{"FAITHFUL_VMODULE"}, Action: func(cctx *cli.Context, v string) error { - fs.Set("vmodule", v) + if v != "" { + fs.Set("vmodule", v) + } return nil }, }, @@ -145,7 +153,9 @@ func NewKlogFlagSet() []cli.Flag { Usage: "when logging hits line file:N, emit a stack trace", EnvVars: []string{"FAITHFUL_LOG_BACKTRACE_AT"}, Action: func(cctx *cli.Context, v string) error { - fs.Set("log_backtrace_at", v) + if v != "" { + fs.Set("log_backtrace_at", v) + } return nil }, }, diff --git a/multiepoch-getBlock.go b/multiepoch-getBlock.go index b1913d71..4c91cc70 100644 --- a/multiepoch-getBlock.go +++ b/multiepoch-getBlock.go @@ -26,8 +26,24 @@ import ( var fasterJson = jsoniter.ConfigCompatibleWithStandardLibrary +type MyContextKey string + +const requestIDKey = MyContextKey("requestID") + +func setRequestIDToContext(ctx context.Context, id string) context.Context { + return context.WithValue(ctx, requestIDKey, id) +} + +func getRequestIDFromContext(ctx context.Context) string { + id, ok := ctx.Value(requestIDKey).(string) + if !ok { + return "" + } + return id +} + func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) (*jsonrpc2.Error, error) { - tim := newTimer() + tim := newTimer(getRequestIDFromContext(ctx)) params, err := parseGetBlockRequest(req.Params) if err != nil { return &jsonrpc2.Error{ diff --git a/multiepoch.go b/multiepoch.go index ddca8b7d..f62b408b 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -2,7 +2,6 @@ package main import ( "context" - "crypto/rand" "fmt" "net/http" "sort" @@ -10,9 +9,9 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/goware/urlx" "github.com/libp2p/go-reuseport" - "github.com/mr-tron/base58" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/sourcegraph/jsonrpc2" "github.com/valyala/fasthttp" @@ -249,11 +248,8 @@ func (m *MultiEpoch) ListenAndServe(ctx context.Context, listenOn string, lsConf } func randomRequestID() string { - b := make([]byte, 4) - if _, err := rand.Read(b); err != nil { - panic(err) - } - return strings.ToUpper(base58.Encode(b)) + id := uuid.New().String() + return id } func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx *fasthttp.RequestCtx) { @@ -307,6 +303,8 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx // read request body body := reqCtx.Request.Body() + reqCtx.Response.Header.Set("X-Request-ID", reqID) + // parse request var rpcRequest jsonrpc2.Request if err := fasterJson.Unmarshal(body, &rpcRequest); err != nil { @@ -319,8 +317,10 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx }) return } + method := rpcRequest.Method - klog.V(3).Infof("[%s] received request: %q", reqID, strings.TrimSpace(string(body))) + klog.V(2).Infof("[%s] method=%q", reqID, sanitizeMethod(method)) + klog.V(3).Infof("[%s] received request with body: %q", reqID, strings.TrimSpace(string(body))) if proxy != nil && !isValidLocalMethod(rpcRequest.Method) { klog.V(2).Infof("[%s] Unhandled method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) @@ -338,7 +338,6 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } rqCtx := &requestContext{ctx: reqCtx} - method := rpcRequest.Method if method == "getVersion" { versionInfo := make(map[string]any) @@ -362,7 +361,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } // errorResp is the error response to be sent to the client. - errorResp, err := handler.handleRequest(reqCtx, rqCtx, &rpcRequest) + errorResp, err := handler.handleRequest(setRequestIDToContext(reqCtx, reqID), rqCtx, &rpcRequest) if err != nil { klog.Errorf("[%s] failed to handle %s: %v", reqID, sanitizeMethod(method), err) } diff --git a/tools.go b/tools.go index 8d6f4f11..89beb021 100644 --- a/tools.go +++ b/tools.go @@ -72,20 +72,22 @@ func loadFromYAML(configFilepath string, dst any) error { } type timer struct { + reqID string start time.Time prev time.Time } -func newTimer() *timer { +func newTimer(reqID string) *timer { now := time.Now() return &timer{ + reqID: reqID, start: now, prev: now, } } func (t *timer) time(name string) { - klog.V(2).Infof("TIMED: %s: %s (overall %s)", name, time.Since(t.prev), time.Since(t.start)) + klog.V(4).Infof("[%s]: %q: %s (overall %s)", t.reqID, name, time.Since(t.prev), time.Since(t.start)) t.prev = time.Now() }