diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 4c02a874e..9d1c5bc8c 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -73,6 +73,9 @@ type backend struct { receivedExternalIDs *redisx.IntervalHash // using external id receivedMsgs *redisx.IntervalHash // using content hash + // tracking of external ids of messages we've sent in case we need one before its status update has been written + sentExternalIDs *redisx.IntervalHash + // both sqlx and redis provide wait stats which are cummulative that we need to convert into increments dbWaitDuration time.Duration dbWaitCount int64 @@ -93,8 +96,9 @@ func newBackend(cfg *courier.Config) courier.Backend { mediaCache: redisx.NewIntervalHash("media-lookups", time.Hour*24, 2), mediaMutexes: *syncx.NewHashMutex(8), - receivedMsgs: redisx.NewIntervalHash("seen-msgs", time.Second*2, 2), - receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), + receivedMsgs: redisx.NewIntervalHash("seen-msgs", time.Second*2, 2), // 2 - 4 seconds + receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours + sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours } } @@ -242,7 +246,7 @@ func (b *backend) Start() error { } // create our batched writers and start them - b.statusWriter = NewStatusWriter(b.db, b.config.SpoolDir, b.writerWG) + b.statusWriter = NewStatusWriter(b, b.config.SpoolDir, b.writerWG) b.statusWriter.Start() b.dbLogWriter = NewDBLogWriter(b.db, b.writerWG) @@ -525,6 +529,8 @@ func (b *backend) NewStatusUpdateByExternalID(channel courier.Channel, externalI // WriteStatusUpdate writes the passed in MsgStatus to our store func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUpdate) error { + su := status.(*StatusUpdate) + if status.ID() == courier.NilMsgID && status.ExternalID() == "" { return errors.New("message status with no id or external id") } @@ -536,11 +542,24 @@ func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUp } } - // if we have an id and are marking an outgoing msg as errored, then clear our sent flag - if status.ID() != courier.NilMsgID && status.Status() == courier.MsgStatusErrored { - err := b.ClearMsgSent(ctx, status.ID()) - if err != nil { - logrus.WithError(err).WithField("msg", status.ID()).Error("error clearing sent flags") + if status.ID() != courier.NilMsgID { + // this is a message we've just sent and were given an external id for + if status.ExternalID() != "" { + rc := b.redisPool.Get() + defer rc.Close() + + err := b.sentExternalIDs.Set(rc, fmt.Sprintf("%d|%s", su.ChannelID_, su.ExternalID_), fmt.Sprintf("%d", status.ID())) + if err != nil { + logrus.WithError(err).WithField("msg", status.ID()).Error("error recording external id") + } + } + + // we sent a message that errored so clear our sent flag to allow it to be retried + if status.Status() == courier.MsgStatusErrored { + err := b.ClearMsgSent(ctx, status.ID()) + if err != nil { + logrus.WithError(err).WithField("msg", status.ID()).Error("error clearing sent flags") + } } } diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 0160d8c31..60f556ff0 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -684,6 +684,43 @@ func (ts *BackendTestSuite) TestMsgStatus() { ts.NoError(tx.Commit()) } +func (ts *BackendTestSuite) TestSentExternalIDCaching() { + r := ts.b.redisPool.Get() + defer r.Close() + + ctx := context.Background() + channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d") + clog := courier.NewChannelLog(courier.ChannelLogTypeMsgSend, channel, nil) + + ts.clearRedis() + + // create a status update from a send which will have id and external id + status1 := ts.b.NewStatusUpdate(channel, 10000, courier.MsgStatusSent, clog) + status1.SetExternalID("ex457") + err := ts.b.WriteStatusUpdate(ctx, status1) + ts.NoError(err) + + keys, err := redis.Strings(r.Do("KEYS", "sent-external-ids:*")) + ts.NoError(err) + ts.Len(keys, 1) + assertredis.HGetAll(ts.T(), ts.b.redisPool, keys[0], map[string]string{"10|ex457": "10000"}) + + // mimic a delay in that status being written by reverting the db changes + ts.b.db.MustExec(`UPDATE msgs_msg SET status = 'W', external_id = NULL WHERE id = 10000`) + + // create a callback status update which only has external id + status2 := ts.b.NewStatusUpdateByExternalID(channel, "ex457", courier.MsgStatusDelivered, clog) + + err = ts.b.WriteStatusUpdate(ctx, status2) + ts.NoError(err) + + // give batcher time to write it + time.Sleep(time.Millisecond * 700) + + // msg status successfully updated in the database + assertdb.Query(ts.T(), ts.b.db, `SELECT status FROM msgs_msg WHERE id = 10000`).Returns("D") +} + func (ts *BackendTestSuite) TestHealth() { // all should be well in test land ts.Equal(ts.b.Health(), "") diff --git a/backends/rapidpro/status.go b/backends/rapidpro/status.go index dc9efc83a..01b129764 100644 --- a/backends/rapidpro/status.go +++ b/backends/rapidpro/status.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/jmoiron/sqlx" "github.com/nyaruka/courier" "github.com/nyaruka/gocommon/dbutil" "github.com/nyaruka/gocommon/syncx" @@ -115,7 +114,7 @@ func (b *backend) flushStatusFile(filename string, contents []byte) error { } // try to flush to our db - _, err = writeStatusUpdatesToDB(ctx, b.db, []*StatusUpdate{status}) + _, err = b.writeStatusUpdatesToDB(ctx, []*StatusUpdate{status}) return err } @@ -187,28 +186,28 @@ type StatusWriter struct { *syncx.Batcher[*StatusUpdate] } -func NewStatusWriter(db *sqlx.DB, spoolDir string, wg *sync.WaitGroup) *StatusWriter { +func NewStatusWriter(b *backend, spoolDir string, wg *sync.WaitGroup) *StatusWriter { return &StatusWriter{ Batcher: syncx.NewBatcher[*StatusUpdate](func(batch []*StatusUpdate) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - writeStatuseUpdates(ctx, db, spoolDir, batch) + b.writeStatuseUpdates(ctx, spoolDir, batch) }, 1000, time.Millisecond*500, 1000, wg), } } // tries to write a batch of message statuses to the database and spools those that fail -func writeStatuseUpdates(ctx context.Context, db *sqlx.DB, spoolDir string, batch []*StatusUpdate) { +func (b *backend) writeStatuseUpdates(ctx context.Context, spoolDir string, batch []*StatusUpdate) { log := logrus.WithField("comp", "status writer") - unresolved, err := writeStatusUpdatesToDB(ctx, db, batch) + unresolved, err := b.writeStatusUpdatesToDB(ctx, batch) // if we received an error, try again one at a time (in case it is one value hanging us up) if err != nil { for _, s := range batch { - _, err = writeStatusUpdatesToDB(ctx, db, []*StatusUpdate{s}) + _, err = b.writeStatusUpdatesToDB(ctx, []*StatusUpdate{s}) if err != nil { log := log.WithField("msg_id", s.ID()) @@ -234,7 +233,7 @@ func writeStatuseUpdates(ctx context.Context, db *sqlx.DB, spoolDir string, batc // writes a batch of msg status updates to the database - messages that can't be resolved are returned and aren't // considered an error -func writeStatusUpdatesToDB(ctx context.Context, db *sqlx.DB, statuses []*StatusUpdate) ([]*StatusUpdate, error) { +func (b *backend) writeStatusUpdatesToDB(ctx context.Context, statuses []*StatusUpdate) ([]*StatusUpdate, error) { // get the statuses which have external ID instead of a message ID missingID := make([]*StatusUpdate, 0, 500) for _, s := range statuses { @@ -245,7 +244,7 @@ func writeStatusUpdatesToDB(ctx context.Context, db *sqlx.DB, statuses []*Status // try to resolve channel ID + external ID to message IDs if len(missingID) > 0 { - if err := resolveStatusUpdateMsgIDs(ctx, db, missingID); err != nil { + if err := b.resolveStatusUpdateMsgIDs(ctx, missingID); err != nil { return nil, err } } @@ -261,7 +260,7 @@ func writeStatusUpdatesToDB(ctx context.Context, db *sqlx.DB, statuses []*Status } } - err := dbutil.BulkQuery(ctx, db, sqlUpdateMsgByID, resolved) + err := dbutil.BulkQuery(ctx, b.db, sqlUpdateMsgByID, resolved) if err != nil { return nil, errors.Wrap(err, "error updating status") } @@ -276,23 +275,47 @@ SELECT id, channel_id, external_id // resolveStatusUpdateMsgIDs tries to resolve msg IDs for the given statuses - if there's no matching channel id + external id pair // found for a status, that status will be left with a nil msg ID. -func resolveStatusUpdateMsgIDs(ctx context.Context, db *sqlx.DB, statuses []*StatusUpdate) error { +func (b *backend) resolveStatusUpdateMsgIDs(ctx context.Context, statuses []*StatusUpdate) error { + rc := b.redisPool.Get() + defer rc.Close() + + chAndExtKeys := make([]string, len(statuses)) + for i, s := range statuses { + chAndExtKeys[i] = fmt.Sprintf("%d|%s", s.ChannelID_, s.ExternalID_) + } + cachedIDs, err := b.sentExternalIDs.MGet(rc, chAndExtKeys...) + if err != nil { + // log error but we continue and try to get ids from the database + logrus.WithError(err).Error("error looking up sent message ids in redis") + } + + // collect the statuses that couldn't be resolved from cache, update the ones that could + notInCache := make([]*StatusUpdate, 0, len(statuses)) + for i := range cachedIDs { + id, err := strconv.Atoi(cachedIDs[i]) + if err != nil { + notInCache = append(notInCache, statuses[i]) + } else { + statuses[i].ID_ = courier.MsgID(id) + } + } + // create a mapping of channel id + external id -> status type ext struct { channelID courier.ChannelID externalID string } - statusesByExt := make(map[ext]*StatusUpdate, len(statuses)) + statusesByExt := make(map[ext]*StatusUpdate, len(notInCache)) for _, s := range statuses { statusesByExt[ext{s.ChannelID_, s.ExternalID_}] = s } - sql, params, err := dbutil.BulkSQL(db, sqlResolveStatusMsgIDs, statuses) + sql, params, err := dbutil.BulkSQL(b.db, sqlResolveStatusMsgIDs, notInCache) if err != nil { return err } - rows, err := db.QueryContext(ctx, sql, params...) + rows, err := b.db.QueryContext(ctx, sql, params...) if err != nil { return err }