From a55a8cf534ad0352fac8c91bd78c142f1ffa2e23 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 12 Jun 2026 01:16:36 +0530 Subject: [PATCH 01/14] Add ASAN --- .semaphore/lsan-suppressions.txt | 34 +++++++ .semaphore/semaphore.yml | 51 +++++++++++ tools/wheels/build-librdkafka-branch.sh | 113 ++++++++++++++++++++++++ 3 files changed, 198 insertions(+) create mode 100644 .semaphore/lsan-suppressions.txt create mode 100755 tools/wheels/build-librdkafka-branch.sh diff --git a/.semaphore/lsan-suppressions.txt b/.semaphore/lsan-suppressions.txt new file mode 100644 index 000000000..dbd9d5488 --- /dev/null +++ b/.semaphore/lsan-suppressions.txt @@ -0,0 +1,34 @@ +# LSAN suppressions for the share consumer ASAN pipeline (run-asan-tests.yml). +# +# CPython deliberately doesn't free a lot of interpreter state at shutdown +# (type objects, interned strings, module dicts, codec caches), so LSAN reports +# those as leaks. Suppress the interpreter's own one-time allocations here so +# the log is left with leaks originating in our cimpl extension and librdkafka. +# +# This is a STARTING POINT, not a finished list — expect to add entries after +# reading the first run's report (that triage is the real work). Keep every +# rule scoped to interpreter internals; never suppress PyInit_cimpl or +# confluent_kafka frames or you'll hide the leaks we're hunting for. + +# Object machinery / type setup kept for the life of the interpreter +leak:_PyObject_GC_New +leak:_PyObject_GC_NewVar +leak:_PyObject_GC_Resize +leak:PyType_Ready +leak:type_new + +# Unicode interning — interned strings live until shutdown +leak:_PyUnicode_New +leak:PyUnicode_New +leak:unicode_resize +leak:PyUnicode_InternInPlace + +# Imports / modules / codecs — one-time startup allocations +leak:_PyImport_FixupExtensionObject +leak:PyModule_New +leak:_PyCodecRegistry_Init + +# Raw interpreter allocator +leak:_PyMem_RawMalloc +leak:_PyMem_RawRealloc +leak:_PyMem_RawCalloc diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index f5c1a7d4f..4ee4b0f44 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -583,6 +583,57 @@ blocks: - cd .. - artifact push project artifacts/confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz --destination confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz - echo Thank you + # TODO KIP-932: runs on every workflow (incl. PRs) while we tune the LSAN + # suppression baseline. Leaks don't gate yet (ASAN exitcode=0); flip that off + # — and ideally move this back to a manual promotion — once it's reliably green. + - name: "Share consumer tests under ASAN+LSAN" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + prologue: + commands: + # Ubuntu 24.04's high-entropy ASLR collides with the sanitizer shadow + # mapping; lower it or libasan aborts before main(). + - sudo sysctl -w vm.mmap_rnd_bits=28 + epilogue: + always: + commands: + - cp asan-share-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/asan/ + jobs: + - name: "ShareConsumer binding layer (ASAN + LSAN)" + commands: + - sem-version python 3.11 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + - uv pip install -r requirements/requirements-tests.txt + # Build an ASAN-instrumented librdkafka from the share consumer + # branch, then build the cimpl extension against it with the same + # instrumentation so binding-layer leaks show real source frames. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_SANITIZE=address tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-fsanitize=address -g -fno-omit-frame-pointer -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-fsanitize=address -L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # python3 isn't an ASAN build, so libasan has to be preloaded to come + # first; PYTHONMALLOC=malloc lets LSAN see allocations pymalloc would + # otherwise hide. exitcode=0 keeps leaks non-blocking for now — pytest + # failures still gate. Both files are broker-free. + - | + set -o pipefail + LD_PRELOAD="$(gcc -print-file-name=libasan.so)" \ + PYTHONMALLOC=malloc \ + ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=0:print_stacktrace=1:symbolize=1" \ + LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" \ + python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log promotions: - name: "Publish to Test PyPI" pipeline_file: publish-test-pypi.yml diff --git a/tools/wheels/build-librdkafka-branch.sh b/tools/wheels/build-librdkafka-branch.sh new file mode 100755 index 000000000..fdbe4a45f --- /dev/null +++ b/tools/wheels/build-librdkafka-branch.sh @@ -0,0 +1,113 @@ +#!/bin/bash +# +# TODO KIP-932: This script is temporary until share consumer support +# lands in a released librdkafka version. +# +# Build librdkafka from a git branch and install it into a NuGet-compatible +# directory layout that matches what install-librdkafka.sh produces. +# +# Usage: build-librdkafka-branch.sh +# +# branch - git branch name, e.g. dev_kip-932_queues-for-kafka +# destdir - destination directory, e.g. dest +# +# Resulting layout (mirrors NuGet redist package): +# /build/native/include/librdkafka/rdkafka.h +# /runtimes/-/native/librdkafka.{so.1,dylib} + +set -ex + +BRANCH="$1" +DEST="$2" + +if [[ -z $BRANCH || -z $DEST ]]; then + echo "Usage: $0 " + exit 1 +fi + +if [[ -f $DEST/build/native/include/librdkafka/rdkafka.h ]]; then + echo "$0: librdkafka already built in $DEST" + exit 0 +fi + +echo "$0: Building librdkafka branch '$BRANCH' into '$DEST'" + +ARCH=${ARCH:-x64} +SRC=/tmp/librdkafka-${BRANCH} +INSTALL=$SRC/install + +[[ -d "$DEST" ]] || mkdir -p "$DEST" +rm -rf "$SRC" + +git clone --depth 1 --branch "$BRANCH" \ + https://github.com/confluentinc/librdkafka.git "$SRC" + +if [[ $OSTYPE == linux* ]]; then + sudo apt-get update -qq && sudo apt-get install -y -qq libssl-dev libsasl2-dev liblz4-dev libzstd-dev +elif [[ $OSTYPE == darwin* ]]; then + # openssl@3 is keg-only in Homebrew (the system ships LibreSSL/Apple + # crypto with no -lcrypto headers), so configure's compile-probe for + # libcrypto silently fails unless we add brew's path. Same for zstd / + # lz4 / pkg-config on a fresh runner. + brew install pkg-config openssl@3 zstd lz4 + OPENSSL_PREFIX="$(brew --prefix openssl@3)" + export PKG_CONFIG_PATH="$OPENSSL_PREFIX/lib/pkgconfig${PKG_CONFIG_PATH:+:$PKG_CONFIG_PATH}" + export CPPFLAGS="-I$OPENSSL_PREFIX/include${CPPFLAGS:+ $CPPFLAGS}" + export LDFLAGS="-L$OPENSSL_PREFIX/lib${LDFLAGS:+ $LDFLAGS}" +fi + +pushd "$SRC" + +# --enable-ssl/-lz4/-zstd convert mklove's silent "failed (disable)" into a +# loud configure error if a future regression breaks dep installation — +# otherwise we ship a wheel where sasl.oauthbearer.config etc. fail at +# runtime with _INVALID_ARG -186. +CONFIGURE_OPTS="--prefix=$INSTALL --enable-ssl --enable-lz4-ext --enable-zstd" +if [[ $OSTYPE == linux* ]]; then + CONFIGURE_OPTS="$CONFIGURE_OPTS --disable-gssapi" +fi + +# LIBRDKAFKA_SANITIZE=address builds an instrumented lib for the share-consumer +# ASAN pipeline. Keep the debug symbols mklove would otherwise strip so leak +# reports land on real source lines; normal wheel builds still strip them. +if [[ -n $LIBRDKAFKA_SANITIZE ]]; then + export CFLAGS="-fsanitize=${LIBRDKAFKA_SANITIZE} -g -fno-omit-frame-pointer ${CFLAGS}" + export LDFLAGS="-fsanitize=${LIBRDKAFKA_SANITIZE} ${LDFLAGS}" +else + CONFIGURE_OPTS="$CONFIGURE_OPTS --disable-debug-symbols" +fi + +./configure $CONFIGURE_OPTS +make -j"$(nproc 2>/dev/null || sysctl -n hw.ncpu)" +make install +popd + +# --- Mirror NuGet layout --- + +INC_DST="$DEST/build/native/include" +mkdir -p "$INC_DST" +cp -r "$INSTALL/include/librdkafka" "$INC_DST/" + +if [[ $OSTYPE == linux* ]]; then + OS_NAME=linux + LIB_DST="$DEST/runtimes/$OS_NAME-$ARCH/native" + mkdir -p "$LIB_DST" + cp -v "$INSTALL"/lib/librdkafka.so* "$LIB_DST/" 2>/dev/null || true + # Ensure librdkafka.so.1 exists (needed by the loader) + if [[ ! -f "$LIB_DST/librdkafka.so.1" ]]; then + cp -v "$LIB_DST/librdkafka.so" "$LIB_DST/librdkafka.so.1" + fi + ldd "$LIB_DST/librdkafka.so.1" + +elif [[ $OSTYPE == darwin* ]]; then + OS_NAME=osx + LIB_DST="$DEST/runtimes/$OS_NAME-$ARCH/native" + mkdir -p "$LIB_DST" + cp -v "$INSTALL"/lib/librdkafka*.dylib "$LIB_DST/" + # Fix the dylib self-referencing name to its installed path + install_name_tool -id "$LIB_DST/librdkafka.dylib" "$LIB_DST/librdkafka.dylib" + otool -L "$LIB_DST/librdkafka.dylib" +fi + +rm -rf "$SRC" +echo "$0: Done. Headers at $INC_DST, library at $LIB_DST" From 1a87dd295af5ccd9caffa4d75e45a32bda0969c6 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 12 Jun 2026 12:52:21 +0530 Subject: [PATCH 02/14] Include integration tests --- .semaphore/semaphore.yml | 78 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 6 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 4ee4b0f44..da0fb130f 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -583,9 +583,10 @@ blocks: - cd .. - artifact push project artifacts/confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz --destination confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz - echo Thank you - # TODO KIP-932: runs on every workflow (incl. PRs) while we tune the LSAN - # suppression baseline. Leaks don't gate yet (ASAN exitcode=0); flip that off - # — and ideally move this back to a manual promotion — once it's reliably green. + # KIP-932: runs on every workflow (incl. PRs). An unsuppressed leak (or any + # ASAN error) now fails the job, so the LSAN baseline in lsan-suppressions.txt + # is load-bearing — keep it tight and scoped to interpreter internals or it'll + # hide real leaks. Consider a manual promotion if the per-PR build cost bites. - name: "Share consumer tests under ASAN+LSAN" dependencies: [] task: @@ -625,15 +626,80 @@ blocks: - uv pip install -e . # python3 isn't an ASAN build, so libasan has to be preloaded to come # first; PYTHONMALLOC=malloc lets LSAN see allocations pymalloc would - # otherwise hide. exitcode=0 keeps leaks non-blocking for now — pytest - # failures still gate. Both files are broker-free. + # otherwise hide. exitcode=1 makes an unsuppressed leak (or any ASAN + # error) fail the job; the build has no -fsanitize-recover so errors + # halt and exit non-zero, and pipefail carries that through the tee. + # Both files are broker-free. - | set -o pipefail LD_PRELOAD="$(gcc -print-file-name=libasan.so)" \ PYTHONMALLOC=malloc \ - ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=0:print_stacktrace=1:symbolize=1" \ + ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" \ LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" \ python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log + # KIP-932: share-consumer INTEGRATION tests under ASAN+LSAN, gated per-PR + # (companion to the broker-free job above). The broker is a JVM, so it must + # never inherit LD_PRELOAD=libasan: trivup starts the cluster in a clean env + # and exports BROKERS, the kafka_cluster fixture connects via ByoFixture, and + # LD_PRELOAD is injected only inside --cmd (the pytest process). ASAN/LSAN + # options are exported outside trivup — inert for the JVM (it never loads + # libasan), but they ride into --cmd via trivup's env.update(). The `| tee` + # sits on the outer trivup call, not inside --cmd: trivup runs --cmd via + # /bin/sh (no pipefail), which would mask a pytest/LSAN failure behind tee. + - name: "Share consumer integration tests under ASAN+LSAN" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + prologue: + commands: + # Same Ubuntu 24.04 ASLR vs sanitizer-shadow fix as the binding-layer job. + - sudo sysctl -w vm.mmap_rnd_bits=28 + epilogue: + always: + commands: + - cp asan-share-integration-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/asan-integration/ + jobs: + - name: "ShareConsumer integration (ASAN + LSAN)" + commands: + - sem-version python 3.11 + - sem-version java 17 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + # pytest comes from requirements-tests; trivup is pinned to the vendored + # 0.14.0 (PyPI 0.12.2 breaks on Kafka 4.2.0's 0.0.0.0 quorum voters). + - uv pip install -r requirements/requirements-tests.txt + - uv pip install tests/trivup/trivup-0.14.0.tar.gz + # ASAN-instrumented librdkafka + cimpl, same build as the binding-layer job. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_SANITIZE=address tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-fsanitize=address -g -fno-omit-frame-pointer -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-fsanitize=address -L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # --conf mirrors tests/common TestUtils.broker_conf(): single-broker + # RF/ISR=1 for the share-coordinator + offsets/txn topics, plus the 1s + # share lock duration the redelivery tests need. BYO mode means the + # fixture doesn't apply broker_conf, so it has to be set here. + - | + set -o pipefail + export PYTHONMALLOC=malloc + export ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" + export LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" + export PYTHONPATH="${PWD}" + python -m trivup.clusters.KafkaCluster \ + --kraft \ + --version 4.2.0 \ + --conf '["transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1","offsets.topic.replication.factor=1","offsets.topic.min.isr=1","share.coordinator.state.topic.replication.factor=1","share.coordinator.state.topic.min.isr=1","group.share.record.lock.duration.ms=1000","group.share.min.record.lock.duration.ms=1000"]' \ + --cmd 'LD_PRELOAD="$(gcc -print-file-name=libasan.so)" python -m pytest -v --timeout=300 tests/integration/share_consumer/' \ + 2>&1 | tee asan-share-integration-tests.log promotions: - name: "Publish to Test PyPI" pipeline_file: publish-test-pypi.yml From f69b7247b7c693e91a246ab13b7f6df9f9b31ce1 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 12 Jun 2026 14:56:25 +0530 Subject: [PATCH 03/14] [KIP-932] Fix share consumer ASAN integration job deps + suppress 3rd-party leaks The integration ASAN job installed requirements-tests.txt (pytest only), so pytest collection died importing the SR client that cluster_fixture loads at module level: ModuleNotFoundError: authlib. Switch to requirements-tests-install.txt, which -r's in requirements-schemaregistry.txt (authlib + cryptography) and the vendored trivup 0.14.0. Also suppress the one-time cryptography/cffi module-init allocations LSAN flags from those transitively-pulled extensions, so they don't gate the job (exitcode=1) once the tests actually run. Scoped to those .so's; the broker-free job never imports them. Co-Authored-By: Claude Opus 4.8 --- .semaphore/lsan-suppressions.txt | 17 +++++++++++++++-- .semaphore/semaphore.yml | 10 ++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/.semaphore/lsan-suppressions.txt b/.semaphore/lsan-suppressions.txt index dbd9d5488..d5888d28c 100644 --- a/.semaphore/lsan-suppressions.txt +++ b/.semaphore/lsan-suppressions.txt @@ -7,8 +7,9 @@ # # This is a STARTING POINT, not a finished list — expect to add entries after # reading the first run's report (that triage is the real work). Keep every -# rule scoped to interpreter internals; never suppress PyInit_cimpl or -# confluent_kafka frames or you'll hide the leaks we're hunting for. +# rule scoped to interpreter internals or third-party extension module-init +# (e.g. cryptography/cffi dragged in by trivup); never suppress PyInit_cimpl, +# confluent_kafka, or librdkafka frames or you'll hide the leaks we're hunting. # Object machinery / type setup kept for the life of the interpreter leak:_PyObject_GC_New @@ -32,3 +33,15 @@ leak:_PyCodecRegistry_Init leak:_PyMem_RawMalloc leak:_PyMem_RawRealloc leak:_PyMem_RawCalloc + +# Third-party C/Rust extension module-init leaks, pulled in transitively by +# trivup (jwcrypto -> cryptography -> cffi) and the schema-registry client that +# cluster_fixture imports at load time. One-time PyInit / pyo3 type-object setup +# inside those .so's — not our code, not librdkafka. (Integration job only; the +# broker-free job never imports these.) +leak:cryptography +leak:_cffi_backend +leak:create_type_object +leak:PyInit__openssl +leak:_my_Py_InitModule +leak:b_init_cffi_1_0_external_module diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index da0fb130f..44baa681f 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -673,10 +673,12 @@ blocks: - sem-version java 17 - pip install uv - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate - # pytest comes from requirements-tests; trivup is pinned to the vendored - # 0.14.0 (PyPI 0.12.2 breaks on Kafka 4.2.0's 0.0.0.0 quorum voters). - - uv pip install -r requirements/requirements-tests.txt - - uv pip install tests/trivup/trivup-0.14.0.tar.gz + # requirements-tests-install.txt pulls pytest, the schema-registry + # client deps (authlib/cryptography — cluster_fixture imports the SR + # client at module load, even though the share tests don't use SR), + # and the vendored trivup 0.14.0 (PyPI 0.12.2 breaks on Kafka 4.2.0's + # 0.0.0.0 quorum voters). + - uv pip install -r requirements/requirements-tests-install.txt # ASAN-instrumented librdkafka + cimpl, same build as the binding-layer job. - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native - LIBRDKAFKA_SANITIZE=address tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest From e21c264ad74ae103c4e356a90523a10a268125c7 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 12 Jun 2026 15:33:52 +0530 Subject: [PATCH 04/14] TEMP DO NOT MERGE: ASAN+LSAN leak canary + print_suppressions=1 Scratch validation for the share-consumer sanitizer job. Adds two deliberate leaks (raw libc malloc; a lost GC dict) to the broker-free ASAN job and turns on print_suppressions, to prove the detector + exitcode=1 gate actually fire and to check whether the broad _PyObject_GC_* suppressions mask our own objects. Revert after reading the run (delete the canary file, restore print_suppressions=0 and the pytest arg). Co-Authored-By: Claude Opus 4.8 --- .semaphore/semaphore.yml | 4 ++-- tests/test_ShareConsumer_leak_canary.py | 30 +++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 tests/test_ShareConsumer_leak_canary.py diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 44baa681f..a974f3156 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -635,8 +635,8 @@ blocks: LD_PRELOAD="$(gcc -print-file-name=libasan.so)" \ PYTHONMALLOC=malloc \ ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" \ - LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" \ - python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log + LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=1" \ + python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py tests/test_ShareConsumer_leak_canary.py 2>&1 | tee asan-share-tests.log # KIP-932: share-consumer INTEGRATION tests under ASAN+LSAN, gated per-PR # (companion to the broker-free job above). The broker is a JVM, so it must # never inherit LD_PRELOAD=libasan: trivup starts the cluster in a clean env diff --git a/tests/test_ShareConsumer_leak_canary.py b/tests/test_ShareConsumer_leak_canary.py new file mode 100644 index 000000000..221168e0d --- /dev/null +++ b/tests/test_ShareConsumer_leak_canary.py @@ -0,0 +1,30 @@ +"""TEMP leak canary — DO NOT MERGE. Delete once the ASAN+LSAN validation run +is read (paired with print_suppressions=1 + the extra pytest arg in +.semaphore/semaphore.yml). + +Both tests pass functionally, so the only thing that can turn the job red is +LSAN. The artifact log then answers the two things a green run can't: + + - test_canary_raw_malloc_leak: a pure libc block, no Python allocator frame + for any rule to match. It MUST be reported and MUST flip the job to exit 1. + If it isn't, the detector or the exitcode=1 gate is broken. + + - test_canary_leaked_gc_object: a lost GC object whose alloc stack runs + through _PyObject_GC_* — exactly the broad frames our suppression list + carries. If LSAN stays silent on this one (and print_suppressions=1 shows a + _PyObject_GC_* rule firing), the suppressions are masking our own objects. +""" +import ctypes + + +def test_canary_raw_malloc_leak(): + libc = ctypes.CDLL(None) + libc.malloc.restype = ctypes.c_void_p + libc.malloc.argtypes = [ctypes.c_size_t] + libc.malloc(1234567) # distinctive size; return discarded, never freed + + +def test_canary_leaked_gc_object(): + obj = {"leak_canary": 7654321} # dict -> GC-tracked + ctypes.pythonapi.Py_IncRef(ctypes.py_object(obj)) # pin refcount forever + del obj # drop the only referrer From 120290fd9adfd0bb89c403ed169ae5e43ce527c8 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 12 Jun 2026 16:23:18 +0530 Subject: [PATCH 05/14] TEMP DO NOT MERGE: gate ASAN+LSAN jobs on report text, not exit code The canary run proved exitcode=1 does not propagate when libasan is LD_PRELOADed into a non-instrumented CPython: LSAN detected the planted leaks (raw malloc + a lost dict) and printed the report, but the process still exited 0 and the job passed. So neither ASAN job was actually gating on leaks. Add a post-pytest grep on the tee'd log for the LeakSanitizer/AddressSanitizer signatures and fail the job when present (also catches ASAN memory errors). Canary + print_suppressions=1 kept for one confirming run (expect the binding-layer job to go RED now); revert both after. Co-Authored-By: Claude Opus 4.8 --- .semaphore/semaphore.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index a974f3156..130658c1d 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -637,6 +637,15 @@ blocks: ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" \ LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=1" \ python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py tests/test_ShareConsumer_leak_canary.py 2>&1 | tee asan-share-tests.log + rc=$? + # exitcode=1 does NOT propagate when libasan is LD_PRELOADed into a + # non-instrumented python: LSAN prints the leak report but the + # process still exits 0. Gate on the report text, not the code. + if grep -Eq 'ERROR: (Address|Leak)Sanitizer|SUMMARY: AddressSanitizer' asan-share-tests.log; then + echo "ASAN/LSAN reported findings above — failing the job." + rc=1 + fi + exit $rc # KIP-932: share-consumer INTEGRATION tests under ASAN+LSAN, gated per-PR # (companion to the broker-free job above). The broker is a JVM, so it must # never inherit LD_PRELOAD=libasan: trivup starts the cluster in a clean env @@ -702,6 +711,14 @@ blocks: --conf '["transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1","offsets.topic.replication.factor=1","offsets.topic.min.isr=1","share.coordinator.state.topic.replication.factor=1","share.coordinator.state.topic.min.isr=1","group.share.record.lock.duration.ms=1000","group.share.min.record.lock.duration.ms=1000"]' \ --cmd 'LD_PRELOAD="$(gcc -print-file-name=libasan.so)" python -m pytest -v --timeout=300 tests/integration/share_consumer/' \ 2>&1 | tee asan-share-integration-tests.log + rc=$? + # Same LD_PRELOAD exit-code caveat as the binding-layer job: a leak + # with otherwise-passing tests still exits 0, so gate on the text. + if grep -Eq 'ERROR: (Address|Leak)Sanitizer|SUMMARY: AddressSanitizer' asan-share-integration-tests.log; then + echo "ASAN/LSAN reported findings above — failing the job." + rc=1 + fi + exit $rc promotions: - name: "Publish to Test PyPI" pipeline_file: publish-test-pypi.yml From e76435e6d15525d917a0065be39ac738ab9052cd Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sat, 13 Jun 2026 16:19:15 +0530 Subject: [PATCH 06/14] Revert leak canary + print_suppressions; keep the log-grep gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The canary run confirmed the fix: with the grep gate added, the planted leaks flipped the binding-layer job to red (previously green on the identical canary). Removing the scratch bits now — delete the canary test, restore print_suppressions=0, drop it from the pytest args. The real fix (gate on the LeakSanitizer/AddressSanitizer report text in both ASAN jobs) stays. Net of the three TEMP commits: the only lasting change is the report-text gate. Squash these before merge. Co-Authored-By: Claude Opus 4.8 --- .semaphore/semaphore.yml | 4 ++-- tests/test_ShareConsumer_leak_canary.py | 30 ------------------------- 2 files changed, 2 insertions(+), 32 deletions(-) delete mode 100644 tests/test_ShareConsumer_leak_canary.py diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 130658c1d..04b253ae9 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -635,8 +635,8 @@ blocks: LD_PRELOAD="$(gcc -print-file-name=libasan.so)" \ PYTHONMALLOC=malloc \ ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" \ - LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=1" \ - python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py tests/test_ShareConsumer_leak_canary.py 2>&1 | tee asan-share-tests.log + LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" \ + python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log rc=$? # exitcode=1 does NOT propagate when libasan is LD_PRELOADed into a # non-instrumented python: LSAN prints the leak report but the diff --git a/tests/test_ShareConsumer_leak_canary.py b/tests/test_ShareConsumer_leak_canary.py deleted file mode 100644 index 221168e0d..000000000 --- a/tests/test_ShareConsumer_leak_canary.py +++ /dev/null @@ -1,30 +0,0 @@ -"""TEMP leak canary — DO NOT MERGE. Delete once the ASAN+LSAN validation run -is read (paired with print_suppressions=1 + the extra pytest arg in -.semaphore/semaphore.yml). - -Both tests pass functionally, so the only thing that can turn the job red is -LSAN. The artifact log then answers the two things a green run can't: - - - test_canary_raw_malloc_leak: a pure libc block, no Python allocator frame - for any rule to match. It MUST be reported and MUST flip the job to exit 1. - If it isn't, the detector or the exitcode=1 gate is broken. - - - test_canary_leaked_gc_object: a lost GC object whose alloc stack runs - through _PyObject_GC_* — exactly the broad frames our suppression list - carries. If LSAN stays silent on this one (and print_suppressions=1 shows a - _PyObject_GC_* rule firing), the suppressions are masking our own objects. -""" -import ctypes - - -def test_canary_raw_malloc_leak(): - libc = ctypes.CDLL(None) - libc.malloc.restype = ctypes.c_void_p - libc.malloc.argtypes = [ctypes.c_size_t] - libc.malloc(1234567) # distinctive size; return discarded, never freed - - -def test_canary_leaked_gc_object(): - obj = {"leak_canary": 7654321} # dict -> GC-tracked - ctypes.pythonapi.Py_IncRef(ctypes.py_object(obj)) # pin refcount forever - del obj # drop the only referrer From 1fb6e35ddfeccb95dfa5ae52d24b0367e84f8e7a Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sat, 13 Jun 2026 17:36:27 +0530 Subject: [PATCH 07/14] [KIP-932] Add per-PR Valgrind (memcheck) job for share consumer binding layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Broker-free only: Memcheck's ~20-50x slowdown would wreck the integration suite's 1s share-lock timing, so Valgrind runs just the fast tests/test_ShareConsumer* suite. It adds the one class ASAN misses — reads of uninitialized memory in cimpl's own C. - build-librdkafka-branch.sh: new LIBRDKAFKA_DEBUG mode (--enable-devel --disable-optimization), a non-ASAN debug build (ASAN and Valgrind can't co-run). - Vendor librdkafka's Valgrind suppressions (glibc-TLS/getaddrinfo/OpenSSL false positives) as .semaphore/librdkafka.suppressions. - New strict, per-PR block: PYTHONMALLOC=malloc + memcheck with --track-origins; gate (grep the report) on any error or a definite/possible leak. First runs may surface CPython interpreter noise to suppress — harvest from the --gen-suppressions output into .semaphore/valgrind-confluent.supp until green. Co-Authored-By: Claude Opus 4.8 --- .semaphore/librdkafka.suppressions | 483 ++++++++++++++++++++++++ .semaphore/semaphore.yml | 63 ++++ tools/wheels/build-librdkafka-branch.sh | 5 + 3 files changed, 551 insertions(+) create mode 100644 .semaphore/librdkafka.suppressions diff --git a/.semaphore/librdkafka.suppressions b/.semaphore/librdkafka.suppressions new file mode 100644 index 000000000..6259dadb1 --- /dev/null +++ b/.semaphore/librdkafka.suppressions @@ -0,0 +1,483 @@ +# Valgrind suppression file for librdkafka +{ + allocate_tls_despite_detached_1 + Memcheck:Leak + fun:calloc + fun:_dl_allocate_tls + fun:pthread_create@@GLIBC_2.2.5 +} + +{ + helgrind---_dl_allocate_tls + Helgrind:Race + fun:mempcpy + fun:_dl_allocate_tls_init + ... + fun:pthread_create@@GLIBC_2.2* + fun:pthread_create_WRK + fun:pthread_create@* +} +{ + drd_nss1 + drd:ConflictingAccess + fun:pthread_mutex_lock + fun:_nss_files_gethostbyname4_r + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} + +{ + drd_nss2 + drd:ConflictingAccess + fun:strlen + fun:nss_load_library + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss3 + drd:ConflictingAccess + fun:__GI_stpcpy + fun:nss_load_library + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss4 + drd:ConflictingAccess + fun:strlen + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss5 + drd:ConflictingAccess + fun:strlen + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss6 + drd:ConflictingAccess + fun:internal_setent + fun:_nss_files_gethostbyname4_r + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + ssl_read + Memcheck:Cond + fun:ssl3_read_bytes + fun:ssl3_read_internal +} + + + +{ + ssl_noterm_leak1 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:SSL_library_init +} +{ + ssl_noterm_leak2 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:OPENSSL_add_all_algorithms_noconf +} +{ + ssl_noterm_leak3 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:OpenSSL_add_all_digests +} +{ + ssl_noterm_leak3b + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:OpenSSL_add_all_digests +} +{ + ssl_noterm_leak4 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:EVP_add_digest +} +{ + ssl_noterm_leak5 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:SSL_load_error_strings +} +{ + ssl_noterm_leak6 + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:OPENSSL_add_all_algorithms_noconf +} +{ + ssl_noterm_leak7 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:ERR_load_SSL_strings +} +{ + ssl_noterm_leak8 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:err_load_strings +} +{ + ssl_noterm_leak8b + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:ERR_load_strings +} +{ + ssl_noterm_leak8c + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:ERR_load_strings +} +{ + ssl_noterm_leak9 + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:ERR_load_SSL_strings +} +{ + ssl_noterm_leak10 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:OPENSSL_init_library +} +{ + ssl_noterm_leak10b + Memcheck:Leak + match-leak-kinds: reachable + fun:calloc + ... + fun:OPENSSL_init_library +} +{ + ssl_noterm_leak11 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:EVP_SignFinal +} +{ + ssl_noterm_leak12 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:FIPS_mode_set +} +{ + thrd_tls_alloc_stack + Memcheck:Leak + match-leak-kinds: possible + fun:calloc + fun:allocate_dtv + fun:_dl_allocate_tls + fun:allocate_stack + fun:pthread_create@@GLIBC_2.2.5 + fun:thrd_create +} +{ + more_tls1 + Memcheck:Leak + match-leak-kinds: possible + fun:calloc + fun:allocate_dtv + fun:_dl_allocate_tls + fun:allocate_stack +} + +{ + ssl_uninit1 + Memcheck:Cond + fun:rd_kafka_metadata_handle + fun:rd_kafka_broker_metadata_reply +} +{ + ssl_uninit2 + Memcheck:Value8 + fun:rd_kafka_metadata_handle + fun:rd_kafka_broker_metadata_reply +} +{ + ssl_uninit3 + Memcheck:Cond + fun:memcpy@@GLIBC_2.14 + fun:rd_kafka_metadata_handle + fun:rd_kafka_broker_metadata_reply +} + +{ + log_races0 + Helgrind:Race + fun:rd_kafka_log0 +} +{ + glibc_tls + Helgrind:Race + fun:mempcpy + fun:_dl_allocate_tls_init + fun:get_cached_stack + fun:allocate_stack + fun:pthread_create@@GLIBC_2.2.5 +} +{ + false_tls + Helgrind:Race + fun:thrd_detach +} + + +# cyrus libsasl2 global/once memory "leaks" +{ + leak_sasl_global_init1 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:sasl_client_init +} +{ + leak_sasl_global_init6 + Memcheck:Leak + match-leak-kinds: reachable + fun:calloc + ... + fun:sasl_client_init +} + +{ + leak_sasl_dlopen + Memcheck:Leak + match-leak-kinds: reachable + fun:?alloc + ... + fun:_dl_catch_error +} +{ + leak_sasl_add_plugin + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:sasl_client_add_plugin +} +{ + leak_sasl_add_plugin2 + Memcheck:Leak + match-leak-kinds: reachable + fun:calloc + ... + fun:sasl_client_add_plugin +} +{ + debian_testing_ld_uninitialized + Memcheck:Cond + fun:index + fun:expand_dynamic_string_token + ... + fun:_dl_start + ... +} +{ + glibc_internals_nss_race1 + Helgrind:Race + ... + fun:getaddrinfo + ... +} +{ + nss_files + Helgrind:Race + ... + fun:_dl_runtime_resolve_avx + ... +} +{ + cpp_glibc_globals + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + fun:pool + fun:__static_initialization_and_destruction_0 + fun:_GLOBAL__sub_I_eh_alloc.cc +} +{ + mtx_unlock_plus_destroy + Helgrind:Race + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + fun:rd_kafka_q_destroy_final +} +{ + mtx_unlock_plus_destroy2 + Helgrind:Race + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + fun:rd_refcnt_destroy +} +{ + nss_dl_lookup + Helgrind:Race + ... + fun:do_lookup_x + fun:_dl_lookup_symbol_x + ... +} +{ + dlopen1 + Memcheck:Leak + match-leak-kinds: reachable + ... + fun:_dl_open +} + +{ + atomics32_set + Helgrind:Race + fun:rd_atomic32_set +} + +{ + atomics32_get + Helgrind:Race + fun:rd_atomic32_get +} + +{ + atomics64_set + Helgrind:Race + fun:rd_atomic64_set +} + +{ + atomics64_get + Helgrind:Race + fun:rd_atomic64_get +} + +{ + osx_dyld_img + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + fun:strdup + fun:__si_module_static_ds_block_invoke + fun:_dispatch_client_callout + fun:_dispatch_once_callout + fun:si_module_static_ds + fun:si_module_with_name + fun:si_module_config_modules_for_category + fun:__si_module_static_search_block_invoke + fun:_dispatch_client_callout + fun:_dispatch_once_callout + fun:si_module_static_search + fun:si_module_with_name + fun:si_search + fun:getpwuid_r + fun:_CFRuntimeBridgeClasses + fun:__CFInitialize + fun:_ZN16ImageLoaderMachO11doImageInitERKN11ImageLoader11LinkContextE + fun:_ZN16ImageLoaderMachO16doInitializationERKN11ImageLoader11LinkContextE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader19processInitializersERKNS_11LinkContextEjRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader15runInitializersERKNS_11LinkContextERNS_21InitializerTimingListE + fun:_ZN4dyld24initializeMainExecutableEv + fun:_ZN4dyld5_mainEPK12macho_headermiPPKcS5_S5_Pm + fun:_ZN13dyldbootstrap5startEPKN5dyld311MachOLoadedEiPPKcS3_Pm + fun:_dyld_start +} diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 04b253ae9..983830409 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -719,6 +719,69 @@ blocks: rc=1 fi exit $rc + # KIP-932: share-consumer binding layer under Valgrind memcheck, gated per-PR. + # Broker-free only — Memcheck's ~20-50x slowdown would blow the integration + # suite's 1s share-lock timing and run for hours, so we keep it to the fast + # tests. It catches the one class ASAN can't: reads of uninitialized memory in + # cimpl's own C. Valgrind and ASAN can't co-run, hence the separate non-ASAN + # LIBRDKAFKA_DEBUG build. Strict gate: any Memcheck error or a definite/possible + # leak fails the job (grep the report — Valgrind's own --error-exitcode is + # reliable here too, since it's the parent process, not LD_PRELOADed). + - name: "Share consumer binding layer (Valgrind)" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + epilogue: + always: + commands: + - cp valgrind-share-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/valgrind/ + jobs: + - name: "ShareConsumer binding layer (Valgrind memcheck)" + commands: + - sem-version python 3.11 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + - uv pip install -r requirements/requirements-tests.txt + - sudo apt-get update -qq && sudo apt-get install -y -qq valgrind + # Non-ASAN debug build (-g, -O0) so Memcheck stacks are real and the + # optimizer doesn't fake uninitialized-read reports. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_DEBUG=1 tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-g -O0 -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # PYTHONMALLOC=malloc lets Memcheck see individual allocations instead + # of pymalloc arenas. librdkafka.suppressions (vendored) silences the + # glibc-TLS/getaddrinfo/OpenSSL false positives; grow + # valgrind-confluent.supp from the --gen-suppressions output on the + # first runs (expect some CPython interpreter noise to suppress). + - | + set -o pipefail + SUPP="--suppressions=${PWD}/.semaphore/librdkafka.suppressions" + [[ -f .semaphore/valgrind-confluent.supp ]] && SUPP="$SUPP --suppressions=${PWD}/.semaphore/valgrind-confluent.supp" + PYTHONMALLOC=malloc valgrind \ + --error-exitcode=42 --exit-on-first-error=no \ + --leak-check=full --show-leak-kinds=all \ + --errors-for-leak-kinds=definite,possible \ + --track-origins=yes --num-callers=40 --gen-suppressions=all \ + $SUPP \ + python -m pytest -v --timeout=1200 tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee valgrind-share-tests.log + rc=$? + # Strict: fail on any Memcheck error or a definite/possible leak. + if grep -Eq 'ERROR SUMMARY: [1-9]|(definitely|possibly) lost: [1-9]' valgrind-share-tests.log; then + echo "Valgrind reported errors/leaks above — failing the job." + rc=1 + fi + exit $rc promotions: - name: "Publish to Test PyPI" pipeline_file: publish-test-pypi.yml diff --git a/tools/wheels/build-librdkafka-branch.sh b/tools/wheels/build-librdkafka-branch.sh index fdbe4a45f..71059001b 100755 --- a/tools/wheels/build-librdkafka-branch.sh +++ b/tools/wheels/build-librdkafka-branch.sh @@ -73,6 +73,11 @@ fi if [[ -n $LIBRDKAFKA_SANITIZE ]]; then export CFLAGS="-fsanitize=${LIBRDKAFKA_SANITIZE} -g -fno-omit-frame-pointer ${CFLAGS}" export LDFLAGS="-fsanitize=${LIBRDKAFKA_SANITIZE} ${LDFLAGS}" +elif [[ -n $LIBRDKAFKA_DEBUG ]]; then + # Valgrind build: keep debug symbols and drop optimization so Memcheck + # stacks are real and the optimizer doesn't synthesize false uninitialized + # reads. Same config librdkafka's own Valgrind CI uses. + CONFIGURE_OPTS="$CONFIGURE_OPTS --enable-devel --disable-optimization" else CONFIGURE_OPTS="$CONFIGURE_OPTS --disable-debug-symbols" fi From 5de2539784d120a23ac9d678fd32c8ab4de7d549 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sat, 13 Jun 2026 18:46:34 +0530 Subject: [PATCH 08/14] Build fix --- .semaphore/semaphore.yml | 29 +++--- .semaphore/valgrind-python.supp | 152 ++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 13 deletions(-) create mode 100644 .semaphore/valgrind-python.supp diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 983830409..411b0c90b 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -724,9 +724,9 @@ blocks: # suite's 1s share-lock timing and run for hours, so we keep it to the fast # tests. It catches the one class ASAN can't: reads of uninitialized memory in # cimpl's own C. Valgrind and ASAN can't co-run, hence the separate non-ASAN - # LIBRDKAFKA_DEBUG build. Strict gate: any Memcheck error or a definite/possible - # leak fails the job (grep the report — Valgrind's own --error-exitcode is - # reliable here too, since it's the parent process, not LD_PRELOADed). + # LIBRDKAFKA_DEBUG build. Gate is uninit/invalid-access only (ERROR SUMMARY); + # leaks stay with LSAN in the ASAN block — CPython leaves thousands of + # "possibly lost"/"still reachable" blocks at exit that aren't ours to chase. - name: "Share consumer binding layer (Valgrind)" dependencies: [] task: @@ -760,25 +760,28 @@ blocks: - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" - uv pip install -e . # PYTHONMALLOC=malloc lets Memcheck see individual allocations instead - # of pymalloc arenas. librdkafka.suppressions (vendored) silences the - # glibc-TLS/getaddrinfo/OpenSSL false positives; grow - # valgrind-confluent.supp from the --gen-suppressions output on the - # first runs (expect some CPython interpreter noise to suppress). + # of pymalloc arenas. Suppressions: librdkafka.suppressions silences + # glibc-TLS/getaddrinfo/OpenSSL; valgrind-python.supp silences CPython's + # own uninit-value noise (eval loop, GC, PyLong). leak-check=summary + # keeps the per-run leak tally visible without dumping ~15k interpreter + # records (that overflowed Semaphore's 16MB log cap) or failing on them. - | set -o pipefail SUPP="--suppressions=${PWD}/.semaphore/librdkafka.suppressions" + SUPP="$SUPP --suppressions=${PWD}/.semaphore/valgrind-python.supp" [[ -f .semaphore/valgrind-confluent.supp ]] && SUPP="$SUPP --suppressions=${PWD}/.semaphore/valgrind-confluent.supp" PYTHONMALLOC=malloc valgrind \ --error-exitcode=42 --exit-on-first-error=no \ - --leak-check=full --show-leak-kinds=all \ - --errors-for-leak-kinds=definite,possible \ - --track-origins=yes --num-callers=40 --gen-suppressions=all \ + --leak-check=summary \ + --track-origins=yes --num-callers=40 \ $SUPP \ python -m pytest -v --timeout=1200 tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee valgrind-share-tests.log rc=$? - # Strict: fail on any Memcheck error or a definite/possible leak. - if grep -Eq 'ERROR SUMMARY: [1-9]|(definitely|possibly) lost: [1-9]' valgrind-share-tests.log; then - echo "Valgrind reported errors/leaks above — failing the job." + # Gate on Memcheck errors only — uninit reads / invalid access, the + # class ASAN misses. Leaks are LSAN's job. To harvest a new CPython + # suppression locally, re-run with --gen-suppressions=all. + if grep -Eq 'ERROR SUMMARY: [1-9]' valgrind-share-tests.log; then + echo "Valgrind reported Memcheck errors above — failing the job." rc=1 fi exit $rc diff --git a/.semaphore/valgrind-python.supp b/.semaphore/valgrind-python.supp new file mode 100644 index 000000000..3b64504c4 --- /dev/null +++ b/.semaphore/valgrind-python.supp @@ -0,0 +1,152 @@ +# CPython interpreter uninitialized-value false positives under Memcheck. +# Harvested from the share-consumer Valgrind run (CPython 3.11, mise build): +# 87 Value8 + 7 Cond, every one originating inside the interpreter itself — +# the eval loop reading its own stack slots, the cyclic GC walking its graph, +# and PyLong digit-array arithmetic. None touch cimpl or librdkafka. +# +# We match on the innermost (top) frame only. A genuine uninitialized read in +# our C would have a cimpl/librdkafka frame on top, so it won't be masked here. +# Function names are stable across CPython patch releases, so this survives a +# mise bump. Leaks are LSAN's job (the ASAN block) — this file is uninit-only. + +# --- bytecode interpreter loop --- +{ + cpython-eval-loop-value8 + Memcheck:Value8 + fun:_PyEval_EvalFrameDefault + ... +} + +# --- cyclic GC traversal --- +{ + cpython-gc-visit-reachable-value8 + Memcheck:Value8 + fun:visit_reachable + ... +} +{ + cpython-gc-visit-decref-value8 + Memcheck:Value8 + fun:visit_decref + ... +} + +# --- frame teardown --- +{ + cpython-frame-clear-value8 + Memcheck:Value8 + fun:_PyFrame_Clear + ... +} + +# --- PyLong / int arithmetic over digit arrays --- +{ + cpython-long-fromstring-value8 + Memcheck:Value8 + fun:PyLong_FromString + ... +} +{ + cpython-long-fromstring-cond + Memcheck:Cond + fun:PyLong_FromString + ... +} +{ + cpython-long-asbytearray-value8 + Memcheck:Value8 + fun:_PyLong_AsByteArray + ... +} +{ + cpython-long-frombytearray-value8 + Memcheck:Value8 + fun:_PyLong_FromByteArray + ... +} +{ + cpython-long-frombytearray-cond + Memcheck:Cond + fun:_PyLong_FromByteArray + ... +} +{ + cpython-long-and-value8 + Memcheck:Value8 + fun:long_and + ... +} +{ + cpython-long-and-cond + Memcheck:Cond + fun:long_and + ... +} +{ + cpython-long-richcompare-value8 + Memcheck:Value8 + fun:long_richcompare + ... +} +{ + cpython-long-richcompare-cond + Memcheck:Cond + fun:long_richcompare + ... +} + +# --- object / type machinery --- +{ + cpython-type-call-value8 + Memcheck:Value8 + fun:type_call + ... +} +{ + cpython-object-init-value8 + Memcheck:Value8 + fun:object_init + ... +} +{ + cpython-subtype-dealloc-value8 + Memcheck:Value8 + fun:subtype_dealloc + ... +} +{ + cpython-generic-setattr-value8 + Memcheck:Value8 + fun:PyObject_GenericSetAttr + ... +} +{ + cpython-tuple-dealloc-value8 + Memcheck:Value8 + fun:tupledealloc + ... +} +{ + cpython-list-dealloc-value8 + Memcheck:Value8 + fun:list_dealloc + ... +} +{ + cpython-tuple-fromarray-value8 + Memcheck:Value8 + fun:_PyTuple_FromArray + ... +} +{ + cpython-function-vectorcall-value8 + Memcheck:Value8 + fun:_PyFunction_Vectorcall + ... +} +{ + cpython-method-vectorcall-fk-value8 + Memcheck:Value8 + fun:method_vectorcall_FASTCALL_KEYWORDS + ... +} From fe455355f4bf4889549645fd7b369b756cce60c4 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sun, 14 Jun 2026 20:58:39 +0530 Subject: [PATCH 09/14] Fix pipeline --- .semaphore/semaphore.yml | 64 +++++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 411b0c90b..ca54620e7 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -637,14 +637,27 @@ blocks: ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" \ LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" \ python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log - rc=$? - # exitcode=1 does NOT propagate when libasan is LD_PRELOADed into a - # non-instrumented python: LSAN prints the leak report but the - # process still exits 0. Gate on the report text, not the code. + # Don't trust $?: libasan LD_PRELOADed into a non-instrumented python + # doesn't propagate its exitcode, and python itself exits 1 at + # interpreter shutdown (cimpl/librdkafka finalization) even on a fully + # green run. Decide from the printed reports, not the exit code. + rc=0 if grep -Eq 'ERROR: (Address|Leak)Sanitizer|SUMMARY: AddressSanitizer' asan-share-tests.log; then echo "ASAN/LSAN reported findings above — failing the job." rc=1 fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' sanitizer report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' asan-share-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means a + # crash/timeout before pytest could report. + if ! grep -Eq '^=+ .*[0-9]+ passed' asan-share-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi exit $rc # KIP-932: share-consumer INTEGRATION tests under ASAN+LSAN, gated per-PR # (companion to the broker-free job above). The broker is a JVM, so it must @@ -711,13 +724,26 @@ blocks: --conf '["transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1","offsets.topic.replication.factor=1","offsets.topic.min.isr=1","share.coordinator.state.topic.replication.factor=1","share.coordinator.state.topic.min.isr=1","group.share.record.lock.duration.ms=1000","group.share.min.record.lock.duration.ms=1000"]' \ --cmd 'LD_PRELOAD="$(gcc -print-file-name=libasan.so)" python -m pytest -v --timeout=300 tests/integration/share_consumer/' \ 2>&1 | tee asan-share-integration-tests.log - rc=$? - # Same LD_PRELOAD exit-code caveat as the binding-layer job: a leak - # with otherwise-passing tests still exits 0, so gate on the text. + # Don't trust $?: trivup runs --cmd via /bin/sh (no pipefail) and + # python exits 1 at interpreter shutdown (cimpl/librdkafka + # finalization) even on a green run. Decide from the printed reports. + rc=0 if grep -Eq 'ERROR: (Address|Leak)Sanitizer|SUMMARY: AddressSanitizer' asan-share-integration-tests.log; then echo "ASAN/LSAN reported findings above — failing the job." rc=1 fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' sanitizer report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' asan-share-integration-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means the + # cluster never came up or pytest crashed/timed out. + if ! grep -Eq '^=+ .*[0-9]+ passed' asan-share-integration-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi exit $rc # KIP-932: share-consumer binding layer under Valgrind memcheck, gated per-PR. # Broker-free only — Memcheck's ~20-50x slowdown would blow the integration @@ -776,14 +802,30 @@ blocks: --track-origins=yes --num-callers=40 \ $SUPP \ python -m pytest -v --timeout=1200 tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee valgrind-share-tests.log - rc=$? - # Gate on Memcheck errors only — uninit reads / invalid access, the - # class ASAN misses. Leaks are LSAN's job. To harvest a new CPython - # suppression locally, re-run with --gen-suppressions=all. + # Don't trust $?: python exits 1 at interpreter shutdown + # (cimpl/librdkafka finalization) even on a fully green run, and with + # --error-exitcode=42 + 0 errors valgrind just forwards that. Decide + # from Memcheck's report and pytest's printed summary instead. + rc=0 + # Memcheck findings — uninit reads / invalid access, the class ASAN + # misses. (To harvest a new CPython suppression locally, re-run with + # --gen-suppressions=all.) Leaks are LSAN's job. if grep -Eq 'ERROR SUMMARY: [1-9]' valgrind-share-tests.log; then echo "Valgrind reported Memcheck errors above — failing the job." rc=1 fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' Memcheck report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' valgrind-share-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means a + # crash/timeout before pytest could report. + if ! grep -Eq '^=+ .*[0-9]+ passed' valgrind-share-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi exit $rc promotions: - name: "Publish to Test PyPI" From 8b36b0615fb6c6d90ca98337f61451722fee3115 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sun, 14 Jun 2026 22:27:06 +0530 Subject: [PATCH 10/14] Fix pipeline --- .semaphore/semaphore.yml | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index ca54620e7..41a761ba6 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -636,8 +636,10 @@ blocks: PYTHONMALLOC=malloc \ ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" \ LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" \ - python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log - # Don't trust $?: libasan LD_PRELOADed into a non-instrumented python + python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log || true + # The `|| true` keeps set -e from aborting the block on python's + # shutdown exit-1 (pipefail forwards it) — without it the gate below + # never runs. Don't trust $?: libasan LD_PRELOADed into a non-instrumented python # doesn't propagate its exitcode, and python itself exits 1 at # interpreter shutdown (cimpl/librdkafka finalization) even on a fully # green run. Decide from the printed reports, not the exit code. @@ -723,7 +725,9 @@ blocks: --version 4.2.0 \ --conf '["transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1","offsets.topic.replication.factor=1","offsets.topic.min.isr=1","share.coordinator.state.topic.replication.factor=1","share.coordinator.state.topic.min.isr=1","group.share.record.lock.duration.ms=1000","group.share.min.record.lock.duration.ms=1000"]' \ --cmd 'LD_PRELOAD="$(gcc -print-file-name=libasan.so)" python -m pytest -v --timeout=300 tests/integration/share_consumer/' \ - 2>&1 | tee asan-share-integration-tests.log + 2>&1 | tee asan-share-integration-tests.log || true + # The `|| true` keeps set -e from aborting on python's shutdown exit-1 + # so the gate below actually runs. # Don't trust $?: trivup runs --cmd via /bin/sh (no pipefail) and # python exits 1 at interpreter shutdown (cimpl/librdkafka # finalization) even on a green run. Decide from the printed reports. @@ -801,7 +805,9 @@ blocks: --leak-check=summary \ --track-origins=yes --num-callers=40 \ $SUPP \ - python -m pytest -v --timeout=1200 tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee valgrind-share-tests.log + python -m pytest -v --timeout=1200 tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee valgrind-share-tests.log || true + # The `|| true` keeps set -e from aborting the block on python's + # shutdown exit-1 (pipefail forwards it) so the gate below runs. # Don't trust $?: python exits 1 at interpreter shutdown # (cimpl/librdkafka finalization) even on a fully green run, and with # --error-exitcode=42 + 0 errors valgrind just forwards that. Decide From f63038022b444ed41d9c1004ba331063d362c219 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sun, 14 Jun 2026 23:17:46 +0530 Subject: [PATCH 11/14] ci: best-effort the sanitizer epilogue artifact push The ASAN/Valgrind share-consumer jobs ran 'artifact push workflow artifacts/' unguarded in their epilogue, so a failed log upload failed the whole job even when the tests and the gate passed clean. Add '|| true', matching the cp on the line above. Co-Authored-By: Claude Opus 4.8 --- .semaphore/semaphore.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 41a761ba6..e9f0bfb82 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -607,7 +607,7 @@ blocks: always: commands: - cp asan-share-tests.log artifacts/ || true - - artifact push workflow artifacts/ --destination artifacts/asan/ + - artifact push workflow artifacts/ --destination artifacts/asan/ || true jobs: - name: "ShareConsumer binding layer (ASAN + LSAN)" commands: @@ -689,7 +689,7 @@ blocks: always: commands: - cp asan-share-integration-tests.log artifacts/ || true - - artifact push workflow artifacts/ --destination artifacts/asan-integration/ + - artifact push workflow artifacts/ --destination artifacts/asan-integration/ || true jobs: - name: "ShareConsumer integration (ASAN + LSAN)" commands: @@ -772,7 +772,7 @@ blocks: always: commands: - cp valgrind-share-tests.log artifacts/ || true - - artifact push workflow artifacts/ --destination artifacts/valgrind/ + - artifact push workflow artifacts/ --destination artifacts/valgrind/ || true jobs: - name: "ShareConsumer binding layer (Valgrind memcheck)" commands: From 043c4ae9131f5f66ce541de6127576a3a25bb62c Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Mon, 15 Jun 2026 11:57:42 +0530 Subject: [PATCH 12/14] Fix pipeline --- .semaphore/semaphore.yml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index e9f0bfb82..2c8e10b76 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -660,7 +660,10 @@ blocks: echo "No passing pytest summary found — the run did not complete." rc=1 fi - exit $rc + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] # KIP-932: share-consumer INTEGRATION tests under ASAN+LSAN, gated per-PR # (companion to the broker-free job above). The broker is a JVM, so it must # never inherit LD_PRELOAD=libasan: trivup starts the cluster in a clean env @@ -748,7 +751,10 @@ blocks: echo "No passing pytest summary found — the run did not complete." rc=1 fi - exit $rc + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] # KIP-932: share-consumer binding layer under Valgrind memcheck, gated per-PR. # Broker-free only — Memcheck's ~20-50x slowdown would blow the integration # suite's 1s share-lock timing and run for hours, so we keep it to the fast @@ -832,7 +838,10 @@ blocks: echo "No passing pytest summary found — the run did not complete." rc=1 fi - exit $rc + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] promotions: - name: "Publish to Test PyPI" pipeline_file: publish-test-pypi.yml From ae719aa474a0559500bd9d6806c4ec04d04aaa56 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Mon, 15 Jun 2026 12:09:54 +0530 Subject: [PATCH 13/14] TEMP: Fix librdkafka commit --- .semaphore/semaphore.yml | 4 ++++ tools/wheels/build-librdkafka-branch.sh | 11 ++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 2c8e10b76..fe516d9f8 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -9,6 +9,10 @@ global_job_config: env_vars: - name: LIBRDKAFKA_VERSION value: v2.15.0-RC1 + # TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support + # Pinned to a specific librdkafka commit; build-librdkafka-branch.sh fetches it by SHA. + - name: LIBRDKAFKA_BRANCH + value: 49345336e8258b0615ab3b1bc42099a208770ad4 prologue: commands: - checkout diff --git a/tools/wheels/build-librdkafka-branch.sh b/tools/wheels/build-librdkafka-branch.sh index 71059001b..18577e051 100755 --- a/tools/wheels/build-librdkafka-branch.sh +++ b/tools/wheels/build-librdkafka-branch.sh @@ -8,7 +8,7 @@ # # Usage: build-librdkafka-branch.sh # -# branch - git branch name, e.g. dev_kip-932_queues-for-kafka +# branch - git branch, tag, or commit SHA, e.g. dev_kip-932_queues-for-kafka # destdir - destination directory, e.g. dest # # Resulting layout (mirrors NuGet redist package): @@ -39,8 +39,13 @@ INSTALL=$SRC/install [[ -d "$DEST" ]] || mkdir -p "$DEST" rm -rf "$SRC" -git clone --depth 1 --branch "$BRANCH" \ - https://github.com/confluentinc/librdkafka.git "$SRC" +# $BRANCH may be a branch, tag, or commit SHA. `git clone --branch` rejects a +# bare SHA, so fetch the ref explicitly (GitHub allows fetching a commit by SHA). +mkdir -p "$SRC" +git -C "$SRC" init +git -C "$SRC" remote add origin https://github.com/confluentinc/librdkafka.git +git -C "$SRC" fetch --depth 1 origin "$BRANCH" +git -C "$SRC" checkout FETCH_HEAD if [[ $OSTYPE == linux* ]]; then sudo apt-get update -qq && sudo apt-get install -y -qq libssl-dev libsasl2-dev liblz4-dev libzstd-dev From 211fcbe85904b25a4b4beeee2a0d597576d91394 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 16 Jun 2026 00:59:46 +0530 Subject: [PATCH 14/14] Add valgaurd integration tests --- .semaphore/semaphore.yml | 82 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index fe516d9f8..af14696ce 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -846,6 +846,88 @@ blocks: # shell before the agent captures status, so green runs get recorded # as failed and the epilogue's artifact push never runs. [ "$rc" -eq 0 ] + # KIP-932: share-consumer integration tests under Valgrind, full-suite per-PR. + # The binding block above keeps Valgrind broker-free on purpose (Memcheck's + # slowdown vs the 1s share lock) — this runs the whole integration suite anyway + # as a per-PR experiment. Expect timing tests (lock expiry/redelivery/throttle) + # to flake and the job to be slow against the 3h cap; curate or promote later. + - name: "Share consumer integration tests under Valgrind" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + epilogue: + always: + commands: + - cp valgrind-share-integration-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/valgrind-integration/ || true + jobs: + - name: "ShareConsumer integration (Valgrind memcheck)" + commands: + - sem-version python 3.11 + - sem-version java 17 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + - uv pip install -r requirements/requirements-tests-install.txt + - sudo apt-get update -qq && sudo apt-get install -y -qq valgrind + # Non-ASAN debug build (-g, -O0) so Memcheck stacks are real and the + # optimizer doesn't fake uninitialized-read reports. Same build as the + # binding-layer Valgrind job. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_DEBUG=1 tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-g -O0 -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # Valgrind wraps the pytest process inside trivup's --cmd; the broker is + # a separate JVM trivup starts outside --cmd, so it never runs under + # Memcheck. Suppression and test paths are relative to the checkout — + # trivup runs --cmd from the repo root (same as the ASAN job's relative + # pytest path) — and the inner 2>&1 routes valgrind's stderr report into + # the tee'd log, not just pytest's stdout. --conf matches the ASAN + # integration job (1s share lock, single-broker RF/ISR=1); --timeout is + # large because Memcheck's slowdown stacks on the broker round-trips. + - | + set -o pipefail + export PYTHONPATH="${PWD}" + python -m trivup.clusters.KafkaCluster \ + --kraft \ + --version 4.2.0 \ + --conf '["transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1","offsets.topic.replication.factor=1","offsets.topic.min.isr=1","share.coordinator.state.topic.replication.factor=1","share.coordinator.state.topic.min.isr=1","group.share.record.lock.duration.ms=1000","group.share.min.record.lock.duration.ms=1000"]' \ + --cmd 'PYTHONMALLOC=malloc valgrind --error-exitcode=42 --exit-on-first-error=no --leak-check=summary --track-origins=yes --num-callers=40 --suppressions=.semaphore/librdkafka.suppressions --suppressions=.semaphore/valgrind-python.supp python -m pytest -v --timeout=1800 tests/integration/share_consumer/ 2>&1' \ + 2>&1 | tee valgrind-share-integration-tests.log || true + # The `|| true` keeps set -e from aborting on python's shutdown exit-1 + # so the gate runs. Don't trust $?: trivup runs --cmd via /bin/sh (no + # pipefail) and python exits 1 at interpreter shutdown even on a green + # run; with --error-exitcode=42 valgrind just forwards that. + rc=0 + # Memcheck findings — uninit reads / invalid access, the class ASAN misses. + if grep -Eq 'ERROR SUMMARY: [1-9]' valgrind-share-integration-tests.log; then + echo "Valgrind reported Memcheck errors above — failing the job." + rc=1 + fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' Memcheck report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' valgrind-share-integration-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means the + # cluster never came up or pytest crashed/timed out. + if ! grep -Eq '^=+ .*[0-9]+ passed' valgrind-share-integration-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] promotions: - name: "Publish to Test PyPI" pipeline_file: publish-test-pypi.yml