diff --git a/.semaphore/librdkafka.suppressions b/.semaphore/librdkafka.suppressions new file mode 100644 index 000000000..6259dadb1 --- /dev/null +++ b/.semaphore/librdkafka.suppressions @@ -0,0 +1,483 @@ +# Valgrind suppression file for librdkafka +{ + allocate_tls_despite_detached_1 + Memcheck:Leak + fun:calloc + fun:_dl_allocate_tls + fun:pthread_create@@GLIBC_2.2.5 +} + +{ + helgrind---_dl_allocate_tls + Helgrind:Race + fun:mempcpy + fun:_dl_allocate_tls_init + ... + fun:pthread_create@@GLIBC_2.2* + fun:pthread_create_WRK + fun:pthread_create@* +} +{ + drd_nss1 + drd:ConflictingAccess + fun:pthread_mutex_lock + fun:_nss_files_gethostbyname4_r + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} + +{ + drd_nss2 + drd:ConflictingAccess + fun:strlen + fun:nss_load_library + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss3 + drd:ConflictingAccess + fun:__GI_stpcpy + fun:nss_load_library + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss4 + drd:ConflictingAccess + fun:strlen + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss5 + drd:ConflictingAccess + fun:strlen + fun:__nss_lookup_function + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + drd_nss6 + drd:ConflictingAccess + fun:internal_setent + fun:_nss_files_gethostbyname4_r + fun:gaih_inet + fun:getaddrinfo + fun:rd_getaddrinfo + fun:rd_kafka_broker_resolve + fun:rd_kafka_broker_connect + fun:rd_kafka_broker_thread_main + fun:_thrd_wrapper_function + obj:/usr/lib/valgrind/vgpreload_drd-amd64-linux.so + fun:start_thread + fun:clone +} +{ + ssl_read + Memcheck:Cond + fun:ssl3_read_bytes + fun:ssl3_read_internal +} + + + +{ + ssl_noterm_leak1 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:SSL_library_init +} +{ + ssl_noterm_leak2 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:OPENSSL_add_all_algorithms_noconf +} +{ + ssl_noterm_leak3 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:OpenSSL_add_all_digests +} +{ + ssl_noterm_leak3b + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:OpenSSL_add_all_digests +} +{ + ssl_noterm_leak4 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:EVP_add_digest +} +{ + ssl_noterm_leak5 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:SSL_load_error_strings +} +{ + ssl_noterm_leak6 + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:OPENSSL_add_all_algorithms_noconf +} +{ + ssl_noterm_leak7 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:ERR_load_SSL_strings +} +{ + ssl_noterm_leak8 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:err_load_strings +} +{ + ssl_noterm_leak8b + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:ERR_load_strings +} +{ + ssl_noterm_leak8c + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:ERR_load_strings +} +{ + ssl_noterm_leak9 + Memcheck:Leak + match-leak-kinds: reachable + fun:realloc + ... + fun:ERR_load_SSL_strings +} +{ + ssl_noterm_leak10 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:OPENSSL_init_library +} +{ + ssl_noterm_leak10b + Memcheck:Leak + match-leak-kinds: reachable + fun:calloc + ... + fun:OPENSSL_init_library +} +{ + ssl_noterm_leak11 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:EVP_SignFinal +} +{ + ssl_noterm_leak12 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:FIPS_mode_set +} +{ + thrd_tls_alloc_stack + Memcheck:Leak + match-leak-kinds: possible + fun:calloc + fun:allocate_dtv + fun:_dl_allocate_tls + fun:allocate_stack + fun:pthread_create@@GLIBC_2.2.5 + fun:thrd_create +} +{ + more_tls1 + Memcheck:Leak + match-leak-kinds: possible + fun:calloc + fun:allocate_dtv + fun:_dl_allocate_tls + fun:allocate_stack +} + +{ + ssl_uninit1 + Memcheck:Cond + fun:rd_kafka_metadata_handle + fun:rd_kafka_broker_metadata_reply +} +{ + ssl_uninit2 + Memcheck:Value8 + fun:rd_kafka_metadata_handle + fun:rd_kafka_broker_metadata_reply +} +{ + ssl_uninit3 + Memcheck:Cond + fun:memcpy@@GLIBC_2.14 + fun:rd_kafka_metadata_handle + fun:rd_kafka_broker_metadata_reply +} + +{ + log_races0 + Helgrind:Race + fun:rd_kafka_log0 +} +{ + glibc_tls + Helgrind:Race + fun:mempcpy + fun:_dl_allocate_tls_init + fun:get_cached_stack + fun:allocate_stack + fun:pthread_create@@GLIBC_2.2.5 +} +{ + false_tls + Helgrind:Race + fun:thrd_detach +} + + +# cyrus libsasl2 global/once memory "leaks" +{ + leak_sasl_global_init1 + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:sasl_client_init +} +{ + leak_sasl_global_init6 + Memcheck:Leak + match-leak-kinds: reachable + fun:calloc + ... + fun:sasl_client_init +} + +{ + leak_sasl_dlopen + Memcheck:Leak + match-leak-kinds: reachable + fun:?alloc + ... + fun:_dl_catch_error +} +{ + leak_sasl_add_plugin + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + ... + fun:sasl_client_add_plugin +} +{ + leak_sasl_add_plugin2 + Memcheck:Leak + match-leak-kinds: reachable + fun:calloc + ... + fun:sasl_client_add_plugin +} +{ + debian_testing_ld_uninitialized + Memcheck:Cond + fun:index + fun:expand_dynamic_string_token + ... + fun:_dl_start + ... +} +{ + glibc_internals_nss_race1 + Helgrind:Race + ... + fun:getaddrinfo + ... +} +{ + nss_files + Helgrind:Race + ... + fun:_dl_runtime_resolve_avx + ... +} +{ + cpp_glibc_globals + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + fun:pool + fun:__static_initialization_and_destruction_0 + fun:_GLOBAL__sub_I_eh_alloc.cc +} +{ + mtx_unlock_plus_destroy + Helgrind:Race + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + fun:rd_kafka_q_destroy_final +} +{ + mtx_unlock_plus_destroy2 + Helgrind:Race + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + fun:rd_refcnt_destroy +} +{ + nss_dl_lookup + Helgrind:Race + ... + fun:do_lookup_x + fun:_dl_lookup_symbol_x + ... +} +{ + dlopen1 + Memcheck:Leak + match-leak-kinds: reachable + ... + fun:_dl_open +} + +{ + atomics32_set + Helgrind:Race + fun:rd_atomic32_set +} + +{ + atomics32_get + Helgrind:Race + fun:rd_atomic32_get +} + +{ + atomics64_set + Helgrind:Race + fun:rd_atomic64_set +} + +{ + atomics64_get + Helgrind:Race + fun:rd_atomic64_get +} + +{ + osx_dyld_img + Memcheck:Leak + match-leak-kinds: reachable + fun:malloc + fun:strdup + fun:__si_module_static_ds_block_invoke + fun:_dispatch_client_callout + fun:_dispatch_once_callout + fun:si_module_static_ds + fun:si_module_with_name + fun:si_module_config_modules_for_category + fun:__si_module_static_search_block_invoke + fun:_dispatch_client_callout + fun:_dispatch_once_callout + fun:si_module_static_search + fun:si_module_with_name + fun:si_search + fun:getpwuid_r + fun:_CFRuntimeBridgeClasses + fun:__CFInitialize + fun:_ZN16ImageLoaderMachO11doImageInitERKN11ImageLoader11LinkContextE + fun:_ZN16ImageLoaderMachO16doInitializationERKN11ImageLoader11LinkContextE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader23recursiveInitializationERKNS_11LinkContextEjPKcRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader19processInitializersERKNS_11LinkContextEjRNS_21InitializerTimingListERNS_15UninitedUpwardsE + fun:_ZN11ImageLoader15runInitializersERKNS_11LinkContextERNS_21InitializerTimingListE + fun:_ZN4dyld24initializeMainExecutableEv + fun:_ZN4dyld5_mainEPK12macho_headermiPPKcS5_S5_Pm + fun:_ZN13dyldbootstrap5startEPKN5dyld311MachOLoadedEiPPKcS3_Pm + fun:_dyld_start +} diff --git a/.semaphore/lsan-suppressions.txt b/.semaphore/lsan-suppressions.txt new file mode 100644 index 000000000..d5888d28c --- /dev/null +++ b/.semaphore/lsan-suppressions.txt @@ -0,0 +1,47 @@ +# LSAN suppressions for the share consumer ASAN pipeline (run-asan-tests.yml). +# +# CPython deliberately doesn't free a lot of interpreter state at shutdown +# (type objects, interned strings, module dicts, codec caches), so LSAN reports +# those as leaks. Suppress the interpreter's own one-time allocations here so +# the log is left with leaks originating in our cimpl extension and librdkafka. +# +# This is a STARTING POINT, not a finished list — expect to add entries after +# reading the first run's report (that triage is the real work). Keep every +# rule scoped to interpreter internals or third-party extension module-init +# (e.g. cryptography/cffi dragged in by trivup); never suppress PyInit_cimpl, +# confluent_kafka, or librdkafka frames or you'll hide the leaks we're hunting. + +# Object machinery / type setup kept for the life of the interpreter +leak:_PyObject_GC_New +leak:_PyObject_GC_NewVar +leak:_PyObject_GC_Resize +leak:PyType_Ready +leak:type_new + +# Unicode interning — interned strings live until shutdown +leak:_PyUnicode_New +leak:PyUnicode_New +leak:unicode_resize +leak:PyUnicode_InternInPlace + +# Imports / modules / codecs — one-time startup allocations +leak:_PyImport_FixupExtensionObject +leak:PyModule_New +leak:_PyCodecRegistry_Init + +# Raw interpreter allocator +leak:_PyMem_RawMalloc +leak:_PyMem_RawRealloc +leak:_PyMem_RawCalloc + +# Third-party C/Rust extension module-init leaks, pulled in transitively by +# trivup (jwcrypto -> cryptography -> cffi) and the schema-registry client that +# cluster_fixture imports at load time. One-time PyInit / pyo3 type-object setup +# inside those .so's — not our code, not librdkafka. (Integration job only; the +# broker-free job never imports these.) +leak:cryptography +leak:_cffi_backend +leak:create_type_object +leak:PyInit__openssl +leak:_my_Py_InitModule +leak:b_init_cffi_1_0_external_module diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index f5c1a7d4f..af14696ce 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -9,6 +9,10 @@ global_job_config: env_vars: - name: LIBRDKAFKA_VERSION value: v2.15.0-RC1 + # TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support + # Pinned to a specific librdkafka commit; build-librdkafka-branch.sh fetches it by SHA. + - name: LIBRDKAFKA_BRANCH + value: 49345336e8258b0615ab3b1bc42099a208770ad4 prologue: commands: - checkout @@ -583,6 +587,347 @@ blocks: - cd .. - artifact push project artifacts/confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz --destination confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz - echo Thank you + # KIP-932: runs on every workflow (incl. PRs). An unsuppressed leak (or any + # ASAN error) now fails the job, so the LSAN baseline in lsan-suppressions.txt + # is load-bearing — keep it tight and scoped to interpreter internals or it'll + # hide real leaks. Consider a manual promotion if the per-PR build cost bites. + - name: "Share consumer tests under ASAN+LSAN" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + prologue: + commands: + # Ubuntu 24.04's high-entropy ASLR collides with the sanitizer shadow + # mapping; lower it or libasan aborts before main(). + - sudo sysctl -w vm.mmap_rnd_bits=28 + epilogue: + always: + commands: + - cp asan-share-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/asan/ || true + jobs: + - name: "ShareConsumer binding layer (ASAN + LSAN)" + commands: + - sem-version python 3.11 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + - uv pip install -r requirements/requirements-tests.txt + # Build an ASAN-instrumented librdkafka from the share consumer + # branch, then build the cimpl extension against it with the same + # instrumentation so binding-layer leaks show real source frames. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_SANITIZE=address tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-fsanitize=address -g -fno-omit-frame-pointer -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-fsanitize=address -L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # python3 isn't an ASAN build, so libasan has to be preloaded to come + # first; PYTHONMALLOC=malloc lets LSAN see allocations pymalloc would + # otherwise hide. exitcode=1 makes an unsuppressed leak (or any ASAN + # error) fail the job; the build has no -fsanitize-recover so errors + # halt and exit non-zero, and pipefail carries that through the tee. + # Both files are broker-free. + - | + set -o pipefail + LD_PRELOAD="$(gcc -print-file-name=libasan.so)" \ + PYTHONMALLOC=malloc \ + ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" \ + LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" \ + python -m pytest -v tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee asan-share-tests.log || true + # The `|| true` keeps set -e from aborting the block on python's + # shutdown exit-1 (pipefail forwards it) — without it the gate below + # never runs. Don't trust $?: libasan LD_PRELOADed into a non-instrumented python + # doesn't propagate its exitcode, and python itself exits 1 at + # interpreter shutdown (cimpl/librdkafka finalization) even on a fully + # green run. Decide from the printed reports, not the exit code. + rc=0 + if grep -Eq 'ERROR: (Address|Leak)Sanitizer|SUMMARY: AddressSanitizer' asan-share-tests.log; then + echo "ASAN/LSAN reported findings above — failing the job." + rc=1 + fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' sanitizer report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' asan-share-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means a + # crash/timeout before pytest could report. + if ! grep -Eq '^=+ .*[0-9]+ passed' asan-share-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] + # KIP-932: share-consumer INTEGRATION tests under ASAN+LSAN, gated per-PR + # (companion to the broker-free job above). The broker is a JVM, so it must + # never inherit LD_PRELOAD=libasan: trivup starts the cluster in a clean env + # and exports BROKERS, the kafka_cluster fixture connects via ByoFixture, and + # LD_PRELOAD is injected only inside --cmd (the pytest process). ASAN/LSAN + # options are exported outside trivup — inert for the JVM (it never loads + # libasan), but they ride into --cmd via trivup's env.update(). The `| tee` + # sits on the outer trivup call, not inside --cmd: trivup runs --cmd via + # /bin/sh (no pipefail), which would mask a pytest/LSAN failure behind tee. + - name: "Share consumer integration tests under ASAN+LSAN" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + prologue: + commands: + # Same Ubuntu 24.04 ASLR vs sanitizer-shadow fix as the binding-layer job. + - sudo sysctl -w vm.mmap_rnd_bits=28 + epilogue: + always: + commands: + - cp asan-share-integration-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/asan-integration/ || true + jobs: + - name: "ShareConsumer integration (ASAN + LSAN)" + commands: + - sem-version python 3.11 + - sem-version java 17 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + # requirements-tests-install.txt pulls pytest, the schema-registry + # client deps (authlib/cryptography — cluster_fixture imports the SR + # client at module load, even though the share tests don't use SR), + # and the vendored trivup 0.14.0 (PyPI 0.12.2 breaks on Kafka 4.2.0's + # 0.0.0.0 quorum voters). + - uv pip install -r requirements/requirements-tests-install.txt + # ASAN-instrumented librdkafka + cimpl, same build as the binding-layer job. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_SANITIZE=address tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-fsanitize=address -g -fno-omit-frame-pointer -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-fsanitize=address -L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # --conf mirrors tests/common TestUtils.broker_conf(): single-broker + # RF/ISR=1 for the share-coordinator + offsets/txn topics, plus the 1s + # share lock duration the redelivery tests need. BYO mode means the + # fixture doesn't apply broker_conf, so it has to be set here. + - | + set -o pipefail + export PYTHONMALLOC=malloc + export ASAN_OPTIONS="detect_leaks=1:halt_on_error=0:abort_on_error=0:exitcode=1:print_stacktrace=1:symbolize=1" + export LSAN_OPTIONS="suppressions=${PWD}/.semaphore/lsan-suppressions.txt:print_suppressions=0" + export PYTHONPATH="${PWD}" + python -m trivup.clusters.KafkaCluster \ + --kraft \ + --version 4.2.0 \ + --conf '["transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1","offsets.topic.replication.factor=1","offsets.topic.min.isr=1","share.coordinator.state.topic.replication.factor=1","share.coordinator.state.topic.min.isr=1","group.share.record.lock.duration.ms=1000","group.share.min.record.lock.duration.ms=1000"]' \ + --cmd 'LD_PRELOAD="$(gcc -print-file-name=libasan.so)" python -m pytest -v --timeout=300 tests/integration/share_consumer/' \ + 2>&1 | tee asan-share-integration-tests.log || true + # The `|| true` keeps set -e from aborting on python's shutdown exit-1 + # so the gate below actually runs. + # Don't trust $?: trivup runs --cmd via /bin/sh (no pipefail) and + # python exits 1 at interpreter shutdown (cimpl/librdkafka + # finalization) even on a green run. Decide from the printed reports. + rc=0 + if grep -Eq 'ERROR: (Address|Leak)Sanitizer|SUMMARY: AddressSanitizer' asan-share-integration-tests.log; then + echo "ASAN/LSAN reported findings above — failing the job." + rc=1 + fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' sanitizer report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' asan-share-integration-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means the + # cluster never came up or pytest crashed/timed out. + if ! grep -Eq '^=+ .*[0-9]+ passed' asan-share-integration-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] + # KIP-932: share-consumer binding layer under Valgrind memcheck, gated per-PR. + # Broker-free only — Memcheck's ~20-50x slowdown would blow the integration + # suite's 1s share-lock timing and run for hours, so we keep it to the fast + # tests. It catches the one class ASAN can't: reads of uninitialized memory in + # cimpl's own C. Valgrind and ASAN can't co-run, hence the separate non-ASAN + # LIBRDKAFKA_DEBUG build. Gate is uninit/invalid-access only (ERROR SUMMARY); + # leaks stay with LSAN in the ASAN block — CPython leaves thousands of + # "possibly lost"/"still reachable" blocks at exit that aren't ours to chase. + - name: "Share consumer binding layer (Valgrind)" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + epilogue: + always: + commands: + - cp valgrind-share-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/valgrind/ || true + jobs: + - name: "ShareConsumer binding layer (Valgrind memcheck)" + commands: + - sem-version python 3.11 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + - uv pip install -r requirements/requirements-tests.txt + - sudo apt-get update -qq && sudo apt-get install -y -qq valgrind + # Non-ASAN debug build (-g, -O0) so Memcheck stacks are real and the + # optimizer doesn't fake uninitialized-read reports. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_DEBUG=1 tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-g -O0 -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # PYTHONMALLOC=malloc lets Memcheck see individual allocations instead + # of pymalloc arenas. Suppressions: librdkafka.suppressions silences + # glibc-TLS/getaddrinfo/OpenSSL; valgrind-python.supp silences CPython's + # own uninit-value noise (eval loop, GC, PyLong). leak-check=summary + # keeps the per-run leak tally visible without dumping ~15k interpreter + # records (that overflowed Semaphore's 16MB log cap) or failing on them. + - | + set -o pipefail + SUPP="--suppressions=${PWD}/.semaphore/librdkafka.suppressions" + SUPP="$SUPP --suppressions=${PWD}/.semaphore/valgrind-python.supp" + [[ -f .semaphore/valgrind-confluent.supp ]] && SUPP="$SUPP --suppressions=${PWD}/.semaphore/valgrind-confluent.supp" + PYTHONMALLOC=malloc valgrind \ + --error-exitcode=42 --exit-on-first-error=no \ + --leak-check=summary \ + --track-origins=yes --num-callers=40 \ + $SUPP \ + python -m pytest -v --timeout=1200 tests/test_ShareConsumer.py tests/test_ShareConsumer_callbacks.py 2>&1 | tee valgrind-share-tests.log || true + # The `|| true` keeps set -e from aborting the block on python's + # shutdown exit-1 (pipefail forwards it) so the gate below runs. + # Don't trust $?: python exits 1 at interpreter shutdown + # (cimpl/librdkafka finalization) even on a fully green run, and with + # --error-exitcode=42 + 0 errors valgrind just forwards that. Decide + # from Memcheck's report and pytest's printed summary instead. + rc=0 + # Memcheck findings — uninit reads / invalid access, the class ASAN + # misses. (To harvest a new CPython suppression locally, re-run with + # --gen-suppressions=all.) Leaks are LSAN's job. + if grep -Eq 'ERROR SUMMARY: [1-9]' valgrind-share-tests.log; then + echo "Valgrind reported Memcheck errors above — failing the job." + rc=1 + fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' Memcheck report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' valgrind-share-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means a + # crash/timeout before pytest could report. + if ! grep -Eq '^=+ .*[0-9]+ passed' valgrind-share-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] + # KIP-932: share-consumer integration tests under Valgrind, full-suite per-PR. + # The binding block above keeps Valgrind broker-free on purpose (Memcheck's + # slowdown vs the 1s share lock) — this runs the whole integration suite anyway + # as a per-PR experiment. Expect timing tests (lock expiry/redelivery/throttle) + # to flake and the job to be slow against the 3h cap; curate or promote later. + - name: "Share consumer integration tests under Valgrind" + dependencies: [] + task: + agent: + machine: + type: s1-prod-ubuntu24-04-amd64-2 + env_vars: + - name: OS_NAME + value: linux + - name: ARCH + value: x64 + epilogue: + always: + commands: + - cp valgrind-share-integration-tests.log artifacts/ || true + - artifact push workflow artifacts/ --destination artifacts/valgrind-integration/ || true + jobs: + - name: "ShareConsumer integration (Valgrind memcheck)" + commands: + - sem-version python 3.11 + - sem-version java 17 + - pip install uv + - uv venv _venv --python "$(command -v python)" && source _venv/bin/activate + - uv pip install -r requirements/requirements-tests-install.txt + - sudo apt-get update -qq && sudo apt-get install -y -qq valgrind + # Non-ASAN debug build (-g, -O0) so Memcheck stacks are real and the + # optimizer doesn't fake uninitialized-read reports. Same build as the + # binding-layer Valgrind job. + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - LIBRDKAFKA_DEBUG=1 tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest + - export CFLAGS="-g -O0 -I${PWD}/dest/build/native/include $CFLAGS" + - export LDFLAGS="-L${PWD}/${lib_dir} $LDFLAGS" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - uv pip install -e . + # Valgrind wraps the pytest process inside trivup's --cmd; the broker is + # a separate JVM trivup starts outside --cmd, so it never runs under + # Memcheck. Suppression and test paths are relative to the checkout — + # trivup runs --cmd from the repo root (same as the ASAN job's relative + # pytest path) — and the inner 2>&1 routes valgrind's stderr report into + # the tee'd log, not just pytest's stdout. --conf matches the ASAN + # integration job (1s share lock, single-broker RF/ISR=1); --timeout is + # large because Memcheck's slowdown stacks on the broker round-trips. + - | + set -o pipefail + export PYTHONPATH="${PWD}" + python -m trivup.clusters.KafkaCluster \ + --kraft \ + --version 4.2.0 \ + --conf '["transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1","offsets.topic.replication.factor=1","offsets.topic.min.isr=1","share.coordinator.state.topic.replication.factor=1","share.coordinator.state.topic.min.isr=1","group.share.record.lock.duration.ms=1000","group.share.min.record.lock.duration.ms=1000"]' \ + --cmd 'PYTHONMALLOC=malloc valgrind --error-exitcode=42 --exit-on-first-error=no --leak-check=summary --track-origins=yes --num-callers=40 --suppressions=.semaphore/librdkafka.suppressions --suppressions=.semaphore/valgrind-python.supp python -m pytest -v --timeout=1800 tests/integration/share_consumer/ 2>&1' \ + 2>&1 | tee valgrind-share-integration-tests.log || true + # The `|| true` keeps set -e from aborting on python's shutdown exit-1 + # so the gate runs. Don't trust $?: trivup runs --cmd via /bin/sh (no + # pipefail) and python exits 1 at interpreter shutdown even on a green + # run; with --error-exitcode=42 valgrind just forwards that. + rc=0 + # Memcheck findings — uninit reads / invalid access, the class ASAN misses. + if grep -Eq 'ERROR SUMMARY: [1-9]' valgrind-share-integration-tests.log; then + echo "Valgrind reported Memcheck errors above — failing the job." + rc=1 + fi + # Real pytest failures/errors. Anchored on '^= ' so it matches pytest's + # summary banner and not the '==PID==' Memcheck report lines. + if grep -Eq '^=+ .*[0-9]+ (failed|error)' valgrind-share-integration-tests.log; then + echo "pytest reported test failures above — failing the job." + rc=1 + fi + # A clean run ends with a green pytest summary; its absence means the + # cluster never came up or pytest crashed/timed out. + if ! grep -Eq '^=+ .*[0-9]+ passed' valgrind-share-integration-tests.log; then + echo "No passing pytest summary found — the run did not complete." + rc=1 + fi + # Not `exit $rc`: an explicit exit (even 0) kills Semaphore's command + # shell before the agent captures status, so green runs get recorded + # as failed and the epilogue's artifact push never runs. + [ "$rc" -eq 0 ] promotions: - name: "Publish to Test PyPI" pipeline_file: publish-test-pypi.yml diff --git a/.semaphore/valgrind-python.supp b/.semaphore/valgrind-python.supp new file mode 100644 index 000000000..3b64504c4 --- /dev/null +++ b/.semaphore/valgrind-python.supp @@ -0,0 +1,152 @@ +# CPython interpreter uninitialized-value false positives under Memcheck. +# Harvested from the share-consumer Valgrind run (CPython 3.11, mise build): +# 87 Value8 + 7 Cond, every one originating inside the interpreter itself — +# the eval loop reading its own stack slots, the cyclic GC walking its graph, +# and PyLong digit-array arithmetic. None touch cimpl or librdkafka. +# +# We match on the innermost (top) frame only. A genuine uninitialized read in +# our C would have a cimpl/librdkafka frame on top, so it won't be masked here. +# Function names are stable across CPython patch releases, so this survives a +# mise bump. Leaks are LSAN's job (the ASAN block) — this file is uninit-only. + +# --- bytecode interpreter loop --- +{ + cpython-eval-loop-value8 + Memcheck:Value8 + fun:_PyEval_EvalFrameDefault + ... +} + +# --- cyclic GC traversal --- +{ + cpython-gc-visit-reachable-value8 + Memcheck:Value8 + fun:visit_reachable + ... +} +{ + cpython-gc-visit-decref-value8 + Memcheck:Value8 + fun:visit_decref + ... +} + +# --- frame teardown --- +{ + cpython-frame-clear-value8 + Memcheck:Value8 + fun:_PyFrame_Clear + ... +} + +# --- PyLong / int arithmetic over digit arrays --- +{ + cpython-long-fromstring-value8 + Memcheck:Value8 + fun:PyLong_FromString + ... +} +{ + cpython-long-fromstring-cond + Memcheck:Cond + fun:PyLong_FromString + ... +} +{ + cpython-long-asbytearray-value8 + Memcheck:Value8 + fun:_PyLong_AsByteArray + ... +} +{ + cpython-long-frombytearray-value8 + Memcheck:Value8 + fun:_PyLong_FromByteArray + ... +} +{ + cpython-long-frombytearray-cond + Memcheck:Cond + fun:_PyLong_FromByteArray + ... +} +{ + cpython-long-and-value8 + Memcheck:Value8 + fun:long_and + ... +} +{ + cpython-long-and-cond + Memcheck:Cond + fun:long_and + ... +} +{ + cpython-long-richcompare-value8 + Memcheck:Value8 + fun:long_richcompare + ... +} +{ + cpython-long-richcompare-cond + Memcheck:Cond + fun:long_richcompare + ... +} + +# --- object / type machinery --- +{ + cpython-type-call-value8 + Memcheck:Value8 + fun:type_call + ... +} +{ + cpython-object-init-value8 + Memcheck:Value8 + fun:object_init + ... +} +{ + cpython-subtype-dealloc-value8 + Memcheck:Value8 + fun:subtype_dealloc + ... +} +{ + cpython-generic-setattr-value8 + Memcheck:Value8 + fun:PyObject_GenericSetAttr + ... +} +{ + cpython-tuple-dealloc-value8 + Memcheck:Value8 + fun:tupledealloc + ... +} +{ + cpython-list-dealloc-value8 + Memcheck:Value8 + fun:list_dealloc + ... +} +{ + cpython-tuple-fromarray-value8 + Memcheck:Value8 + fun:_PyTuple_FromArray + ... +} +{ + cpython-function-vectorcall-value8 + Memcheck:Value8 + fun:_PyFunction_Vectorcall + ... +} +{ + cpython-method-vectorcall-fk-value8 + Memcheck:Value8 + fun:method_vectorcall_FASTCALL_KEYWORDS + ... +} diff --git a/tools/wheels/build-librdkafka-branch.sh b/tools/wheels/build-librdkafka-branch.sh new file mode 100755 index 000000000..18577e051 --- /dev/null +++ b/tools/wheels/build-librdkafka-branch.sh @@ -0,0 +1,123 @@ +#!/bin/bash +# +# TODO KIP-932: This script is temporary until share consumer support +# lands in a released librdkafka version. +# +# Build librdkafka from a git branch and install it into a NuGet-compatible +# directory layout that matches what install-librdkafka.sh produces. +# +# Usage: build-librdkafka-branch.sh +# +# branch - git branch, tag, or commit SHA, e.g. dev_kip-932_queues-for-kafka +# destdir - destination directory, e.g. dest +# +# Resulting layout (mirrors NuGet redist package): +# /build/native/include/librdkafka/rdkafka.h +# /runtimes/-/native/librdkafka.{so.1,dylib} + +set -ex + +BRANCH="$1" +DEST="$2" + +if [[ -z $BRANCH || -z $DEST ]]; then + echo "Usage: $0 " + exit 1 +fi + +if [[ -f $DEST/build/native/include/librdkafka/rdkafka.h ]]; then + echo "$0: librdkafka already built in $DEST" + exit 0 +fi + +echo "$0: Building librdkafka branch '$BRANCH' into '$DEST'" + +ARCH=${ARCH:-x64} +SRC=/tmp/librdkafka-${BRANCH} +INSTALL=$SRC/install + +[[ -d "$DEST" ]] || mkdir -p "$DEST" +rm -rf "$SRC" + +# $BRANCH may be a branch, tag, or commit SHA. `git clone --branch` rejects a +# bare SHA, so fetch the ref explicitly (GitHub allows fetching a commit by SHA). +mkdir -p "$SRC" +git -C "$SRC" init +git -C "$SRC" remote add origin https://github.com/confluentinc/librdkafka.git +git -C "$SRC" fetch --depth 1 origin "$BRANCH" +git -C "$SRC" checkout FETCH_HEAD + +if [[ $OSTYPE == linux* ]]; then + sudo apt-get update -qq && sudo apt-get install -y -qq libssl-dev libsasl2-dev liblz4-dev libzstd-dev +elif [[ $OSTYPE == darwin* ]]; then + # openssl@3 is keg-only in Homebrew (the system ships LibreSSL/Apple + # crypto with no -lcrypto headers), so configure's compile-probe for + # libcrypto silently fails unless we add brew's path. Same for zstd / + # lz4 / pkg-config on a fresh runner. + brew install pkg-config openssl@3 zstd lz4 + OPENSSL_PREFIX="$(brew --prefix openssl@3)" + export PKG_CONFIG_PATH="$OPENSSL_PREFIX/lib/pkgconfig${PKG_CONFIG_PATH:+:$PKG_CONFIG_PATH}" + export CPPFLAGS="-I$OPENSSL_PREFIX/include${CPPFLAGS:+ $CPPFLAGS}" + export LDFLAGS="-L$OPENSSL_PREFIX/lib${LDFLAGS:+ $LDFLAGS}" +fi + +pushd "$SRC" + +# --enable-ssl/-lz4/-zstd convert mklove's silent "failed (disable)" into a +# loud configure error if a future regression breaks dep installation — +# otherwise we ship a wheel where sasl.oauthbearer.config etc. fail at +# runtime with _INVALID_ARG -186. +CONFIGURE_OPTS="--prefix=$INSTALL --enable-ssl --enable-lz4-ext --enable-zstd" +if [[ $OSTYPE == linux* ]]; then + CONFIGURE_OPTS="$CONFIGURE_OPTS --disable-gssapi" +fi + +# LIBRDKAFKA_SANITIZE=address builds an instrumented lib for the share-consumer +# ASAN pipeline. Keep the debug symbols mklove would otherwise strip so leak +# reports land on real source lines; normal wheel builds still strip them. +if [[ -n $LIBRDKAFKA_SANITIZE ]]; then + export CFLAGS="-fsanitize=${LIBRDKAFKA_SANITIZE} -g -fno-omit-frame-pointer ${CFLAGS}" + export LDFLAGS="-fsanitize=${LIBRDKAFKA_SANITIZE} ${LDFLAGS}" +elif [[ -n $LIBRDKAFKA_DEBUG ]]; then + # Valgrind build: keep debug symbols and drop optimization so Memcheck + # stacks are real and the optimizer doesn't synthesize false uninitialized + # reads. Same config librdkafka's own Valgrind CI uses. + CONFIGURE_OPTS="$CONFIGURE_OPTS --enable-devel --disable-optimization" +else + CONFIGURE_OPTS="$CONFIGURE_OPTS --disable-debug-symbols" +fi + +./configure $CONFIGURE_OPTS +make -j"$(nproc 2>/dev/null || sysctl -n hw.ncpu)" +make install +popd + +# --- Mirror NuGet layout --- + +INC_DST="$DEST/build/native/include" +mkdir -p "$INC_DST" +cp -r "$INSTALL/include/librdkafka" "$INC_DST/" + +if [[ $OSTYPE == linux* ]]; then + OS_NAME=linux + LIB_DST="$DEST/runtimes/$OS_NAME-$ARCH/native" + mkdir -p "$LIB_DST" + cp -v "$INSTALL"/lib/librdkafka.so* "$LIB_DST/" 2>/dev/null || true + # Ensure librdkafka.so.1 exists (needed by the loader) + if [[ ! -f "$LIB_DST/librdkafka.so.1" ]]; then + cp -v "$LIB_DST/librdkafka.so" "$LIB_DST/librdkafka.so.1" + fi + ldd "$LIB_DST/librdkafka.so.1" + +elif [[ $OSTYPE == darwin* ]]; then + OS_NAME=osx + LIB_DST="$DEST/runtimes/$OS_NAME-$ARCH/native" + mkdir -p "$LIB_DST" + cp -v "$INSTALL"/lib/librdkafka*.dylib "$LIB_DST/" + # Fix the dylib self-referencing name to its installed path + install_name_tool -id "$LIB_DST/librdkafka.dylib" "$LIB_DST/librdkafka.dylib" + otool -L "$LIB_DST/librdkafka.dylib" +fi + +rm -rf "$SRC" +echo "$0: Done. Headers at $INC_DST, library at $LIB_DST"