From e4e36dbfef1918f0d098807ec013ca8703fe4f65 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 15 Oct 2024 15:22:10 +0200 Subject: [PATCH 1/2] updated queen --- queen.go | 128 +++++++++++++++++++++++-------------------------------- 1 file changed, 53 insertions(+), 75 deletions(-) diff --git a/queen.go b/queen.go index bc037f1..e1880b9 100644 --- a/queen.go +++ b/queen.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "strconv" - "strings" "time" "github.com/dennis-tra/nebula-crawler/config" @@ -43,8 +42,6 @@ type Queen struct { ants []*Ant antsLogs chan antslog.RequestLog - seen map[peer.ID]struct{} - upnp bool // portsOccupancy is a slice of bools that represent the occupancy of the ports // false corresponds to an available port, true to an occupied port @@ -52,14 +49,12 @@ type Queen struct { portsOccupancy []bool firstPort uint16 - dbc *db.DBClient + Client *db.DBClient mmc *maxmind.Client uclient *udger.Client resolveBatchSize int resolveBatchTime int // in sec - - Client *db.DBClient } func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPorts, firstPort uint16) (*Queen, error) { @@ -70,9 +65,41 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort return nil, err } + mmc, err := maxmind.NewClient(os.Getenv("MAXMIND_ASN_DB"), os.Getenv("MAXMIND_COUNTRY_DB")) + if err != nil { + logger.Errorf("Failed to initialized Maxmind client: %v\n", err) + } + + queen := &Queen{ + nebulaDB: nebulaDB, + keysDB: keysDB, + peerstore: peerstore, + datastore: dssync.MutexWrap(ds.NewMapDatastore()), + ants: []*Ant{}, + antsLogs: make(chan antslog.RequestLog, 1024), + upnp: true, + Client: getDbClient(ctx), + mmc: mmc, + uclient: getUdgerClient(), + resolveBatchSize: getBatchSize(), + resolveBatchTime: getBatchSize(), + } + + if nPorts != 0 { + queen.upnp = false + queen.firstPort = firstPort + queen.portsOccupancy = make([]bool, nPorts) + } + + logger.Info("queen created") + + return queen, nil +} + +func getDbClient(ctx context.Context) *db.DBClient { dbPort, err := strconv.Atoi(os.Getenv("DB_PORT")) if err != nil { - fmt.Errorf("Port must be an integer", err) + logger.Errorf("Port must be an integer: %w", err) } mP, _ := tele.NewMeterProvider() @@ -94,21 +121,24 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort if err != nil { logger.Errorf("Failed to initialize DB client: %v\n", err) } + return dbc +} - mmc, err := maxmind.NewClient(os.Getenv("MAXMIND_ASN_DB"), os.Getenv("MAXMIND_COUNTRY_DB")) - if err != nil { - logger.Errorf("Failed to initialized Maxmind client: %v\n", err) - } - +func getUdgerClient() *udger.Client { filePathUdger := os.Getenv("UDGER_FILEPATH") - var uclient *udger.Client if filePathUdger != "" { - uclient, err = udger.NewClient(filePathUdger) + uclient, err := udger.NewClient(filePathUdger) if err != nil { logger.Errorf("Failed to initialize Udger client with %s: %v\n", filePathUdger, err) + return nil } + return uclient } + logger.Warn("Missing UDGER_FILEPATH: skipping udger") + return nil +} +func getBatchSize() int { batchSizeEnvVal := os.Getenv("BATCH_SIZE") if len(batchSizeEnvVal) == 0 { batchSizeEnvVal = "1000" @@ -117,8 +147,11 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort if err != nil { logger.Errorln("BATCH_SIZE should be an integer") } + return batchSize +} - batchTimeEnvVal := os.Getenv("BATCH_SIZE") +func getBatchTime() int { + batchTimeEnvVal := os.Getenv("BATCH_TIME") if len(batchTimeEnvVal) == 0 { batchTimeEnvVal = "30" } @@ -126,34 +159,7 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort if err != nil { logger.Errorln("BATCH_TIME should be an integer") } - - queen := &Queen{ - nebulaDB: nebulaDB, - keysDB: keysDB, - peerstore: peerstore, - datastore: dssync.MutexWrap(ds.NewMapDatastore()), - ants: []*Ant{}, - antsLogs: make(chan antslog.RequestLog, 1024), - seen: make(map[peer.ID]struct{}), - dbc: dbc, - mmc: mmc, - uclient: uclient, - resolveBatchSize: batchSize, - resolveBatchTime: batchTime, - Client: dbc, - } - - if nPorts == 0 { - queen.upnp = true - } else { - queen.upnp = false - queen.firstPort = firstPort - queen.portsOccupancy = make([]bool, nPorts) - } - - logger.Debug("queen created") - - return queen, nil + return batchTime } func (q *Queen) takeAvailablePort() (uint16, error) { @@ -191,13 +197,6 @@ func (q *Queen) Run(ctx context.Context) { } func (q *Queen) consumeAntsLogs(ctx context.Context) { - lnCount := 0 - f, err := os.OpenFile("log.txt", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - logger.Panicln(err) - } - defer f.Close() - requests := make([]models.RequestsDenormalized, 0, q.resolveBatchSize) // bulk insert for every batch size or N seconds, whichever comes first ticker := time.NewTicker(time.Duration(q.resolveBatchTime) * time.Second) @@ -215,20 +214,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { } else { agent = peerstoreAgent.(string) } - fmt.Printf( - "%s \tself: %s \ttype: %s \trequester: %s \ttarget: %s \tagent: %s \tmaddrs: %s\n", - log.Timestamp.Format(time.RFC3339), - log.Self, - reqType, - log.Requester, - log.Target.B58String(), - agent, - maddrs, - ) - - // Keep this protocols slice empty for now, - // because we don't need it yet and I don't know how to get it - // protocols := make([]string, 0) + // protocols, _ := q.peerstore.GetProtocols(log.Requester) request := models.RequestsDenormalized{ RequestStartedAt: log.Timestamp, @@ -241,24 +227,16 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { } requests = append(requests, request) if len(requests) >= q.resolveBatchSize { - err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests) + err = db.BulkInsertRequests(ctx, q.Client.Handler, requests) if err != nil { logger.Fatalf("Error inserting requests: %v", err) } requests = requests[:0] } - if _, ok := q.seen[log.Requester]; !ok { - q.seen[log.Requester] = struct{}{} - if strings.Contains(agent, "light") { - lnCount++ - } - fmt.Fprintf(f, "\r%s %s\n", log.Requester, agent) - logger.Debugf("total: %d \tlight: %d", len(q.seen), lnCount) - } case <-ticker.C: if len(requests) > 0 { - err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests) + err := db.BulkInsertRequests(ctx, q.Client.Handler, requests) if err != nil { logger.Fatalf("Error inserting requests: %v", err) } @@ -267,7 +245,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { case <-ctx.Done(): if len(requests) > 0 { - err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests) + err := db.BulkInsertRequests(ctx, q.Client.Handler, requests) if err != nil { logger.Fatalf("Error inserting remaining requests: %v", err) } From 40be62a103ac347e1e00fd6a6881c58b215e1fe2 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 15 Oct 2024 16:41:46 +0200 Subject: [PATCH 2/2] renamed queentest --- cmd/{queentest => honeypot}/main.go | 33 ++---------------- cmd/queentest/analysis.py | 52 ----------------------------- queen.go | 36 ++++++++++++++++---- 3 files changed, 33 insertions(+), 88 deletions(-) rename cmd/{queentest => honeypot}/main.go (51%) delete mode 100644 cmd/queentest/analysis.py diff --git a/cmd/queentest/main.go b/cmd/honeypot/main.go similarity index 51% rename from cmd/queentest/main.go rename to cmd/honeypot/main.go index 9d55146..644ef7e 100644 --- a/cmd/queentest/main.go +++ b/cmd/honeypot/main.go @@ -6,11 +6,9 @@ import ( "os" "os/signal" "syscall" - "time" logging "github.com/ipfs/go-log/v2" "github.com/probe-lab/ants-watch" - "github.com/probe-lab/ants-watch/db" ) var logger = logging.Logger("ants-queen") @@ -20,7 +18,7 @@ func main() { logging.SetLogLevel("dht", "error") logging.SetLogLevel("basichost", "info") - postgresStr := flag.String("postgres", "", "Postgres connection string, postgres://user:password@host:port/dbname") + nebulaPostgresStr := flag.String("postgres", "", "Postgres connection string, postgres://user:password@host:port/dbname") nPorts := flag.Int("nPorts", 128, "Number of ports ants can listen on") firstPort := flag.Int("firstPort", 6000, "First port ants can listen on") upnp := flag.Bool("upnp", false, "Enable UPnP") @@ -34,9 +32,9 @@ func main() { var queen *ants.Queen var err error if *upnp { - queen, err = ants.NewQueen(ctx, *postgresStr, "keys.db", 0, 0) + queen, err = ants.NewQueen(ctx, *nebulaPostgresStr, "keys.db", 0, 0) } else { - queen, err = ants.NewQueen(ctx, *postgresStr, "keys.db", uint16(*nPorts), uint16(*firstPort)) + queen, err = ants.NewQueen(ctx, *nebulaPostgresStr, "keys.db", uint16(*nPorts), uint16(*firstPort)) } if err != nil { panic(err) @@ -44,31 +42,6 @@ func main() { go queen.Run(ctx) - go func() { - nctx, ncancel := context.WithCancel(ctx) - defer ncancel() - - logger.Info("Starting continuous normalization...") - - for { - select { - case <-nctx.Done(): - logger.Info("Normalization context canceled, stopping normalization loop.") - return - default: - err := db.NormalizeRequests(nctx, queen.Client.Handler, queen.Client) - if err != nil { - logger.Errorf("Error during normalization: %w", err) - } else { - logger.Info("Normalization completed for current batch.") - } - - // TODO: remove the hardcoded time - time.Sleep(10 * time.Second) - } - } - }() - go func() { sig := <-sigChan logger.Infof("Received signal: %s, shutting down...", sig) diff --git a/cmd/queentest/analysis.py b/cmd/queentest/analysis.py deleted file mode 100644 index ec81a25..0000000 --- a/cmd/queentest/analysis.py +++ /dev/null @@ -1,52 +0,0 @@ -if __name__ == "__main__": - f = open("log.txt", "r") - - unknown_count = 0 - light_count = 0 - bridge_count = 0 - full_count = 0 - celestia_celestia_count = 0 - nebula_count = 0 - lumina_count = 0 - libp2p_count = 0 - for line in f.readlines(): - agent = line.split(" ")[-1][:-1] - - if agent == "unknown": - unknown_count += 1 - elif "celestia-node/celestia/light" in agent: - light_count += 1 - elif "celestia-node/celestia/bridge" in agent: - bridge_count += 1 - elif "celestia-node/celestia/full" in agent: - full_count += 1 - elif "celestia-celestia" in agent: - celestia_celestia_count += 1 - elif "nebula" in agent: - nebula_count += 1 - elif "lumina/celestia" in agent: - lumina_count += 1 - elif "rust-libp2p" in agent: - libp2p_count += 1 - elif agent != "": - print(agent) - - print("unknown:", unknown_count) - print("light:", light_count) - print("bridge:", bridge_count) - print("full:", full_count) - print("celestia-celestia:", celestia_celestia_count) - # print("nebula:", nebula_count) - print("lumina:", lumina_count) - print("rust-libp2p:", libp2p_count) - print( - "total:", - unknown_count - + light_count - + bridge_count - + full_count - + celestia_celestia_count - # + nebula_count - + lumina_count - + libp2p_count, - ) diff --git a/queen.go b/queen.go index e1880b9..3c04d01 100644 --- a/queen.go +++ b/queen.go @@ -49,7 +49,7 @@ type Queen struct { portsOccupancy []bool firstPort uint16 - Client *db.DBClient + dbc *db.DBClient mmc *maxmind.Client uclient *udger.Client @@ -78,7 +78,7 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort ants: []*Ant{}, antsLogs: make(chan antslog.RequestLog, 1024), upnp: true, - Client: getDbClient(ctx), + dbc: getDbClient(ctx), mmc: mmc, uclient: getUdgerClient(), resolveBatchSize: getBatchSize(), @@ -183,6 +183,8 @@ func (q *Queen) freePort(port uint16) { func (q *Queen) Run(ctx context.Context) { go q.consumeAntsLogs(ctx) + go q.normalizeRequests(ctx) + t := time.NewTicker(CRAWL_INTERVAL) q.routine(ctx) @@ -227,7 +229,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { } requests = append(requests, request) if len(requests) >= q.resolveBatchSize { - err = db.BulkInsertRequests(ctx, q.Client.Handler, requests) + err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests) if err != nil { logger.Fatalf("Error inserting requests: %v", err) } @@ -236,7 +238,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { case <-ticker.C: if len(requests) > 0 { - err := db.BulkInsertRequests(ctx, q.Client.Handler, requests) + err := db.BulkInsertRequests(ctx, q.dbc.Handler, requests) if err != nil { logger.Fatalf("Error inserting requests: %v", err) } @@ -245,7 +247,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { case <-ctx.Done(): if len(requests) > 0 { - err := db.BulkInsertRequests(ctx, q.Client.Handler, requests) + err := db.BulkInsertRequests(ctx, q.dbc.Handler, requests) if err != nil { logger.Fatalf("Error inserting remaining requests: %v", err) } @@ -255,6 +257,28 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { } } +func (q *Queen) normalizeRequests(ctx context.Context) { + nctx, ncancel := context.WithCancel(ctx) + defer ncancel() + + logger.Info("Starting continuous normalization...") + + for { + select { + case <-nctx.Done(): + logger.Info("Normalization context canceled, stopping normalization loop.") + return + default: + err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc) + if err != nil { + logger.Errorf("Error during normalization: %w", err) + } else { + logger.Info("Normalization completed for current batch.") + } + } + } +} + func (q *Queen) routine(ctx context.Context) { networkPeers, err := q.nebulaDB.GetLatestPeerIds(ctx) if err != nil { @@ -324,7 +348,7 @@ func (q *Queen) routine(ctx context.Context) { for _, ant := range q.ants { logger.Debugf("Upserting ant: %v\n", ant.Host.ID().String()) - antID, err := q.Client.UpsertPeer(ctx, ant.Host.ID().String(), null.StringFrom(ant.UserAgent), nil, time.Now()) + antID, err := q.dbc.UpsertPeer(ctx, ant.Host.ID().String(), null.StringFrom(ant.UserAgent), nil, time.Now()) if err != nil { logger.Errorf("antID: %d could not be inserted because of %v", antID, err) }