Skip to content

Commit 33c4c79

Browse files
adinauerclaude
andcommitted
feat(samples): Add opt-in Kafka console e2e coverage
Gate the console Kafka showcase behind SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS so Kafka behavior is enabled only when configured. Keep the showcase isolated in KafkaShowcase and use fail-fast Kafka client timeouts for local runs.\n\nExtend console system tests to assert producer and consumer queue tracing when Kafka is enabled. Update system-test-runner to provision or reuse a local Kafka broker for the console module and clean up runner-managed resources. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent cb4d2ac commit 33c4c79

File tree

4 files changed

+142
-22
lines changed

4 files changed

+142
-22
lines changed

sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/KafkaShowcase.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,22 @@ final class KafkaShowcase {
2424

2525
private KafkaShowcase() {}
2626

27-
static void demonstrate() {
27+
static void demonstrate(final String bootstrapServers) {
2828
final String topic = "sentry-topic-console-sample";
2929
final CountDownLatch consumedLatch = new CountDownLatch(1);
30-
final Thread consumerThread = startKafkaConsumerThread(topic, consumedLatch);
30+
final Thread consumerThread = startKafkaConsumerThread(topic, bootstrapServers, consumedLatch);
3131

3232
final Properties producerProperties = new Properties();
33-
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
33+
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
3434
producerProperties.put(
3535
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3636
producerProperties.put(
3737
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3838
producerProperties.put(
3939
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName());
40+
producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000);
41+
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
42+
producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000);
4043

4144
final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo");
4245
try (ISentryLifecycleToken ignored = transaction.makeCurrent()) {
@@ -66,12 +69,12 @@ static void demonstrate() {
6669
}
6770

6871
private static Thread startKafkaConsumerThread(
69-
final String topic, final CountDownLatch consumedLatch) {
72+
final String topic, final String bootstrapServers, final CountDownLatch consumedLatch) {
7073
final Thread consumerThread =
7174
new Thread(
7275
() -> {
7376
final Properties consumerProperties = new Properties();
74-
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
77+
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
7578
consumerProperties.put(
7679
ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID());
7780
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -83,6 +86,8 @@ private static Thread startKafkaConsumerThread(
8386
consumerProperties.put(
8487
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
8588
SentryKafkaConsumerInterceptor.class.getName());
89+
consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000);
90+
consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
8691

8792
try (KafkaConsumer<String, String> consumer =
8893
new KafkaConsumer<>(consumerProperties)) {

sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ public class Main {
1616
private static long numberOfDiscardedSpansDueToOverflow = 0;
1717

1818
public static void main(String[] args) throws InterruptedException {
19+
final String kafkaBootstrapServers = System.getenv("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS");
20+
final boolean kafkaEnabled =
21+
kafkaBootstrapServers != null && !kafkaBootstrapServers.trim().isEmpty();
22+
1923
Sentry.init(
2024
options -> {
2125
// NOTE: Replace the test DSN below with YOUR OWN DSN to see the events from this app in
@@ -95,7 +99,7 @@ public static void main(String[] args) throws InterruptedException {
9599

96100
// Enable cache tracing to create spans for cache operations
97101
options.setEnableCacheTracing(true);
98-
options.setEnableQueueTracing(true);
102+
options.setEnableQueueTracing(kafkaEnabled);
99103

100104
// Determine traces sample rate based on the sampling context
101105
// options.setTracesSampler(
@@ -181,9 +185,10 @@ public static void main(String[] args) throws InterruptedException {
181185

182186
// Kafka queue tracing with kafka-clients interceptors.
183187
//
184-
// This uses the native producer interceptor from sentry-kafka.
185-
// If no local Kafka broker is available, this block exits quietly.
186-
KafkaShowcase.demonstrate();
188+
// Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
189+
if (kafkaEnabled) {
190+
KafkaShowcase.demonstrate(kafkaBootstrapServers);
191+
}
187192

188193
// Performance feature
189194
//

sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,7 @@ class ConsoleApplicationSystemTest {
1919

2020
@Test
2121
fun `console application sends expected events when run as JAR`() {
22-
val jarFile = testHelper.findJar("sentry-samples-console")
23-
val process =
24-
testHelper.launch(
25-
jarFile,
26-
mapOf(
27-
"SENTRY_DSN" to testHelper.dsn,
28-
"SENTRY_TRACES_SAMPLE_RATE" to "1.0",
29-
"SENTRY_ENABLE_PRETTY_SERIALIZATION_OUTPUT" to "false",
30-
"SENTRY_DEBUG" to "true",
31-
"SENTRY_PROFILE_SESSION_SAMPLE_RATE" to "1.0",
32-
"SENTRY_PROFILE_LIFECYCLE" to "TRACE",
33-
),
34-
)
22+
val process = launchConsoleProcess()
3523

3624
process.waitFor(30, TimeUnit.SECONDS)
3725
assertEquals(0, process.exitValue())
@@ -40,6 +28,40 @@ class ConsoleApplicationSystemTest {
4028
verifyExpectedEvents()
4129
}
4230

31+
@Test
32+
fun `console application sends kafka producer and consumer tracing when kafka is enabled`() {
33+
val process =
34+
launchConsoleProcess(mapOf("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS" to "localhost:9092"))
35+
36+
process.waitFor(30, TimeUnit.SECONDS)
37+
assertEquals(0, process.exitValue())
38+
39+
testHelper.ensureTransactionReceived { transaction, _ ->
40+
transaction.transaction == "kafka-demo" &&
41+
testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")
42+
}
43+
44+
testHelper.ensureTransactionReceived { transaction, _ ->
45+
testHelper.doesTransactionHaveOp(transaction, "queue.receive") &&
46+
transaction.contexts.trace?.data?.get("messaging.system") == "kafka"
47+
}
48+
}
49+
50+
private fun launchConsoleProcess(overrides: Map<String, String> = emptyMap()): Process {
51+
val jarFile = testHelper.findJar("sentry-samples-console")
52+
val env =
53+
mutableMapOf(
54+
"SENTRY_DSN" to testHelper.dsn,
55+
"SENTRY_TRACES_SAMPLE_RATE" to "1.0",
56+
"SENTRY_ENABLE_PRETTY_SERIALIZATION_OUTPUT" to "false",
57+
"SENTRY_DEBUG" to "true",
58+
"SENTRY_PROFILE_SESSION_SAMPLE_RATE" to "1.0",
59+
"SENTRY_PROFILE_LIFECYCLE" to "TRACE",
60+
)
61+
env.putAll(overrides)
62+
return testHelper.launch(jarFile, env)
63+
}
64+
4365
private fun verifyExpectedEvents() {
4466
var profilerId: SentryId? = null
4567
// Verify we received a "Fatal message!" event

test/system-test-runner.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import argparse
4343
import requests
4444
import threading
45+
import socket
4546
from pathlib import Path
4647
from typing import Optional, List, Tuple
4748
from dataclasses import dataclass
@@ -65,6 +66,9 @@
6566
"SENTRY_ENABLE_CACHE_TRACING": "true"
6667
}
6768

69+
KAFKA_CONTAINER_NAME = "sentry-java-system-test-kafka"
70+
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
71+
6872
class ServerType(Enum):
6973
TOMCAT = 0
7074
SPRING = 1
@@ -155,6 +159,7 @@ def __init__(self):
155159
self.mock_server = Server(name="Mock", pid_filepath="sentry-mock-server.pid")
156160
self.tomcat_server = Server(name="Tomcat", pid_filepath="tomcat-server.pid")
157161
self.spring_server = Server(name="Spring", pid_filepath="spring-server.pid")
162+
self.kafka_started_by_runner = False
158163

159164
# Load existing PIDs if available
160165
for server in (self.mock_server, self.tomcat_server, self.spring_server):
@@ -196,7 +201,78 @@ def kill_process(self, pid: int, name: str) -> None:
196201
except (OSError, ProcessLookupError):
197202
print(f"Process {pid} was already dead")
198203

204+
def module_requires_kafka(self, sample_module: str) -> bool:
205+
return sample_module == "sentry-samples-console"
206+
207+
def wait_for_port(self, host: str, port: int, max_attempts: int = 20) -> bool:
208+
for _ in range(max_attempts):
209+
try:
210+
with socket.create_connection((host, port), timeout=1):
211+
return True
212+
except OSError:
213+
time.sleep(1)
214+
return False
199215

216+
def start_kafka_broker(self) -> None:
217+
if self.wait_for_port("localhost", 9092, max_attempts=1):
218+
print("Kafka broker already running on localhost:9092, reusing it.")
219+
self.kafka_started_by_runner = False
220+
return
221+
222+
self.stop_kafka_broker()
223+
224+
print("Starting Kafka broker (Redpanda) for system tests...")
225+
run_result = subprocess.run(
226+
[
227+
"docker",
228+
"run",
229+
"-d",
230+
"--name",
231+
KAFKA_CONTAINER_NAME,
232+
"-p",
233+
"9092:9092",
234+
"docker.redpanda.com/redpandadata/redpanda:v24.1.9",
235+
"redpanda",
236+
"start",
237+
"--overprovisioned",
238+
"--smp",
239+
"1",
240+
"--memory",
241+
"1G",
242+
"--reserve-memory",
243+
"0M",
244+
"--node-id",
245+
"0",
246+
"--check=false",
247+
"--kafka-addr",
248+
"PLAINTEXT://0.0.0.0:9092",
249+
"--advertise-kafka-addr",
250+
"PLAINTEXT://localhost:9092",
251+
],
252+
check=False,
253+
capture_output=True,
254+
text=True,
255+
)
256+
257+
if run_result.returncode != 0:
258+
raise RuntimeError(f"Failed to start Kafka container: {run_result.stderr}")
259+
260+
if not self.wait_for_port("localhost", 9092, max_attempts=30):
261+
raise RuntimeError("Kafka broker did not become ready on localhost:9092")
262+
263+
self.kafka_started_by_runner = True
264+
265+
def stop_kafka_broker(self) -> None:
266+
if not self.kafka_started_by_runner:
267+
return
268+
269+
subprocess.run(
270+
["docker", "rm", "-f", KAFKA_CONTAINER_NAME],
271+
check=False,
272+
stdout=subprocess.DEVNULL,
273+
stderr=subprocess.DEVNULL,
274+
)
275+
self.kafka_started_by_runner = False
200276

201277
def start_sentry_mock_server(self) -> None:
202278
"""Start the Sentry mock server."""
@@ -557,6 +633,12 @@ def setup_test_infrastructure(self, sample_module: str, java_agent: str,
557633
java_agent_auto_init: str, build_before_run: str,
558634
server_type: Optional[ServerType]) -> int:
559635
"""Set up test infrastructure. Returns 0 on success, error code on failure."""
636+
if self.module_requires_kafka(sample_module):
637+
self.start_kafka_broker()
638+
os.environ["SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS"] = KAFKA_BOOTSTRAP_SERVERS
639+
else:
640+
os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None)
641+
560642
# Build if requested
561643
if build_before_run == "1":
562644
print("Building before test run")
@@ -624,6 +706,8 @@ def run_single_test(self, sample_module: str, java_agent: str,
624706
elif server_type == ServerType.SPRING:
625707
self.stop_spring_server()
626708
self.stop_sentry_mock_server()
709+
self.stop_kafka_broker()
710+
os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None)
627711

628712
def run_all_tests(self) -> int:
629713
"""Run all system tests."""
@@ -954,6 +1038,8 @@ def cleanup_on_exit(self, signum, frame):
9541038
self.stop_spring_server()
9551039
self.stop_sentry_mock_server()
9561040
self.stop_tomcat_server()
1041+
self.stop_kafka_broker()
1042+
os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None)
9571043
sys.exit(1)
9581044

9591045
def main():
@@ -1152,6 +1238,8 @@ def main():
11521238
runner.stop_spring_server()
11531239
runner.stop_sentry_mock_server()
11541240
runner.stop_tomcat_server()
1241+
runner.stop_kafka_broker()
1242+
os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None)
11551243

11561244
if __name__ == "__main__":
11571245
sys.exit(main())

0 commit comments

Comments
 (0)