Skip to content

Commit

Permalink
made os signal ready
Browse files Browse the repository at this point in the history
  • Loading branch information
padaliyajay committed Dec 23, 2024
1 parent fe7aee2 commit 8c0b41f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
14 changes: 13 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package main

import (
"context"
"flag"
"os"
"os/signal"
"sync"
"syscall"
)

func main() {
Expand All @@ -13,7 +17,9 @@ func main() {
if config, err := LoadConfig(config_file); err != nil {
panic(err)
} else {
broker := NewRedisBroker(config.Redis.Addr, config.Redis.Password, config.Redis.DB)
ctx, cancel := context.WithCancel(context.Background())

broker := NewRedisBroker(ctx, config.Redis.Addr, config.Redis.Password, config.Redis.DB)
defer broker.Close()

var wq sync.WaitGroup
Expand All @@ -32,6 +38,12 @@ func main() {
})()
}

// Handle termination signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

<-sigCh
cancel()
wq.Wait()
}
}
39 changes: 20 additions & 19 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (

type RedisBroker struct {
client *redis.Client
ctx context.Context
}

func (b *RedisBroker) getHook(id string) (*HookEvent, error) {
ctx := context.Background()

result, err := b.client.HMGet(ctx, "asynchook:"+id, "url", "payload", "timestamp", "secret").Result()
result, err := b.client.HMGet(b.ctx, "asynchook:"+id, "url", "payload", "timestamp", "secret").Result()

if err != nil {
return nil, err
Expand Down Expand Up @@ -43,31 +42,32 @@ func (b *RedisBroker) getHook(id string) (*HookEvent, error) {
}

func (b *RedisBroker) deleteRawHook(id string) error {
ctx := context.Background()

_, err := b.client.Del(ctx, "asynchook:"+id).Result()
_, err := b.client.Del(b.ctx, "asynchook:"+id).Result()

return err
}

func (b *RedisBroker) Run(manager *HookManager) {
ctx := context.Background()

for {
result, _ := b.client.BZPopMin(ctx, time.Minute, "asynchooks:"+manager.Channel).Result()
select {
case <-b.ctx.Done():
return
default:
result, _ := b.client.BZPopMin(b.ctx, time.Minute, "asynchooks:"+manager.Channel).Result()

if result != nil {
id := result.Member.(string)
if result != nil {
id := result.Member.(string)

hook, err := b.getHook(id)
if err != nil {
fmt.Println(err)
continue
}
hook, err := b.getHook(id)
if err != nil {
fmt.Println(err)
continue
}

b.deleteRawHook(id)
b.deleteRawHook(id)

manager.Process(hook)
manager.Process(hook)
}
}
}
}
Expand All @@ -76,7 +76,7 @@ func (b *RedisBroker) Close() {
b.client.Close()
}

func NewRedisBroker(addr string, password string, db int) *RedisBroker {
func NewRedisBroker(ctx context.Context, addr string, password string, db int) *RedisBroker {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
Expand All @@ -85,5 +85,6 @@ func NewRedisBroker(addr string, password string, db int) *RedisBroker {

return &RedisBroker{
client: client,
ctx: ctx,
}
}

0 comments on commit 8c0b41f

Please sign in to comment.