@@ -34,6 +34,25 @@ def _env_first(*names: str, default: str = "") -> str:
3434 return default
3535
3636
37+ def _keep_containers () -> bool :
38+ """LOGREDUCER_KEEP_CONTAINERS=1 leaves docker services RUNNING after the run.
39+
40+ Default is to stop/remove them. Keeping them lets you re-run fast: grab the
41+ printed endpoint and export it (KAFKA_BOOTSTRAP_SERVERS / CLICKHOUSE_* / a
42+ DB URL) so the next run takes the env-first path and skips container startup.
43+ """
44+ return os .environ .get ("LOGREDUCER_KEEP_CONTAINERS" , "" ).strip ().lower () in ("1" , "true" , "yes" , "on" )
45+
46+
47+ def _stop (container : Any , label : str , endpoint : str ) -> None :
48+ """Stop a throwaway container, unless the caller asked to keep it running."""
49+ if _keep_containers ():
50+ print (f"\n [keep] { label } left running at { endpoint } (LOGREDUCER_KEEP_CONTAINERS=1)" )
51+ return
52+ with contextlib .suppress (Exception ):
53+ container .stop ()
54+
55+
3756# ---------------------------------------------------------------------------
3857# ClickHouse
3958# ---------------------------------------------------------------------------
@@ -131,36 +150,69 @@ def clickhouse_client() -> Iterator[Any]:
131150 finally :
132151 with contextlib .suppress (Exception ):
133152 client .close ()
134- container .stop () # stop the fallback container as soon as we are done
153+ endpoint = f"{ container .get_container_host_ip ()} :{ container .get_exposed_port (8123 )} "
154+ _stop (container , "ClickHouse" , endpoint )
135155
136156
137157# ---------------------------------------------------------------------------
138158# Kafka
139159# ---------------------------------------------------------------------------
140160
141161
142- def _kafka_from_env () -> str | None :
143- """Return a reachable bootstrap.servers string from env, or None."""
162+ def _kafka_config_from_env () -> str | dict [str , Any ] | None :
163+ """Return a reachable Kafka config from env, or None.
164+
165+ A bare ``KAFKA_BOOTSTRAP_SERVERS`` yields a plain bootstrap string. Adding
166+ ``KAFKA_SECURITY_PROTOCOL`` (e.g. SASL_SSL) upgrades it to a full librdkafka
167+ config dict (SASL mechanism/username/password + TLS CA) - used to reach the
168+ authenticated PET broker. Probed with an AdminClient before use.
169+ """
144170 servers = _env_first ("KAFKA_BOOTSTRAP_SERVERS" )
145171 if not servers :
146172 return None
173+ protocol = _env_first ("KAFKA_SECURITY_PROTOCOL" )
174+ config : str | dict [str , Any ]
175+ if protocol :
176+ config = {"bootstrap.servers" : servers , "security.protocol" : protocol }
177+ for env_key , conf_key in (
178+ ("KAFKA_SASL_MECHANISM" , "sasl.mechanism" ),
179+ ("KAFKA_SASL_USERNAME" , "sasl.username" ),
180+ ("KAFKA_SASL_PASSWORD" , "sasl.password" ),
181+ ("KAFKA_SSL_CA_LOCATION" , "ssl.ca.location" ),
182+ ):
183+ value = _env_first (env_key )
184+ if value :
185+ config [conf_key ] = value
186+ # Internal PET broker: skip TLS cert verification (same stance as
187+ # CLICKHOUSE_VERIFY=false) - the chain uses an internal, pre-rebrand CA.
188+ if not _env_bool ("KAFKA_SSL_VERIFY" , default = True ):
189+ config ["enable.ssl.certificate.verification" ] = "false"
190+ else :
191+ config = servers
147192 try :
148193 from confluent_kafka .admin import AdminClient
149194
150- AdminClient ({"bootstrap.servers" : servers , "socket.timeout.ms" : 3000 }).list_topics (timeout = 5 )
151- return servers
195+ probe = dict (config ) if isinstance (config , dict ) else {"bootstrap.servers" : config }
196+ probe ["socket.timeout.ms" ] = 6000
197+ AdminClient (probe ).list_topics (timeout = 8 )
198+ return config
152199 except Exception :
153200 return None
154201
155202
156- def _kafka_from_docker () -> tuple [str , Any ] | None :
157- """Start a throwaway Kafka container; return (bootstrap, container) or None."""
203+ def _redpanda_from_docker () -> tuple [str , Any ] | None :
204+ """Start a throwaway Redpanda container; return (bootstrap, container) or None.
205+
206+ Redpanda is the docker broker (Kafka-API compatible, single binary, far
207+ smaller/faster to start than Apache Kafka) - confluent-kafka talks to it
208+ unchanged.
209+ """
158210 try :
159- from testcontainers .kafka import KafkaContainer
211+ from testcontainers .kafka import RedpandaContainer
160212 except ImportError :
161213 return None
162214 try :
163- container = KafkaContainer ()
215+ container = RedpandaContainer ()
164216 container .start ()
165217 except Exception :
166218 with contextlib .suppress (Exception ):
@@ -170,21 +222,26 @@ def _kafka_from_docker() -> tuple[str, Any] | None:
170222
171223
172224@pytest .fixture (scope = "session" )
173- def kafka_bootstrap () -> Iterator [str ]:
174- """A Kafka bootstrap string - local if configured, else docker, else skip."""
175- servers = _kafka_from_env ()
176- if servers is not None :
177- yield servers
225+ def kafka_bootstrap () -> Iterator [str | dict [str , Any ]]:
226+ """A Kafka config - a configured broker (str, or a SASL dict for PET) if
227+ reachable, else a Redpanda docker broker (str), else skip.
228+
229+ The value is accepted directly by KafkaSource/KafkaSink and the corpora
230+ loaders (all take a bootstrap string or a full librdkafka config dict).
231+ """
232+ config = _kafka_config_from_env ()
233+ if config is not None :
234+ yield config
178235 return
179236
180- docker = _kafka_from_docker ()
237+ docker = _redpanda_from_docker ()
181238 if docker is None :
182- pytest .skip ("no Kafka: set KAFKA_BOOTSTRAP_SERVERS, or start Docker" )
239+ pytest .skip ("no Kafka broker : set KAFKA_BOOTSTRAP_SERVERS, or start Docker (Redpanda) " )
183240 servers , container = docker
184241 try :
185242 yield servers
186243 finally :
187- container . stop () # stop the fallback container as soon as we are done
244+ _stop ( container , "Redpanda" , servers )
188245
189246
190247# ---------------------------------------------------------------------------
@@ -207,8 +264,8 @@ def _seed_logs(engine: Any, rows: int) -> None:
207264
208265
209266@pytest .fixture (scope = "session" )
210- def pg_logs_engine () -> Iterator [Any ]:
211- """A PostgreSQL engine with a seeded `logs` table (docker, else skip)."""
267+ def pg_engine () -> Iterator [Any ]:
268+ """A bare PostgreSQL engine on a throwaway container (docker, else skip)."""
212269 try :
213270 from sqlalchemy import create_engine
214271 from testcontainers .postgres import PostgresContainer
@@ -221,17 +278,16 @@ def pg_logs_engine() -> Iterator[Any]:
221278 pytest .skip (f"no Docker for PostgreSQL: { exc } " )
222279 engine = create_engine (container .get_connection_url ())
223280 try :
224- _seed_logs (engine , 5000 )
225281 yield engine
226282 finally :
227283 with contextlib .suppress (Exception ):
228284 engine .dispose ()
229- container . stop ( )
285+ _stop ( container , "PostgreSQL" , container . get_connection_url () )
230286
231287
232288@pytest .fixture (scope = "session" )
233- def mysql_logs_engine () -> Iterator [Any ]:
234- """A MySQL engine with a seeded `logs` table (docker, else skip)."""
289+ def mysql_engine () -> Iterator [Any ]:
290+ """A bare MySQL engine on a throwaway container (docker, else skip)."""
235291 try :
236292 from sqlalchemy import create_engine
237293 from testcontainers .mysql import MySqlContainer
@@ -249,9 +305,22 @@ def mysql_logs_engine() -> Iterator[Any]:
249305 url = url .replace ("mysql://" , "mysql+pymysql://" , 1 )
250306 engine = create_engine (url )
251307 try :
252- _seed_logs (engine , 5000 )
253308 yield engine
254309 finally :
255310 with contextlib .suppress (Exception ):
256311 engine .dispose ()
257- container .stop ()
312+ _stop (container , "MySQL" , url )
313+
314+
315+ @pytest .fixture (scope = "session" )
316+ def pg_logs_engine (pg_engine : Any ) -> Any :
317+ """PostgreSQL with a seeded synthetic `logs` table (for the sampling tests)."""
318+ _seed_logs (pg_engine , 5000 )
319+ return pg_engine
320+
321+
322+ @pytest .fixture (scope = "session" )
323+ def mysql_logs_engine (mysql_engine : Any ) -> Any :
324+ """MySQL with a seeded synthetic `logs` table (for the sampling tests)."""
325+ _seed_logs (mysql_engine , 5000 )
326+ return mysql_engine
0 commit comments