Skip to content

Commit eff04bf

Browse files
committed
feat(logs): introduce TelemetryProcessor
1 parent a1c636a commit eff04bf

26 files changed

Lines changed: 2411 additions & 469 deletions

lib/sentry.ex

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ defmodule Sentry do
4040
> was the `:included_environments` option (a list of environments to report events for).
4141
> This was used together with the `:environment_name` option to determine whether to
4242
> send events. `:included_environments` is deprecated in v10.0.0 in favor of setting
43-
> or not setting `:dsn`. It will be removed in v11.0.0.
43+
> or not setting `:dsn`. It will be removed in v12.0.0.
4444
4545
You can even rely on more specific logic to determine the environment name. It's
4646
not uncommon for most applications to have a "staging" environment. In order
@@ -183,7 +183,17 @@ defmodule Sentry do
183183
> with `:source_code_exclude_patterns`.
184184
"""
185185

186-
alias Sentry.{CheckIn, Client, ClientError, ClientReport, Config, Event, LoggerUtils, Options}
186+
alias Sentry.{
187+
CheckIn,
188+
Client,
189+
ClientError,
190+
ClientReport,
191+
Config,
192+
Event,
193+
LoggerUtils,
194+
Options,
195+
TelemetryProcessor
196+
}
187197

188198
require Logger
189199

@@ -350,7 +360,7 @@ defmodule Sentry do
350360
"""
351361
@spec send_event(Event.t(), keyword()) :: send_result
352362
def send_event(event, options \\ []) do
353-
# TODO: remove on v11.0.0, :included_environments was deprecated in 10.0.0.
363+
# TODO: remove on v12.0.0, :included_environments was deprecated in 10.0.0.
354364
included_envs = Config.included_environments()
355365

356366
cond do
@@ -378,7 +388,7 @@ defmodule Sentry do
378388
end
379389

380390
def send_transaction(transaction, options \\ []) do
381-
# TODO: remove on v11.0.0, :included_environments was deprecated in 10.0.0.
391+
# TODO: remove on v12.0.0, :included_environments was deprecated in 10.0.0.
382392
included_envs = Config.included_environments()
383393

384394
cond do
@@ -502,4 +512,42 @@ defmodule Sentry do
502512
nil -> nil
503513
end
504514
end
515+
516+
@doc """
517+
Flushes all pending events to Sentry.
518+
519+
This is a blocking call that drains all the buffers and waits for the scheduler
520+
to process all pending items. Useful before application shutdown to ensure
521+
all telemetry events are sent.
522+
523+
## Options
524+
525+
* `:timeout` - Maximum time to wait for flush to complete (default: 5000ms)
526+
527+
## Examples
528+
529+
# Flush with default timeout
530+
Sentry.flush()
531+
532+
# Flush with custom timeout
533+
Sentry.flush(timeout: 10_000)
534+
535+
"""
536+
@doc since: "12.0.0"
537+
@spec flush(keyword()) :: :ok
538+
def flush(opts \\ []) do
539+
timeout = Keyword.get(opts, :timeout, 5000)
540+
541+
try do
542+
TelemetryProcessor.flush(TelemetryProcessor.default_name(), timeout)
543+
catch
544+
:exit, {:noproc, _} ->
545+
Logger.warning("Sentry.flush/1 failed: TelemetryProcessor not running")
546+
:ok
547+
548+
:exit, reason ->
549+
Logger.warning("Sentry.flush/1 failed unexpectedly: #{inspect(reason)}")
550+
:ok
551+
end
552+
end
505553
end

lib/sentry/application.ex

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,15 @@ defmodule Sentry.Application do
3434
[]
3535
end
3636

37-
maybe_log_event_buffer =
37+
maybe_telemetry_processor =
3838
if Config.enable_logs?() do
3939
[
40-
{Task.Supervisor, name: Sentry.LogEventBuffer.TaskSupervisor},
41-
Sentry.LogEventBuffer
40+
{Sentry.TelemetryProcessor,
41+
[
42+
buffer_capacities: Config.telemetry_buffer_capacities(),
43+
scheduler_weights: Config.telemetry_scheduler_weights(),
44+
transport_capacity: Config.transport_capacity()
45+
]}
4246
]
4347
else
4448
[]
@@ -58,7 +62,7 @@ defmodule Sentry.Application do
5862
] ++
5963
maybe_http_client_spec ++
6064
maybe_span_storage ++
61-
maybe_log_event_buffer ++
65+
maybe_telemetry_processor ++
6266
maybe_rate_limiter() ++
6367
[Sentry.Transport.SenderPool]
6468

lib/sentry/client.ex

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ defmodule Sentry.Client do
1414
Envelope,
1515
Event,
1616
Interfaces,
17-
LogEvent,
1817
LoggerUtils,
1918
Options,
2019
Transaction,
@@ -135,69 +134,6 @@ defmodule Sentry.Client do
135134
end
136135
end
137136

138-
@doc """
139-
Sends a batch of log events to Sentry.
140-
141-
Log events are sent asynchronously and do not support sampling.
142-
They are buffered and sent in batches according to the Sentry Logs Protocol.
143-
144-
If a `:before_send_log` callback is configured, it will be called for each log event.
145-
If the callback returns `nil` or `false`, the log event is not sent. If it returns an
146-
updated `Sentry.LogEvent`, that will be used instead.
147-
148-
Returns `{:ok, envelope_id}` on success or `{:error, reason}` on failure.
149-
"""
150-
@doc since: "12.0.0"
151-
@spec send_log_batch([LogEvent.t()]) ::
152-
{:ok, envelope_id :: String.t()} | {:error, ClientError.t()}
153-
def send_log_batch([]), do: {:ok, ""}
154-
155-
def send_log_batch(log_events) when is_list(log_events) do
156-
case Sentry.Test.maybe_collect_logs(log_events) do
157-
:collected ->
158-
{:ok, ""}
159-
160-
:not_collecting ->
161-
log_events
162-
|> run_before_send_log_callbacks()
163-
|> send_log_events()
164-
end
165-
end
166-
167-
defp run_before_send_log_callbacks(log_events) do
168-
callback = Config.before_send_log()
169-
170-
if callback do
171-
for log_event <- log_events,
172-
%LogEvent{} = modified_event <- [call_before_send_log(log_event, callback)] do
173-
modified_event
174-
end
175-
else
176-
log_events
177-
end
178-
end
179-
180-
defp call_before_send_log(log_event, function) when is_function(function, 1) do
181-
function.(log_event)
182-
end
183-
184-
defp call_before_send_log(log_event, {mod, fun}) do
185-
apply(mod, fun, [log_event])
186-
end
187-
188-
defp send_log_events([]), do: {:ok, ""}
189-
190-
defp send_log_events(log_events) do
191-
client = Config.client()
192-
193-
request_retries =
194-
Application.get_env(:sentry, :request_retries, Transport.default_retries())
195-
196-
log_events
197-
|> Envelope.from_log_events()
198-
|> Transport.encode_and_post_envelope(client, request_retries)
199-
end
200-
201137
defp sample_event(sample_rate) do
202138
cond do
203139
sample_rate == 1 -> :ok

lib/sentry/client_report/sender.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ defmodule Sentry.ClientReport.Sender do
3131
| Sentry.CheckIn.t()
3232
| ClientReport.t()
3333
| Sentry.Event.t()
34+
| Sentry.Transaction.t()
3435
def record_discarded_events(reason, event_items, genserver)
3536
when is_list(event_items) do
3637
# We silently ignore events whose reasons aren't valid because we have to add it to the allowlist in Snuba

lib/sentry/config.ex

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -394,17 +394,6 @@ defmodule Sentry.Config do
394394
Use `Sentry.LogsHandler` to capture log events from Erlang's `:logger`.
395395
*Available since 12.0.0*.
396396
"""
397-
],
398-
max_log_events: [
399-
type: :non_neg_integer,
400-
default: 100,
401-
doc: """
402-
The maximum number of log events to buffer before flushing to Sentry.
403-
Log events are buffered and sent in batches to reduce network overhead.
404-
When the buffer reaches this size, it will be flushed immediately.
405-
Otherwise, logs are flushed every 5 seconds. Only applies when `:enable_logs`
406-
is `true`. *Available since 12.0.0*.
407-
"""
408397
]
409398
]
410399

@@ -486,6 +475,41 @@ defmodule Sentry.Config do
486475
connections to keep in the pool. Only applied if `:client` is set to
487476
`Sentry.HackneyClient`.
488477
"""
478+
],
479+
telemetry_buffer_capacities: [
480+
type: {:map, {:in, [:log]}, :pos_integer},
481+
default: %{},
482+
type_doc: "`%{category => pos_integer()}`",
483+
doc: """
484+
Overrides for the maximum number of items each telemetry buffer can hold.
485+
When a buffer reaches capacity, oldest items are dropped to make room.
486+
Currently only the `:log` category is managed by the TelemetryProcessor.
487+
Default: log=1000.
488+
*Available since v12.0.0*.
489+
"""
490+
],
491+
telemetry_scheduler_weights: [
492+
type: {:map, {:in, [:low]}, :pos_integer},
493+
default: %{},
494+
type_doc: "`%{priority => pos_integer()}`",
495+
doc: """
496+
Overrides for the weighted round-robin scheduler priority weights.
497+
Higher weights mean more sending slots for that priority level.
498+
Currently only the `:low` priority (logs) is managed by the TelemetryProcessor.
499+
Default: low=2.
500+
*Available since v12.0.0*.
501+
"""
502+
],
503+
transport_capacity: [
504+
type: :pos_integer,
505+
default: 1000,
506+
doc: """
507+
Maximum number of items the transport queue can hold. For log envelopes,
508+
each log event counts as one item toward capacity. When the queue is full,
509+
the scheduler stops dequeuing from buffers until space becomes available.
510+
The transport queue processes one envelope at a time.
511+
*Available since v12.0.0*.
512+
"""
489513
]
490514
]
491515

@@ -827,8 +851,14 @@ defmodule Sentry.Config do
827851
@spec enable_logs?() :: boolean()
828852
def enable_logs?, do: fetch!(:enable_logs)
829853

830-
@spec max_log_events() :: non_neg_integer()
831-
def max_log_events, do: fetch!(:max_log_events)
854+
@spec telemetry_buffer_capacities() :: %{Sentry.Telemetry.Category.t() => pos_integer()}
855+
def telemetry_buffer_capacities, do: fetch!(:telemetry_buffer_capacities)
856+
857+
@spec telemetry_scheduler_weights() :: %{Sentry.Telemetry.Category.priority() => pos_integer()}
858+
def telemetry_scheduler_weights, do: fetch!(:telemetry_scheduler_weights)
859+
860+
@spec transport_capacity() :: pos_integer()
861+
def transport_capacity, do: fetch!(:transport_capacity)
832862

833863
@spec before_send_log() ::
834864
(Sentry.LogEvent.t() -> Sentry.LogEvent.t() | nil | false) | {module(), atom()} | nil

lib/sentry/envelope.ex

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,20 @@ defmodule Sentry.Envelope do
114114
def get_data_category(%Event{}), do: "error"
115115
def get_data_category(%LogBatch{}), do: "log_item"
116116

117+
@doc """
118+
Returns the total number of payload items in the envelope.
119+
120+
For log envelopes, this counts individual log events within the LogBatch.
121+
For other envelope types, each item counts as 1.
122+
"""
123+
@spec item_count(t()) :: non_neg_integer()
124+
def item_count(%__MODULE__{items: items}) do
125+
Enum.reduce(items, 0, fn
126+
%LogBatch{log_events: log_events}, acc -> acc + length(log_events)
127+
_other, acc -> acc + 1
128+
end)
129+
end
130+
117131
@doc """
118132
Encodes the envelope into its binary representation.
119133

0 commit comments

Comments
 (0)