diff --git a/.agent-tasks/2026-03-20--4--log-accumulator-crash/discarded-threads.md b/.agent-tasks/2026-03-20--4--log-accumulator-crash/discarded-threads.md new file mode 100644 index 0000000..8ca433c --- /dev/null +++ b/.agent-tasks/2026-03-20--4--log-accumulator-crash/discarded-threads.md @@ -0,0 +1,3 @@ +# Discarded threads + +(none) diff --git a/.agent-tasks/2026-03-20--4--log-accumulator-crash/open-questions.md b/.agent-tasks/2026-03-20--4--log-accumulator-crash/open-questions.md new file mode 100644 index 0000000..5b5439d --- /dev/null +++ b/.agent-tasks/2026-03-20--4--log-accumulator-crash/open-questions.md @@ -0,0 +1,3 @@ +# Open questions + +(none) diff --git a/.agent-tasks/2026-03-20--4--log-accumulator-crash/plan.md b/.agent-tasks/2026-03-20--4--log-accumulator-crash/plan.md new file mode 100644 index 0000000..a562d2e --- /dev/null +++ b/.agent-tasks/2026-03-20--4--log-accumulator-crash/plan.md @@ -0,0 +1,36 @@ +# Implementation Plan + +## Step 1: Fix task result handler to properly clean up + +In `handle_info({ref, result}, state)` (line 124-134): +- Call `Process.demonitor(ref, [:flush])` to prevent the `:DOWN` message from arriving +- Actually remove the ref from `pending_tasks` (the current code has a comment but doesn't do it) + +## Step 2: Fix `block_until_any_task_ready` to demonitor on result receipt + +When the `receive` block in `block_until_any_task_ready` catches `{ref, _result}`: +- Call `Process.demonitor(ref, [:flush])` to prevent orphaned `:DOWN` messages + +## Step 3: Add catch-all `handle_info` clause + +Add a catch-all at the bottom of the `handle_info` clauses that silently ignores any unexpected messages. This is a safety net for any edge cases we haven't anticipated. + +## Step 4: Add test for the crash scenario + +Write a test that: +- Sets up a LogAccumulator with concurrent request limit of 1 +- Sends enough logs to trigger `block_until_any_task_ready` +- Makes the export endpoint unavailable so tasks fail +- Verifies the LogAccumulator doesn't crash + +--- + +## Review discussion + +**Reviewer concern:** Adding a catch-all `handle_info` could hide future bugs. + +**Response:** Since this module uses `@behaviour GenServer` and is used as a `logger_olp` callback, unexpected messages are a real possibility (e.g., `:EXIT` from `trap_exit`, or stray messages from the OTP logger framework). The catch-all only applies after all specific handlers have been tried. The `:DOWN` and task result message types are well-understood, and the catch-all acts as a safety net. + +**Reviewer concern:** Should we log stray messages? + +**Response:** No — logging from inside a log handler risks recursion. Silent discard is the correct behavior here. diff --git a/.agent-tasks/2026-03-20--4--log-accumulator-crash/progress.md b/.agent-tasks/2026-03-20--4--log-accumulator-crash/progress.md new file mode 100644 index 0000000..440b9b8 --- /dev/null +++ b/.agent-tasks/2026-03-20--4--log-accumulator-crash/progress.md @@ -0,0 +1,15 @@ +# Progress log + +## Timeline + +- **2026-03-20 Start**: Claimed task, read issue #18 and LogAccumulator source code +- **2026-03-20**: Identified root cause — race between `block_until_any_task_ready` consuming `{ref, result}` and subsequent `:DOWN` arriving via `handle_info` with ref already removed from `pending_tasks` +- **2026-03-20**: Also found secondary bug — `handle_info({ref, result}, state)` doesn't actually remove ref from `pending_tasks` despite the comment +- **2026-03-20**: Implemented fix with 3 changes: proper demonitor+cleanup in handle_info, demonitor+flush in block_until_any_task_ready, and catch-all handle_info clause +- **2026-03-20**: Added integration test, all 45 tests pass +- **2026-03-20**: Opened PR https://github.com/electric-sql/elixir-otel-metric-exporter/pull/31 + +## Operational issues + +- Had to run `mix deps.get` to download `protobuf` dependency before tests would run +- Test design required careful handling of the feedback loop: failed exports generate Logger.debug logs which themselves get queued as log events, causing cascading export attempts. Used `retry: false` config and `Bypass.stub` (not `expect_once`) to handle this gracefully diff --git a/.agent-tasks/2026-03-20--4--log-accumulator-crash/prompt.md b/.agent-tasks/2026-03-20--4--log-accumulator-crash/prompt.md new file mode 100644 index 0000000..5d5308e --- /dev/null +++ b/.agent-tasks/2026-03-20--4--log-accumulator-crash/prompt.md @@ -0,0 +1,8 @@ +# Task prompt + +Assigned issue: electric-sql/alco-agent-tasks#4 +Upstream issue: electric-sql/elixir-otel-metric-exporter#18 + +**Title:** When export fails repeatedly due to the remote server being unavailable, LogAccumulator crashes hard + +**Description:** Investigate and implement a fix for the LogAccumulator crashing with `FunctionClauseError` when it receives `:DOWN` messages from export tasks that it no longer tracks in `pending_tasks`. diff --git a/.agent-tasks/2026-03-20--4--log-accumulator-crash/task.md b/.agent-tasks/2026-03-20--4--log-accumulator-crash/task.md new file mode 100644 index 0000000..b2e4c82 --- /dev/null +++ b/.agent-tasks/2026-03-20--4--log-accumulator-crash/task.md @@ -0,0 +1,27 @@ +# Task: Fix LogAccumulator crash on unmatched :DOWN messages + +## Problem + +The `LogAccumulator` GenServer (used as a `logger_olp` callback) crashes with a `FunctionClauseError` when it receives a `:DOWN` message whose ref is not present in `pending_tasks`. + +## Root Cause + +There are two code paths that handle task completion messages: + +1. **`handle_info/2`** — processes messages delivered via the normal GenServer message loop +2. **`block_until_any_task_ready/1`** — uses a raw `receive` to consume messages from the mailbox when the process is blocking + +The race condition: +- `block_until_any_task_ready` receives `{ref, result}` from a completed task and removes `ref` from `pending_tasks` +- The subsequent `:DOWN` message for that same task process arrives via `handle_info` +- The guard `is_map_key(state.pending_tasks, ref)` fails on the `:DOWN` handler +- No catch-all clause exists → `FunctionClauseError` crash + +Secondary bug: `handle_info({ref, result}, state)` at line 124 says "Remove the task from the pending tasks map" but the returned state is unchanged — the ref is never actually removed. + +## Fix Strategy + +1. In `handle_info({ref, result}, state)`: demonitor with flush and actually remove ref from `pending_tasks` +2. In `block_until_any_task_ready`: when receiving `{ref, result}`, demonitor with flush to prevent orphaned `:DOWN` +3. Add a catch-all `handle_info` clause to silently ignore any stray `:DOWN` or task result messages +4. Add a test that exercises the crash scenario diff --git a/lib/otel_metric_exporter/log_accumulator.ex b/lib/otel_metric_exporter/log_accumulator.ex index db8cc19..b6f4f63 100644 --- a/lib/otel_metric_exporter/log_accumulator.ex +++ b/lib/otel_metric_exporter/log_accumulator.ex @@ -123,22 +123,28 @@ defmodule OtelMetricExporter.LogAccumulator do def handle_info({ref, result}, state) when is_map_key(state.pending_tasks, ref) do + Process.demonitor(ref, [:flush]) + if match?({:error, _}, result) do Logger.debug( "Error sending logs to #{state.api.config.otlp_endpoint}: #{inspect(elem(result, 1))}" ) end - # Remove the task from the pending tasks map - {:noreply, state} + {:noreply, %{state | pending_tasks: Map.delete(state.pending_tasks, ref)}} end def handle_info({:DOWN, ref, :process, _, _}, state) when is_map_key(state.pending_tasks, ref) do - # Remove the task from the pending tasks map {:noreply, %{state | pending_tasks: Map.delete(state.pending_tasks, ref)}} end + # Catch-all for stray messages (e.g. late :DOWN or task results after ref was + # already cleaned up by block_until_any_task_ready or a prior handler). + def handle_info(_msg, state) do + {:noreply, state} + end + def terminate(_reason, state) do # Send any remaining logs if possible send_events_via_task(state) @@ -179,11 +185,10 @@ defmodule OtelMetricExporter.LogAccumulator do # from a task that we started receive do {ref, _result} when is_map_key(pending_tasks, ref) -> - # Remove the task from the pending tasks map + Process.demonitor(ref, [:flush]) %{state | pending_tasks: Map.delete(pending_tasks, ref)} {:DOWN, ref, :process, _, _} when is_map_key(pending_tasks, ref) -> - # Remove the task from the pending tasks map %{state | pending_tasks: Map.delete(pending_tasks, ref)} end end diff --git a/test/otel_metric_exporter/log_handler_integration_test.exs b/test/otel_metric_exporter/log_handler_integration_test.exs index 6a6f966..4a3021d 100644 --- a/test/otel_metric_exporter/log_handler_integration_test.exs +++ b/test/otel_metric_exporter/log_handler_integration_test.exs @@ -444,4 +444,37 @@ defmodule OtelMetricExporter.LogHandlerIntegrationTest do } ] = logs end + + test "LogAccumulator survives repeated export failures", %{ + bypass: bypass, + handler_id: handler_id, + config: initial_config + } do + # Configure with small buffer, single concurrent request, and no retries. + # This forces block_until_any_task_ready to be exercised, which is the + # code path that previously caused FunctionClauseError on stray :DOWN messages. + new_config = %{max_buffer_size: 1, debounce_ms: 5, otlp_concurrent_requests: 1, retry: false} + merged_config = Map.merge(initial_config, new_config) + :ok = :logger.set_handler_config(handler_id, %{config: merged_config}) + + # Return 500 for all requests so exports fail immediately (no retry) + Bypass.stub(bypass, "POST", "/v1/logs", fn conn -> + Plug.Conn.resp(conn, 500, "") + end) + + # Send a burst of logs to trigger multiple export attempts. + # With max_buffer_size=1 and concurrent_requests=1, this will force + # the accumulator into block_until_any_task_ready repeatedly. + for i <- 1..5 do + Logger.info("failure-test-#{i}") + Process.sleep(10) + end + + # Wait for tasks to complete + Process.sleep(200) + + # Verify the handler is still alive by checking it's still registered + handlers = :logger.get_handler_ids() + assert handler_id in handlers, "LogAccumulator should still be alive after export failures" + end end