Skip to content

Commit

Permalink
new concept of schedule hook, expire hook and retry hook
Browse files Browse the repository at this point in the history
  • Loading branch information
padaliyajay committed Dec 28, 2024
1 parent fbf67cf commit 853aa83
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 81 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Asynchook allows you run task in background by creating a hook which call your u

### Installation
```bash
wget https://github.com/padaliyajay/asynchook/releases/download/v1.0.2/asynchook_1.0.2_amd64.deb
dpkg -i asynchook_1.0.2_amd64.deb
wget https://github.com/padaliyajay/asynchook/releases/download/v1.1.0/asynchook_1.1.0_amd64.deb
dpkg -i asynchook_1.1.0_amd64.deb
systemctl enable asynchook
```

Expand Down Expand Up @@ -36,10 +36,10 @@ channels:
### Usage
##### Send event to redis
```bash
HSET asynchook:1001 id 1001 url http://localhost:8080/mail payload '[YOUR JSON TEXT]' secret '[Your Secret]'
HSET asynchook:1001 id 1001 url http://localhost:8080/mail payload '[YOUR JSON TEXT]' secret '[Your Secret]' run_after_time '[UNIX TIMESTAMP]' expire_time '[UNIX TIMESTAMP]'
ZADD asynchooks:default 1 1001
```
Here Id and URL are mandatory fields. But payload and secret are optional.
Here Id and URL are mandatory fields. But payload, secret and others are optional.

## License
MIT
13 changes: 13 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import "time"

type Broker interface {
HookStream(channel string, cb func(*HookEvent))

ClearHook(id string)

ScheduleHook(channel string, id string, runAfter time.Time)

UpdateRetryCount(id string, retryCount int)
}
2 changes: 1 addition & 1 deletion build-deb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -e

# Variables
APP_NAME="asynchook" # Name of your application
VERSION="1.0.2" # Version of your application
VERSION="1.1.0" # Version of your application
ARCH="amd64" # Architecture (amd64, arm64, all, etc.)
BUILD_DIR="./build" # Temporary build directory
PACKAGE_DIR="./package" # Package structure directory
Expand Down
36 changes: 8 additions & 28 deletions hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@ package main

import (
"fmt"
"log"
"net/http"
"net/url"
"time"
)

type HookEvent struct {
Id string
Url string
Payload string
Timestamp string
Secret string
Id string
Url string
Payload string
Secret string
Run_after_time time.Time
Expire_time time.Time
Retry_count int
}

func (h *HookEvent) Process() error {
postData := url.Values{}
postData.Set("payload", h.Payload)
postData.Set("timestamp", h.Timestamp)
postData.Set("secret", h.Secret)

resp, err := http.PostForm(h.Url, postData)
Expand All @@ -33,26 +34,5 @@ func (h *HookEvent) Process() error {
return fmt.Errorf("failed to send hook event to %s: %s", h.Url, resp.Status)
}

fmt.Println("hook event sent to", h.Url)

return nil
}

type HookManager struct {
Channel string
rl *RateLimiter
}

func NewHookManager(channel string, rl *RateLimiter) *HookManager {
return &HookManager{channel, rl}
}

func (s *HookManager) Process(hook *HookEvent) error {
s.rl.Acquire()
err := hook.Process()
if err != nil {
log.Println(err)
}

return err
}
60 changes: 60 additions & 0 deletions hookManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"
"log"
"time"
)

type HookManager struct {
Broker Broker
Channel string
rateLimiter *RateLimiter
}

func NewHookManager(broker Broker, channel string, rateLimit string) *HookManager {
rateLimiter := NewRateLimiter(rateLimit)

return &HookManager{broker, channel, rateLimiter}
}

func (hm *HookManager) Run() {
hm.Broker.HookStream(hm.Channel, hm.process)
}

func (hm *HookManager) process(hook *HookEvent) {
if !hook.Run_after_time.IsZero() && hook.Run_after_time.After(time.Now()) {
fmt.Println("hook event scheduled to run", hook.Url)
hm.Broker.ScheduleHook(hm.Channel, hook.Id, hook.Run_after_time)
return
}

if !hook.Expire_time.IsZero() && hook.Expire_time.Before(time.Now()) {
fmt.Println("hook event expired", hook.Url)
hm.Broker.ClearHook(hook.Id)
return
}

hm.rateLimiter.Acquire()

go func() {
err := hook.Process()
if err != nil {
log.Println(err)
if hook.Retry_count < 3 {
hm.Broker.UpdateRetryCount(hook.Id, hook.Retry_count+1)
hm.Broker.ScheduleHook(hm.Channel, hook.Id, time.Now().Add(time.Minute*time.Duration(hook.Retry_count)))
} else {
hm.Broker.ClearHook(hook.Id)
log.Println("hook event failed", hook.Url)
}
} else {
hm.Broker.ClearHook(hook.Id)
fmt.Println("hook event sent to", hook.Url)
}
}()
}

func (hm *HookManager) Stop() {
hm.rateLimiter.Stop()
}
8 changes: 3 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ func main() {
for _, channel := range config.Channels {
wq.Add(1)

rateLimiter := NewRateLimiter(channel.Ratelimit)
defer rateLimiter.Stop()

manager := NewHookManager(channel.Name, rateLimiter)
hookManager := NewHookManager(broker, channel.Name, channel.Ratelimit)
defer hookManager.Stop()

go (func() {
defer wq.Done()
broker.Run(manager)
hookManager.Run()
})()
}

Expand Down
147 changes: 104 additions & 43 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"strconv"
"time"

"github.com/redis/go-redis/v9"
Expand All @@ -14,8 +15,86 @@ type RedisBroker struct {
ctx context.Context
}

func NewRedisBroker(ctx context.Context, addr string, password string, db int) *RedisBroker {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})

return &RedisBroker{
client: client,
ctx: ctx,
}
}

func (b *RedisBroker) HookStream(channel string, cb func(*HookEvent)) {
// Start a goroutine to listen for new hooks
go func() {
for {
select {
case <-b.ctx.Done():
return
default:
result, _ := b.client.BZPopMin(b.ctx, time.Minute, "asynchooks:"+channel).Result()

if result != nil {
id := result.Member.(string)

hook, err := b.getHook(id)
if err != nil {
b.deleteRawHook(id)
log.Println(err)
continue
}

cb(hook)
}
}
}
}()

// Start a goroutine to check for scheduled hooks
go func() {
for {
select {
case <-b.ctx.Done():
return
default:
results, _ := b.client.ZRangeByScore(b.ctx, "asynchooks-scheduled:"+channel, &redis.ZRangeBy{
Min: "-inf",
Max: strconv.FormatInt(time.Now().Unix(), 10),
Offset: 0,
Count: 100,
}).Result()

for _, id := range results {
b.client.ZRem(b.ctx, "asynchooks-scheduled:"+channel, id)
b.client.ZAdd(b.ctx, "asynchooks:"+channel, redis.Z{Score: 0, Member: id})
}

time.Sleep(time.Second)
}
}
}()
}

func (b *RedisBroker) ScheduleHook(channel string, id string, runAfter time.Time) {
_, err := b.client.ZAdd(b.ctx, "asynchooks-scheduled:"+channel, redis.Z{Score: float64(runAfter.Unix()), Member: id}).Result()
if err != nil {
log.Println(err)
}
}

func (b *RedisBroker) UpdateRetryCount(id string, retryCount int) {
_, err := b.client.HSet(b.ctx, "asynchook:"+id, "retry_count", retryCount).Result()
if err != nil {
log.Println(err)
}
}

func (b *RedisBroker) getHook(id string) (*HookEvent, error) {
result, err := b.client.HMGet(b.ctx, "asynchook:"+id, "url", "payload", "timestamp", "secret").Result()
result, err := b.client.HMGet(b.ctx, "asynchook:"+id, "url", "payload", "secret", "run_after_time", "expire_time", "retry_count").Result()

if err != nil {
return nil, err
Expand All @@ -26,17 +105,33 @@ func (b *RedisBroker) getHook(id string) (*HookEvent, error) {
}

hook := &HookEvent{
Id: id,
Url: result[0].(string),
Id: id,
Url: result[0].(string),
Retry_count: 0,
}
if result[1] != nil { // payload
hook.Payload = result[1].(string)
}
if result[2] != nil { // timestamp
hook.Timestamp = result[2].(string)
if result[2] != nil { // secret
hook.Secret = result[2].(string)
}
if result[3] != nil { // run_after_time
unixTime, err := strconv.ParseInt(result[3].(string), 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing run_after_time: %v", err)
}
hook.Run_after_time = time.Unix(unixTime, 0)
}
if result[4] != nil { // expire_time
unixTime, err := strconv.ParseInt(result[4].(string), 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing expire_time: %v", err)
}
hook.Expire_time = time.Unix(unixTime, 0)
}
if result[3] != nil { // secret
hook.Secret = result[3].(string)
if result[5] != nil { // retry_count
retry_count, _ := strconv.Atoi(result[5].(string))
hook.Retry_count = retry_count
}

return hook, nil
Expand All @@ -48,44 +143,10 @@ func (b *RedisBroker) deleteRawHook(id string) error {
return err
}

func (b *RedisBroker) Run(manager *HookManager) {
for {
select {
case <-b.ctx.Done():
return
default:
result, _ := b.client.BZPopMin(b.ctx, time.Second*10, "asynchooks:"+manager.Channel).Result()

if result != nil {
id := result.Member.(string)

hook, err := b.getHook(id)
if err != nil {
log.Println(err)
continue
}

b.deleteRawHook(id)

manager.Process(hook)
}
}
}
func (b *RedisBroker) ClearHook(id string) {
b.deleteRawHook(id)
}

func (b *RedisBroker) Close() {
b.client.Close()
}

func NewRedisBroker(ctx context.Context, addr string, password string, db int) *RedisBroker {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})

return &RedisBroker{
client: client,
ctx: ctx,
}
}

0 comments on commit 853aa83

Please sign in to comment.