Skip to content

Commit

Permalink
Merge pull request #1 from henomis/v0.0.3
Browse files Browse the repository at this point in the history
V0.0.3
  • Loading branch information
henomis authored May 1, 2024
2 parents b22ce84 + 9f89b37 commit 42c534f
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 254 deletions.
40 changes: 21 additions & 19 deletions examples/cmd/ingestion/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ import (
)

func main() {
l := langfuse.New()
l := langfuse.New(context.Background())

err := l.Trace(&model.Trace{Name: "test-trace"})
trace, err := l.Trace(&model.Trace{Name: "test-trace"})
if err != nil {
panic(err)
}

err = l.Span(&model.Span{Name: "test-span"})
span, err := l.Span(&model.Span{Name: "test-span", TraceID: trace.ID}, nil)
if err != nil {
panic(err)
}

err = l.Generation(
generation, err := l.Generation(
&model.Generation{
Name: "test-generation",
Model: "gpt-3.5-turbo",
TraceID: trace.ID,
Name: "test-generation",
Model: "gpt-3.5-turbo",
ModelParameters: model.M{
"maxTokens": "1000",
"temperature": "0.9",
Expand All @@ -42,14 +43,16 @@ func main() {
"key": "value",
},
},
&span.ID,
)
if err != nil {
panic(err)
}

err = l.Event(
_, err = l.Event(
&model.Event{
Name: "test-event",
Name: "test-event",
TraceID: trace.ID,
Metadata: model.M{
"key": "value",
},
Expand All @@ -60,33 +63,32 @@ func main() {
"key": "value",
},
},
&generation.ID,
)
if err != nil {
panic(err)
}

err = l.GenerationEnd(
&model.Generation{
Output: model.M{
"completion": "The Q3 OKRs contain goals for multiple teams...",
},
},
)
generation.Output = model.M{
"completion": "The Q3 OKRs contain goals for multiple teams...",
}
_, err = l.GenerationEnd(generation)
if err != nil {
panic(err)
}

err = l.Score(
_, err = l.Score(
&model.Score{
Name: "test-score",
Value: 0.9,
TraceID: trace.ID,
Name: "test-score",
Value: 0.9,
},
)
if err != nil {
panic(err)
}

err = l.SpanEnd(&model.Span{})
_, err = l.SpanEnd(span)
if err != nil {
panic(err)
}
Expand Down
11 changes: 6 additions & 5 deletions internal/pkg/observer/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package observer

import (
"context"
"time"
)

Expand Down Expand Up @@ -37,19 +38,19 @@ func (h *handler[T]) withTick(period time.Duration) *handler[T] {
return h
}

func (h *handler[T]) listen() {
func (h *handler[T]) listen(ctx context.Context) {
ticker := time.NewTicker(h.tickerPeriod)

for {
select {
case <-ticker.C:
go h.handle()
go h.handle(ctx)
case cmd, ok := <-h.commandCh:
if !ok {
return
}

h.handle()
h.handle(ctx)
if cmd == commandFlushAndWait {
ticker.Stop()
close(h.commandCh)
Expand All @@ -58,8 +59,8 @@ func (h *handler[T]) listen() {
}
}

func (h *handler[T]) handle() {
h.fn(h.queue.All())
func (h *handler[T]) handle(ctx context.Context) {
h.fn(ctx, h.queue.All())
}

func (h *handler[T]) flush() {
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ import (
"time"
)

type EventHandler[T any] func(events []T)
type EventHandler[T any] func(ctx context.Context, events []T)

type Observer[T any] struct {
queue *queue[T]
handler *handler[T]
}

func NewObserver[T any](fn EventHandler[T]) *Observer[T] {
func NewObserver[T any](ctx context.Context, fn EventHandler[T]) *Observer[T] {
queue := newQueue[T]()

o := &Observer[T]{
queue: queue,
handler: newHandler(queue, fn),
}
go o.handler.listen()
go o.handler.listen(ctx)

return o
}
Expand Down
60 changes: 0 additions & 60 deletions internal/pkg/path/path.go

This file was deleted.

Loading

0 comments on commit 42c534f

Please sign in to comment.