From f011963f50d59b26b4e137fe952e70c1a590bf7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?andr=C3=A9=20marques?= Date: Sat, 6 Sep 2025 16:53:38 -0300 Subject: [PATCH 1/8] feat: enable passing callback instead of url to exporter config --- lib/otel_metric_exporter/otel_api.ex | 6 ++++++ lib/otel_metric_exporter/otel_api/config.ex | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/lib/otel_metric_exporter/otel_api.ex b/lib/otel_metric_exporter/otel_api.ex index e5b5646..4ff8109 100644 --- a/lib/otel_metric_exporter/otel_api.ex +++ b/lib/otel_metric_exporter/otel_api.ex @@ -45,6 +45,12 @@ defmodule OtelMetricExporter.OtelApi do |> send_proto("/v1/logs", api) end + def send_metrics(%__MODULE__{config: config}, metrics) when not is_nil(config.exporter_callback) do + config.exporter_callback.(metrics) + + :ok + end + def send_metrics(%__MODULE__{config: config} = api, metrics) do metrics |> Protocol.build_metric_service_request(config.resource) diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index 57ceecc..99a7e9c 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -6,6 +6,7 @@ defmodule OtelMetricExporter.OtelApi.Config do :otlp_protocol, :otlp_headers, :otlp_timeout, + :exporter_callback, :exporter, :resource, :otlp_compression, @@ -70,6 +71,11 @@ defmodule OtelMetricExporter.OtelApi.Config do default: 10, doc: "Number of concurrent requests to send to the OTLP endpoint." ], + exporter_callback: [ + type: {:or, [{:fun, 1}, nil]}, + default: nil, + doc: "A callback function invoked instead of making an HTTP request. Must have arity 1" + ], resource: [ type: {:map, {:or, [:atom, :string]}, :any}, default: %{}, From f17525f3d7d46e1db5f9f3edd376482403db1296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?andr=C3=A9=20marques?= Date: Thu, 11 Sep 2025 04:34:19 -0300 Subject: [PATCH 2/8] feat: fully implement export_callback --- lib/otel_metric_exporter/otel_api.ex | 21 ++++++---- lib/otel_metric_exporter/otel_api/config.ex | 14 +++++-- .../otel_api/config_test.exs | 9 +++-- test/otel_metric_exporter/otel_api_test.exs | 40 +++++++++++++++++++ 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/lib/otel_metric_exporter/otel_api.ex b/lib/otel_metric_exporter/otel_api.ex index 4ff8109..7c8fb58 100644 --- a/lib/otel_metric_exporter/otel_api.ex +++ b/lib/otel_metric_exporter/otel_api.ex @@ -39,24 +39,24 @@ defmodule OtelMetricExporter.OtelApi do end end - def send_log_events(%__MODULE__{config: config} = api, events) do + def send_log_events(%__MODULE__{config: %{export_callback: nil} = config} = api, events) do events |> Protocol.build_log_service_request(config.resource) |> send_proto("/v1/logs", api) end - def send_metrics(%__MODULE__{config: config}, metrics) when not is_nil(config.exporter_callback) do - config.exporter_callback.(metrics) + def send_log_events(%__MODULE__{config: config}, events), + do: execute_export_callback(events, :logs, config) - :ok - end - - def send_metrics(%__MODULE__{config: config} = api, metrics) do + def send_metrics(%__MODULE__{config: %{export_callback: nil} = config} = api, metrics) do metrics |> Protocol.build_metric_service_request(config.resource) |> send_proto("/v1/metrics", api) end + def send_metrics(%__MODULE__{config: config}, metrics), + do: execute_export_callback(metrics, :metrics, config) + @spec send_proto(struct(), String.t(), %__MODULE__{}) :: :ok | {:error, any()} defp send_proto(body, path, %__MODULE__{} = api) do body @@ -142,4 +142,11 @@ defmodule OtelMetricExporter.OtelApi do do: :zlib.gzip(body) defp maybe_compress(body, _), do: body + + defp execute_export_callback(batch, type, config) do + case config.export_callback.(batch, type, config) do + {:error, _} = error -> error + _ -> :ok + end + end end diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index 99a7e9c..d7aabb2 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -6,7 +6,7 @@ defmodule OtelMetricExporter.OtelApi.Config do :otlp_protocol, :otlp_headers, :otlp_timeout, - :exporter_callback, + :export_callback, :exporter, :resource, :otlp_compression, @@ -71,10 +71,16 @@ defmodule OtelMetricExporter.OtelApi.Config do default: 10, doc: "Number of concurrent requests to send to the OTLP endpoint." ], - exporter_callback: [ - type: {:or, [{:fun, 1}, nil]}, + export_callback: [ + type: {:or, [{:fun, 3}, nil]}, default: nil, - doc: "A callback function invoked instead of making an HTTP request. Must have arity 1" + doc: """ + A callback function invoked instead of making an HTTP request. Should accept as arguments: + + - `batch`: list of signals to be exported + - `type`: kind of signal (:metrics or :logs) + - `config`: the options passed to this OtelMetricExporter instance + """ ], resource: [ type: {:map, {:or, [:atom, :string]}, :any}, diff --git a/test/otel_metric_exporter/otel_api/config_test.exs b/test/otel_metric_exporter/otel_api/config_test.exs index ef91e29..5e58015 100644 --- a/test/otel_metric_exporter/otel_api/config_test.exs +++ b/test/otel_metric_exporter/otel_api/config_test.exs @@ -39,7 +39,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, - otlp_timeout: 10000 + otlp_timeout: 10000, + export_callback: nil }} end @@ -67,7 +68,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{}, otlp_protocol: :http_protobuf, otlp_timeout: 10000, - otlp_endpoint: "http://localhost:4317" + otlp_endpoint: "http://localhost:4317", + export_callback: nil }} end @@ -98,7 +100,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{}, otlp_protocol: :http_protobuf, otlp_timeout: 10000, - otlp_endpoint: "http://localhost:4317" + otlp_endpoint: "http://localhost:4317", + export_callback: nil }} end end diff --git a/test/otel_metric_exporter/otel_api_test.exs b/test/otel_metric_exporter/otel_api_test.exs index 31319c7..55f1ff6 100644 --- a/test/otel_metric_exporter/otel_api_test.exs +++ b/test/otel_metric_exporter/otel_api_test.exs @@ -2,6 +2,8 @@ defmodule OtelMetricExporter.OtelApiTest do use ExUnit.Case, async: false alias OtelMetricExporter.OtelApi alias OtelMetricExporter.OtelApi.Config + alias OtelMetricExporter.Opentelemetry.Proto.Logs.V1.LogRecord + alias OtelMetricExporter.Opentelemetry.Proto.Metrics.V1.Metric setup do on_exit(fn -> @@ -90,4 +92,42 @@ defmodule OtelMetricExporter.OtelApiTest do ) end end + + describe "send_metrics/2" do + test "if :config has export_callback, executes it instead of sending HTTP request" do + pid = self() + + callback = + fn [metric], :metrics, %Config{} -> + send(pid, metric.description) + end + + metrics = [%Metric{description: "callback executed"}] + + %OtelApi{config: %Config{export_callback: callback}} + |> OtelApi.send_metrics(metrics) + + assert_received "callback executed" + end + end + + describe "send_log_events/2" do + test "if :config has export_callback, executes it instead of sending HTTP request" do + pid = self() + + callback = + fn [log], :logs, %Config{} -> + {:string_value, message} = log.body.value + + send(pid, message) + end + + logs = [%LogRecord{body: %{value: {:string_value, "callback executed"}}}] + + %OtelApi{config: %Config{export_callback: callback}} + |> OtelApi.send_log_events(logs) + + assert_received "callback executed" + end + end end From 40726bcc866fcf79b7e96153ee2ea61cf53c7b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?andr=C3=A9=20marques?= Date: Fri, 12 Sep 2025 02:11:13 -0300 Subject: [PATCH 3/8] chore: change arguments contract for export callback --- lib/otel_metric_exporter/otel_api.ex | 2 +- lib/otel_metric_exporter/otel_api/config.ex | 5 ++--- test/otel_metric_exporter/otel_api_test.exs | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/otel_metric_exporter/otel_api.ex b/lib/otel_metric_exporter/otel_api.ex index 7c8fb58..e434980 100644 --- a/lib/otel_metric_exporter/otel_api.ex +++ b/lib/otel_metric_exporter/otel_api.ex @@ -144,7 +144,7 @@ defmodule OtelMetricExporter.OtelApi do defp maybe_compress(body, _), do: body defp execute_export_callback(batch, type, config) do - case config.export_callback.(batch, type, config) do + case config.export_callback.({type, batch}, config) do {:error, _} = error -> error _ -> :ok end diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index d7aabb2..c2affc1 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -72,13 +72,12 @@ defmodule OtelMetricExporter.OtelApi.Config do doc: "Number of concurrent requests to send to the OTLP endpoint." ], export_callback: [ - type: {:or, [{:fun, 3}, nil]}, + type: {:or, [{:fun, 2}, nil]}, default: nil, doc: """ A callback function invoked instead of making an HTTP request. Should accept as arguments: - - `batch`: list of signals to be exported - - `type`: kind of signal (:metrics or :logs) + - `{type, batch}`: kind (:metrics or :logs) and list of signals - `config`: the options passed to this OtelMetricExporter instance """ ], diff --git a/test/otel_metric_exporter/otel_api_test.exs b/test/otel_metric_exporter/otel_api_test.exs index 55f1ff6..37710da 100644 --- a/test/otel_metric_exporter/otel_api_test.exs +++ b/test/otel_metric_exporter/otel_api_test.exs @@ -98,7 +98,7 @@ defmodule OtelMetricExporter.OtelApiTest do pid = self() callback = - fn [metric], :metrics, %Config{} -> + fn {:metrics, [metric]}, %Config{} -> send(pid, metric.description) end @@ -116,7 +116,7 @@ defmodule OtelMetricExporter.OtelApiTest do pid = self() callback = - fn [log], :logs, %Config{} -> + fn {:logs, [log]}, %Config{} -> {:string_value, message} = log.body.value send(pid, message) From 1ece0f7bdf15b4b30fcad8a16aa4807f85942ca8 Mon Sep 17 00:00:00 2001 From: Ziinc Date: Wed, 12 Nov 2025 08:18:18 +0800 Subject: [PATCH 4/8] feat: request batching (#3) * feat: request batching * fix: better batching logic * chore: formatting * perf: increase default batch size to 250 --- lib/otel_metric_exporter/metric_store.ex | 174 +++++++++++++++--- lib/otel_metric_exporter/otel_api/config.ex | 14 +- .../metric_store_test.exs | 88 ++++++++- .../otel_api/config_test.exs | 6 + 4 files changed, 254 insertions(+), 28 deletions(-) diff --git a/lib/otel_metric_exporter/metric_store.ex b/lib/otel_metric_exporter/metric_store.ex index 73fa414..96ee059 100644 --- a/lib/otel_metric_exporter/metric_store.ex +++ b/lib/otel_metric_exporter/metric_store.ex @@ -197,39 +197,121 @@ defmodule OtelMetricExporter.MetricStore do x -> x end - earliest_gen..current_gen//1 - |> Enum.reduce(%{}, fn gen, acc -> - {_, start, finish} = List.first(:ets.lookup(state.generations_table, gen), {nil, nil, nil}) + metrics = + earliest_gen..current_gen//1 + |> Enum.flat_map(fn gen -> + {_, start, finish} = + List.first(:ets.lookup(state.generations_table, gen), {nil, nil, nil}) + + get_metrics(state.metrics_table, gen) + |> Enum.map(fn {key, values} -> + tagged_values = Enum.map(values, fn {tags, value} -> {{start, finish}, tags, value} end) + {key, tagged_values} + end) + end) + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> Enum.map(fn {{type, name}, grouped_values} -> + metric = + Enum.find(state.metrics, &(Enum.join(&1.name, ".") == name and metric_type(&1) == type)) - get_metrics(state.metrics_table, gen) - |> Map.new(fn {metric_key, values} -> - {metric_key, Enum.map(values, fn {tags, value} -> {{start, finish}, tags, value} end)} + convert_metric(metric, List.flatten(grouped_values)) end) - |> Map.merge(acc, fn _k, v1, v2 -> v2 ++ v1 end) - end) - |> Enum.map(fn {{type, name}, tagged_values} -> - metric = - Enum.find(state.metrics, &(Enum.join(&1.name, ".") == name and metric_type(&1) == type)) - convert_metric(metric, tagged_values) - end) - |> then(&OtelApi.send_metrics(state.api, &1)) - |> case do - :ok -> - # Clear exported metrics - for x <- earliest_gen..current_gen//1 do - :ets.match_delete(state.metrics_table, {{x, :_, :_, :_, :_}, :_, :_}) - :ets.delete(state.generations_table, x) - end + max_concurrency = Map.get(state.api.config, :max_concurrency, 3) + + batch_results = + metrics + |> create_batches(state.api.config.max_batch_size) + |> Enum.with_index() + |> Task.async_stream( + fn {batch, idx} -> {idx, batch, OtelApi.send_metrics(state.api, batch)} end, + max_concurrency: max_concurrency, + timeout: 60_000 + ) + |> Enum.to_list() + + successful_keys = + for {:ok, {_, batch, :ok}} <- batch_results, + %Metric{name: name, data: data} <- batch, + do: {otlp_data_type(data), name} + + # Deduplicate successful_keys since split metrics have the same name/type + unique_successful_keys = MapSet.new(successful_keys) + + if MapSet.size(unique_successful_keys) == length(metrics) and metrics != [] do + clear_generations(state, earliest_gen..current_gen//1) + :ok + else + unless successful_keys == [] do + delete_metrics_by_keys(state, earliest_gen..current_gen//1, unique_successful_keys) + end + + log_failures(batch_results) + {:error, :partial_failure} + end + end + + defp create_batches(metrics, max_batch_size) do + do_create_batches(metrics, max_batch_size, [], 0, []) + end + + defp do_create_batches([], _max_size, [], _current_count, acc) do + Enum.reverse(acc) + end + + defp do_create_batches([], _max_size, current_batch, _current_count, acc) do + Enum.reverse([Enum.reverse(current_batch) | acc]) + end - :ok + defp do_create_batches([metric | rest], max_size, current_batch, current_count, acc) do + metric_count = count_data_points(metric) + + cond do + # Metric fits in current batch + current_count + metric_count <= max_size -> + do_create_batches( + rest, + max_size, + [metric | current_batch], + current_count + metric_count, + acc + ) + + # Metric needs to be split + metric_count > max_size -> + chunks = split_metric(metric, max_size) + new_acc = if current_batch == [], do: acc, else: [Enum.reverse(current_batch) | acc] + do_create_batches(chunks ++ rest, max_size, [], 0, new_acc) - {:error, reason} -> - Logger.error("Failed to export metrics: #{inspect(reason)}") - {:error, reason} + # Emit current batch and start new one with this metric + true -> + new_acc = if current_batch == [], do: acc, else: [Enum.reverse(current_batch) | acc] + do_create_batches(rest, max_size, [metric], metric_count, new_acc) end end + defp count_data_points(%Metric{data: {_, %_{data_points: data_points}}}), + do: Enum.count(data_points) + + defp split_metric( + %Metric{name: name, description: description, unit: unit, data: {data_type, data_struct}} = + _metric, + max_batch_size + ) do + data_points = data_struct.data_points + + data_points + |> Enum.chunk_every(max_batch_size) + |> Enum.map(fn chunk -> + %Metric{ + name: name, + description: description, + unit: unit, + data: {data_type, Map.put(data_struct, :data_points, chunk)} + } + end) + end + defp convert_metric( %{name: name, description: description, unit: unit} = metric, values @@ -334,6 +416,48 @@ defmodule OtelMetricExporter.MetricStore do defp convert_unit(:terabyte), do: "TBy" defp convert_unit(x) when is_atom(x), do: Atom.to_string(x) + defp otlp_data_type({:sum, %Sum{is_monotonic: true}}), do: :counter + defp otlp_data_type({:sum, _}), do: :sum + defp otlp_data_type({:gauge, _}), do: :last_value + defp otlp_data_type({:histogram, _}), do: :distribution + + defp clear_generations(state, range) do + for gen <- range do + :ets.match_delete(state.metrics_table, {{gen, :_, :_, :_, :_}, :_, :_}) + :ets.delete(state.generations_table, gen) + end + end + + defp delete_metrics_by_keys(state, range, keys) do + for gen <- range do + for {type, name} <- keys do + :ets.match_delete(state.metrics_table, {{gen, name, type, :_, :_}, :_, :_}) + end + + if :ets.match_object(state.metrics_table, {{gen, :_, :_, :_, :_}, :_, :_}) == [] do + :ets.delete(state.generations_table, gen) + end + end + end + + defp log_failures(batch_results) do + for result <- batch_results do + case result do + {:ok, {_, _, :ok}} -> + :ok + + {:ok, {idx, _, {:error, reason}}} -> + Logger.error("Failed to export batch #{idx}: #{inspect(reason)}") + + {:exit, reason} -> + Logger.error("Batch export task exited: #{inspect(reason)}") + + other -> + Logger.error("Unexpected batch result: #{inspect(other)}") + end + end + end + defp generation_key(metrics_table) do {__MODULE__, metrics_table, :generation} end diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index c2affc1..407e284 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -10,7 +10,9 @@ defmodule OtelMetricExporter.OtelApi.Config do :exporter, :resource, :otlp_compression, - :otlp_concurrent_requests + :otlp_concurrent_requests, + :max_batch_size, + :max_concurrency ] @type protocol :: :http_protobuf @@ -71,6 +73,11 @@ defmodule OtelMetricExporter.OtelApi.Config do default: 10, doc: "Number of concurrent requests to send to the OTLP endpoint." ], + max_batch_size: [ + type: :pos_integer, + default: 250, + doc: "Maximum number of metrics to send per batch request." + ], export_callback: [ type: {:or, [{:fun, 2}, nil]}, default: nil, @@ -81,6 +88,11 @@ defmodule OtelMetricExporter.OtelApi.Config do - `config`: the options passed to this OtelMetricExporter instance """ ], + max_concurrency: [ + type: :pos_integer, + default: 3, + doc: "Maximum number of concurrent batch exports." + ], resource: [ type: {:map, {:or, [:atom, :string]}, :any}, default: %{}, diff --git a/test/otel_metric_exporter/metric_store_test.exs b/test/otel_metric_exporter/metric_store_test.exs index 7c6c031..5980d95 100644 --- a/test/otel_metric_exporter/metric_store_test.exs +++ b/test/otel_metric_exporter/metric_store_test.exs @@ -158,7 +158,7 @@ defmodule OtelMetricExporter.MetricStoreTest do metrics = MetricStore.get_metrics(@name) # Export metrics synchronously - assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export metrics" + assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export batch" # Verify metrics were not cleared due to error assert MetricStore.get_metrics(@name, 0) == metrics @@ -176,7 +176,7 @@ defmodule OtelMetricExporter.MetricStoreTest do metrics = MetricStore.get_metrics(@name) # Export metrics synchronously - assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export metrics" + assert capture_log(fn -> MetricStore.export_sync(@name) end) =~ "Failed to export batch" # Verify metrics were not cleared due to error assert MetricStore.get_metrics(@name, 0) == metrics @@ -227,5 +227,89 @@ defmodule OtelMetricExporter.MetricStoreTest do assert MetricStore.get_metrics(@name, 0) == %{} assert MetricStore.get_metrics(@name, 1) == %{} end + + test "splits large metric sets into batches", %{bypass: bypass, store_config: config} do + metrics = Enum.map(1..60, &Metrics.counter("test.counter.#{&1}")) + start_supervised!({MetricStore, Map.merge(config, %{metrics: metrics, max_batch_size: 50})}) + + Enum.each(metrics, &MetricStore.write_metric(@name, &1, 1, %{})) + + {:ok, agent} = Agent.start_link(fn -> [] end) + + Bypass.expect(bypass, "POST", "/v1/metrics", fn conn -> + {:ok, body, _} = Plug.Conn.read_body(conn) + + [%{scope_metrics: [%{metrics: batch}]}] = + ExportMetricsServiceRequest.decode(body).resource_metrics + + Agent.update(agent, &[length(batch) | &1]) + Plug.Conn.resp(conn, 200, "") + end) + + assert :ok = MetricStore.export_sync(@name) + assert MetricStore.get_metrics(@name, 0) == %{} + assert Agent.get(agent, & &1) |> Enum.sort() == [10, 50] + end + + test "splits large metric data points into smaller sets", %{ + bypass: bypass, + store_config: config + } do + metrics = + Enum.map(1..60, fn _ -> Metrics.last_value("test.counter.1", tags: [:my_field]) end) + + start_supervised!({MetricStore, Map.merge(config, %{metrics: metrics, max_batch_size: 50})}) + + Enum.each( + metrics, + &MetricStore.write_metric(@name, &1, 1, %{my_field: "counter_#{:rand.uniform(100_000)}"}) + ) + + {:ok, agent} = Agent.start_link(fn -> [] end) + + Bypass.expect(bypass, "POST", "/v1/metrics", fn conn -> + {:ok, body, _} = Plug.Conn.read_body(conn) + + [%{scope_metrics: [%{metrics: [%{data: {:gauge, %{data_points: data_points}}}]}]}] = + ExportMetricsServiceRequest.decode(body).resource_metrics + + Agent.update(agent, &[length(data_points) | &1]) + Plug.Conn.resp(conn, 200, "") + end) + + assert :ok = MetricStore.export_sync(@name) + assert MetricStore.get_metrics(@name, 0) == %{} + + for v <- Agent.get(agent, & &1) do + assert v <= 50 + end + end + + test "retains only failed batch metrics", %{bypass: bypass, store_config: config} do + metrics = Enum.map(1..60, &Metrics.counter("test.counter.#{&1}")) + start_supervised!({MetricStore, Map.merge(config, %{metrics: metrics, max_batch_size: 50})}) + + Enum.each(metrics, &MetricStore.write_metric(@name, &1, 1, %{})) + + Bypass.expect(bypass, "POST", "/v1/metrics", fn conn -> + {:ok, body, _} = Plug.Conn.read_body(conn) + + [%{scope_metrics: [%{metrics: batch}]}] = + ExportMetricsServiceRequest.decode(body).resource_metrics + + status = if length(batch) == 50, do: 200, else: 500 + Plug.Conn.resp(conn, status, "") + end) + + :timer.sleep(500) + + log = + capture_log(fn -> assert {:error, :partial_failure} = MetricStore.export_sync(@name) end) + + assert log =~ "Failed to export batch" + + # First batch (50 metrics) succeeds and is cleared, second batch (10 metrics) fails and is retained + assert map_size(MetricStore.get_metrics(@name, 0)) == 10 + end end end diff --git a/test/otel_metric_exporter/otel_api/config_test.exs b/test/otel_metric_exporter/otel_api/config_test.exs index 5e58015..b3cf24f 100644 --- a/test/otel_metric_exporter/otel_api/config_test.exs +++ b/test/otel_metric_exporter/otel_api/config_test.exs @@ -36,6 +36,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do }, otlp_compression: :gzip, otlp_concurrent_requests: 10, + max_batch_size: 250, + max_concurrency: 3, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, @@ -64,6 +66,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do }, otlp_compression: :gzip, otlp_concurrent_requests: 10, + max_batch_size: 250, + max_concurrency: 3, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, @@ -96,6 +100,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do }, otlp_compression: :gzip, otlp_concurrent_requests: 10, + max_batch_size: 250, + max_concurrency: 3, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, From c0fcaede4cd8e1706649431fbaf83dae1fc1cd35 Mon Sep 17 00:00:00 2001 From: Adam Mokan Date: Tue, 11 Nov 2025 17:21:48 -0700 Subject: [PATCH 5/8] defensive logic to minimize number of generations kept in case of repeated failures --- lib/otel_metric_exporter/metric_store.ex | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/otel_metric_exporter/metric_store.ex b/lib/otel_metric_exporter/metric_store.ex index 96ee059..60fffb3 100644 --- a/lib/otel_metric_exporter/metric_store.ex +++ b/lib/otel_metric_exporter/metric_store.ex @@ -247,6 +247,15 @@ defmodule OtelMetricExporter.MetricStore do end log_failures(batch_results) + + # fallback to prune old generations if we do not have 100% success + max_generations_to_hold = Map.get(state.api.config, :max_generations, 10) + + if current_gen - earliest_gen >= max_generations_to_hold do + force_clear_range = earliest_gen..(current_gen - max_generations_to_hold)//1 + clear_generations(state, force_clear_range) + end + {:error, :partial_failure} end end From 1821a835807a2d7f2917c815c1ddf3fbbce03c56 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 12 Nov 2025 12:19:15 +0800 Subject: [PATCH 6/8] perf: more optimizations --- lib/otel_metric_exporter/metric_store.ex | 63 +++++++++---------- lib/otel_metric_exporter/otel_api/config.ex | 10 +-- .../otel_api/config_test.exs | 9 +-- 3 files changed, 33 insertions(+), 49 deletions(-) diff --git a/lib/otel_metric_exporter/metric_store.ex b/lib/otel_metric_exporter/metric_store.ex index 60fffb3..70f13e7 100644 --- a/lib/otel_metric_exporter/metric_store.ex +++ b/lib/otel_metric_exporter/metric_store.ex @@ -217,23 +217,37 @@ defmodule OtelMetricExporter.MetricStore do convert_metric(metric, List.flatten(grouped_values)) end) - max_concurrency = Map.get(state.api.config, :max_concurrency, 3) - - batch_results = + tasks = metrics |> create_batches(state.api.config.max_batch_size) - |> Enum.with_index() - |> Task.async_stream( - fn {batch, idx} -> {idx, batch, OtelApi.send_metrics(state.api, batch)} end, - max_concurrency: max_concurrency, - timeout: 60_000 - ) - |> Enum.to_list() + |> Enum.map(fn batch -> + Task.async(fn -> {batch, OtelApi.send_metrics(state.api, batch)} end) + end) + + yielded_results = Task.yield_many(tasks, timeout: 15_000) successful_keys = - for {:ok, {_, batch, :ok}} <- batch_results, - %Metric{name: name, data: data} <- batch, - do: {otlp_data_type(data), name} + Enum.reduce(yielded_results, [], fn + {_task, {:ok, {batch, :ok}}}, acc -> + new_keys = for %Metric{name: name, data: data} <- batch, do: {otlp_data_type(data), name} + new_keys ++ acc + + {_task, {:ok, {__batch, {:error, reason}}}}, acc -> + Logger.warning("Failed to export batch: #{inspect(reason)}") + acc + + {_task, {:ok, other}}, acc -> + Logger.warning("Unexpected batch result: #{inspect(other)}") + acc + {_task, {:exit, reason}}, acc -> + Logger.warning("Batch export task crashed: #{inspect(reason)}") + acc + + {task, nil}, acc -> + Task.shutdown(task, :brutal_kill) + Logger.warning("Batch export task exceeded 15s timeout and was killed") + acc + end) # Deduplicate successful_keys since split metrics have the same name/type unique_successful_keys = MapSet.new(successful_keys) @@ -246,10 +260,8 @@ defmodule OtelMetricExporter.MetricStore do delete_metrics_by_keys(state, earliest_gen..current_gen//1, unique_successful_keys) end - log_failures(batch_results) - # fallback to prune old generations if we do not have 100% success - max_generations_to_hold = Map.get(state.api.config, :max_generations, 10) + max_generations_to_hold = Map.get(state.api.config, :max_generations, 5) if current_gen - earliest_gen >= max_generations_to_hold do force_clear_range = earliest_gen..(current_gen - max_generations_to_hold)//1 @@ -448,25 +460,6 @@ defmodule OtelMetricExporter.MetricStore do end end end - - defp log_failures(batch_results) do - for result <- batch_results do - case result do - {:ok, {_, _, :ok}} -> - :ok - - {:ok, {idx, _, {:error, reason}}} -> - Logger.error("Failed to export batch #{idx}: #{inspect(reason)}") - - {:exit, reason} -> - Logger.error("Batch export task exited: #{inspect(reason)}") - - other -> - Logger.error("Unexpected batch result: #{inspect(other)}") - end - end - end - defp generation_key(metrics_table) do {__MODULE__, metrics_table, :generation} end diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index 407e284..c57b602 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -11,8 +11,7 @@ defmodule OtelMetricExporter.OtelApi.Config do :resource, :otlp_compression, :otlp_concurrent_requests, - :max_batch_size, - :max_concurrency + :max_batch_size ] @type protocol :: :http_protobuf @@ -75,7 +74,7 @@ defmodule OtelMetricExporter.OtelApi.Config do ], max_batch_size: [ type: :pos_integer, - default: 250, + default: 500, doc: "Maximum number of metrics to send per batch request." ], export_callback: [ @@ -88,11 +87,6 @@ defmodule OtelMetricExporter.OtelApi.Config do - `config`: the options passed to this OtelMetricExporter instance """ ], - max_concurrency: [ - type: :pos_integer, - default: 3, - doc: "Maximum number of concurrent batch exports." - ], resource: [ type: {:map, {:or, [:atom, :string]}, :any}, default: %{}, diff --git a/test/otel_metric_exporter/otel_api/config_test.exs b/test/otel_metric_exporter/otel_api/config_test.exs index b3cf24f..69812f1 100644 --- a/test/otel_metric_exporter/otel_api/config_test.exs +++ b/test/otel_metric_exporter/otel_api/config_test.exs @@ -36,8 +36,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do }, otlp_compression: :gzip, otlp_concurrent_requests: 10, - max_batch_size: 250, - max_concurrency: 3, + max_batch_size: 500, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, @@ -66,8 +65,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do }, otlp_compression: :gzip, otlp_concurrent_requests: 10, - max_batch_size: 250, - max_concurrency: 3, + max_batch_size: 500, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, @@ -100,8 +98,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do }, otlp_compression: :gzip, otlp_concurrent_requests: 10, - max_batch_size: 250, - max_concurrency: 3, + max_batch_size: 500, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, From 05743a6c86da6970891b895eb22bd3023f8d7d73 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 12 Nov 2025 12:23:44 +0800 Subject: [PATCH 7/8] perf: increase default :otlp_concurrent_requests to 50 --- lib/otel_metric_exporter/otel_api/config.ex | 2 +- test/otel_metric_exporter/otel_api/config_test.exs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index c57b602..84ebb73 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -69,7 +69,7 @@ defmodule OtelMetricExporter.OtelApi.Config do ], otlp_concurrent_requests: [ type: :non_neg_integer, - default: 10, + default: 50, doc: "Number of concurrent requests to send to the OTLP endpoint." ], max_batch_size: [ diff --git a/test/otel_metric_exporter/otel_api/config_test.exs b/test/otel_metric_exporter/otel_api/config_test.exs index 69812f1..38028b2 100644 --- a/test/otel_metric_exporter/otel_api/config_test.exs +++ b/test/otel_metric_exporter/otel_api/config_test.exs @@ -35,7 +35,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do exporter: :otlp }, otlp_compression: :gzip, - otlp_concurrent_requests: 10, + otlp_concurrent_requests: 50, max_batch_size: 500, resource: %{}, otlp_headers: %{}, @@ -64,7 +64,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{"key1" => "value1", "key2" => "value2"} }, otlp_compression: :gzip, - otlp_concurrent_requests: 10, + otlp_concurrent_requests: 50, max_batch_size: 500, resource: %{}, otlp_headers: %{}, @@ -97,7 +97,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{"key1" => "value1", "key2" => "value2"} }, otlp_compression: :gzip, - otlp_concurrent_requests: 10, + otlp_concurrent_requests: 50, max_batch_size: 500, resource: %{}, otlp_headers: %{}, From 61e2d371638e5ff7edad09639a0d9bc7a694d875 Mon Sep 17 00:00:00 2001 From: Adam Mokan Date: Wed, 12 Nov 2025 13:35:21 -0700 Subject: [PATCH 8/8] set `max_batch_size` back to 250 and `otlp_concurrent_requests` to 30 --- lib/otel_metric_exporter/otel_api/config.ex | 4 ++-- .../otel_metric_exporter/otel_api/config_test.exs | 15 +++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/otel_metric_exporter/otel_api/config.ex b/lib/otel_metric_exporter/otel_api/config.ex index 84ebb73..e55891d 100644 --- a/lib/otel_metric_exporter/otel_api/config.ex +++ b/lib/otel_metric_exporter/otel_api/config.ex @@ -69,12 +69,12 @@ defmodule OtelMetricExporter.OtelApi.Config do ], otlp_concurrent_requests: [ type: :non_neg_integer, - default: 50, + default: 30, doc: "Number of concurrent requests to send to the OTLP endpoint." ], max_batch_size: [ type: :pos_integer, - default: 500, + default: 250, doc: "Maximum number of metrics to send per batch request." ], export_callback: [ diff --git a/test/otel_metric_exporter/otel_api/config_test.exs b/test/otel_metric_exporter/otel_api/config_test.exs index 38028b2..b5753ed 100644 --- a/test/otel_metric_exporter/otel_api/config_test.exs +++ b/test/otel_metric_exporter/otel_api/config_test.exs @@ -35,8 +35,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do exporter: :otlp }, otlp_compression: :gzip, - otlp_concurrent_requests: 50, - max_batch_size: 500, + otlp_concurrent_requests: 30, + max_batch_size: 250, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, @@ -64,8 +64,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{"key1" => "value1", "key2" => "value2"} }, otlp_compression: :gzip, - otlp_concurrent_requests: 50, - max_batch_size: 500, + otlp_concurrent_requests: 30, + max_batch_size: 250, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, @@ -97,8 +97,8 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_headers: %{"key1" => "value1", "key2" => "value2"} }, otlp_compression: :gzip, - otlp_concurrent_requests: 50, - max_batch_size: 500, + otlp_concurrent_requests: 30, + max_batch_size: 250, resource: %{}, otlp_headers: %{}, otlp_protocol: :http_protobuf, @@ -133,8 +133,7 @@ defmodule OtelMetricExporter.OtelApi.ConfigTest do otlp_endpoint: "http://localhost:4318", otlp_headers: %{"key1" => "value1", "key2" => "value2"}, otlp_timeout: 10 - }, - %{}} = + }, %{}} = OtelMetricExporter.OtelApi.Config.validate_for_scope( %{logs: %{exporter: :otlp}}, :logs