From 38ea1330cf6571c0b2d2ad8db382ecc5e8f2ad8e Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Tue, 1 Oct 2024 22:58:36 -0400 Subject: [PATCH 1/8] improvement: implement a subscription notification batcher --- config/config.exs | 4 + lib/subscription/batcher.ex | 188 ++++++++++++++++++++++++++++++++++++ lib/subscription/runner.ex | 38 +------- test/subscription_test.exs | 90 +++++++++++++++++ 4 files changed, 283 insertions(+), 37 deletions(-) create mode 100644 lib/subscription/batcher.ex diff --git a/config/config.exs b/config/config.exs index 7c092045..c966d1d6 100644 --- a/config/config.exs +++ b/config/config.exs @@ -11,6 +11,10 @@ config :logger, level: :info config :ash_graphql, :subscriptions, true +if Mix.env() == :test do + config :ash_graphql, :simulate_subscription_slowness?, true +end + if Mix.env() == :dev do config :git_ops, mix_project: AshGraphql.MixProject, diff --git a/lib/subscription/batcher.ex b/lib/subscription/batcher.ex new file mode 100644 index 00000000..232ce895 --- /dev/null +++ b/lib/subscription/batcher.ex @@ -0,0 +1,188 @@ +defmodule AshGraphql.Subscription.Batcher do + use GenServer + + alias Absinthe.Pipeline.BatchResolver + + require Logger + @compile {:inline, simulate_slowness: 0} + + defstruct [batches: %{}, total_count: 0, async_limit: 100, async_threshold: 50] + + defmodule Batch do + defstruct [notifications: [], count: 0, pubsub: nil, key_strategy: nil, doc: nil, timer: nil] + + def add(batch, item) do + %{batch | notifications: [item | batch.notifications], count: batch.count + 1} + end + end + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: opts[:name] || __MODULE__) + end + + def drain do + GenServer.call(__MODULE__, :drain, :infinity) + end + + def publish(topic, notification, pubsub, key_strategy, doc) do + case GenServer.call(__MODULE__, {:publish, topic, notification, pubsub, key_strategy, doc}, :infinity) do + :handled -> + :ok + :backpressure_sync -> + do_send(topic, [notification], pubsub, key_strategy, doc) + end + end + + def init(config) do + {:ok, %__MODULE__{async_limit: config[:async_limit] || 100, async_threshold: config[:async_threshold] || 50}} + end + + def handle_call(:drain, _from, state) do + {:reply, :done, send_all_batches(state)} + end + + def handle_call(:dump_state, _from, state) do + {:reply, state, state} + end + + def handle_call({:publish, topic, notification, pubsub, key_strategy, doc}, from, state) do + simulate_slowness() + + if state.total_count >= state.async_limit do + {:reply, :backpressure_sync, state} + else + GenServer.reply(from, :handled) + simulate_slowness() + state = put_notification(state, topic, pubsub, key_strategy, doc, notification) + + # if we have less than async threshold, we can process it eagerly + if state.total_count < state.async_threshold do + # so we eagerly process current_calls + state = eagerly_build_batches(state, state.async_threshold - state.total_count) + + # and if we still have less than the async threshold + if state.total_count < state.async_threshold do + # then we send all of our batches + {:noreply, send_all_batches(state)} + else + # otherwise we wait on the regularly scheduled push + {:noreply, ensure_timer(state, topic)} + end + else + # otherwise we wait on the regularly scheduled push + {:noreply, ensure_timer(state, topic)} + end + end + end + + def handle_info({_task, {:sent, topic, count, _res}}, state) do + {:noreply, %{state | total_count: state.total_count - count, batches: Map.delete(state.batches, topic)}} + end + + def handle_info({:send_batch, topic}, state) do + batch = state.batches[topic] + Task.async(fn -> + {:sent, topic, batch.count, do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} + end) + + {:noreply, state} + end + + defp eagerly_build_batches(state, 0), do: state + defp eagerly_build_batches(state, count) do + receive do + {:"$gen_call", {:publish, topic, notification, pubsub, key_strategy, doc}, from} -> + GenServer.reply(from, :handled) + + state + |> put_notification(topic, pubsub, key_strategy, doc, notification) + |> eagerly_build_batches(count - 1) + after + 0 -> + state + end + end + + if Application.compile_env(:ash_graphql, :simulate_subscription_slowness?, false) do + defp simulate_slowness do + :timer.sleep(Application.get_env(:ash_graphql, :simulate_subscription_processing_time, 0)) + end + else + defp simulate_slowness do + :ok + end + end + + defp send_all_batches(state) do + Enum.each(state.batches, fn {topic, batch} -> + if batch.timer do + Process.cancel_timer(batch.timer) + end + do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc) + end) + + %{state | batches: %{}, total_count: 0} + end + + defp do_send(topic, notifications, pubsub, key_strategy, doc) do + # Refactor to do batch resolution + notifications + |> Enum.reverse() + |> Enum.each(fn notification -> + try do + pipeline = + Absinthe.Subscription.Local.pipeline(doc, notification) + + {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) + + Logger.debug(""" + Absinthe Subscription Publication + Field Topic: #{inspect(key_strategy)} + Subscription id: #{inspect(topic)} + Data: #{inspect(data)} + """) + + case should_send?(data) do + false -> + :ok + + true -> + :ok = pubsub.publish_subscription(topic, data) + end + rescue + e -> + BatchResolver.pipeline_error(e, __STACKTRACE__) + end + end) + end + + defp put_notification(state, topic, pubsub, key_strategy, doc, notification) do + state.batches + |> Map.put_new_lazy(topic, fn -> %Batch{key_strategy: key_strategy, doc: doc, pubsub: pubsub} end) + |> Map.update!(topic, &Batch.add(&1, notification)) + |> then(&%{state | batches: &1, total_count: state.total_count + 1}) + end + + defp ensure_timer(%{batches: batches} = state, topic) do + if batches[topic].timer do + state + else + # TODO: this interval should be configurable + timer = Process.send_after(self(), {:send_batch, topic}, 1000) + + put_in(state.batches[topic].timer, timer) + end + end + + defp should_send?(%{errors: errors}) do + # if the user is not allowed to see the data or the query didn't + # return any data we do not send the error to the client + # because it would just expose unnecessary information + # and the user can not really do anything usefull with it + not (errors + |> List.wrap() + |> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found", nil] end)) + end + + defp should_send?(_), do: true +end diff --git a/lib/subscription/runner.ex b/lib/subscription/runner.ex index fbbf3e18..0d8cce61 100644 --- a/lib/subscription/runner.ex +++ b/lib/subscription/runner.ex @@ -5,48 +5,12 @@ defmodule AshGraphql.Subscription.Runner do Mostly a copy of https://github.com/absinthe-graphql/absinthe/blob/3d0823bd71c2ebb94357a5588c723e053de8c66a/lib/absinthe/subscription/local.ex#L40 but this lets us decide if we want to send the data to the client or not in certain error cases """ - alias Absinthe.Pipeline.BatchResolver require Logger def run_docset(pubsub, docs_and_topics, notification) do for {topic, key_strategy, doc} <- docs_and_topics do - try do - pipeline = - Absinthe.Subscription.Local.pipeline(doc, notification) - - {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) - - Logger.debug(""" - Absinthe Subscription Publication - Field Topic: #{inspect(key_strategy)} - Subscription id: #{inspect(topic)} - Data: #{inspect(data)} - """) - - case should_send?(data) do - false -> - :ok - - true -> - :ok = pubsub.publish_subscription(topic, data) - end - rescue - e -> - BatchResolver.pipeline_error(e, __STACKTRACE__) - end + AshGraphql.Subscription.Batcher.publish(topic, notification, pubsub, key_strategy, doc) end end - - defp should_send?(%{errors: errors}) do - # if the user is not allowed to see the data or the query didn't - # return any data we do not send the error to the client - # because it would just expose unnecessary information - # and the user can not really do anything usefull with it - not (errors - |> List.wrap() - |> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found", nil] end)) - end - - defp should_send?(_), do: true end diff --git a/test/subscription_test.exs b/test/subscription_test.exs index f219e091..d7324d0c 100644 --- a/test/subscription_test.exs +++ b/test/subscription_test.exs @@ -15,9 +15,11 @@ defmodule AshGraphql.SubscriptionTest do Application.put_env(PubSub, :notifier_test_pid, self()) {:ok, pubsub} = PubSub.start_link() {:ok, absinthe_sub} = Absinthe.Subscription.start_link(PubSub) + start_supervised(AshGraphql.Subscription.Batcher, []) :ok on_exit(fn -> + Application.delete_env(:ash_graphql, :simulate_subscription_processing_time) Process.exit(pubsub, :normal) Process.exit(absinthe_sub, :normal) # block until the processes have exited @@ -105,6 +107,8 @@ defmodule AshGraphql.SubscriptionTest do assert Enum.empty?(mutation_result["updateSubscribable"]["errors"]) + AshGraphql.Subscription.Batcher.drain() + assert_receive({^topic, %{data: subscription_data}}) assert subscription_data["subscribableEvents"]["updated"]["text"] == "bar" @@ -128,6 +132,8 @@ defmodule AshGraphql.SubscriptionTest do context: %{actor: @admin} ) + AshGraphql.Subscription.Batcher.drain() + assert Enum.empty?(mutation_result["destroySubscribable"]["errors"]) assert_receive({^topic, %{data: subscription_data}}) @@ -195,6 +201,8 @@ defmodule AshGraphql.SubscriptionTest do |> Ash.Changeset.for_create(:create, %{text: "foo", actor_id: 1}, actor: @admin) |> Ash.create!() + AshGraphql.Subscription.Batcher.drain() + # actor1 will get data because it can see the resource assert_receive {^topic1, %{data: subscription_data}} # actor 2 will not get data because it cannot see the resource @@ -252,6 +260,8 @@ defmodule AshGraphql.SubscriptionTest do |> Ash.Changeset.for_create(:create, %{text: "foo", actor_id: 1}, actor: @admin) |> Ash.create!() + AshGraphql.Subscription.Batcher.drain() + assert_receive {^topic1, %{data: subscription_data}} assert subscribable.id == @@ -290,6 +300,8 @@ defmodule AshGraphql.SubscriptionTest do ) |> Ash.create!() + AshGraphql.Subscription.Batcher.drain() + assert_receive {^topic, %{data: subscription_data}} assert subscribable.id == @@ -327,6 +339,8 @@ defmodule AshGraphql.SubscriptionTest do ) |> Ash.create!() + AshGraphql.Subscription.Batcher.drain() + assert_receive {^topic, %{data: subscription_data}} assert subscribable.id == @@ -364,10 +378,86 @@ defmodule AshGraphql.SubscriptionTest do ) |> Ash.create!() + AshGraphql.Subscription.Batcher.drain() + assert_receive {^topic, %{data: subscription_data, errors: errors}} assert is_nil(subscription_data["subscribedOnDomain"]["created"]) refute Enum.empty?(errors) assert [%{code: "forbidden_field"}] = errors end + + test "it aggregates multiple messages" do + stop_supervised(AshGraphql.Subscription.Batcher) + start_supervised({AshGraphql.Subscription.Batcher, [async_threshold: 0]}) + + Application.put_env(:ash_graphql, :simulate_subscription_processing_time, 1000) + + assert {:ok, %{"subscribed" => topic}} = + Absinthe.run( + """ + subscription { + subscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """, + Schema, + context: %{actor: @admin, pubsub: PubSub} + ) + + create_mutation = """ + mutation CreateSubscribable($input: CreateSubscribableInput) { + createSubscribable(input: $input) { + result{ + id + text + } + errors{ + message + } + } + } + """ + + assert {:ok, %{data: mutation_result}} = + Absinthe.run(create_mutation, Schema, + variables: %{"input" => %{"text" => "foo"}}, + context: %{actor: @admin} + ) + + subscribable_id = mutation_result["createSubscribable"]["result"]["id"] + + assert {:ok, %{data: mutation_result}} = + Absinthe.run(create_mutation, Schema, + variables: %{"input" => %{"text" => "foo"}}, + context: %{actor: @admin} + ) + + assert GenServer.call(AshGraphql.Subscription.Batcher, :dump_state, :infinity).total_count == 2 + + assert Enum.empty?(mutation_result["createSubscribable"]["errors"]) + + subscribable_id2 = mutation_result["createSubscribable"]["result"]["id"] + refute is_nil(subscribable_id) + + + assert_receive({^topic, %{data: subscription_data}}) + assert_receive({^topic, %{data: subscription_data2}}) + refute_received({^topic, _}) + + assert subscribable_id == + subscription_data["subscribableEvents"]["created"]["id"] + + assert subscribable_id2 == + subscription_data2["subscribableEvents"]["created"]["id"] + end end From f5575e82e6afe9fff33997067f4f02da1d158c7c Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Tue, 1 Oct 2024 23:02:07 -0400 Subject: [PATCH 2/8] chore: format --- lib/subscription/batcher.ex | 32 ++++++++++++++++++++++++-------- test/subscription_test.exs | 4 ++-- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/lib/subscription/batcher.ex b/lib/subscription/batcher.ex index 232ce895..6462d520 100644 --- a/lib/subscription/batcher.ex +++ b/lib/subscription/batcher.ex @@ -6,10 +6,10 @@ defmodule AshGraphql.Subscription.Batcher do require Logger @compile {:inline, simulate_slowness: 0} - defstruct [batches: %{}, total_count: 0, async_limit: 100, async_threshold: 50] + defstruct batches: %{}, total_count: 0, async_limit: 100, async_threshold: 50 defmodule Batch do - defstruct [notifications: [], count: 0, pubsub: nil, key_strategy: nil, doc: nil, timer: nil] + defstruct notifications: [], count: 0, pubsub: nil, key_strategy: nil, doc: nil, timer: nil def add(batch, item) do %{batch | notifications: [item | batch.notifications], count: batch.count + 1} @@ -25,16 +25,25 @@ defmodule AshGraphql.Subscription.Batcher do end def publish(topic, notification, pubsub, key_strategy, doc) do - case GenServer.call(__MODULE__, {:publish, topic, notification, pubsub, key_strategy, doc}, :infinity) do + case GenServer.call( + __MODULE__, + {:publish, topic, notification, pubsub, key_strategy, doc}, + :infinity + ) do :handled -> :ok + :backpressure_sync -> do_send(topic, [notification], pubsub, key_strategy, doc) end end def init(config) do - {:ok, %__MODULE__{async_limit: config[:async_limit] || 100, async_threshold: config[:async_threshold] || 50}} + {:ok, + %__MODULE__{ + async_limit: config[:async_limit] || 100, + async_threshold: config[:async_threshold] || 50 + }} end def handle_call(:drain, _from, state) do @@ -76,19 +85,23 @@ defmodule AshGraphql.Subscription.Batcher do end def handle_info({_task, {:sent, topic, count, _res}}, state) do - {:noreply, %{state | total_count: state.total_count - count, batches: Map.delete(state.batches, topic)}} + {:noreply, + %{state | total_count: state.total_count - count, batches: Map.delete(state.batches, topic)}} end def handle_info({:send_batch, topic}, state) do batch = state.batches[topic] + Task.async(fn -> - {:sent, topic, batch.count, do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} + {:sent, topic, batch.count, + do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} end) {:noreply, state} end defp eagerly_build_batches(state, 0), do: state + defp eagerly_build_batches(state, count) do receive do {:"$gen_call", {:publish, topic, notification, pubsub, key_strategy, doc}, from} -> @@ -99,7 +112,7 @@ defmodule AshGraphql.Subscription.Batcher do |> eagerly_build_batches(count - 1) after 0 -> - state + state end end @@ -118,6 +131,7 @@ defmodule AshGraphql.Subscription.Batcher do if batch.timer do Process.cancel_timer(batch.timer) end + do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc) end) @@ -158,7 +172,9 @@ defmodule AshGraphql.Subscription.Batcher do defp put_notification(state, topic, pubsub, key_strategy, doc, notification) do state.batches - |> Map.put_new_lazy(topic, fn -> %Batch{key_strategy: key_strategy, doc: doc, pubsub: pubsub} end) + |> Map.put_new_lazy(topic, fn -> + %Batch{key_strategy: key_strategy, doc: doc, pubsub: pubsub} + end) |> Map.update!(topic, &Batch.add(&1, notification)) |> then(&%{state | batches: &1, total_count: state.total_count + 1}) end diff --git a/test/subscription_test.exs b/test/subscription_test.exs index d7324d0c..e5ffe1dd 100644 --- a/test/subscription_test.exs +++ b/test/subscription_test.exs @@ -442,14 +442,14 @@ defmodule AshGraphql.SubscriptionTest do context: %{actor: @admin} ) - assert GenServer.call(AshGraphql.Subscription.Batcher, :dump_state, :infinity).total_count == 2 + assert GenServer.call(AshGraphql.Subscription.Batcher, :dump_state, :infinity).total_count == + 2 assert Enum.empty?(mutation_result["createSubscribable"]["errors"]) subscribable_id2 = mutation_result["createSubscribable"]["result"]["id"] refute is_nil(subscribable_id) - assert_receive({^topic, %{data: subscription_data}}) assert_receive({^topic, %{data: subscription_data2}}) refute_received({^topic, _}) From 03dee514df52c4f669bd19162bcdcf4b5942cab1 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Wed, 2 Oct 2024 19:03:05 -0400 Subject: [PATCH 3/8] improvement: proper batched resolution, with benchmarks --- .formatter.exs | 2 +- benchmarks/subscriptions.exs | 125 +++++++++++++++++++++ lib/graphql/resolver.ex | 168 ++++++++++++++++++++++++++++ lib/subscription/batcher.ex | 211 ++++++++++++++++++++++++----------- mix.exs | 3 +- mix.lock | 3 + test/subscription_test.exs | 8 +- 7 files changed, 449 insertions(+), 71 deletions(-) create mode 100644 benchmarks/subscriptions.exs diff --git a/.formatter.exs b/.formatter.exs index a0a49475..c4e89f49 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -77,7 +77,7 @@ spark_locals_without_parens = [ ] [ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], + inputs: ["{mix,.formatter}.exs", "{config,lib,test,benchmarks}/**/*.{ex,exs}"], locals_without_parens: spark_locals_without_parens, export: [ locals_without_parens: spark_locals_without_parens diff --git a/benchmarks/subscriptions.exs b/benchmarks/subscriptions.exs new file mode 100644 index 00000000..a2267bb2 --- /dev/null +++ b/benchmarks/subscriptions.exs @@ -0,0 +1,125 @@ +alias AshGraphql.Test.PubSub +alias AshGraphql.Test.Schema + +{:ok, pubsub} = PubSub.start_link() +{:ok, absinthe_sub} = Absinthe.Subscription.start_link(PubSub) + +# Application.put_env(:ash_graphql, :simulate_subscription_processing_time, 1000) +:ok + +admin = %{ + id: 0, + role: :admin +} + +create_mutation = """ +mutation CreateSubscribable($input: CreateSubscribableInput) { + createSubscribable(input: $input) { + result{ + id + text + } + errors{ + message + } + } + } +""" + +AshGraphql.Subscription.Batcher.start_link() + +Benchee.run( + %{ + "1 mutation" => fn _input -> + Absinthe.run(create_mutation, Schema, + variables: %{"input" => %{"text" => "foo"}}, + context: %{actor: admin} + ) + end + }, + inputs: %{ + "25 same subscribers" => {25, :same}, + "500 same subscribers" => {500, :same}, + "50 mixed subscribers" => {25, [:same, :different]}, + "1000 mixed subscribers" => {500, [:same, :different]} + }, + after_scenario: fn _ -> + count = fn counter -> + receive do + msg -> + 1 + counter.(counter) + after + 0 -> 0 + end + end + + AshGraphql.Subscription.Batcher.drain() + + IO.puts("Received #{count.(count)} messages") + end, + before_scenario: fn {input, types} -> + Application.put_env(PubSub, :notifier_test_pid, self()) + + if :different in List.wrap(types) do + Enum.each(1..input, fn i -> + actor = %{ + id: i, + role: :admin + } + + {:ok, %{"subscribed" => topic}} = + Absinthe.run( + """ + subscription { + subscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """, + Schema, + context: %{actor: actor, pubsub: PubSub} + ) + end) + end + + if :same in List.wrap(types) do + Enum.each(1..input, fn _i -> + actor = %{ + id: -1, + role: :admin + } + + {:ok, %{"subscribed" => topic}} = + Absinthe.run( + """ + subscription { + subscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """, + Schema, + context: %{actor: actor, pubsub: PubSub} + ) + end) + end + end +) + +AshGraphql.Subscription.Batcher.drain() diff --git a/lib/graphql/resolver.ex b/lib/graphql/resolver.ex index 0a7b7d82..c3ef4b44 100644 --- a/lib/graphql/resolver.ex +++ b/lib/graphql/resolver.ex @@ -506,6 +506,166 @@ defmodule AshGraphql.Graphql.Resolver do end end + def resolve( + %{root_value: {:pre_resolved, item}} = resolution, + {_, _, %AshGraphql.Resource.Subscription{}, _} + ) do + Absinthe.Resolution.put_result( + resolution, + {:ok, item} + ) + end + + def resolve( + %{arguments: args, context: context, root_value: notifications} = resolution, + {domain, resource, + %AshGraphql.Resource.Subscription{read_action: read_action, name: name}, _input?} + ) + when is_list(notifications) do + case handle_arguments(resource, read_action, args) do + {:ok, args} -> + metadata = %{ + domain: domain, + resource: resource, + resource_short_name: Ash.Resource.Info.short_name(resource), + actor: Map.get(context, :actor), + tenant: Map.get(context, :tenant), + action: read_action, + source: :graphql, + subscription: name, + authorize?: AshGraphql.Domain.Info.authorize?(domain) + } + + trace domain, + resource, + :gql_subscription, + name, + metadata do + opts = [ + actor: Map.get(context, :actor), + action: read_action, + authorize?: AshGraphql.Domain.Info.authorize?(domain), + tenant: Map.get(context, :tenant) + ] + + subscription_events = + notifications + |> Enum.group_by(& &1.action.type) + |> Enum.map(fn {type, notifications} -> + subscription_field = subcription_field_from_action_type(type) + key = String.to_existing_atom(subscription_field) + + if type in [:create, :update] do + data = Enum.map(notifications, & &1.data) + {filter, args} = Map.pop(args, :filter) + + read_action = + read_action || Ash.Resource.Info.primary_action!(resource, :read).name + + # read the records that were just created/updated + query = + resource + |> Ash.Query.do_filter(massage_filter(resource, filter)) + |> Ash.Query.for_read(read_action, args, opts) + |> AshGraphql.Subscription.query_for_subscription( + domain, + resolution, + subscription_result_type(name), + [subscription_field] + ) + + query_with_authorization_rules = + Ash.can( + query, + opts[:actor], + tenant: opts[:tenant], + run_queries?: false, + alter_source?: true + ) + + current_filter = query.filter + + {known_results, need_refetch} = + case query_with_authorization_rules do + {:ok, true, %{authorize_results: [], filter: nil} = query} -> + {data, []} + + {:ok, true, + %{authorize_results: [], filter: %Ash.Filter{expression: nil}} = query} -> + {data, []} + + {:ok, true, %{authorize_results: []} = query} -> + Enum.reduce(data, {[], []}, fn record, {known, refetch} -> + case Ash.Expr.eval(query.filter, + record: data, + unknown_on_unknown_refs?: true + ) do + {:ok, true} -> + {[record | known], refetch} + + {:ok, false} -> + {known, refetch} + + _ -> + {known, [record | refetch]} + end + end) + + {:error, false, _} -> + {[], []} + + _ -> + {[], data} + end + + primary_key = Ash.Resource.Info.primary_key(resource) + + primary_key_matches = + Enum.map(need_refetch, fn record -> + Map.take(record, primary_key) + end) + + with {:ok, known_results} <- Ash.load(known_results, query), + {:ok, need_refetch} <- do_refetch(query, primary_key_matches) do + known_results + |> Stream.concat(need_refetch) + |> Enum.map(fn record -> + %{key => record} + end) + else + {:error, error} -> + # caught by the batch resolver + raise Ash.Error.to_error_class(error) + end + else + Enum.map(notifications, fn notification -> + %{type => AshGraphql.Resource.encode_id(notification.data, false)} + end) + end + end) + + case List.flatten(subscription_events) do + [] -> + Absinthe.Resolution.put_result( + resolution, + {:error, to_errors([Ash.Error.Query.NotFound.exception()], context, domain)} + ) + + [first | rest] -> + Process.put(:batch_resolved, rest) + + Absinthe.Resolution.put_result( + resolution, + {:ok, first} + ) + end + end + + {:error, error} -> + {:error, error} + end + end + def resolve( %{arguments: args, context: context, root_value: notification} = resolution, {domain, resource, @@ -631,6 +791,14 @@ defmodule AshGraphql.Graphql.Resolver do end end + defp do_refetch(_query, []) do + {:ok, []} + end + + defp do_refetch(query, primary_key_matches) do + Ash.read(Ash.Query.do_filter(query, or: primary_key_matches)) + end + defp subcription_field_from_action_type(:create), do: "created" defp subcription_field_from_action_type(:update), do: "updated" defp subcription_field_from_action_type(:destroy), do: "destroyed" diff --git a/lib/subscription/batcher.ex b/lib/subscription/batcher.ex index 6462d520..3c66ea50 100644 --- a/lib/subscription/batcher.ex +++ b/lib/subscription/batcher.ex @@ -6,17 +6,23 @@ defmodule AshGraphql.Subscription.Batcher do require Logger @compile {:inline, simulate_slowness: 0} - defstruct batches: %{}, total_count: 0, async_limit: 100, async_threshold: 50 + defstruct batches: %{}, total_count: 0, async_limit: 100, send_immediately_threshold: 50 defmodule Batch do - defstruct notifications: [], count: 0, pubsub: nil, key_strategy: nil, doc: nil, timer: nil + defstruct notifications: [], + count: 0, + pubsub: nil, + key_strategy: nil, + doc: nil, + timer: nil, + task: nil def add(batch, item) do %{batch | notifications: [item | batch.notifications], count: batch.count + 1} end end - def start_link(opts) do + def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: opts[:name] || __MODULE__) end @@ -42,62 +48,84 @@ defmodule AshGraphql.Subscription.Batcher do {:ok, %__MODULE__{ async_limit: config[:async_limit] || 100, - async_threshold: config[:async_threshold] || 50 + send_immediately_threshold: config[:send_immediately_threshold] || 50 }} end def handle_call(:drain, _from, state) do - {:reply, :done, send_all_batches(state)} + {:reply, :done, send_all_batches(state, false)} end def handle_call(:dump_state, _from, state) do {:reply, state, state} end - def handle_call({:publish, topic, notification, pubsub, key_strategy, doc}, from, state) do - simulate_slowness() - + def handle_call({:publish, topic, notification, pubsub, key_strategy, doc}, _from, state) do if state.total_count >= state.async_limit do {:reply, :backpressure_sync, state} else - GenServer.reply(from, :handled) - simulate_slowness() - state = put_notification(state, topic, pubsub, key_strategy, doc, notification) - - # if we have less than async threshold, we can process it eagerly - if state.total_count < state.async_threshold do - # so we eagerly process current_calls - state = eagerly_build_batches(state, state.async_threshold - state.total_count) - - # and if we still have less than the async threshold - if state.total_count < state.async_threshold do - # then we send all of our batches - {:noreply, send_all_batches(state)} - else - # otherwise we wait on the regularly scheduled push - {:noreply, ensure_timer(state, topic)} - end + {:reply, :handled, state, + {:continue, {:publish, topic, notification, pubsub, key_strategy, doc}}} + end + end + + def handle_continue({:publish, topic, notification, pubsub, key_strategy, doc}, state) do + state = put_notification(state, topic, pubsub, key_strategy, doc, notification) + + # if we have less than async threshold, we can process it eagerly + if state.total_count < state.send_immediately_threshold do + # so we eagerly process current_calls + state = eagerly_build_batches(state, state.send_immediately_threshold - state.total_count) + + # and if we still have less than the async threshold + if state.total_count < state.send_immediately_threshold do + # then we send all of our batches + {:noreply, send_all_batches(state, true)} else # otherwise we wait on the regularly scheduled push {:noreply, ensure_timer(state, topic)} end + else + # otherwise we wait on the regularly scheduled push + {:noreply, ensure_timer(state, topic)} end end - def handle_info({_task, {:sent, topic, count, _res}}, state) do + def handle_info({_task, {:sent, topic, _res}}, state) do + case state.batches[topic] do + %{timer: timer} when not is_nil(timer) -> + Process.cancel_timer(timer) + + _ -> + :ok + end + {:noreply, - %{state | total_count: state.total_count - count, batches: Map.delete(state.batches, topic)}} + %{ + state + | total_count: state.total_count - Map.get(state.batches[topic] || %{}, :count, 0), + batches: Map.delete(state.batches, topic) + }} + end + + def handle_info({:DOWN, _, _, _, :normal}, state) do + {:noreply, state} end def handle_info({:send_batch, topic}, state) do batch = state.batches[topic] - Task.async(fn -> - {:sent, topic, batch.count, - do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} - end) + if batch do + task = + Task.async(fn -> + {:sent, topic, + do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} + end) - {:noreply, state} + {:noreply, put_in(state.batches[topic].task, task)} + else + {:noreply, state} + end end defp eagerly_build_batches(state, 0), do: state @@ -126,48 +154,97 @@ defmodule AshGraphql.Subscription.Batcher do end end - defp send_all_batches(state) do - Enum.each(state.batches, fn {topic, batch} -> + defp send_all_batches(state, async?) do + state.batches + |> Enum.reject(fn {_, batch} -> + batch.task + end) + |> Enum.reduce(state, fn {topic, batch}, state -> if batch.timer do Process.cancel_timer(batch.timer) end - do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc) - end) + if async? do + task = + Task.async(fn -> + {:sent, topic, + do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} + end) + + put_in(state.batches[topic].task, task) + else + do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc) - %{state | batches: %{}, total_count: 0} + %{ + state + | batches: Map.delete(state.batches, topic), + total_count: state.total_count - batch.count + } + end + end) end defp do_send(topic, notifications, pubsub, key_strategy, doc) do - # Refactor to do batch resolution - notifications - |> Enum.reverse() - |> Enum.each(fn notification -> - try do - pipeline = - Absinthe.Subscription.Local.pipeline(doc, notification) - - {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) - - Logger.debug(""" - Absinthe Subscription Publication - Field Topic: #{inspect(key_strategy)} - Subscription id: #{inspect(topic)} - Data: #{inspect(data)} - """) - - case should_send?(data) do - false -> - :ok - - true -> - :ok = pubsub.publish_subscription(topic, data) - end - rescue - e -> - BatchResolver.pipeline_error(e, __STACKTRACE__) + # This is a temporary and very hacky way of doing this + # we pass in the notifications as a list as the root data + # The resolver then returns the *first* one, and puts a list + # of notifications in the process dictionary. Those will be + # passed in *again* to the resolution step, to be returned + # as-is. Its gross, and I hate it, but it is better than forcing + # individual resolution :) + + simulate_slowness() + + pipeline = + Absinthe.Subscription.Local.pipeline(doc, notifications) + + first_results = + case Absinthe.Pipeline.run(doc.source, pipeline) do + {:ok, %{result: data}, _} -> + if should_send?(data) do + [List.wrap(data)] + else + [] + end + + {:error, error} -> + raise Ash.Error.to_error_class(error) end - end) + + result = + case List.wrap(Process.get(:batch_resolved)) do + [] -> + first_results + + batch -> + batch = + Enum.map(batch, fn item -> + pipeline = + Absinthe.Subscription.Local.pipeline(doc, {:pre_resolved, item}) + + {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) + + data + end) + + [batch] ++ first_results + end + + Logger.debug(""" + Absinthe Subscription Publication + Field Topic: #{inspect(key_strategy)} + Subscription id: #{inspect(topic)} + Notification Count: #{Enum.count(notifications)} + """) + + for batch <- result, record <- batch, not is_nil(record) do + :ok = pubsub.publish_subscription(topic, record) + end + rescue + e -> + BatchResolver.pipeline_error(e, __STACKTRACE__) + after + Process.delete(:batch_resolved) end defp put_notification(state, topic, pubsub, key_strategy, doc, notification) do @@ -197,7 +274,9 @@ defmodule AshGraphql.Subscription.Batcher do # and the user can not really do anything usefull with it not (errors |> List.wrap() - |> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found", nil] end)) + |> Enum.any?(fn error -> + Map.get(error, :code) in ["forbidden", "not_found", nil] + end)) end defp should_send?(_), do: true diff --git a/mix.exs b/mix.exs index 6b8f8762..ce1d43db 100644 --- a/mix.exs +++ b/mix.exs @@ -156,7 +156,8 @@ defmodule AshGraphql.MixProject do {:git_ops, "~> 2.5", only: [:dev, :test]}, {:mix_test_watch, "~> 1.0", only: :dev, runtime: false}, {:simple_sat, ">= 0.0.0", only: :test}, - {:mix_audit, ">= 0.0.0", only: [:dev, :test], runtime: false} + {:mix_audit, ">= 0.0.0", only: [:dev, :test], runtime: false}, + {:benchee, "~> 1.1", only: [:dev, :test]} ] end diff --git a/mix.lock b/mix.lock index 05f5096e..29cabb49 100644 --- a/mix.lock +++ b/mix.lock @@ -3,11 +3,13 @@ "absinthe_phoenix": {:hex, :absinthe_phoenix, "2.0.3", "74e0862f280424b7bc290f6f69e133268bce0b4e7db0218c7e129c5c2b1d3fd4", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:absinthe_plug, "~> 1.5", [hex: :absinthe_plug, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.5", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.13 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}], "hexpm", "caffaea03c17ea7419fe07e4bc04c2399c47f0d8736900623dbf4749a826fd2c"}, "absinthe_plug": {:hex, :absinthe_plug, "1.5.8", "38d230641ba9dca8f72f1fed2dfc8abd53b3907d1996363da32434ab6ee5d6ab", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "bbb04176647b735828861e7b2705465e53e2cf54ccf5a73ddd1ebd855f996e5a"}, "ash": {:hex, :ash, "3.4.22", "e292e40cae558c486bb23da656b564c3bb5fb551dbd2aeae54c879b408c91844", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.3.36 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:owl, "~> 0.11", [hex: :owl, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.9", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.2.29 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6ca9be81d06ab07cb2f4d14132b085dc421f3f400b7aa0148b9fd7d575499efc"}, + "benchee": {:hex, :benchee, "1.3.1", "c786e6a76321121a44229dde3988fc772bca73ea75170a73fd5f4ddf1af95ccf", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "76224c58ea1d0391c8309a8ecbfe27d71062878f59bd41a390266bf4ac1cc56d"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "castore": {:hex, :castore, "1.0.9", "5cc77474afadf02c7c017823f460a17daa7908e991b0cc917febc90e466a375c", [:mix], [], "hexpm", "5ea956504f1ba6f2b4eb707061d8e17870de2bee95fb59d512872c2ef06925e7"}, "credo": {:hex, :credo, "1.7.8", "9722ba1681e973025908d542ec3d95db5f9c549251ba5b028e251ad8c24ab8c5", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cb9e87cc64f152f3ed1c6e325e7b894dea8f5ef2e41123bd864e3cd5ceb44968"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"}, + "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "ecto": {:hex, :ecto, "3.12.3", "1a9111560731f6c3606924c81c870a68a34c819f6d4f03822f370ea31a582208", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9efd91506ae722f95e48dc49e70d0cb632ede3b7a23896252a60a14ac6d59165"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, @@ -43,6 +45,7 @@ "spark": {:hex, :spark, "2.2.31", "ce58988f5b34b96bb01cfc5399a5ddc24a7a5bcf0ae7003503678f3466d7779a", [:mix], [{:igniter, ">= 0.3.36 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "1b4fdf2e0bba79a2e6417428f79dc8b7f7b01f02cdb632526071052d6cbfdfec"}, "spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"}, "splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"}, + "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, "stream_data": {:hex, :stream_data, "1.1.1", "fd515ca95619cca83ba08b20f5e814aaf1e5ebff114659dc9731f966c9226246", [:mix], [], "hexpm", "45d0cd46bd06738463fd53f22b70042dbb58c384bb99ef4e7576e7bb7d3b8c8c"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, diff --git a/test/subscription_test.exs b/test/subscription_test.exs index e5ffe1dd..30ef0e75 100644 --- a/test/subscription_test.exs +++ b/test/subscription_test.exs @@ -389,7 +389,7 @@ defmodule AshGraphql.SubscriptionTest do test "it aggregates multiple messages" do stop_supervised(AshGraphql.Subscription.Batcher) - start_supervised({AshGraphql.Subscription.Batcher, [async_threshold: 0]}) + start_supervised({AshGraphql.Subscription.Batcher, [send_immediately_threshold: 0]}) Application.put_env(:ash_graphql, :simulate_subscription_processing_time, 1000) @@ -442,8 +442,10 @@ defmodule AshGraphql.SubscriptionTest do context: %{actor: @admin} ) - assert GenServer.call(AshGraphql.Subscription.Batcher, :dump_state, :infinity).total_count == - 2 + state = GenServer.call(AshGraphql.Subscription.Batcher, :dump_state, :infinity) + assert state.total_count == 2 + + assert Enum.count(state.batches) == 1 assert Enum.empty?(mutation_result["createSubscribable"]["errors"]) From 3a7a0a1d28972b8d0184676705718853d6a9f16f Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Sat, 5 Oct 2024 17:05:41 +0200 Subject: [PATCH 4/8] Subscription batcher finish (#220) --- config/config.exs | 2 - .../topics/use-subscriptions-with-graphql.md | 27 ++++- lib/graphql/resolver.ex | 2 +- lib/subscription/batcher.ex | 100 +++++++++++++----- lib/subscription/runner.ex | 1 - test/subscription_test.exs | 60 ++++++++++- 6 files changed, 161 insertions(+), 31 deletions(-) diff --git a/config/config.exs b/config/config.exs index c966d1d6..cf144249 100644 --- a/config/config.exs +++ b/config/config.exs @@ -4,8 +4,6 @@ config :ash, :disable_async?, true config :ash, :validate_domain_resource_inclusion?, false config :ash, :validate_domain_config_inclusion?, false -config :logger, level: :warning - config :ash, :pub_sub, debug?: true config :logger, level: :info diff --git a/documentation/topics/use-subscriptions-with-graphql.md b/documentation/topics/use-subscriptions-with-graphql.md index b172f791..0f51734a 100644 --- a/documentation/topics/use-subscriptions-with-graphql.md +++ b/documentation/topics/use-subscriptions-with-graphql.md @@ -32,6 +32,11 @@ end The subscription DSL is currently in beta and before using it you have to enable them in your config. +> ### Subscription response order {: .warning} +> +> The order in which the subscription responses are sent to the client is not guaranteed to be the +> same as the order in which the mutations were executed. + ```elixir config :ash_graphql, :policies, show_policy_breakdowns?: true ``` @@ -39,7 +44,27 @@ config :ash_graphql, :policies, show_policy_breakdowns?: true First you'll need to do some setup, follow the the [setup guide](https://hexdocs.pm/absinthe/subscriptions.html#absinthe-phoenix-setup) in the absinthe docs, but instead of using `Absinthe.Pheonix.Endpoint` use `AshGraphql.Subscription.Endpoint`. -Afterwards add an empty subscription block to your schema module. +By default subscriptions are resolved synchronously as part of the mutation. This means that a resolver is run for every subscriber that +is not deduplicated. If you have a lot of subscribers you can add the `AshGraphql.Subscription.Batcher` to your supervision tree, which +batches up notifications and runs subscription resolution out-of-band. + +```elixir + @impl true + def start(_type, _args) do + children = [ + ..., + {Absinthe.Subscription, MyAppWeb.Endpoint}, + AshGraphql.Subscription.Batcher + ] + + # See https://hexdocs.pm/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: MyAppWeb.Supervisor] + Supervisor.start_link(children, opts) + end +``` + +Afterwards, add an empty subscription block to your schema module. ```elixir defmodule MyAppWeb.Schema do diff --git a/lib/graphql/resolver.ex b/lib/graphql/resolver.ex index c3ef4b44..90589c9f 100644 --- a/lib/graphql/resolver.ex +++ b/lib/graphql/resolver.ex @@ -639,7 +639,7 @@ defmodule AshGraphql.Graphql.Resolver do end else Enum.map(notifications, fn notification -> - %{type => AshGraphql.Resource.encode_id(notification.data, false)} + %{key => AshGraphql.Resource.encode_id(notification.data, false)} end) end end) diff --git a/lib/subscription/batcher.ex b/lib/subscription/batcher.ex index 3c66ea50..8fe8c75b 100644 --- a/lib/subscription/batcher.ex +++ b/lib/subscription/batcher.ex @@ -1,4 +1,8 @@ defmodule AshGraphql.Subscription.Batcher do + @moduledoc """ + If started as a GenServer, this module will batch notifications and send them in bulk. + Otherwise, it will send them immediately. + """ use GenServer alias Absinthe.Pipeline.BatchResolver @@ -6,20 +10,35 @@ defmodule AshGraphql.Subscription.Batcher do require Logger @compile {:inline, simulate_slowness: 0} - defstruct batches: %{}, total_count: 0, async_limit: 100, send_immediately_threshold: 50 + defstruct batches: %{}, + in_progress_batches: %{}, + total_count: 0, + async_limit: 100, + send_immediately_threshold: 50, + subscription_batch_interval: 1000 defmodule Batch do + @moduledoc false defstruct notifications: [], count: 0, pubsub: nil, key_strategy: nil, doc: nil, - timer: nil, - task: nil + timer: nil def add(batch, item) do %{batch | notifications: [item | batch.notifications], count: batch.count + 1} end + + def remove(batch, items) when is_list(items) do + %{ + batch + | notifications: Enum.reject(batch.notifications, &(&1 in items)), + count: batch.count - length(items) + } + end + + def remove(batch, item), do: remove(batch, [item]) end def start_link(opts \\ []) do @@ -42,13 +61,29 @@ defmodule AshGraphql.Subscription.Batcher do :backpressure_sync -> do_send(topic, [notification], pubsub, key_strategy, doc) end + catch + :exit, {:noproc, _} -> + do_send(topic, [notification], pubsub, key_strategy, doc) end + @doc """ + Config options + + `async_limit` (default 100): + if there are more than `async_limit` notifications, we will start to backpressure + + `send_immediately_threshold` (default 50): + if there are less then `send_immediately_threshold` notifications, we will send them immediately + + `subscription_batch_interval` (default 1000): + the interval in milliseconds the batcher waits for new notifications before sending them + """ def init(config) do {:ok, %__MODULE__{ async_limit: config[:async_limit] || 100, - send_immediately_threshold: config[:send_immediately_threshold] || 50 + send_immediately_threshold: config[:send_immediately_threshold] || 50, + subscription_batch_interval: config[:send_immediately_threshold] || 1000 }} end @@ -91,21 +126,26 @@ defmodule AshGraphql.Subscription.Batcher do end end - def handle_info({_task, {:sent, topic, _res}}, state) do - case state.batches[topic] do + def handle_info({task, {:sent, _topic, _res}}, state) do + batch = state.in_progress_batches[task] + + state = + %{ + state + | total_count: state.total_count - batch.count, + in_progress_batches: Map.delete(state.in_progress_batches, task) + } + + case batch do %{timer: timer} when not is_nil(timer) -> Process.cancel_timer(timer) + :ok _ -> :ok end - {:noreply, - %{ - state - | total_count: state.total_count - Map.get(state.batches[topic] || %{}, :count, 0), - batches: Map.delete(state.batches, topic) - }} + {:noreply, state} end def handle_info({:DOWN, _, _, _, :normal}, state) do @@ -115,6 +155,7 @@ defmodule AshGraphql.Subscription.Batcher do def handle_info({:send_batch, topic}, state) do batch = state.batches[topic] + # only run one task per topic at a time if batch do task = Task.async(fn -> @@ -122,7 +163,12 @@ defmodule AshGraphql.Subscription.Batcher do do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} end) - {:noreply, put_in(state.batches[topic].task, task)} + {:noreply, + %{ + state + | batches: Map.delete(state.batches, topic), + in_progress_batches: Map.put(state.in_progress_batches, task.ref, batch) + }} else {:noreply, state} end @@ -156,9 +202,6 @@ defmodule AshGraphql.Subscription.Batcher do defp send_all_batches(state, async?) do state.batches - |> Enum.reject(fn {_, batch} -> - batch.task - end) |> Enum.reduce(state, fn {topic, batch}, state -> if batch.timer do Process.cancel_timer(batch.timer) @@ -171,7 +214,11 @@ defmodule AshGraphql.Subscription.Batcher do do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc)} end) - put_in(state.batches[topic].task, task) + %{ + state + | batches: Map.delete(state.batches, topic), + in_progress_batches: Map.put(state.in_progress_batches, task.ref, batch) + } else do_send(topic, batch.notifications, batch.pubsub, batch.key_strategy, batch.doc) @@ -256,12 +303,19 @@ defmodule AshGraphql.Subscription.Batcher do |> then(&%{state | batches: &1, total_count: state.total_count + 1}) end - defp ensure_timer(%{batches: batches} = state, topic) do - if batches[topic].timer do + defp ensure_timer( + %{batches: batches, subscription_batch_interval: subscription_batch_interval} = state, + topic + ) do + if not is_nil(batches[topic].timer) and Process.read_timer(batches[topic].timer) do state else - # TODO: this interval should be configurable - timer = Process.send_after(self(), {:send_batch, topic}, 1000) + timer = + Process.send_after( + self(), + {:send_batch, topic}, + subscription_batch_interval + ) put_in(state.batches[topic].timer, timer) end @@ -274,9 +328,7 @@ defmodule AshGraphql.Subscription.Batcher do # and the user can not really do anything usefull with it not (errors |> List.wrap() - |> Enum.any?(fn error -> - Map.get(error, :code) in ["forbidden", "not_found", nil] - end)) + |> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found", nil] end)) end defp should_send?(_), do: true diff --git a/lib/subscription/runner.ex b/lib/subscription/runner.ex index 0d8cce61..031612bf 100644 --- a/lib/subscription/runner.ex +++ b/lib/subscription/runner.ex @@ -5,7 +5,6 @@ defmodule AshGraphql.Subscription.Runner do Mostly a copy of https://github.com/absinthe-graphql/absinthe/blob/3d0823bd71c2ebb94357a5588c723e053de8c66a/lib/absinthe/subscription/local.ex#L40 but this lets us decide if we want to send the data to the client or not in certain error cases """ - require Logger def run_docset(pubsub, docs_and_topics, notification) do diff --git a/test/subscription_test.exs b/test/subscription_test.exs index 30ef0e75..45c46c8f 100644 --- a/test/subscription_test.exs +++ b/test/subscription_test.exs @@ -452,8 +452,9 @@ defmodule AshGraphql.SubscriptionTest do subscribable_id2 = mutation_result["createSubscribable"]["result"]["id"] refute is_nil(subscribable_id) - assert_receive({^topic, %{data: subscription_data}}) - assert_receive({^topic, %{data: subscription_data2}}) + # wait for 2 seconds (process timer + simulated processing time) + assert_receive({^topic, %{data: subscription_data}}, 2000) + assert_receive({^topic, %{data: subscription_data2}}, 2000) refute_received({^topic, _}) assert subscribable_id == @@ -462,4 +463,59 @@ defmodule AshGraphql.SubscriptionTest do assert subscribable_id2 == subscription_data2["subscribableEvents"]["created"]["id"] end + + test "subscription is resolved synchronously" do + stop_supervised(AshGraphql.Subscription.Batcher) + + assert is_nil(Process.whereis(AshGraphql.Subscription.Batcher)) + + assert {:ok, %{"subscribed" => topic}} = + Absinthe.run( + """ + subscription { + subscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """, + Schema, + context: %{actor: @admin, pubsub: PubSub} + ) + + create_mutation = """ + mutation CreateSubscribable($input: CreateSubscribableInput) { + createSubscribable(input: $input) { + result{ + id + text + } + errors{ + message + } + } + } + """ + + assert {:ok, %{data: mutation_result}} = + Absinthe.run(create_mutation, Schema, + variables: %{"input" => %{"text" => "foo"}}, + context: %{actor: @admin} + ) + + subscribable_id = mutation_result["createSubscribable"]["result"]["id"] + + assert_receive({^topic, %{data: subscription_data}}) + refute_received({^topic, _}) + + assert subscribable_id == + subscription_data["subscribableEvents"]["created"]["id"] + end end From e2e2a06c3a287c7291605256c6e2f97bf46a4a49 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Mon, 7 Oct 2024 14:18:57 +0200 Subject: [PATCH 5/8] cleanup --- benchmarks/subscriptions.exs | 10 +++++----- lib/subscription/batcher.ex | 2 +- test/subscription_test.exs | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/benchmarks/subscriptions.exs b/benchmarks/subscriptions.exs index a2267bb2..0bb8684e 100644 --- a/benchmarks/subscriptions.exs +++ b/benchmarks/subscriptions.exs @@ -1,8 +1,8 @@ alias AshGraphql.Test.PubSub alias AshGraphql.Test.Schema -{:ok, pubsub} = PubSub.start_link() -{:ok, absinthe_sub} = Absinthe.Subscription.start_link(PubSub) +{:ok, _pubsub} = PubSub.start_link() +{:ok, _absinthe_sub} = Absinthe.Subscription.start_link(PubSub) # Application.put_env(:ash_graphql, :simulate_subscription_processing_time, 1000) :ok @@ -46,7 +46,7 @@ Benchee.run( after_scenario: fn _ -> count = fn counter -> receive do - msg -> + _msg -> 1 + counter.(counter) after 0 -> 0 @@ -67,7 +67,7 @@ Benchee.run( role: :admin } - {:ok, %{"subscribed" => topic}} = + {:ok, %{"subscribed" => _topic}} = Absinthe.run( """ subscription { @@ -97,7 +97,7 @@ Benchee.run( role: :admin } - {:ok, %{"subscribed" => topic}} = + {:ok, %{"subscribed" => _topic}} = Absinthe.run( """ subscription { diff --git a/lib/subscription/batcher.ex b/lib/subscription/batcher.ex index 8fe8c75b..b8fa9d0e 100644 --- a/lib/subscription/batcher.ex +++ b/lib/subscription/batcher.ex @@ -83,7 +83,7 @@ defmodule AshGraphql.Subscription.Batcher do %__MODULE__{ async_limit: config[:async_limit] || 100, send_immediately_threshold: config[:send_immediately_threshold] || 50, - subscription_batch_interval: config[:send_immediately_threshold] || 1000 + subscription_batch_interval: config[:subscription_batch_interval] || 1000 }} end diff --git a/test/subscription_test.exs b/test/subscription_test.exs index 45c46c8f..34d8023d 100644 --- a/test/subscription_test.exs +++ b/test/subscription_test.exs @@ -452,9 +452,9 @@ defmodule AshGraphql.SubscriptionTest do subscribable_id2 = mutation_result["createSubscribable"]["result"]["id"] refute is_nil(subscribable_id) - # wait for 2 seconds (process timer + simulated processing time) - assert_receive({^topic, %{data: subscription_data}}, 2000) - assert_receive({^topic, %{data: subscription_data2}}, 2000) + # wait for up to 3 seconds (process timer + simulated processing time + wiggle room) + assert_receive({^topic, %{data: subscription_data}}, 3000) + assert_receive({^topic, %{data: subscription_data2}}, 3000) refute_received({^topic, _}) assert subscribable_id == From a79d1cee1092caf0d8e9352da6b22acdcd5d015c Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Mon, 7 Oct 2024 14:23:11 +0200 Subject: [PATCH 6/8] update ash --- mix.lock | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mix.lock b/mix.lock index 29cabb49..3565670d 100644 --- a/mix.lock +++ b/mix.lock @@ -2,16 +2,16 @@ "absinthe": {:hex, :absinthe, "1.7.8", "43443d12ad2b4fcce60e257ac71caf3081f3d5c4ddd5eac63a02628bcaf5b556", [:mix], [{:dataloader, "~> 1.0.0 or ~> 2.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:opentelemetry_process_propagator, "~> 0.2.1 or ~> 0.3", [hex: :opentelemetry_process_propagator, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c4085df201892a498384f997649aedb37a4ce8a726c170d5b5617ed3bf45d40b"}, "absinthe_phoenix": {:hex, :absinthe_phoenix, "2.0.3", "74e0862f280424b7bc290f6f69e133268bce0b4e7db0218c7e129c5c2b1d3fd4", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:absinthe_plug, "~> 1.5", [hex: :absinthe_plug, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.5", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.13 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}], "hexpm", "caffaea03c17ea7419fe07e4bc04c2399c47f0d8736900623dbf4749a826fd2c"}, "absinthe_plug": {:hex, :absinthe_plug, "1.5.8", "38d230641ba9dca8f72f1fed2dfc8abd53b3907d1996363da32434ab6ee5d6ab", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "bbb04176647b735828861e7b2705465e53e2cf54ccf5a73ddd1ebd855f996e5a"}, - "ash": {:hex, :ash, "3.4.22", "e292e40cae558c486bb23da656b564c3bb5fb551dbd2aeae54c879b408c91844", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.3.36 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:owl, "~> 0.11", [hex: :owl, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.9", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.2.29 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6ca9be81d06ab07cb2f4d14132b085dc421f3f400b7aa0148b9fd7d575499efc"}, + "ash": {:hex, :ash, "3.4.23", "92fca0a8e7949435e6092a0960c815328f3e9d29b686b3d0bce4ee316d140bd4", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.3.36 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:owl, "~> 0.11", [hex: :owl, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.9", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.2.29 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "41b7b34e7d744a91a01b0250ccda3856a4f44fa5f883e27f7e07aeef689b5f2b"}, "benchee": {:hex, :benchee, "1.3.1", "c786e6a76321121a44229dde3988fc772bca73ea75170a73fd5f4ddf1af95ccf", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "76224c58ea1d0391c8309a8ecbfe27d71062878f59bd41a390266bf4ac1cc56d"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "castore": {:hex, :castore, "1.0.9", "5cc77474afadf02c7c017823f460a17daa7908e991b0cc917febc90e466a375c", [:mix], [], "hexpm", "5ea956504f1ba6f2b4eb707061d8e17870de2bee95fb59d512872c2ef06925e7"}, "credo": {:hex, :credo, "1.7.8", "9722ba1681e973025908d542ec3d95db5f9c549251ba5b028e251ad8c24ab8c5", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cb9e87cc64f152f3ed1c6e325e7b894dea8f5ef2e41123bd864e3cd5ceb44968"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, - "dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, - "ecto": {:hex, :ecto, "3.12.3", "1a9111560731f6c3606924c81c870a68a34c819f6d4f03822f370ea31a582208", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9efd91506ae722f95e48dc49e70d0cb632ede3b7a23896252a60a14ac6d59165"}, + "ecto": {:hex, :ecto, "3.12.4", "267c94d9f2969e6acc4dd5e3e3af5b05cdae89a4d549925f3008b2b7eb0b93c3", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ef04e4101688a67d061e1b10d7bc1fbf00d1d13c17eef08b71d070ff9188f747"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ets": {:hex, :ets, "0.9.0", "79c6a6c205436780486f72d84230c6cba2f8a9920456750ddd1e47389107d5fd", [:mix], [], "hexpm", "2861fdfb04bcaeff370f1a5904eec864f0a56dcfebe5921ea9aadf2a481c822b"}, "ex_check": {:hex, :ex_check, "0.16.0", "07615bef493c5b8d12d5119de3914274277299c6483989e52b0f6b8358a26b5f", [:mix], [], "hexpm", "4d809b72a18d405514dda4809257d8e665ae7cf37a7aee3be6b74a34dec310f5"}, @@ -20,7 +20,7 @@ "git_cli": {:hex, :git_cli, "0.3.0", "a5422f9b95c99483385b976f5d43f7e8233283a47cda13533d7c16131cb14df5", [:mix], [], "hexpm", "78cb952f4c86a41f4d3511f1d3ecb28edb268e3a7df278de2faa1bd4672eaf9b"}, "git_ops": {:hex, :git_ops, "2.6.1", "cc7799a68c26cf814d6d1a5121415b4f5bf813de200908f930b27a2f1fe9dad5", [:mix], [{:git_cli, "~> 0.2", [hex: :git_cli, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "ce62d07e41fe993ec22c35d5edb11cf333a21ddaead6f5d9868fcb607d42039e"}, "glob_ex": {:hex, :glob_ex, "0.1.9", "b97a25392f5339e49f587e5b24c468c6a4f38299febd5ec85c5f8bb2e42b5c1e", [:mix], [], "hexpm", "be72e584ad1d8776a4d134d4b6da1bac8b80b515cdadf0120e0920b9978d7f01"}, - "igniter": {:hex, :igniter, "0.3.45", "f487138ed0c5cbf8f1bdd53360cdc2ac40c18ff379c8a575a7a34e526b4ba846", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:rewrite, "~> 0.9", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "fbe663e3f4566fb358c64bdddf0ceb05cf9175cea34cccf45a9aa5532ea967f8"}, + "igniter": {:hex, :igniter, "0.3.49", "da3ce1ff42a8ba61cda462cbd46aafad9ac09be9b6fe92cbcd9a25ebf914f32f", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:rewrite, "~> 0.9", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "4b2693347fdf51da6e921a5bc067fefb22698bd92c1597d06a27dc06ec686b26"}, "iterex": {:hex, :iterex, "0.1.2", "58f9b9b9a22a55cbfc7b5234a9c9c63eaac26d276b3db80936c0e1c60355a5a6", [:mix], [], "hexpm", "2e103b8bcc81757a9af121f6dc0df312c9a17220f302b1193ef720460d03029d"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "libgraph": {:hex, :libgraph, "0.16.0", "3936f3eca6ef826e08880230f806bfea13193e49bf153f93edcf0239d4fd1d07", [:mix], [], "hexpm", "41ca92240e8a4138c30a7e06466acc709b0cbb795c643e9e17174a178982d6bf"}, @@ -31,7 +31,7 @@ "mix_audit": {:hex, :mix_audit, "2.1.4", "0a23d5b07350cdd69001c13882a4f5fb9f90fbd4cbf2ebc190a2ee0d187ea3e9", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "fd807653cc8c1cada2911129c7eb9e985e3cc76ebf26f4dd628bb25bbcaa7099"}, "mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, - "owl": {:hex, :owl, "0.11.0", "2cd46185d330aa2400f1c8c3cddf8d2ff6320baeff23321d1810e58127082cae", [:mix], [{:ucwidth, "~> 0.2", [hex: :ucwidth, repo: "hexpm", optional: true]}], "hexpm", "73f5783f0e963cc04a061be717a0dbb3e49ae0c4bfd55fb4b78ece8d33a65efe"}, + "owl": {:hex, :owl, "0.12.0", "0c4b48f90797a7f5f09ebd67ba7ebdc20761c3ec9c7928dfcafcb6d3c2d25c99", [:mix], [{:ucwidth, "~> 0.2", [hex: :ucwidth, repo: "hexpm", optional: true]}], "hexpm", "241d85ae62824dd72f9b2e4a5ba4e69ebb9960089a3c68ce6c1ddf2073db3c15"}, "phoenix": {:hex, :phoenix, "1.7.14", "a7d0b3f1bc95987044ddada111e77bd7f75646a08518942c72a8440278ae7825", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "c7859bc56cc5dfef19ecfc240775dae358cbaa530231118a9e014df392ace61a"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"}, "phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"}, @@ -46,7 +46,7 @@ "spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"}, "splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, - "stream_data": {:hex, :stream_data, "1.1.1", "fd515ca95619cca83ba08b20f5e814aaf1e5ebff114659dc9731f966c9226246", [:mix], [], "hexpm", "45d0cd46bd06738463fd53f22b70042dbb58c384bb99ef4e7576e7bb7d3b8c8c"}, + "stream_data": {:hex, :stream_data, "1.1.2", "05499eaec0443349ff877aaabc6e194e82bda6799b9ce6aaa1aadac15a9fdb4d", [:mix], [], "hexpm", "129558d2c77cbc1eb2f4747acbbea79e181a5da51108457000020a906813a1a9"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.7", "65fa74042530064ef0570b75b43f5c49bb8b235d6515671b3d250022cb8a1f9e", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d0f478ee64deddfec64b800673fd6e0c8888b079d9f3444dd96d2a98383bdbd1"}, From 88c236653ecef696635efcf01f046c07ae5dcd2c Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Mon, 7 Oct 2024 15:01:39 +0200 Subject: [PATCH 7/8] remove unused function --- lib/subscription/batcher.ex | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/lib/subscription/batcher.ex b/lib/subscription/batcher.ex index b8fa9d0e..e1f9f128 100644 --- a/lib/subscription/batcher.ex +++ b/lib/subscription/batcher.ex @@ -29,16 +29,6 @@ defmodule AshGraphql.Subscription.Batcher do def add(batch, item) do %{batch | notifications: [item | batch.notifications], count: batch.count + 1} end - - def remove(batch, items) when is_list(items) do - %{ - batch - | notifications: Enum.reject(batch.notifications, &(&1 in items)), - count: batch.count - length(items) - } - end - - def remove(batch, item), do: remove(batch, [item]) end def start_link(opts \\ []) do From 65ff695140a104130426708f96bc226bb1175469 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Mon, 7 Oct 2024 15:15:09 +0200 Subject: [PATCH 8/8] remove unnecessary drain calls --- test/subscription_test.exs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/test/subscription_test.exs b/test/subscription_test.exs index 34d8023d..fdae16eb 100644 --- a/test/subscription_test.exs +++ b/test/subscription_test.exs @@ -107,8 +107,6 @@ defmodule AshGraphql.SubscriptionTest do assert Enum.empty?(mutation_result["updateSubscribable"]["errors"]) - AshGraphql.Subscription.Batcher.drain() - assert_receive({^topic, %{data: subscription_data}}) assert subscription_data["subscribableEvents"]["updated"]["text"] == "bar" @@ -132,8 +130,6 @@ defmodule AshGraphql.SubscriptionTest do context: %{actor: @admin} ) - AshGraphql.Subscription.Batcher.drain() - assert Enum.empty?(mutation_result["destroySubscribable"]["errors"]) assert_receive({^topic, %{data: subscription_data}}) @@ -201,8 +197,6 @@ defmodule AshGraphql.SubscriptionTest do |> Ash.Changeset.for_create(:create, %{text: "foo", actor_id: 1}, actor: @admin) |> Ash.create!() - AshGraphql.Subscription.Batcher.drain() - # actor1 will get data because it can see the resource assert_receive {^topic1, %{data: subscription_data}} # actor 2 will not get data because it cannot see the resource @@ -260,8 +254,6 @@ defmodule AshGraphql.SubscriptionTest do |> Ash.Changeset.for_create(:create, %{text: "foo", actor_id: 1}, actor: @admin) |> Ash.create!() - AshGraphql.Subscription.Batcher.drain() - assert_receive {^topic1, %{data: subscription_data}} assert subscribable.id == @@ -300,8 +292,6 @@ defmodule AshGraphql.SubscriptionTest do ) |> Ash.create!() - AshGraphql.Subscription.Batcher.drain() - assert_receive {^topic, %{data: subscription_data}} assert subscribable.id == @@ -339,8 +329,6 @@ defmodule AshGraphql.SubscriptionTest do ) |> Ash.create!() - AshGraphql.Subscription.Batcher.drain() - assert_receive {^topic, %{data: subscription_data}} assert subscribable.id == @@ -378,8 +366,6 @@ defmodule AshGraphql.SubscriptionTest do ) |> Ash.create!() - AshGraphql.Subscription.Batcher.drain() - assert_receive {^topic, %{data: subscription_data, errors: errors}} assert is_nil(subscription_data["subscribedOnDomain"]["created"])