diff --git a/guides/Usage.md b/guides/Usage.md index cc0bfc82..b5de7a93 100644 --- a/guides/Usage.md +++ b/guides/Usage.md @@ -275,3 +275,78 @@ Hard delete a stream that should exist: ```elixir :ok = MyApp.EventStore.delete_stream("stream2", :stream_exists, :hard) ``` + +## Telemetry + +EventStore emits `:telemetry` events for public operations. Each instrumented +operation publishes `:start`, `:stop`, and `:exception` events under the +`[:eventstore, operation, suffix]` namespace. + +The first pass covers these operations: + +- `:append_to_stream` +- `:link_to_stream` +- `:read_stream_forward` +- `:read_stream_backward` +- `:delete_stream` +- `:paginate_streams` +- `:subscribe_to_stream` +- `:delete_subscription` +- `:read_snapshot` +- `:record_snapshot` +- `:delete_snapshot` +- `:stream_batch_read` + +Alias operations reuse the same event names. For example, +`read_all_streams_forward/3` emits `[:eventstore, :read_stream_forward, ...]` +with `stream_uuid: "$all"` in the metadata, and +`subscribe_to_all_streams/3` emits `[:eventstore, :subscribe_to_stream, ...]` +with the same stream identifier. + +Stop metadata includes a normalized `:result` for all instrumented operations. +Operations that return `:ok` emit `result: :ok`. Operations that return +`{:ok, value}` also emit `result: :ok` so telemetry does not copy returned +payloads such as event lists or subscription structs into metadata. When an +operation returns `{:error, reason}`, stop metadata includes +`result: {:error, reason}`. + +Lazy stream APIs do not emit `:stream_forward` or `:stream_backward` spans. +Instead, `stream_forward/3`, `stream_backward/3`, `stream_all_forward/2`, and +`stream_all_backward/2` emit `[:eventstore, :stream_batch_read, ...]` once per +batch read performed during enumeration. These events include `:direction`, +`:start_version`, and `:requested_batch_size` in start metadata, and add +`:event_count` plus `:result` in stop metadata. Forward streaming may emit a +final batch read with `event_count: 0` to detect completion. + +Measurements: + +- `:start` includes `%{system_time: System.system_time(), monotonic_time: native_time}` +- `:stop` includes `%{duration: native_time, monotonic_time: native_time}` +- `:exception` includes `%{duration: native_time, monotonic_time: native_time}` + +Metadata always includes `:event_store`. Depending on the operation it may also +include fields such as `:name`, `:stream_uuid`, `:expected_version`, +`:event_count`, `:count`, `:start_version`, `:delete_type`, `:result`, +`:subscription_name`, `:source_uuid`, and pagination options. Because +EventStore now uses `:telemetry.span/3`, emitted metadata also includes a +`telemetry_span_context` key so handlers can correlate start/stop/exception +events for the same operation execution. + +Example handler: + +```elixir +events = [ + [:eventstore, :append_to_stream, :start], + [:eventstore, :append_to_stream, :stop], + [:eventstore, :append_to_stream, :exception] +] + +:telemetry.attach_many( + "my-app-eventstore", + events, + fn event_name, measurements, metadata, _config -> + IO.inspect({event_name, measurements, metadata}, label: "eventstore.telemetry") + end, + nil +) +``` diff --git a/lib/event_store.ex b/lib/event_store.ex index 0a091e52..c748d21a 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -254,7 +254,7 @@ defmodule EventStore do quote bind_quoted: [opts: opts] do @behaviour EventStore - alias EventStore.{Config, EventData, PubSub, Subscriptions} + alias EventStore.{Config, EventData, PubSub, Subscriptions, Telemetry} alias EventStore.Snapshots.{SnapshotData, Snapshotter} alias EventStore.Subscriptions.Subscription alias EventStore.Streams.Stream @@ -298,15 +298,25 @@ defmodule EventStore do def append_to_stream(stream_uuid, expected_version, events, opts \\ []) - def append_to_stream(@all_stream, _expected_version, _events, _opts), - do: {:error, :cannot_append_to_all_stream} - def append_to_stream(stream_uuid, expected_version, events, opts) do - overrides = Keyword.take(opts, @accepted_overrides_append_to_stream) - {conn, opts} = parse_opts(opts) - opts = Keyword.merge(opts, overrides) - - Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) + telemetry_span( + :append_to_stream, + opts, + stream_metadata(stream_uuid, expected_version, events), + fn -> + case stream_uuid do + @all_stream -> + {:error, :cannot_append_to_all_stream} + + _ -> + overrides = Keyword.take(opts, @accepted_overrides_append_to_stream) + {conn, opts} = parse_opts(opts) + opts = Keyword.merge(opts, overrides) + + Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) + end + end + ) end def link_to_stream( @@ -316,13 +326,29 @@ defmodule EventStore do opts \\ [] ) - def link_to_stream(@all_stream, _expected_version, _events_or_event_ids, _opts), - do: {:error, :cannot_append_to_all_stream} - def link_to_stream(stream_uuid, expected_version, events_or_event_ids, opts) do - {conn, opts} = parse_opts(opts) - - Stream.link_to_stream(conn, stream_uuid, expected_version, events_or_event_ids, opts) + telemetry_span( + :link_to_stream, + opts, + stream_metadata(stream_uuid, expected_version, events_or_event_ids), + fn -> + case stream_uuid do + @all_stream -> + {:error, :cannot_append_to_all_stream} + + _ -> + {conn, opts} = parse_opts(opts) + + Stream.link_to_stream( + conn, + stream_uuid, + expected_version, + events_or_event_ids, + opts + ) + end + end + ) end def read_stream_forward( @@ -333,9 +359,16 @@ defmodule EventStore do ) def read_stream_forward(stream_uuid, start_version, count, opts) do - {conn, opts} = parse_opts(opts) - - Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts) + telemetry_span( + :read_stream_forward, + opts, + %{count: count, start_version: start_version, stream_uuid: stream_uuid}, + fn -> + {conn, opts} = parse_opts(opts) + + Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts) + end + ) end def read_all_streams_forward( @@ -355,9 +388,16 @@ defmodule EventStore do ) def read_stream_backward(stream_uuid, start_version, count, opts) do - {conn, opts} = parse_opts(opts) - - Stream.read_stream_backward(conn, stream_uuid, start_version, count, opts) + telemetry_span( + :read_stream_backward, + opts, + %{count: count, start_version: start_version, stream_uuid: stream_uuid}, + fn -> + {conn, opts} = parse_opts(opts) + + Stream.read_stream_backward(conn, stream_uuid, start_version, count, opts) + end + ) end def read_all_streams_backward( @@ -377,9 +417,13 @@ defmodule EventStore do end def stream_forward(stream_uuid, start_version, opts) do + event_store_name = Keyword.get(opts, :name) {conn, opts} = parse_opts(opts) - opts = Keyword.put_new(opts, :read_batch_size, @default_batch_size) + opts = + opts + |> annotate_stream_telemetry_opts(event_store_name) + |> Keyword.put_new(:read_batch_size, @default_batch_size) Stream.stream_forward(conn, stream_uuid, start_version, opts) end @@ -397,9 +441,13 @@ defmodule EventStore do end def stream_backward(stream_uuid, start_version, opts) do + event_store_name = Keyword.get(opts, :name) {conn, opts} = parse_opts(opts) - opts = Keyword.put_new(opts, :read_batch_size, @default_batch_size) + opts = + opts + |> annotate_stream_telemetry_opts(event_store_name) + |> Keyword.put_new(:read_batch_size, @default_batch_size) Stream.stream_backward(conn, stream_uuid, start_version, opts) end @@ -411,24 +459,36 @@ defmodule EventStore do def delete_stream(stream_uuid, expected_version, type \\ :soft, opts \\ []) - def delete_stream(@all_stream, _expected_version, _type, _opts), - do: {:error, :cannot_delete_all_stream} - def delete_stream(stream_uuid, expected_version, type, opts) when type in [:soft, :hard] do - {conn, opts} = parse_opts(opts) - - Stream.delete(conn, stream_uuid, expected_version, type, opts) + telemetry_span( + :delete_stream, + opts, + %{delete_type: type, expected_version: expected_version, stream_uuid: stream_uuid}, + fn -> + case stream_uuid do + @all_stream -> + {:error, :cannot_delete_all_stream} + + _ -> + {conn, opts} = parse_opts(opts) + + Stream.delete(conn, stream_uuid, expected_version, type, opts) + end + end + ) end def paginate_streams(opts \\ []) do - pagination_opts = - Keyword.take(opts, [:page_size, :page_number, :search, :sort_by, :sort_dir]) + telemetry_span(:paginate_streams, opts, pagination_metadata(opts), fn -> + pagination_opts = + Keyword.take(opts, [:page_size, :page_number, :search, :sort_by, :sort_dir]) - {conn, opts} = parse_opts(opts) + {conn, opts} = parse_opts(opts) - opts = Keyword.merge(opts, pagination_opts) + opts = Keyword.merge(opts, pagination_opts) - Stream.paginate_streams(conn, opts) + Stream.paginate_streams(conn, opts) + end) end def stream_info(stream_uuid, opts \\ []) @@ -447,47 +507,54 @@ defmodule EventStore do end def subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts \\ []) do - name = name(opts) - config = Config.lookup(name) - conn = Keyword.fetch!(config, :conn) - schema = Keyword.fetch!(config, :schema) - serializer = Keyword.fetch!(config, :serializer) - correlation_id_type = Keyword.get(config, :correlation_id_type, "uuid") - causation_id_type = Keyword.get(config, :causation_id_type, "uuid") - - query_timeout = timeout(opts, config) - - {start_from, opts} = Keyword.pop(opts, :start_from, :origin) - - with {:ok, start_from} <- - Stream.start_from(conn, stream_uuid, start_from, - schema: schema, - timeout: query_timeout - ) do - opts = - opts - |> Keyword.delete(:timeout) - |> Keyword.merge( - conn: conn, - event_store: name, - query_timeout: query_timeout, - schema: schema, - serializer: serializer, - correlation_id_type: correlation_id_type, - causation_id_type: causation_id_type, - stream_uuid: stream_uuid, - subscription_name: subscription_name, - start_from: start_from - ) - |> Keyword.put_new_lazy(:hibernate_after, fn -> - Keyword.fetch!(config, :subscription_hibernate_after) - end) - |> Keyword.put_new_lazy(:retry_interval, fn -> - Keyword.fetch!(config, :subscription_retry_interval) - end) - - Subscriptions.subscribe_to_stream(subscriber, opts) - end + telemetry_span( + :subscribe_to_stream, + opts, + %{stream_uuid: stream_uuid, subscription_name: subscription_name}, + fn -> + name = name(opts) + config = Config.lookup(name) + conn = Keyword.fetch!(config, :conn) + schema = Keyword.fetch!(config, :schema) + serializer = Keyword.fetch!(config, :serializer) + correlation_id_type = Keyword.get(config, :correlation_id_type, "uuid") + causation_id_type = Keyword.get(config, :causation_id_type, "uuid") + + query_timeout = timeout(opts, config) + + {start_from, opts} = Keyword.pop(opts, :start_from, :origin) + + with {:ok, start_from} <- + Stream.start_from(conn, stream_uuid, start_from, + schema: schema, + timeout: query_timeout + ) do + opts = + opts + |> Keyword.delete(:timeout) + |> Keyword.merge( + conn: conn, + event_store: name, + query_timeout: query_timeout, + schema: schema, + serializer: serializer, + correlation_id_type: correlation_id_type, + causation_id_type: causation_id_type, + stream_uuid: stream_uuid, + subscription_name: subscription_name, + start_from: start_from + ) + |> Keyword.put_new_lazy(:hibernate_after, fn -> + Keyword.fetch!(config, :subscription_hibernate_after) + end) + |> Keyword.put_new_lazy(:retry_interval, fn -> + Keyword.fetch!(config, :subscription_retry_interval) + end) + + Subscriptions.subscribe_to_stream(subscriber, opts) + end + end + ) end def subscribe_to_all_streams(subscription_name, subscriber, opts \\ []), @@ -505,34 +572,85 @@ defmodule EventStore do do: unsubscribe_from_stream(@all_stream, subscription_name, opts) def delete_subscription(stream_uuid, subscription_name, opts \\ []) do - name = name(opts) - - with :ok <- Subscriptions.stop_subscription(name, stream_uuid, subscription_name) do - {conn, opts} = parse_opts(opts) - - Subscriptions.delete_subscription(conn, stream_uuid, subscription_name, opts) - end + telemetry_span( + :delete_subscription, + opts, + %{stream_uuid: stream_uuid, subscription_name: subscription_name}, + fn -> + name = name(opts) + + with :ok <- Subscriptions.stop_subscription(name, stream_uuid, subscription_name) do + {conn, opts} = parse_opts(opts) + + Subscriptions.delete_subscription(conn, stream_uuid, subscription_name, opts) + end + end + ) end def delete_all_streams_subscription(subscription_name, opts \\ []), do: delete_subscription(@all_stream, subscription_name, opts) def read_snapshot(source_uuid, opts \\ []) do - {conn, opts} = parse_opts(opts) + telemetry_span(:read_snapshot, opts, %{source_uuid: source_uuid}, fn -> + {conn, opts} = parse_opts(opts) - Snapshotter.read_snapshot(conn, source_uuid, opts) + Snapshotter.read_snapshot(conn, source_uuid, opts) + end) end def record_snapshot(%SnapshotData{} = snapshot, opts \\ []) do - {conn, opts} = parse_opts(opts) + telemetry_span(:record_snapshot, opts, snapshot_metadata(snapshot), fn -> + {conn, opts} = parse_opts(opts) - Snapshotter.record_snapshot(conn, snapshot, opts) + Snapshotter.record_snapshot(conn, snapshot, opts) + end) end def delete_snapshot(source_uuid, opts \\ []) do - {conn, opts} = parse_opts(opts) + telemetry_span(:delete_snapshot, opts, %{source_uuid: source_uuid}, fn -> + {conn, opts} = parse_opts(opts) + + Snapshotter.delete_snapshot(conn, source_uuid, opts) + end) + end + + defp snapshot_metadata(%SnapshotData{source_uuid: source_uuid}), + do: %{source_uuid: source_uuid} + + defp stream_metadata(stream_uuid, expected_version, events_or_event_ids) do + %{expected_version: expected_version, stream_uuid: stream_uuid} + |> maybe_put_event_count(events_or_event_ids) + end + + defp maybe_put_event_count(metadata, events_or_event_ids) + when is_list(events_or_event_ids) do + Map.put(metadata, :event_count, length(events_or_event_ids)) + end + + defp maybe_put_event_count(metadata, _events_or_event_ids), do: metadata + + defp pagination_metadata(opts) when is_list(opts) do + opts + |> Keyword.take([:page_size, :page_number, :search, :sort_by, :sort_dir]) + |> Enum.reject(fn {_key, value} -> is_nil(value) end) + |> Map.new() + end + + defp pagination_metadata(_opts), do: %{} + + defp telemetry_span(operation, opts, metadata, fun) do + Telemetry.span(operation, Telemetry.metadata(__MODULE__, opts, metadata), fun) + end + + defp annotate_stream_telemetry_opts(opts, nil) do + Keyword.put(opts, :event_store, __MODULE__) + end - Snapshotter.delete_snapshot(conn, source_uuid, opts) + defp annotate_stream_telemetry_opts(opts, event_store_name) do + opts + |> Keyword.put(:event_store, __MODULE__) + |> Keyword.put(:name, event_store_name) end defp parse_opts(opts) do diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 39088b62..3a161003 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -1,7 +1,7 @@ defmodule EventStore.Streams.Stream do @moduledoc false - alias EventStore.{EventData, RecordedEvent, Storage, UUID} + alias EventStore.{EventData, RecordedEvent, Storage, Telemetry, UUID} alias EventStore.Streams.StreamInfo def append_to_stream(conn, stream_uuid, expected_version, events, opts) @@ -254,7 +254,7 @@ defmodule EventStore.Streams.Stream do Elixir.Stream.resource( fn -> start_version end, fn next_version -> - case read_storage_forward(conn, stream, next_version, read_batch_size, opts) do + case read_stream_batch(:forward, conn, stream, next_version, read_batch_size, opts) do {:ok, []} -> {:halt, next_version} {:ok, events} -> {events, List.last(events).event_number + 1} end @@ -280,7 +280,7 @@ defmodule EventStore.Streams.Stream do {:halt, 0} next_version -> - case read_storage_backward(conn, stream, next_version, read_batch_size, opts) do + case read_stream_batch(:backward, conn, stream, next_version, read_batch_size, opts) do {:ok, []} -> {:halt, next_version} {:ok, events} -> {events, List.last(events).event_number - 1} end @@ -289,6 +289,50 @@ defmodule EventStore.Streams.Stream do ) end + defp read_stream_batch( + direction, + conn, + %StreamInfo{stream_uuid: stream_uuid} = stream, + start_version, + requested_batch_size, + opts + ) do + start_metadata = + Telemetry.metadata( + Keyword.get(opts, :event_store, EventStore), + opts, + %{ + direction: direction, + requested_batch_size: requested_batch_size, + start_version: start_version, + stream_uuid: stream_uuid + } + ) + + Telemetry.span( + :stream_batch_read, + start_metadata, + fn -> + read_storage(direction, conn, stream, start_version, requested_batch_size, opts) + end, + fn + {:ok, events} -> + Map.merge(start_metadata, %{event_count: length(events), result: :ok}) + + {:error, reason} -> + Map.merge(start_metadata, %{event_count: 0, result: {:error, reason}}) + end + ) + end + + defp read_storage(:forward, conn, stream, start_version, count, opts) do + read_storage_forward(conn, stream, start_version, count, opts) + end + + defp read_storage(:backward, conn, stream, start_version, count, opts) do + read_storage_backward(conn, stream, start_version, count, opts) + end + defp deserialize_recorded_events(recorded_events, serializer), do: Enum.map(recorded_events, &RecordedEvent.deserialize(&1, serializer)) diff --git a/lib/event_store/telemetry.ex b/lib/event_store/telemetry.ex new file mode 100644 index 00000000..b776d3f0 --- /dev/null +++ b/lib/event_store/telemetry.ex @@ -0,0 +1,49 @@ +defmodule EventStore.Telemetry do + @moduledoc false + + @event_name_prefix [:eventstore] + + def metadata(event_store, opts, extra_metadata \\ %{}) do + extra_metadata + |> Map.put(:event_store, event_store) + |> maybe_put_name(event_store, opts) + end + + def span(operation, metadata, fun) + when is_atom(operation) and is_map(metadata) and is_function(fun, 0) do + event_name = @event_name_prefix ++ [operation] + + :telemetry.span(event_name, metadata, fn -> + result = fun.() + + {result, Map.put(metadata, :result, result_metadata(result))} + end) + end + + def span(operation, start_metadata, fun, stop_metadata_fun) + when is_atom(operation) and is_map(start_metadata) and is_function(fun, 0) and + is_function(stop_metadata_fun, 1) do + event_name = @event_name_prefix ++ [operation] + + :telemetry.span(event_name, start_metadata, fn -> + result = fun.() + + {result, stop_metadata_fun.(result)} + end) + end + + defp result_metadata(:ok), do: :ok + defp result_metadata({:ok, _value}), do: :ok + defp result_metadata({:error, reason}), do: {:error, reason} + defp result_metadata(result), do: result + + defp maybe_put_name(metadata, event_store, opts) do + case Keyword.get(opts, :name) do + name when is_atom(name) and name not in [nil, event_store] -> + Map.put(metadata, :name, name) + + _ -> + metadata + end + end +end diff --git a/mix.exs b/mix.exs index dd9860bc..cfd0056c 100644 --- a/mix.exs +++ b/mix.exs @@ -42,6 +42,7 @@ defmodule EventStore.Mixfile do {:fsm, "~> 0.3"}, {:gen_stage, "~> 1.2"}, {:postgrex, "~> 0.17"}, + {:telemetry, "~> 1.0"}, # Optional dependencies {:jason, "~> 1.4", optional: true}, diff --git a/test/storage/append_events_test.exs b/test/storage/append_events_test.exs index dc229ea6..504d1807 100644 --- a/test/storage/append_events_test.exs +++ b/test/storage/append_events_test.exs @@ -225,10 +225,12 @@ defmodule EventStore.Storage.AppendEventsTest do test "append single event with a db connection error", %{conn: conn, schema: schema} do recorded_events = EventFactory.create_recorded_events(100, UUID.uuid4()) - # Using Postgrex query timeout value of zero will cause a `DBConnection.ConnectionError` error - # to be returned. - assert {:error, %DBConnection.ConnectionError{}} = + # A zero timeout can surface either as a DBConnection timeout or as a PostgreSQL + # query cancellation, depending on the driver/runtime timing. + assert {:error, error} = Appender.append(conn, 1, recorded_events, schema: schema, timeout: 0) + + assert match?(%DBConnection.ConnectionError{}, error) or error == :query_canceled end defp create_stream(context) do diff --git a/test/telemetry_test.exs b/test/telemetry_test.exs new file mode 100644 index 00000000..6d086774 --- /dev/null +++ b/test/telemetry_test.exs @@ -0,0 +1,569 @@ +defmodule EventStore.TelemetryTest do + use EventStore.StorageCase + + alias EventStore.{EventFactory, UUID} + alias EventStore.Snapshots.SnapshotData + alias EventStore.Subscriptions.Subscription + alias EventStore.Telemetry + alias TestEventStore, as: EventStore + + defmodule ExampleData do + @derive Jason.Encoder + defstruct([:data]) + end + + test "emits start and stop telemetry for successful append" do + attach_telemetry(:append_to_stream) + + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(2) + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, events) + + assert_start_event(:append_to_stream, + event_count: 2, + event_store: EventStore, + expected_version: 0, + stream_uuid: stream_uuid + ) + + assert_stop_event(:append_to_stream, + event_count: 2, + event_store: EventStore, + expected_version: 0, + result: :ok, + stream_uuid: stream_uuid + ) + + refute_exception_event(:append_to_stream) + end + + test "emits stop telemetry for append errors returned as tuples" do + stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(1)) + + attach_telemetry(:append_to_stream) + + assert {:error, :wrong_expected_version} = + EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(1)) + + assert_start_event(:append_to_stream, + event_count: 1, + event_store: EventStore, + expected_version: 0, + stream_uuid: stream_uuid + ) + + assert_stop_event(:append_to_stream, + event_count: 1, + event_store: EventStore, + expected_version: 0, + result: {:error, :wrong_expected_version}, + stream_uuid: stream_uuid + ) + + refute_exception_event(:append_to_stream) + end + + test "emits start and stop telemetry for successful link_to_stream" do + source_stream_uuid = UUID.uuid4() + target_stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(source_stream_uuid, 0, EventFactory.create_events(2)) + assert {:ok, source_events} = EventStore.read_stream_forward(source_stream_uuid) + + attach_telemetry(:link_to_stream) + + assert :ok = EventStore.link_to_stream(target_stream_uuid, 0, source_events) + + assert_start_event(:link_to_stream, + event_count: 2, + event_store: EventStore, + expected_version: 0, + stream_uuid: target_stream_uuid + ) + + assert_stop_event(:link_to_stream, + event_count: 2, + event_store: EventStore, + expected_version: 0, + result: :ok, + stream_uuid: target_stream_uuid + ) + + refute_exception_event(:link_to_stream) + end + + test "emits stop telemetry for link_to_stream errors returned as tuples" do + source_stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(source_stream_uuid, 0, EventFactory.create_events(1)) + assert {:ok, source_events} = EventStore.read_stream_forward(source_stream_uuid) + + attach_telemetry(:link_to_stream) + + assert {:error, :cannot_append_to_all_stream} = + EventStore.link_to_stream("$all", 0, source_events) + + assert_start_event(:link_to_stream, + event_count: 1, + event_store: EventStore, + expected_version: 0, + stream_uuid: "$all" + ) + + assert_stop_event(:link_to_stream, + event_count: 1, + event_store: EventStore, + expected_version: 0, + result: {:error, :cannot_append_to_all_stream}, + stream_uuid: "$all" + ) + + refute_exception_event(:link_to_stream) + end + + test "emits start and stop telemetry for successful read_stream_forward" do + stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(2)) + + attach_telemetry(:read_stream_forward) + + assert {:ok, [_recorded_event]} = EventStore.read_stream_forward(stream_uuid, 0, 1) + + assert_start_event(:read_stream_forward, + count: 1, + event_store: EventStore, + start_version: 0, + stream_uuid: stream_uuid + ) + + assert_stop_event(:read_stream_forward, + count: 1, + event_store: EventStore, + result: :ok, + start_version: 0, + stream_uuid: stream_uuid + ) + + refute_exception_event(:read_stream_forward) + end + + test "emits exception telemetry when a public operation raises" do + attach_telemetry(:read_stream_forward) + + stream_uuid = UUID.uuid4() + + assert_raise ArgumentError, ~r/expected `:timeout`/, fn -> + EventStore.read_stream_forward(stream_uuid, 0, 1, timeout: :invalid) + end + + assert_start_event(:read_stream_forward, + count: 1, + event_store: EventStore, + start_version: 0, + stream_uuid: stream_uuid + ) + + assert_exception_event(:read_stream_forward, + count: 1, + event_store: EventStore, + start_version: 0, + stream_uuid: stream_uuid + ) + + refute_stop_event(:read_stream_forward) + end + + test "emits start and stop telemetry for successful read_stream_backward" do + stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(2)) + + attach_telemetry(:read_stream_backward) + + assert {:ok, [_recorded_event]} = EventStore.read_stream_backward(stream_uuid, -1, 1) + + assert_start_event(:read_stream_backward, + count: 1, + event_store: EventStore, + start_version: -1, + stream_uuid: stream_uuid + ) + + assert_stop_event(:read_stream_backward, + count: 1, + event_store: EventStore, + result: :ok, + start_version: -1, + stream_uuid: stream_uuid + ) + + refute_exception_event(:read_stream_backward) + end + + test "emits start and stop telemetry for delete_stream" do + attach_telemetry(:delete_stream) + + stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(1)) + assert :ok = EventStore.delete_stream(stream_uuid, :stream_exists, :soft) + + assert_start_event(:delete_stream, + delete_type: :soft, + event_store: EventStore, + expected_version: :stream_exists, + stream_uuid: stream_uuid + ) + + assert_stop_event(:delete_stream, + delete_type: :soft, + event_store: EventStore, + expected_version: :stream_exists, + result: :ok, + stream_uuid: stream_uuid + ) + + refute_exception_event(:delete_stream) + end + + test "emits stop telemetry for delete_stream errors returned as tuples" do + attach_telemetry(:delete_stream) + + assert {:error, :cannot_delete_all_stream} = + EventStore.delete_stream("$all", :any_version, :soft) + + assert_start_event(:delete_stream, + delete_type: :soft, + event_store: EventStore, + expected_version: :any_version, + stream_uuid: "$all" + ) + + assert_stop_event(:delete_stream, + delete_type: :soft, + event_store: EventStore, + expected_version: :any_version, + result: {:error, :cannot_delete_all_stream}, + stream_uuid: "$all" + ) + + refute_exception_event(:delete_stream) + end + + test "emits start and stop telemetry for subscribe_to_stream" do + attach_telemetry(:subscribe_to_stream) + + stream_uuid = UUID.uuid4() + subscription_name = "telemetry-" <> UUID.uuid4() + + assert {:ok, subscription} = + EventStore.subscribe_to_stream(stream_uuid, subscription_name, self()) + + assert_start_event(:subscribe_to_stream, + event_store: EventStore, + stream_uuid: stream_uuid, + subscription_name: subscription_name + ) + + assert_stop_event(:subscribe_to_stream, + event_store: EventStore, + result: :ok, + stream_uuid: stream_uuid, + subscription_name: subscription_name + ) + + refute_exception_event(:subscribe_to_stream) + assert_receive {:subscribed, ^subscription} + assert :ok = Subscription.unsubscribe(subscription) + end + + test "emits start and stop telemetry for delete_subscription" do + stream_uuid = UUID.uuid4() + subscription_name = "telemetry-" <> UUID.uuid4() + + assert {:ok, subscription} = + EventStore.subscribe_to_stream(stream_uuid, subscription_name, self()) + + assert_receive {:subscribed, ^subscription} + + attach_telemetry(:delete_subscription) + + assert :ok = EventStore.delete_subscription(stream_uuid, subscription_name) + + assert_start_event(:delete_subscription, + event_store: EventStore, + stream_uuid: stream_uuid, + subscription_name: subscription_name + ) + + assert_stop_event(:delete_subscription, + event_store: EventStore, + result: :ok, + stream_uuid: stream_uuid, + subscription_name: subscription_name + ) + + refute_exception_event(:delete_subscription) + refute Process.alive?(subscription) + end + + test "emits start and stop telemetry for paginate_streams" do + stream_uuid = "telemetry-" <> UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(1)) + + attach_telemetry(:paginate_streams) + + assert {:ok, page} = + EventStore.paginate_streams( + page_number: 1, + page_size: 5, + search: "telemetry-", + sort_by: :stream_uuid, + sort_dir: :asc + ) + + assert Enum.any?(page.entries, &(&1.stream_uuid == stream_uuid)) + + assert_start_event(:paginate_streams, + event_store: EventStore, + page_number: 1, + page_size: 5, + search: "telemetry-", + sort_by: :stream_uuid, + sort_dir: :asc + ) + + assert_stop_event(:paginate_streams, + event_store: EventStore, + page_number: 1, + page_size: 5, + result: :ok, + search: "telemetry-", + sort_by: :stream_uuid, + sort_dir: :asc + ) + + refute_exception_event(:paginate_streams) + end + + test "emits telemetry for snapshot lifecycle" do + attach_telemetry([:record_snapshot, :read_snapshot, :delete_snapshot]) + + snapshot = snapshot_data() + + assert :ok = EventStore.record_snapshot(snapshot) + assert {:ok, read_snapshot} = EventStore.read_snapshot(snapshot.source_uuid) + assert :ok = EventStore.delete_snapshot(snapshot.source_uuid) + + assert read_snapshot.source_uuid == snapshot.source_uuid + assert read_snapshot.source_version == snapshot.source_version + + assert_start_event(:record_snapshot, + event_store: EventStore, + source_uuid: snapshot.source_uuid + ) + + assert_stop_event(:record_snapshot, + event_store: EventStore, + result: :ok, + source_uuid: snapshot.source_uuid + ) + + refute_exception_event(:record_snapshot) + + assert_start_event(:read_snapshot, event_store: EventStore, source_uuid: snapshot.source_uuid) + + assert_stop_event(:read_snapshot, + event_store: EventStore, + result: :ok, + source_uuid: snapshot.source_uuid + ) + + refute_exception_event(:read_snapshot) + + assert_start_event(:delete_snapshot, + event_store: EventStore, + source_uuid: snapshot.source_uuid + ) + + assert_stop_event(:delete_snapshot, + event_store: EventStore, + result: :ok, + source_uuid: snapshot.source_uuid + ) + + refute_exception_event(:delete_snapshot) + end + + test "does not emit telemetry for lazy stream APIs" do + attach_telemetry([:stream_forward, :stream_backward]) + + stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(2)) + + assert EventStore.stream_forward(stream_uuid) |> Enum.count() == 2 + assert EventStore.stream_all_forward() |> Enum.count() == 2 + assert EventStore.stream_backward(stream_uuid) |> Enum.count() == 2 + assert EventStore.stream_all_backward() |> Enum.count() == 2 + + refute_receive {:telemetry_event, _, _, _} + end + + test "emits per-batch telemetry for stream_forward" do + stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(2)) + + attach_telemetry(:stream_batch_read) + + assert EventStore.stream_forward(stream_uuid, 0, read_batch_size: 1) |> Enum.count() == 2 + + assert_stream_batch_read(:forward, stream_uuid, 1, 1, 1) + assert_stream_batch_read(:forward, stream_uuid, 2, 1, 1) + assert_stream_batch_read(:forward, stream_uuid, 3, 1, 0) + end + + test "emits per-batch telemetry for stream_backward" do + stream_uuid = UUID.uuid4() + + assert :ok = EventStore.append_to_stream(stream_uuid, 0, EventFactory.create_events(2)) + + attach_telemetry(:stream_batch_read) + + assert EventStore.stream_backward(stream_uuid, -1, read_batch_size: 1) |> Enum.count() == 2 + + assert_stream_batch_read(:backward, stream_uuid, 2, 1, 1) + assert_stream_batch_read(:backward, stream_uuid, 1, 1, 1) + refute_receive {:telemetry_event, [:eventstore, :stream_batch_read, _], _, _} + end + + test "includes custom event store name metadata" do + metadata = Telemetry.metadata(EventStore, [name: :eventstore1], %{stream_uuid: "stream-1"}) + + assert metadata.name == :eventstore1 + assert metadata.stream_uuid == "stream-1" + assert metadata.event_store == EventStore + end + + test "does not include nil event store name metadata" do + metadata = Telemetry.metadata(EventStore, [], %{stream_uuid: "stream-1"}) + + refute Map.has_key?(metadata, :name) + assert metadata.stream_uuid == "stream-1" + assert metadata.event_store == EventStore + end + + def handle_event(event_name, measurements, metadata, test_pid) do + send(test_pid, {:telemetry_event, event_name, measurements, metadata}) + end + + defp attach_telemetry(operation) when is_atom(operation), do: attach_telemetry([operation]) + + defp attach_telemetry(operations) when is_list(operations) do + handler_id = "#{inspect(__MODULE__)}-#{System.unique_integer([:positive])}" + + events = + for operation <- operations, + suffix <- [:start, :stop, :exception] do + [:eventstore, operation, suffix] + end + + :ok = :telemetry.attach_many(handler_id, events, &__MODULE__.handle_event/4, self()) + on_exit(fn -> :telemetry.detach(handler_id) end) + end + + defp assert_start_event(operation, expected_metadata) do + assert_receive {:telemetry_event, [:eventstore, ^operation, :start], measurements, metadata} + + assert is_integer(measurements.system_time) + assert_metadata(metadata, expected_metadata) + end + + defp assert_stop_event(operation, expected_metadata) do + assert_receive {:telemetry_event, [:eventstore, ^operation, :stop], measurements, metadata} + + assert is_integer(measurements.duration) + assert measurements.duration >= 0 + assert_metadata(metadata, expected_metadata) + end + + defp assert_exception_event(operation, expected_metadata) do + assert_receive {:telemetry_event, [:eventstore, ^operation, :exception], measurements, + metadata} + + assert is_integer(measurements.duration) + assert measurements.duration >= 0 + assert metadata.kind == :error + assert %ArgumentError{} = metadata.reason + assert is_list(metadata.stacktrace) + assert_metadata(metadata, expected_metadata) + end + + defp refute_exception_event(operation) do + refute_receive {:telemetry_event, [:eventstore, ^operation, :exception], _, _} + end + + defp refute_stop_event(operation) do + refute_receive {:telemetry_event, [:eventstore, ^operation, :stop], _, _} + end + + defp assert_stream_batch_read( + direction, + stream_uuid, + start_version, + requested_batch_size, + event_count + ) do + assert_receive {:telemetry_event, [:eventstore, :stream_batch_read, :start], + start_measurements, start_metadata} + + assert is_integer(start_measurements.system_time) + assert is_integer(start_measurements.monotonic_time) + + assert_metadata(start_metadata, + direction: direction, + event_store: EventStore, + requested_batch_size: requested_batch_size, + start_version: start_version, + stream_uuid: stream_uuid + ) + + assert_receive {:telemetry_event, [:eventstore, :stream_batch_read, :stop], stop_measurements, + stop_metadata} + + assert is_integer(stop_measurements.duration) + assert is_integer(stop_measurements.monotonic_time) + assert stop_measurements.duration >= 0 + + assert_metadata(stop_metadata, + direction: direction, + event_count: event_count, + event_store: EventStore, + requested_batch_size: requested_batch_size, + result: :ok, + start_version: start_version, + stream_uuid: stream_uuid + ) + end + + defp assert_metadata(metadata, expected_metadata) do + Enum.each(expected_metadata, fn {key, value} -> + assert Map.fetch!(metadata, key) == value + end) + end + + defp snapshot_data do + %SnapshotData{ + source_uuid: UUID.uuid4(), + source_version: 1, + source_type: Atom.to_string(ExampleData), + data: %ExampleData{data: "some data"} + } + end +end