diff --git a/lib/otel_metric_exporter/metric_store.ex b/lib/otel_metric_exporter/metric_store.ex index 73fa414..70f13e7 100644 --- a/lib/otel_metric_exporter/metric_store.ex +++ b/lib/otel_metric_exporter/metric_store.ex @@ -197,39 +197,142 @@ defmodule OtelMetricExporter.MetricStore do x -> x end - earliest_gen..current_gen//1 - |> Enum.reduce(%{}, fn gen, acc -> - {_, start, finish} = List.first(:ets.lookup(state.generations_table, gen), {nil, nil, nil}) + metrics = + earliest_gen..current_gen//1 + |> Enum.flat_map(fn gen -> + {_, start, finish} = + List.first(:ets.lookup(state.generations_table, gen), {nil, nil, nil}) + + get_metrics(state.metrics_table, gen) + |> Enum.map(fn {key, values} -> + tagged_values = Enum.map(values, fn {tags, value} -> {{start, finish}, tags, value} end) + {key, tagged_values} + end) + end) + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> Enum.map(fn {{type, name}, grouped_values} -> + metric = + Enum.find(state.metrics, &(Enum.join(&1.name, ".") == name and metric_type(&1) == type)) - get_metrics(state.metrics_table, gen) - |> Map.new(fn {metric_key, values} -> - {metric_key, Enum.map(values, fn {tags, value} -> {{start, finish}, tags, value} end)} + convert_metric(metric, List.flatten(grouped_values)) end) - |> Map.merge(acc, fn _k, v1, v2 -> v2 ++ v1 end) - end) - |> Enum.map(fn {{type, name}, tagged_values} -> - metric = - Enum.find(state.metrics, &(Enum.join(&1.name, ".") == name and metric_type(&1) == type)) - convert_metric(metric, tagged_values) - end) - |> then(&OtelApi.send_metrics(state.api, &1)) - |> case do - :ok -> - # Clear exported metrics - for x <- earliest_gen..current_gen//1 do - :ets.match_delete(state.metrics_table, {{x, :_, :_, :_, :_}, :_, :_}) - :ets.delete(state.generations_table, x) - end + tasks = + metrics + |> create_batches(state.api.config.max_batch_size) + |> Enum.map(fn batch -> + Task.async(fn -> {batch, OtelApi.send_metrics(state.api, batch)} end) + end) + + yielded_results = Task.yield_many(tasks, timeout: 15_000) + + successful_keys = + Enum.reduce(yielded_results, [], fn + {_task, {:ok, {batch, :ok}}}, acc -> + new_keys = for %Metric{name: name, data: data} <- batch, do: {otlp_data_type(data), name} + new_keys ++ acc + + {_task, {:ok, {__batch, {:error, reason}}}}, acc -> + Logger.warning("Failed to export batch: #{inspect(reason)}") + acc + + {_task, {:ok, other}}, acc -> + Logger.warning("Unexpected batch result: #{inspect(other)}") + acc + {_task, {:exit, reason}}, acc -> + Logger.warning("Batch export task crashed: #{inspect(reason)}") + acc + + {task, nil}, acc -> + Task.shutdown(task, :brutal_kill) + Logger.warning("Batch export task exceeded 15s timeout and was killed") + acc + end) + + # Deduplicate successful_keys since split metrics have the same name/type + unique_successful_keys = MapSet.new(successful_keys) + + if MapSet.size(unique_successful_keys) == length(metrics) and metrics != [] do + clear_generations(state, earliest_gen..current_gen//1) + :ok + else + unless successful_keys == [] do + delete_metrics_by_keys(state, earliest_gen..current_gen//1, unique_successful_keys) + end + + # fallback to prune old generations if we do not have 100% success + max_generations_to_hold = Map.get(state.api.config, :max_generations, 5) - :ok + if current_gen - earliest_gen >= max_generations_to_hold do + force_clear_range = earliest_gen..(current_gen - max_generations_to_hold)//1 + clear_generations(state, force_clear_range) + end - {:error, reason} -> - Logger.error("Failed to export metrics: #{inspect(reason)}") - {:error, reason} + {:error, :partial_failure} end end + defp create_batches(metrics, max_batch_size) do + do_create_batches(metrics, max_batch_size, [], 0, []) + end + + defp do_create_batches([], _max_size, [], _current_count, acc) do + Enum.reverse(acc) + end + + defp do_create_batches([], _max_size, current_batch, _current_count, acc) do + Enum.reverse([Enum.reverse(current_batch) | acc]) + end + + defp do_create_batches([metric | rest], max_size, current_batch, current_count, acc) do + metric_count = count_data_points(metric) + + cond do + # Metric fits in current batch + current_count + metric_count <= max_size -> + do_create_batches( + rest, + max_size, + [metric | current_batch], + current_count + metric_count, + acc + ) + + # Metric needs to be split + metric_count > max_size -> + chunks = split_metric(metric, max_size) + new_acc = if current_batch == [], do: acc, else: [Enum.reverse(current_batch) | acc] + do_create_batches(chunks ++ rest, max_size, [], 0, new_acc) + + # Emit current batch and start new one with this metric + true -> + new_acc = if current_batch == [], do: acc, else: [Enum.reverse(current_batch) | acc] + do_create_batches(rest, max_size, [metric], metric_count, new_acc) + end + end + + defp count_data_points(%Metric{data: {_, %_{data_points: data_points}}}), + do: Enum.count(data_points) + + defp split_metric( + %Metric{name: name, description: description, unit: unit, data: {data_type, data_struct}} = + _metric, + max_batch_size + ) do + data_points = data_struct.data_points + + data_points + |> Enum.chunk_every(max_batch_size) + |> Enum.map(fn chunk -> + %Metric{ + name: name, + description: description, + unit: unit, + data: {data_type, Map.put(data_struct, :data_points, chunk)} + } + end) + end + defp convert_metric( %{name: name, description: description, unit: unit} = metric, values @@ -334,6 +437,29 @@ defmodule OtelMetricExporter.MetricStore do defp convert_unit(:terabyte), do: "TBy" defp convert_unit(x) when is_atom(x), do: Atom.to_string(x) + defp otlp_data_type({:sum, %Sum{is_monotonic: true}}), do: :counter + defp otlp_data_type({:sum, _}), do: :sum + defp otlp_data_type({:gauge, _}), do: :last_value + defp otlp_data_type({:histogram, _}), do: :distribution + + defp clear_generations(state, range) do + for gen <- range do + :ets.match_delete(state.metrics_table, {{gen, :_, :_, :_, :_}, :_, :_}) + :ets.delete(state.generations_table, gen) + end + end + + defp delete_metrics_by_keys(state, range, keys) do + for gen <- range do + for {type, name} <- keys do + :ets.match_delete(state.metrics_table, {{gen, name, type, :_, :_}, :_, :_}) + end + + if :ets.match_object(state.metrics_table, {{gen, :_, :_, :_, :_}, :_, :_}) == [] do + :ets.delete(state.generations_table, gen) + end + end + end defp generation_key(metrics_table) do {__MODULE__, metrics_table, :generation} end diff --git a/lib/otel_metric_exporter/otel_api.ex b/lib/otel_metric_exporter/otel_api.ex index e5b5646..e434980 100644 --- a/lib/otel_metric_exporter/otel_api.ex +++ b/lib/otel_metric_exporter/otel_api.ex @@ -39,18 +39,24 @@ defmodule OtelMetricExporter.OtelApi do end end - def send_log_events(%__MODULE__{config: config} = api, events) do + def send_log_events(%__MODULE__{config: %{export_callback: nil} = config} = api, events) do events |> Protocol.build_log_service_request(config.resource) |> send_proto("/v1/logs", api) end - def send_metrics(%__MODULE__{config: config} = api, metrics) do + def send_log_events(%__MODULE__{config: config}, events), + do: execute_export_callback(events, :logs, config) + + def send_metrics(%__MODULE__{config: %{export_callback: nil} = config} = api, metrics) do metrics |> Protocol.build_metric_service_request(config.resource) |> send_proto("/v1/metrics", api) end + def send_metrics(%__MODULE__{config: config}, metrics), + do: execute_export_callback(metrics, :metrics, config) + @spec send_proto(struct(), String.t(), %__MODULE__{}) :: :ok | {:error, any()} defp send_proto(body, path, %__MODULE__{} = api) do body @@ -136,4 +142,11 @@ defmodule OtelMetricExporter.OtelApi do do: :zlib.gzip(body) defp maybe_compress(body, _), do: body + + defp execute_export_callback(batch, type, config) do + case config.export_callback.({type, batch}, config) do + {:error, _} = error -> error + _ -> :ok + end + end end diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index 57ceecc..e55891d 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -6,10 +6,12 @@ defmodule OtelMetricExporter.OtelApi.Config do :otlp_protocol, :otlp_headers, :otlp_timeout, + :export_callback, :exporter, :resource, :otlp_compression, - :otlp_concurrent_requests + :otlp_concurrent_requests, + :max_batch_size ] @type protocol :: :http_protobuf @@ -67,9 +69,24 @@ defmodule OtelMetricExporter.OtelApi.Config do ], otlp_concurrent_requests: [ type: :non_neg_integer, - default: 10, + default: 30, doc: "Number of concurrent requests to send to the OTLP endpoint." ], + max_batch_size: [ + type: :pos_integer, + default: 250, + doc: "Maximum number of metrics to send per batch request." + ], + export_callback: [ + type: {:or, [{:fun, 2}, nil]}, + default: nil, + doc: """ + A callback function invoked instead of making an HTTP request. Should accept as arguments: + + - `{type, batch}`: kind (:metrics or :logs) and list of signals + - `config`: the options passed to this OtelMetricExporter instance + """ + ], resource: [ type: {:map, {:or, [:atom, :string]}, :any}, default: %{}, diff --git a/test/otel_metric_exporter/metric_store_test.exs b/test/otel_metric_exporter/metric_store_test.exs index 7c6c031..5980d95 100644 --- a/test/otel_metric_exporter/metric_store_test.exs +++ b/test/otel_metric_exporter/metric_store_test.exs @@ -158,7 +158,7 @@ defmodule OtelMetricExporter.MetricStoreTest do metrics = MetricStore.get_metrics(@name) # Export metrics synchronously - assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export metrics" + assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export batch" # Verify metrics were not cleared due to error assert MetricStore.get_metrics(@name, 0) == metrics @@ -176,7 +176,7 @@ defmodule OtelMetricExporter.MetricStoreTest do metrics = MetricStore.get_metrics(@name) # Export metrics synchronously - assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export metrics" + assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export batch" # Verify metrics were not cleared due to error assert MetricStore.get_metrics(@name, 0) == metrics @@ -227,5 +227,89 @@ defmodule OtelMetricExporter.MetricStoreTest do assert MetricStore.get_metrics(@name, 0) == %{} assert MetricStore.get_metrics(@name, 1) == %{} end + + test "splits large metric sets into batches", %{bypass: bypass, store_config: config} do + metrics = Enum.map(1..60, &Metrics.counter("test.counter.#{&1}")) + start_supervised!({MetricStore, Map.merge(config, %{metrics: metrics, max_batch_size: 50})}) + + Enum.each(metrics, &MetricStore.write_metric(@name, &1, 1, %{})) + + {:ok, agent} = Agent.start_link(fn -> [] end) + + Bypass.expect(bypass, "POST", "/v1/metrics", fn conn -> + {:ok, body, _} = Plug.Conn.read_body(conn) + + [%{scope_metrics: [%{metrics: batch}]}] = + ExportMetricsServiceRequest.decode(body).resource_metrics + + Agent.update(agent, &[length(batch) | &1]) + Plug.Conn.resp(conn, 200, "") + end) + + assert :ok = MetricStore.export_sync(@name) + assert MetricStore.get_metrics(@name, 0) == %{} + assert Agent.get(agent, & &1) |> Enum.sort() == [10, 50] + end + + test "splits large metric data points into smaller sets", %{ + bypass: bypass, + store_config: config + } do + metrics = + Enum.map(1..60, fn _ -> Metrics.last_value("test.counter.1", tags: [:my_field]) end) + + start_supervised!({MetricStore, Map.merge(config, %{metrics: metrics, max_batch_size: 50})}) + + Enum.each( + metrics, + &MetricStore.write_metric(@name, &1, 1, %{my_field: "counter_#{:rand.uniform(100_000)}"}) + ) + + {:ok, agent} = Agent.start_link(fn -> [] end) + + Bypass.expect(bypass, "POST", "/v1/metrics", fn conn -> + {:ok, body, _} = Plug.Conn.read_body(conn) + + [%{scope_metrics: [%{metrics: [%{data: {:gauge, %{data_points: data_points}}}]}]}] = + ExportMetricsServiceRequest.decode(body).resource_metrics + + Agent.update(agent, &[length(data_points) | &1]) + Plug.Conn.resp(conn, 200, "") + end) + + assert :ok = MetricStore.export_sync(@name) + assert MetricStore.get_metrics(@name, 0) == %{} + + for v <- Agent.get(agent, & &1) do + assert v <= 50 + end + end + + test "retains only failed batch metrics", %{bypass: bypass, store_config: config} do + metrics = Enum.map(1..60, &Metrics.counter("test.counter.#{&1}")) + start_supervised!({MetricStore, Map.merge(config, %{metrics: metrics, max_batch_size: 50})}) + + Enum.each(metrics, &MetricStore.write_metric(@name, &1, 1, %{})) + + Bypass.expect(bypass, "POST", "/v1/metrics", fn conn -> + {:ok, body, _} = Plug.Conn.read_body(conn) + + [%{scope_metrics: [%{metrics: batch}]}] = + ExportMetricsServiceRequest.decode(body).resource_metrics + + status = if length(batch) == 50, do: 200, else: 500 + Plug.Conn.resp(conn, status, "") + end) + + :timer.sleep(500) + + log = + capture_log(fn -> assert {:error, :partial_failure} = MetricStore.export_sync(@name) end) + + assert log =~ "Failed to export batch" + + # First batch (50 metrics) succeeds and is cleared, second batch (10 metrics) fails and is retained + assert map_size(MetricStore.get_metrics(@name, 0)) == 10 + end end end diff --git a/test/otel_metric_exporter/otel_api/config_test.exs b/test/otel_metric_exporter/otel_api/config_test.exs index ef91e29..b5753ed 100644 --- a/test/otel_metric_exporter/otel_api/config_test.exs +++ b/test/otel_metric_exporter/otel_api/config_test.exs @@ -35,11 +35,13 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do exporter: :otlp }, otlp_compression: :gzip, - otlp_concurrent_requests: 10, + otlp_concurrent_requests: 30, + max_batch_size: 250, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, - otlp_timeout: 10000 + otlp_timeout: 10000, + export_callback: nil }} end @@ -62,12 +64,14 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{"key1" => "value1", "key2" => "value2"} }, otlp_compression: :gzip, - otlp_concurrent_requests: 10, + otlp_concurrent_requests: 30, + max_batch_size: 250, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, otlp_timeout: 10000, - otlp_endpoint: "http://localhost:4317" + otlp_endpoint: "http://localhost:4317", + export_callback: nil }} end @@ -93,12 +97,14 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{"key1" => "value1", "key2" => "value2"} }, otlp_compression: :gzip, - otlp_concurrent_requests: 10, + otlp_concurrent_requests: 30, + max_batch_size: 250, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, otlp_timeout: 10000, - otlp_endpoint: "http://localhost:4317" + otlp_endpoint: "http://localhost:4317", + export_callback: nil }} end end @@ -127,8 +133,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_endpoint: "http://localhost:4318", otlp_headers: %{"key1" => "value1", "key2" => "value2"}, otlp_timeout: 10 - }, - %{}} = + }, %{}} = OtelMetricExporter.OtelApi.Config.validate_for_scope( %{logs: %{exporter: :otlp}}, :logs diff --git a/test/otel_metric_exporter/otel_api_test.exs b/test/otel_metric_exporter/otel_api_test.exs index 31319c7..37710da 100644 --- a/test/otel_metric_exporter/otel_api_test.exs +++ b/test/otel_metric_exporter/otel_api_test.exs @@ -2,6 +2,8 @@ defmodule OtelMetricExporter.OtelApiTest do use ExUnit.Case, async: false alias OtelMetricExporter.OtelApi alias OtelMetricExporter.OtelApi.Config + alias OtelMetricExporter.Opentelemetry.Proto.Logs.V1.LogRecord + alias OtelMetricExporter.Opentelemetry.Proto.Metrics.V1.Metric setup do on_exit(fn -> @@ -90,4 +92,42 @@ defmodule OtelMetricExporter.OtelApiTest do ) end end + + describe "send_metrics/2" do + test "if :config has export_callback, executes it instead of sending HTTP request" do + pid = self() + + callback = + fn {:metrics, [metric]}, %Config{} -> + send(pid, metric.description) + end + + metrics = [%Metric{description: "callback executed"}] + + %OtelApi{config: %Config{export_callback: callback}} + |> OtelApi.send_metrics(metrics) + + assert_received "callback executed" + end + end + + describe "send_log_events/2" do + test "if :config has export_callback, executes it instead of sending HTTP request" do + pid = self() + + callback = + fn {:logs, [log]}, %Config{} -> + {:string_value, message} = log.body.value + + send(pid, message) + end + + logs = [%LogRecord{body: %{value: {:string_value, "callback executed"}}}] + + %OtelApi{config: %Config{export_callback: callback}} + |> OtelApi.send_log_events(logs) + + assert_received "callback executed" + end + end end