Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Discarded threads

(none)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Open questions

(none)
36 changes: 36 additions & 0 deletions .agent-tasks/2026-03-20--4--log-accumulator-crash/plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Implementation Plan

## Step 1: Fix task result handler to properly clean up

In `handle_info({ref, result}, state)` (line 124-134):
- Call `Process.demonitor(ref, [:flush])` to prevent the `:DOWN` message from arriving
- Actually remove the ref from `pending_tasks` (the current code has a comment but doesn't do it)

## Step 2: Fix `block_until_any_task_ready` to demonitor on result receipt

When the `receive` block in `block_until_any_task_ready` catches `{ref, _result}`:
- Call `Process.demonitor(ref, [:flush])` to prevent orphaned `:DOWN` messages

## Step 3: Add catch-all `handle_info` clause

Add a catch-all at the bottom of the `handle_info` clauses that silently ignores any unexpected messages. This is a safety net for any edge cases we haven't anticipated.

## Step 4: Add test for the crash scenario

Write a test that:
- Sets up a LogAccumulator with concurrent request limit of 1
- Sends enough logs to trigger `block_until_any_task_ready`
- Makes the export endpoint unavailable so tasks fail
- Verifies the LogAccumulator doesn't crash

---

## Review discussion

**Reviewer concern:** Adding a catch-all `handle_info` could hide future bugs.

**Response:** Since this module uses `@behaviour GenServer` and is used as a `logger_olp` callback, unexpected messages are a real possibility (e.g., `:EXIT` from `trap_exit`, or stray messages from the OTP logger framework). The catch-all only applies after all specific handlers have been tried. The `:DOWN` and task result message types are well-understood, and the catch-all acts as a safety net.

**Reviewer concern:** Should we log stray messages?

**Response:** No — logging from inside a log handler risks recursion. Silent discard is the correct behavior here.
15 changes: 15 additions & 0 deletions .agent-tasks/2026-03-20--4--log-accumulator-crash/progress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Progress log

## Timeline

- **2026-03-20 Start**: Claimed task, read issue #18 and LogAccumulator source code
- **2026-03-20**: Identified root cause — race between `block_until_any_task_ready` consuming `{ref, result}` and subsequent `:DOWN` arriving via `handle_info` with ref already removed from `pending_tasks`
- **2026-03-20**: Also found secondary bug — `handle_info({ref, result}, state)` doesn't actually remove ref from `pending_tasks` despite the comment
- **2026-03-20**: Implemented fix with 3 changes: proper demonitor+cleanup in handle_info, demonitor+flush in block_until_any_task_ready, and catch-all handle_info clause
- **2026-03-20**: Added integration test, all 45 tests pass
- **2026-03-20**: Opened PR https://github.com/electric-sql/elixir-otel-metric-exporter/pull/31

## Operational issues

- Had to run `mix deps.get` to download `protobuf` dependency before tests would run
- Test design required careful handling of the feedback loop: failed exports generate Logger.debug logs which themselves get queued as log events, causing cascading export attempts. Used `retry: false` config and `Bypass.stub` (not `expect_once`) to handle this gracefully
8 changes: 8 additions & 0 deletions .agent-tasks/2026-03-20--4--log-accumulator-crash/prompt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Task prompt

Assigned issue: electric-sql/alco-agent-tasks#4
Upstream issue: electric-sql/elixir-otel-metric-exporter#18

**Title:** When export fails repeatedly due to the remote server being unavailable, LogAccumulator crashes hard

**Description:** Investigate and implement a fix for the LogAccumulator crashing with `FunctionClauseError` when it receives `:DOWN` messages from export tasks that it no longer tracks in `pending_tasks`.
27 changes: 27 additions & 0 deletions .agent-tasks/2026-03-20--4--log-accumulator-crash/task.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Task: Fix LogAccumulator crash on unmatched :DOWN messages

## Problem

The `LogAccumulator` GenServer (used as a `logger_olp` callback) crashes with a `FunctionClauseError` when it receives a `:DOWN` message whose ref is not present in `pending_tasks`.

## Root Cause

There are two code paths that handle task completion messages:

1. **`handle_info/2`** — processes messages delivered via the normal GenServer message loop
2. **`block_until_any_task_ready/1`** — uses a raw `receive` to consume messages from the mailbox when the process is blocking

The race condition:
- `block_until_any_task_ready` receives `{ref, result}` from a completed task and removes `ref` from `pending_tasks`
- The subsequent `:DOWN` message for that same task process arrives via `handle_info`
- The guard `is_map_key(state.pending_tasks, ref)` fails on the `:DOWN` handler
- No catch-all clause exists → `FunctionClauseError` crash

Secondary bug: `handle_info({ref, result}, state)` at line 124 says "Remove the task from the pending tasks map" but the returned state is unchanged — the ref is never actually removed.

## Fix Strategy

1. In `handle_info({ref, result}, state)`: demonitor with flush and actually remove ref from `pending_tasks`
2. In `block_until_any_task_ready`: when receiving `{ref, result}`, demonitor with flush to prevent orphaned `:DOWN`
3. Add a catch-all `handle_info` clause to silently ignore any stray `:DOWN` or task result messages
4. Add a test that exercises the crash scenario
15 changes: 10 additions & 5 deletions lib/otel_metric_exporter/log_accumulator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,28 @@ defmodule OtelMetricExporter.LogAccumulator do

def handle_info({ref, result}, state)
when is_map_key(state.pending_tasks, ref) do
Process.demonitor(ref, [:flush])

if match?({:error, _}, result) do
Logger.debug(
"Error sending logs to #{state.api.config.otlp_endpoint}: #{inspect(elem(result, 1))}"
)
end

# Remove the task from the pending tasks map
{:noreply, state}
{:noreply, %{state | pending_tasks: Map.delete(state.pending_tasks, ref)}}
end

def handle_info({:DOWN, ref, :process, _, _}, state)
when is_map_key(state.pending_tasks, ref) do
# Remove the task from the pending tasks map
{:noreply, %{state | pending_tasks: Map.delete(state.pending_tasks, ref)}}
end

# Catch-all for stray messages (e.g. late :DOWN or task results after ref was
# already cleaned up by block_until_any_task_ready or a prior handler).
def handle_info(_msg, state) do
{:noreply, state}
end

def terminate(_reason, state) do
# Send any remaining logs if possible
send_events_via_task(state)
Expand Down Expand Up @@ -179,11 +185,10 @@ defmodule OtelMetricExporter.LogAccumulator do
# from a task that we started
receive do
{ref, _result} when is_map_key(pending_tasks, ref) ->
# Remove the task from the pending tasks map
Process.demonitor(ref, [:flush])
%{state | pending_tasks: Map.delete(pending_tasks, ref)}

{:DOWN, ref, :process, _, _} when is_map_key(pending_tasks, ref) ->
# Remove the task from the pending tasks map
%{state | pending_tasks: Map.delete(pending_tasks, ref)}
end
end
Expand Down
33 changes: 33 additions & 0 deletions test/otel_metric_exporter/log_handler_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -444,4 +444,37 @@ defmodule OtelMetricExporter.LogHandlerIntegrationTest do
}
] = logs
end

test "LogAccumulator survives repeated export failures", %{
bypass: bypass,
handler_id: handler_id,
config: initial_config
} do
# Configure with small buffer, single concurrent request, and no retries.
# This forces block_until_any_task_ready to be exercised, which is the
# code path that previously caused FunctionClauseError on stray :DOWN messages.
new_config = %{max_buffer_size: 1, debounce_ms: 5, otlp_concurrent_requests: 1, retry: false}
merged_config = Map.merge(initial_config, new_config)
:ok = :logger.set_handler_config(handler_id, %{config: merged_config})

# Return 500 for all requests so exports fail immediately (no retry)
Bypass.stub(bypass, "POST", "/v1/logs", fn conn ->
Plug.Conn.resp(conn, 500, "")
end)

# Send a burst of logs to trigger multiple export attempts.
# With max_buffer_size=1 and concurrent_requests=1, this will force
# the accumulator into block_until_any_task_ready repeatedly.
for i <- 1..5 do
Logger.info("failure-test-#{i}")
Process.sleep(10)
end

# Wait for tasks to complete
Process.sleep(200)

# Verify the handler is still alive by checking it's still registered
handlers = :logger.get_handler_ids()
assert handler_id in handlers, "LogAccumulator should still be alive after export failures"
end
end