Skip to content

Commit

Permalink
Merge pull request #4 from lebrunel/streaming
Browse files Browse the repository at this point in the history
Refactors streaming
  • Loading branch information
lebrunel authored Feb 19, 2024
2 parents de2686d + 816a111 commit 3b92eb6
Show file tree
Hide file tree
Showing 9 changed files with 423 additions and 300 deletions.
129 changes: 64 additions & 65 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The package can be installed by adding `ollama` to your list of dependencies in
```elixir
def deps do
[
{:ollama, "~> 0.4"}
{:ollama, "~> 0.5"}
]
end
```
Expand All @@ -30,110 +30,109 @@ For more examples, refer to the [Ollama documentation](https://hexdocs.pm/ollama
### 1. Generate a completion

```elixir
iex> client = Ollama.init()
client = Ollama.init()

iex> Ollama.completion(client, [
...> model: "llama2",
...> prompt: "Why is the sky blue?",
...> ])
{:ok, %{"response" => "The sky is blue because it is the color of the sky.", ...}}
Ollama.completion(client, [
model: "llama2",
prompt: "Why is the sky blue?",
])
# {:ok, %{"response" => "The sky is blue because it is the color of the sky.", ...}}
```

### 2. Generate the next message in a chat

```elixir
iex> client = Ollama.init()
iex> messages = [
...> %{role: "system", content: "You are a helpful assistant."},
...> %{role: "user", content: "Why is the sky blue?"},
...> %{role: "assistant", content: "Due to rayleigh scattering."},
...> %{role: "user", content: "How is that different than mie scattering?"},
...> ]

iex> Ollama.chat(client, [
...> model: "llama2",
...> messages: messages,
...> ])
{:ok, %{"message" => %{
"role" => "assistant",
"content" => "Mie scattering affects all wavelengths similarly, while Rayleigh favors shorter ones."
}, ...}}
client = Ollama.init()
messages = [
%{role: "system", content: "You are a helpful assistant."},
%{role: "user", content: "Why is the sky blue?"},
%{role: "assistant", content: "Due to rayleigh scattering."},
%{role: "user", content: "How is that different than mie scattering?"},
]

Ollama.chat(client, [
model: "llama2",
messages: messages,
])
# {:ok, %{"message" => %{
# "role" => "assistant",
# "content" => "Mie scattering affects all wavelengths similarly, while Rayleigh favors shorter ones."
# }, ...}}
```

## Streaming

By default, all endpoints are called with streaming disabled, blocking unti the HTTP request completes and the response body is returned. For endpoints where streaming is supported, the `:stream` option can be set to `true` or a `t:pid/0`. When streaming is enabled, the function returns a `t:Task.t/0`, which asynchronously sends messages back to either the calling process, or the process associated with the given `t:pid/0`.
When an Ollama endpoint is called with the `:stream` option set to `true`, a `t:Ollama.Streaming.t/0` struct is returned providing a unqiue `t:reference/0` for the streaming request, and a lazy enumerable that generates messages as they are received from the streaming request.

```elixir
iex> Ollama.completion(client, [
...> model: "llama2",
...> prompt: "Why is the sky blue?",
...> stream: true,
...> ])
{:ok, %Task{}}

iex> Ollama.chat(client, [
...> model: "llama2",
...> messages: messages,
...> stream: true,
...> ])
{:ok, %Task{}}
Ollama.completion(client, [
model: "llama2",
prompt: "Why is the sky blue?",
stream: true,
])
# {:ok, %Ollama.Streaming{}}

Ollama.chat(client, [
model: "llama2",
messages: messages,
stream: true,
])
# {:ok, %Ollama.Streaming{}}
```

Messages will be sent in the following format, allowing the receiving process to pattern match against the pid of the async task if known:
`Ollama.Streaming` implements the `Enumerable` protocol, so can be used directly with `Stream` functions. Most of the time, you'll just want to asynchronously call `Ollama.Streaming.send_to/2`, which will run the stream and send each message to a process of your chosing.

Messages are sent in the following format, allowing the receiving process to pattern match against the `t:reference/0` of the streaming request:

```elixir
{request_pid, {:data, data}}
```

The data is a map from the Ollama JSON message. See [Ollama API docs](https://github.com/ollama/ollama/blob/main/docs/api.md).

You could manually create a `receive` block to handle messages.

```elixir
receive do
{^current_message_pid, {:data, %{"done" => true} = data}} ->
# handle last message
{^current_message_pid, {:data, data}} ->
# handle message
{_pid, _data_} ->
# this message was not expected!
end
```
Each data chunk is a map. For its schema, Refer to the [Ollama API docs](https://github.com/ollama/ollama/blob/main/docs/api.md).

In most cases you will probably use `c:GenServer.handle_info/2`. The following example show's how a LiveView process may by constructed to both create the streaming request and receive the streaming messages.
A typical example is to make a streaming request as part of a LiveView event, and send each of the streaming messages back to the same LiveView process.

```elixir
defmodule Ollama.ChatLive do
defmodule MyApp.ChatLive do
use Phoenix.LiveView
alias Ollama.Streaming

# When the client invokes the "prompt" event, create a streaming request
# and optionally store the request task into the assigns
# When the client invokes the "prompt" event, create a streaming request and
# asynchronously send messages back to self.
def handle_event("prompt", %{"message" => prompt}, socket) do
client = Ollama.init()
{:ok, task} = Ollama.completion(client, [
{:ok, streamer} = Ollama.completion(Ollama.init(), [
model: "llama2",
prompt: prompt,
stream: true,
])

{:noreply, assign(socket, current_request: task)}
pid = self()
{:noreply,
socket
|> assign(current_request: streamer.ref)
|> start_async(:streaming, fn -> Streaming.send_to(streaming, pid) end)
}
end

# The request task streams messages back to the LiveView process
def handle_info({_request_pid, {:data, _data}} = message, socket) do
pid = socket.assigns.current_request.pid
# The streaming request sends messages back to the LiveView process.
def handle_info({_request_ref, {:data, _data}} = message, socket) do
ref = socket.assigns.current_request
case message do
{^pid, {:data, %{"done" => false} = data}} ->
{^ref, {:data, %{"done" => false} = data}} ->
# handle each streaming chunk

{^pid, {:data, %{"done" => true} = data}} ->
{^ref, {:data, %{"done" => true} = data}} ->
# handle the final streaming chunk

{_pid, _data} ->
{_ref, _data} ->
# this message was not expected!
end
end

# When the streaming request is finished, remove the current reference.
def handle_async(:streaming, :ok, socket) do
{:noreply, assign(socket, current_request: nil)}
end
end
```

Expand Down
Loading

0 comments on commit 3b92eb6

Please sign in to comment.