diff --git a/queen.go b/queen.go index 3d99b49..2a5c97c 100644 --- a/queen.go +++ b/queen.go @@ -184,15 +184,22 @@ func (q *Queen) freePort(port uint16) { func (q *Queen) Run(ctx context.Context) { go q.consumeAntsLogs(ctx) - go q.normalizeRequests(ctx) - t := time.NewTicker(CRAWL_INTERVAL) + crawlTime := time.NewTicker(CRAWL_INTERVAL) + defer crawlTime.Stop() + + normalizationTime := time.NewTicker(NORMALIZATION_INTERVAL) + defer normalizationTime.Stop() + q.routine(ctx) for { select { - case <-t.C: + case <-crawlTime.C: q.routine(ctx) + case <-normalizationTime.C: + go q.normalizeRequests(ctx) + // time.Sleep(10 * time.Second) case <-ctx.Done(): q.persistLiveAntsKeys() return @@ -271,19 +278,11 @@ func (q *Queen) normalizeRequests(ctx context.Context) { logger.Info("Starting continuous normalization...") - for { - select { - case <-nctx.Done(): - logger.Info("Normalization context canceled, stopping normalization loop.") - return - default: - err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc) - if err != nil { - logger.Errorf("Error during normalization: %w", err) - } else { - logger.Info("Normalization completed for current batch.") - } - } + err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc) + if err != nil { + logger.Errorf("Error during normalization: %w", err) + } else { + logger.Info("Normalization completed for current batch.") } } diff --git a/util.go b/util.go index 2d0357d..cc9a875 100644 --- a/util.go +++ b/util.go @@ -14,9 +14,9 @@ import ( ) const ( - CRAWL_INTERVAL = 30 * time.Minute - - BUCKET_SIZE = 20 + CRAWL_INTERVAL = 30 * time.Minute + NORMALIZATION_INTERVAL = 60 * time.Second + BUCKET_SIZE = 20 ) func PeeridToKadid(pid peer.ID) bit256.Key {