Skip to content

Commit

Permalink
πŸ”₯ Feature (v3): Add buffered streaming support (#3131)
Browse files Browse the repository at this point in the history
* πŸ”₯ Feature: Add SendStreamWriter to Ctx

Create a new `*DefaultCtx` method called `SendStreamWriter()`
that maps to fasthttp's `Response.SetBodyStreamWriter()`

* 🚨 Test: Validate regular use of c.SendStreamWriter()

- Adds Test_Ctx_SendStreamWriter to ctx_test.go

* 🚨 Test: (WIP) Validate interrupted use of c.SendStreamWriter()

- Adds Test_Ctx_SendStreamWriter_Interrupted to ctx_test.go
    - (Work-In-Progress) This test verifies that some data is
      still sent before a client disconnects when using the method
      `c.SendStreamWriter()`.

**Note:** Running this test reports a race condition when using
the `-race` flag or running `make test`. The test uses a channel
and mutex to prevent race conditions, but still triggers a warning.

* πŸ“š Doc: Add `SendStreamWriter` to docs/api/ctx.md

* 🩹 Fix: Remove race condition in Test_Ctx_SendStreamWriter_Interrupted

* 🎨 Styles: Update ctx_test.go to respect golangci-lint

* πŸ“š Doc: Update /docs/api/ctx.md to show proper `w.Flush()` error handling

* πŸ“š Doc: Add SendStreamWriter details to docs/whats_new.md

* 🎨 Styles: Update /docs/whats_new.md to respect markdownlint-cli2

* 🩹 Fix: Fix Fprintf syntax error in docs/whats_new.md

---------

Co-authored-by: M. Efe Γ‡etin <[email protected]>
  • Loading branch information
grivera64 and efectn authored Nov 27, 2024
1 parent ff55cfd commit 31a503f
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 0 deletions.
8 changes: 8 additions & 0 deletions ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package fiber

import (
"bufio"
"bytes"
"context"
"crypto/tls"
Expand Down Expand Up @@ -1671,6 +1672,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error {
return nil
}

// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter))

return nil
}

// Set sets the response's HTTP header field to the specified key, value.
func (c *DefaultCtx) Set(key, val string) {
c.fasthttp.Response.Header.Set(key, val)
Expand Down
3 changes: 3 additions & 0 deletions ctx_interface_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4447,6 +4447,71 @@ func Test_Ctx_SendStream(t *testing.T) {
require.Equal(t, "Hello bufio", string(c.Response().Body()))
}

// go test -run Test_Ctx_SendStreamWriter
func Test_Ctx_SendStreamWriter(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

err := c.SendStreamWriter(func(w *bufio.Writer) {
w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
})
require.NoError(t, err)
require.Equal(t, "Don't crash please", string(c.Response().Body()))

err = c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
if err := w.Flush(); err != nil {
t.Errorf("unexpected error: %s", err)
return
}
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body()))

err = c.SendStreamWriter(func(_ *bufio.Writer) {})
require.NoError(t, err)
require.Empty(t, c.Response().Body())
}

// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
app.Get("/", func(c Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck // It is fine to ignore the error

if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}

time.Sleep(400 * time.Millisecond)
}
})
})

req := httptest.NewRequest(MethodGet, "/", nil)
testConfig := TestConfig{
Timeout: 1 * time.Second,
FailOnTimeout: false,
}
resp, err := app.Test(req, testConfig)
require.NoError(t, err)

body, err := io.ReadAll(resp.Body)
t.Logf("%v", err)
require.EqualError(t, err, "unexpected EOF")

require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(body))
}

// go test -run Test_Ctx_Set
func Test_Ctx_Set(t *testing.T) {
t.Parallel()
Expand Down
60 changes: 60 additions & 0 deletions docs/api/ctx.md
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,66 @@ app.Get("/", func(c fiber.Ctx) error {
})
```

## SendStreamWriter

Sets the response body stream writer.

:::note
The argument `streamWriter` represents a function that populates
the response body using a buffered stream writer.
:::

```go title="Signature"
func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error
```

```go title="Example"
app.Get("/", func (c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Hello, World!\n")
})
// => "Hello, World!"
})
```

:::info
To send data before `streamWriter` returns, you can call `w.Flush()`
on the provided writer. Otherwise, the buffered stream flushes after
`streamWriter` returns.
:::

:::note
`w.Flush()` will return an error if the client disconnects before `streamWriter` finishes writing a response.
:::

```go title="Example"
app.Get("/wait", func(c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
// Begin Work
fmt.Fprintf(w, "Please wait for 10 seconds\n")
if err := w.Flush(); err != nil {
log.Print("Client disconnected!")
return
}
// Send progress over time
time.Sleep(time.Second)
for i := 0; i < 9; i++ {
fmt.Fprintf(w, "Still waiting...\n")
if err := w.Flush(); err != nil {
// If client disconnected, cancel work and finish
log.Print("Client disconnected!")
return
}
time.Sleep(time.Second)
}
// Finish
fmt.Fprintf(w, "Done!\n")
})
})
```

## Set

Sets the response’s HTTP header field to the specified `key`, `value`.
Expand Down
38 changes: 38 additions & 0 deletions docs/whats_new.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ DRAFT section
- Reset
- Schema -> ExpressJs like
- SendStream -> ExpressJs like
- SendStreamWriter
- SendString -> ExpressJs like
- String -> ExpressJs like
- ViewBind -> instead of Bind
Expand Down Expand Up @@ -296,6 +297,43 @@ DRAFT section
- UserContext has been renamed to Context which returns a context.Context object.
- SetUserContext has been renamed to SetContext.

### SendStreamWriter

In v3, we added support for buffered streaming by providing the new method `SendStreamWriter()`.

```go
func (c Ctx) SendStreamWriter(streamWriter func(w *bufio.Writer))
```

With this new method, you can implement:

- Server-Side Events (SSE)
- Large file downloads
- Live data streaming

```go
app.Get("/sse", func(c fiber.Ctx) {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")

return c.SendStreamWriter(func(w *bufio.Writer) {
for {
fmt.Fprintf(w, "event: my-event\n")
fmt.Fprintf(w, "data: Hello SSE\n\n")

if err := w.Flush(); err != nil {
log.Print("Client disconnected!")
return
}
}
})
})
```

You can find more details about this feature in [/docs/api/ctx.md](./api/ctx.md).

---

## 🌎 Client package
Expand Down

1 comment on commit 31a503f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: 31a503f Previous: 8c84b0f Ratio
Benchmark_Ctx_Send 6.506 ns/op 0 B/op 0 allocs/op 4.335 ns/op 0 B/op 0 allocs/op 1.50
Benchmark_Ctx_Send - ns/op 6.506 ns/op 4.335 ns/op 1.50
Benchmark_Utils_GetOffer/1_parameter 206.1 ns/op 0 B/op 0 allocs/op 131 ns/op 0 B/op 0 allocs/op 1.57
Benchmark_Utils_GetOffer/1_parameter - ns/op 206.1 ns/op 131 ns/op 1.57
Benchmark_Middleware_BasicAuth - B/op 80 B/op 48 B/op 1.67
Benchmark_Middleware_BasicAuth - allocs/op 5 allocs/op 3 allocs/op 1.67
Benchmark_Middleware_BasicAuth_Upper - B/op 80 B/op 48 B/op 1.67
Benchmark_Middleware_BasicAuth_Upper - allocs/op 5 allocs/op 3 allocs/op 1.67
Benchmark_CORS_NewHandler - B/op 16 B/op 0 B/op +∞
Benchmark_CORS_NewHandler - allocs/op 1 allocs/op 0 allocs/op +∞
Benchmark_CORS_NewHandlerSingleOrigin - B/op 16 B/op 0 B/op +∞
Benchmark_CORS_NewHandlerSingleOrigin - allocs/op 1 allocs/op 0 allocs/op +∞
Benchmark_CORS_NewHandlerPreflight - B/op 104 B/op 0 B/op +∞
Benchmark_CORS_NewHandlerPreflight - allocs/op 5 allocs/op 0 allocs/op +∞
Benchmark_CORS_NewHandlerPreflightSingleOrigin - B/op 104 B/op 0 B/op +∞
Benchmark_CORS_NewHandlerPreflightSingleOrigin - allocs/op 5 allocs/op 0 allocs/op +∞
Benchmark_CORS_NewHandlerPreflightWildcard - B/op 104 B/op 0 B/op +∞
Benchmark_CORS_NewHandlerPreflightWildcard - allocs/op 5 allocs/op 0 allocs/op +∞
Benchmark_Middleware_CSRF_GenerateToken - B/op 518 B/op 327 B/op 1.58
Benchmark_Middleware_CSRF_GenerateToken - allocs/op 10 allocs/op 6 allocs/op 1.67

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.