Skip to content

Commit

Permalink
style consistent with anthropix
Browse files Browse the repository at this point in the history
  • Loading branch information
lebrunel committed Apr 18, 2024
1 parent 726bcb5 commit b56089d
Showing 1 changed file with 17 additions and 20 deletions.
37 changes: 17 additions & 20 deletions lib/ollama.ex
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,8 @@ defmodule Ollama do
{:error, term()}

@typep req_response() ::
{:ok, Req.Response.t()} |
{:error, term()} |
Task.t() |
Enumerable.t()
{:ok, Req.Response.t() | Task.t() | Enumerable.t()} |
{:error, term()}


@default_req_opts [
Expand Down Expand Up @@ -757,13 +755,16 @@ defmodule Ollama do

cond do
stream_opt ->
opts = opts
|> Keyword.update!(:json, & Map.put(&1, :stream, true))
|> Keyword.put(:into, stream_handler(dest))
opts =
opts
|> Keyword.update!(:json, & Map.put(&1, :stream, true))
|> Keyword.put(:into, stream_handler(dest))

task = Task.async(fn -> req |> Req.request(opts) |> res() end)

case stream_opt do
true -> Stream.resource(fn -> task end, &stream_next/1, &stream_end/1)
_ -> task
true -> {:ok, Stream.resource(fn -> task end, &stream_next/1, &stream_end/1)}
_ -> {:ok, task}
end

Keyword.get(opts, :json) |> is_map() ->
Expand All @@ -777,8 +778,8 @@ defmodule Ollama do

# Normalizes the response returned from the request
@spec res(req_response()) :: response()
defp res(%Task{} = task), do: {:ok, task}
defp res(stream) when is_function(stream), do: {:ok, stream}
defp res({:ok, %Task{} = task}), do: {:ok, task}
defp res({:ok, enum}) when is_function(enum), do: {:ok, enum}

defp res({:ok, %{status: status, body: body}}) when status in 200..299 do
{:ok, body}
Expand All @@ -800,16 +801,12 @@ defmodule Ollama do
@spec stream_handler(pid()) :: fun()
defp stream_handler(pid) do
fn {:data, data}, {req, res} ->
case Jason.decode(data) do
{:ok, data} ->
Process.send(pid, {self(), {:data, data}}, [])
{:cont, {req, stream_merge(res, data)}}

{:error, _} ->
Process.send(pid, {self(), {:data, data}}, [])
{:cont, {req, res}}
with {:ok, data} <- Jason.decode(data) do
Process.send(pid, {self(), {:data, data}}, [])
{:cont, {req, stream_merge(res, data)}}
else
_ -> {:cont, {req, res}}
end

end
end

Expand Down

0 comments on commit b56089d

Please sign in to comment.