diff --git a/db/migrations/000018_add_protocols_to_requests_dnf_table.down.sql b/db/migrations/000018_add_protocols_to_requests_dnf_table.down.sql new file mode 100644 index 0000000..8654232 --- /dev/null +++ b/db/migrations/000018_add_protocols_to_requests_dnf_table.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE requests_denormalized + DROP COLUMN IF EXISTS protocols; diff --git a/db/migrations/000018_add_protocols_to_requests_dnf_table.up.sql b/db/migrations/000018_add_protocols_to_requests_dnf_table.up.sql new file mode 100644 index 0000000..b947dd9 --- /dev/null +++ b/db/migrations/000018_add_protocols_to_requests_dnf_table.up.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE requests_denormalized ADD COLUMN protocols TEXT[]; + +COMMIT; diff --git a/db/models/requests_denormalized.go b/db/models/requests_denormalized.go index c94f4aa..7c0424f 100644 --- a/db/models/requests_denormalized.go +++ b/db/models/requests_denormalized.go @@ -34,6 +34,7 @@ type RequestsDenormalized struct { MultiAddresses types.StringArray `boil:"multi_addresses" json:"multi_addresses,omitempty" toml:"multi_addresses" yaml:"multi_addresses,omitempty"` AgentVersion null.String `boil:"agent_version" json:"agent_version,omitempty" toml:"agent_version" yaml:"agent_version,omitempty"` NormalizedAt null.Time `boil:"normalized_at" json:"normalized_at,omitempty" toml:"normalized_at" yaml:"normalized_at,omitempty"` + Protocols types.StringArray `boil:"protocols" json:"protocols,omitempty" toml:"protocols" yaml:"protocols,omitempty"` R *requestsDenormalizedR `boil:"-" json:"-" toml:"-" yaml:"-"` L requestsDenormalizedL `boil:"-" json:"-" toml:"-" yaml:"-"` @@ -49,6 +50,7 @@ var RequestsDenormalizedColumns = struct { MultiAddresses string AgentVersion string NormalizedAt string + Protocols string }{ ID: "id", RequestStartedAt: "request_started_at", @@ -59,6 +61,7 @@ var RequestsDenormalizedColumns = struct { MultiAddresses: "multi_addresses", AgentVersion: "agent_version", NormalizedAt: "normalized_at", + Protocols: "protocols", } var RequestsDenormalizedTableColumns = struct { @@ -71,6 +74,7 @@ var RequestsDenormalizedTableColumns = struct { MultiAddresses string AgentVersion string NormalizedAt string + Protocols string }{ ID: "requests_denormalized.id", RequestStartedAt: "requests_denormalized.request_started_at", @@ -81,6 +85,7 @@ var RequestsDenormalizedTableColumns = struct { MultiAddresses: "requests_denormalized.multi_addresses", AgentVersion: "requests_denormalized.agent_version", NormalizedAt: "requests_denormalized.normalized_at", + Protocols: "requests_denormalized.protocols", } // Generated where @@ -145,6 +150,7 @@ var RequestsDenormalizedWhere = struct { MultiAddresses whereHelpertypes_StringArray AgentVersion whereHelpernull_String NormalizedAt whereHelpernull_Time + Protocols whereHelpertypes_StringArray }{ ID: whereHelperint64{field: "\"requests_denormalized\".\"id\""}, RequestStartedAt: whereHelpertime_Time{field: "\"requests_denormalized\".\"request_started_at\""}, @@ -155,6 +161,7 @@ var RequestsDenormalizedWhere = struct { MultiAddresses: whereHelpertypes_StringArray{field: "\"requests_denormalized\".\"multi_addresses\""}, AgentVersion: whereHelpernull_String{field: "\"requests_denormalized\".\"agent_version\""}, NormalizedAt: whereHelpernull_Time{field: "\"requests_denormalized\".\"normalized_at\""}, + Protocols: whereHelpertypes_StringArray{field: "\"requests_denormalized\".\"protocols\""}, } // RequestsDenormalizedRels is where relationship names are stored. @@ -174,9 +181,9 @@ func (*requestsDenormalizedR) NewStruct() *requestsDenormalizedR { type requestsDenormalizedL struct{} var ( - requestsDenormalizedAllColumns = []string{"id", "request_started_at", "request_type", "ant_multihash", "peer_multihash", "key_multihash", "multi_addresses", "agent_version", "normalized_at"} + requestsDenormalizedAllColumns = []string{"id", "request_started_at", "request_type", "ant_multihash", "peer_multihash", "key_multihash", "multi_addresses", "agent_version", "normalized_at", "protocols"} requestsDenormalizedColumnsWithoutDefault = []string{"request_started_at", "request_type", "ant_multihash", "peer_multihash", "key_multihash"} - requestsDenormalizedColumnsWithDefault = []string{"id", "multi_addresses", "agent_version", "normalized_at"} + requestsDenormalizedColumnsWithDefault = []string{"id", "multi_addresses", "agent_version", "normalized_at", "protocols"} requestsDenormalizedPrimaryKeyColumns = []string{"id", "request_started_at"} requestsDenormalizedGeneratedColumns = []string{"id"} ) diff --git a/queen.go b/queen.go index b5e91ee..8330ed7 100644 --- a/queen.go +++ b/queen.go @@ -184,15 +184,22 @@ func (q *Queen) freePort(port uint16) { func (q *Queen) Run(ctx context.Context) { go q.consumeAntsLogs(ctx) - go q.normalizeRequests(ctx) - t := time.NewTicker(CRAWL_INTERVAL) + crawlTime := time.NewTicker(CRAWL_INTERVAL) + defer crawlTime.Stop() + + normalizationTime := time.NewTicker(NORMALIZATION_INTERVAL) + defer normalizationTime.Stop() + q.routine(ctx) for { select { - case <-t.C: + case <-crawlTime.C: q.routine(ctx) + case <-normalizationTime.C: + go q.normalizeRequests(ctx) + // time.Sleep(10 * time.Second) case <-ctx.Done(): q.persistLiveAntsKeys() return @@ -265,19 +272,11 @@ func (q *Queen) normalizeRequests(ctx context.Context) { logger.Info("Starting continuous normalization...") - for { - select { - case <-nctx.Done(): - logger.Info("Normalization context canceled, stopping normalization loop.") - return - default: - err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc) - if err != nil { - logger.Errorf("Error during normalization: %w", err) - } else { - logger.Info("Normalization completed for current batch.") - } - } + err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc) + if err != nil { + logger.Errorf("Error during normalization: %w", err) + } else { + logger.Info("Normalization completed for current batch.") } } diff --git a/util.go b/util.go index 2d0357d..cc9a875 100644 --- a/util.go +++ b/util.go @@ -14,9 +14,9 @@ import ( ) const ( - CRAWL_INTERVAL = 30 * time.Minute - - BUCKET_SIZE = 20 + CRAWL_INTERVAL = 30 * time.Minute + NORMALIZATION_INTERVAL = 60 * time.Second + BUCKET_SIZE = 20 ) func PeeridToKadid(pid peer.ID) bit256.Key {