From 853aa835c4868ee794161b1327aeabda1afdcabb Mon Sep 17 00:00:00 2001 From: padaliyajay Date: Sat, 28 Dec 2024 12:44:38 +0530 Subject: [PATCH] new concept of schedule hook, expire hook and retry hook --- README.md | 8 +-- broker.go | 13 +++++ build-deb.sh | 2 +- hook.go | 36 +++--------- hookManager.go | 60 ++++++++++++++++++++ main.go | 8 +-- redis.go | 147 ++++++++++++++++++++++++++++++++++--------------- 7 files changed, 193 insertions(+), 81 deletions(-) create mode 100644 broker.go create mode 100644 hookManager.go diff --git a/README.md b/README.md index 4439686..f2687d8 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ Asynchook allows you run task in background by creating a hook which call your u ### Installation ```bash -wget https://github.com/padaliyajay/asynchook/releases/download/v1.0.2/asynchook_1.0.2_amd64.deb -dpkg -i asynchook_1.0.2_amd64.deb +wget https://github.com/padaliyajay/asynchook/releases/download/v1.1.0/asynchook_1.1.0_amd64.deb +dpkg -i asynchook_1.1.0_amd64.deb systemctl enable asynchook ``` @@ -36,10 +36,10 @@ channels: ### Usage ##### Send event to redis ```bash -HSET asynchook:1001 id 1001 url http://localhost:8080/mail payload '[YOUR JSON TEXT]' secret '[Your Secret]' +HSET asynchook:1001 id 1001 url http://localhost:8080/mail payload '[YOUR JSON TEXT]' secret '[Your Secret]' run_after_time '[UNIX TIMESTAMP]' expire_time '[UNIX TIMESTAMP]' ZADD asynchooks:default 1 1001 ``` -Here Id and URL are mandatory fields. But payload and secret are optional. +Here Id and URL are mandatory fields. But payload, secret and others are optional. ## License MIT \ No newline at end of file diff --git a/broker.go b/broker.go new file mode 100644 index 0000000..50e17f7 --- /dev/null +++ b/broker.go @@ -0,0 +1,13 @@ +package main + +import "time" + +type Broker interface { + HookStream(channel string, cb func(*HookEvent)) + + ClearHook(id string) + + ScheduleHook(channel string, id string, runAfter time.Time) + + UpdateRetryCount(id string, retryCount int) +} diff --git a/build-deb.sh b/build-deb.sh index d54f354..631797c 100755 --- a/build-deb.sh +++ b/build-deb.sh @@ -5,7 +5,7 @@ set -e # Variables APP_NAME="asynchook" # Name of your application -VERSION="1.0.2" # Version of your application +VERSION="1.1.0" # Version of your application ARCH="amd64" # Architecture (amd64, arm64, all, etc.) BUILD_DIR="./build" # Temporary build directory PACKAGE_DIR="./package" # Package structure directory diff --git a/hook.go b/hook.go index 4f97146..be54f91 100644 --- a/hook.go +++ b/hook.go @@ -2,23 +2,24 @@ package main import ( "fmt" - "log" "net/http" "net/url" + "time" ) type HookEvent struct { - Id string - Url string - Payload string - Timestamp string - Secret string + Id string + Url string + Payload string + Secret string + Run_after_time time.Time + Expire_time time.Time + Retry_count int } func (h *HookEvent) Process() error { postData := url.Values{} postData.Set("payload", h.Payload) - postData.Set("timestamp", h.Timestamp) postData.Set("secret", h.Secret) resp, err := http.PostForm(h.Url, postData) @@ -33,26 +34,5 @@ func (h *HookEvent) Process() error { return fmt.Errorf("failed to send hook event to %s: %s", h.Url, resp.Status) } - fmt.Println("hook event sent to", h.Url) - return nil } - -type HookManager struct { - Channel string - rl *RateLimiter -} - -func NewHookManager(channel string, rl *RateLimiter) *HookManager { - return &HookManager{channel, rl} -} - -func (s *HookManager) Process(hook *HookEvent) error { - s.rl.Acquire() - err := hook.Process() - if err != nil { - log.Println(err) - } - - return err -} diff --git a/hookManager.go b/hookManager.go new file mode 100644 index 0000000..6e112f2 --- /dev/null +++ b/hookManager.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "log" + "time" +) + +type HookManager struct { + Broker Broker + Channel string + rateLimiter *RateLimiter +} + +func NewHookManager(broker Broker, channel string, rateLimit string) *HookManager { + rateLimiter := NewRateLimiter(rateLimit) + + return &HookManager{broker, channel, rateLimiter} +} + +func (hm *HookManager) Run() { + hm.Broker.HookStream(hm.Channel, hm.process) +} + +func (hm *HookManager) process(hook *HookEvent) { + if !hook.Run_after_time.IsZero() && hook.Run_after_time.After(time.Now()) { + fmt.Println("hook event scheduled to run", hook.Url) + hm.Broker.ScheduleHook(hm.Channel, hook.Id, hook.Run_after_time) + return + } + + if !hook.Expire_time.IsZero() && hook.Expire_time.Before(time.Now()) { + fmt.Println("hook event expired", hook.Url) + hm.Broker.ClearHook(hook.Id) + return + } + + hm.rateLimiter.Acquire() + + go func() { + err := hook.Process() + if err != nil { + log.Println(err) + if hook.Retry_count < 3 { + hm.Broker.UpdateRetryCount(hook.Id, hook.Retry_count+1) + hm.Broker.ScheduleHook(hm.Channel, hook.Id, time.Now().Add(time.Minute*time.Duration(hook.Retry_count))) + } else { + hm.Broker.ClearHook(hook.Id) + log.Println("hook event failed", hook.Url) + } + } else { + hm.Broker.ClearHook(hook.Id) + fmt.Println("hook event sent to", hook.Url) + } + }() +} + +func (hm *HookManager) Stop() { + hm.rateLimiter.Stop() +} diff --git a/main.go b/main.go index 96ff9ff..36ffa68 100644 --- a/main.go +++ b/main.go @@ -38,14 +38,12 @@ func main() { for _, channel := range config.Channels { wq.Add(1) - rateLimiter := NewRateLimiter(channel.Ratelimit) - defer rateLimiter.Stop() - - manager := NewHookManager(channel.Name, rateLimiter) + hookManager := NewHookManager(broker, channel.Name, channel.Ratelimit) + defer hookManager.Stop() go (func() { defer wq.Done() - broker.Run(manager) + hookManager.Run() })() } diff --git a/redis.go b/redis.go index 44c6a7a..2668e00 100644 --- a/redis.go +++ b/redis.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "strconv" "time" "github.com/redis/go-redis/v9" @@ -14,8 +15,86 @@ type RedisBroker struct { ctx context.Context } +func NewRedisBroker(ctx context.Context, addr string, password string, db int) *RedisBroker { + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + DB: db, + }) + + return &RedisBroker{ + client: client, + ctx: ctx, + } +} + +func (b *RedisBroker) HookStream(channel string, cb func(*HookEvent)) { + // Start a goroutine to listen for new hooks + go func() { + for { + select { + case <-b.ctx.Done(): + return + default: + result, _ := b.client.BZPopMin(b.ctx, time.Minute, "asynchooks:"+channel).Result() + + if result != nil { + id := result.Member.(string) + + hook, err := b.getHook(id) + if err != nil { + b.deleteRawHook(id) + log.Println(err) + continue + } + + cb(hook) + } + } + } + }() + + // Start a goroutine to check for scheduled hooks + go func() { + for { + select { + case <-b.ctx.Done(): + return + default: + results, _ := b.client.ZRangeByScore(b.ctx, "asynchooks-scheduled:"+channel, &redis.ZRangeBy{ + Min: "-inf", + Max: strconv.FormatInt(time.Now().Unix(), 10), + Offset: 0, + Count: 100, + }).Result() + + for _, id := range results { + b.client.ZRem(b.ctx, "asynchooks-scheduled:"+channel, id) + b.client.ZAdd(b.ctx, "asynchooks:"+channel, redis.Z{Score: 0, Member: id}) + } + + time.Sleep(time.Second) + } + } + }() +} + +func (b *RedisBroker) ScheduleHook(channel string, id string, runAfter time.Time) { + _, err := b.client.ZAdd(b.ctx, "asynchooks-scheduled:"+channel, redis.Z{Score: float64(runAfter.Unix()), Member: id}).Result() + if err != nil { + log.Println(err) + } +} + +func (b *RedisBroker) UpdateRetryCount(id string, retryCount int) { + _, err := b.client.HSet(b.ctx, "asynchook:"+id, "retry_count", retryCount).Result() + if err != nil { + log.Println(err) + } +} + func (b *RedisBroker) getHook(id string) (*HookEvent, error) { - result, err := b.client.HMGet(b.ctx, "asynchook:"+id, "url", "payload", "timestamp", "secret").Result() + result, err := b.client.HMGet(b.ctx, "asynchook:"+id, "url", "payload", "secret", "run_after_time", "expire_time", "retry_count").Result() if err != nil { return nil, err @@ -26,17 +105,33 @@ func (b *RedisBroker) getHook(id string) (*HookEvent, error) { } hook := &HookEvent{ - Id: id, - Url: result[0].(string), + Id: id, + Url: result[0].(string), + Retry_count: 0, } if result[1] != nil { // payload hook.Payload = result[1].(string) } - if result[2] != nil { // timestamp - hook.Timestamp = result[2].(string) + if result[2] != nil { // secret + hook.Secret = result[2].(string) + } + if result[3] != nil { // run_after_time + unixTime, err := strconv.ParseInt(result[3].(string), 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing run_after_time: %v", err) + } + hook.Run_after_time = time.Unix(unixTime, 0) + } + if result[4] != nil { // expire_time + unixTime, err := strconv.ParseInt(result[4].(string), 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing expire_time: %v", err) + } + hook.Expire_time = time.Unix(unixTime, 0) } - if result[3] != nil { // secret - hook.Secret = result[3].(string) + if result[5] != nil { // retry_count + retry_count, _ := strconv.Atoi(result[5].(string)) + hook.Retry_count = retry_count } return hook, nil @@ -48,44 +143,10 @@ func (b *RedisBroker) deleteRawHook(id string) error { return err } -func (b *RedisBroker) Run(manager *HookManager) { - for { - select { - case <-b.ctx.Done(): - return - default: - result, _ := b.client.BZPopMin(b.ctx, time.Second*10, "asynchooks:"+manager.Channel).Result() - - if result != nil { - id := result.Member.(string) - - hook, err := b.getHook(id) - if err != nil { - log.Println(err) - continue - } - - b.deleteRawHook(id) - - manager.Process(hook) - } - } - } +func (b *RedisBroker) ClearHook(id string) { + b.deleteRawHook(id) } func (b *RedisBroker) Close() { b.client.Close() } - -func NewRedisBroker(ctx context.Context, addr string, password string, db int) *RedisBroker { - client := redis.NewClient(&redis.Options{ - Addr: addr, - Password: password, - DB: db, - }) - - return &RedisBroker{ - client: client, - ctx: ctx, - } -}