Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka/changelog.d/21206.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Set ruff formatting rules in pyproject.toml to inherit from the global ones of the repo.
10 changes: 5 additions & 5 deletions kafka/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

[[envs.default.matrix]]
python = ["3.12"]
version = ["2.8", "3.3"]
kafka_version = ["3.3"]

[envs.default.overrides]
matrix.version.env-vars = [
{ key = "KAFKA_VERSION", value = "2.8.1", if = ["2.8"] },
{ key = "KAFKA_VERSION", value = "3.3.1", if = ["3.3"] },
# See mappings between kafka version and confluent versions here: https://docs.confluent.io/platform/current/installation/versions-interoperability.html
matrix.kafka_version.env-vars = [
{ key = "CONFLUENT_VERSION", value = "7.3.0", if = ["3.3"] },
]

[envs.latest.env-vars]
KAFKA_VERSION = "latest"
CONFLUENT_VERSION = "latest"
3 changes: 3 additions & 0 deletions kafka/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ include = [
dev-mode-dirs = [
".",
]

[tool.ruff]
extend = "../pyproject.toml"
29 changes: 19 additions & 10 deletions kafka/tests/compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
services:
zookeeper:
image: bitnami/zookeeper:3.8.0
image: confluentinc/cp-zookeeper:7.2.0
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ["CMD", "bash", "-c", "echo 'srvr' | nc localhost 2181 | grep -q 'Mode:'"]
interval: 10s
timeout: 5s
retries: 5
kafka:
image: bitnami/kafka:${KAFKA_VERSION}
image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
ports:
- 9092:9092
- 9999:9999
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CREATE_TOPICS: "marvel:2:1,dc:2:1"
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_ADVERTISED_HOST_NAME: localhost
KAFKA_CFG_ADVERTISED_PORT: "9092"
KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
JMX_PORT: "9999"
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
depends_on:
- zookeeper
zookeeper:
condition: service_healthy

11 changes: 10 additions & 1 deletion kafka/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ def dd_environment():
compose_file = os.path.join(HERE, 'compose', 'docker-compose.yml')

with docker_run(
compose_file, conditions=[CheckDockerLogs(compose_file, [r'\[KafkaServer id=\d+\] started'], matches="all")]
compose_file,
conditions=[
CheckDockerLogs(
compose_file,
[r'\[KafkaServer id=\d+\] started'],
matches="all",
service="kafka",
),
],
waith_for_health=True,
):
yield load_jmx_config(), {'use_jmx': True}
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/21206.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Set ruff formatting rules in pyproject.toml to inherit from the global ones of the repo.
21 changes: 6 additions & 15 deletions kafka_consumer/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,17 @@ post-install-commands = [
"python -m pip install --no-binary confluent-kafka confluent-kafka==2.8.0",
]

[envs.default.env-vars]
ZK_VERSION = "3.6.4"
AUTHENTICATION = "noauth"

[[envs.default.matrix]]
python = ["3.12"]
version = ["2.6", "3.3"]

[[envs.default.matrix]]
python = ["3.12"]
version = ["3.3"]
auth = ["ssl", "kerberos"]
kafka_version = ["3.3"]
auth = ["ssl", "kerberos", "noauth"]

[envs.default.overrides]
matrix.version.env-vars = [
{ key = "KAFKA_VERSION", value = "2.6.0", if = ["2.6"] },
{ key = "KAFKA_VERSION", value = "3.3.2-debian-11-r175", if = ["3.3"] },
# See mappings between kafka version and confluent versions here: https://docs.confluent.io/platform/current/installation/versions-interoperability.html
matrix.kafka_version.env-vars = [
{ key = "CONFLUENT_VERSION", value = "7.3.0", if = ["3.3"] },
]
matrix.auth.env-vars = "AUTHENTICATION"

[envs.latest.env-vars]
KAFKA_VERSION = "latest"
ZK_VERSION = "3.6.4"
CONFLUENT_VERSION = "latest"
3 changes: 3 additions & 0 deletions kafka_consumer/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ include = [
dev-mode-dirs = [
".",
]

[tool.ruff]
extend = "../pyproject.toml"
1 change: 1 addition & 0 deletions kafka_consumer/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def get_authentication_configuration(instance):
"ssl.certificate.location": instance.get("tls_cert"),
"ssl.key.location": instance.get("tls_private_key"),
"ssl.key.password": instance.get("tls_private_key_password"),
"ssl.endpoint.identification.algorithm": "https",
}
)

Expand Down
18 changes: 16 additions & 2 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def dd_environment():
conditions.append(WaitFor(wait_for_cp_kafka_topics, attempts=10, wait=10))
common.E2E_METADATA["docker_volumes"].append(f"{secret_dir}:/var/lib/secret")

if common.AUTHENTICATION == "ssl":
conditions.append(WaitFor(wait_for_ssl_ready, attempts=30, wait=5))

conditions.extend(
[
WaitFor(create_topics, attempts=60, wait=3),
Expand Down Expand Up @@ -111,12 +114,23 @@ def initialize_topics():
time.sleep(5)


def wait_for_ssl_ready():
try:
client = _create_admin_client()
metadata = client.list_topics(timeout=5)
return metadata.cluster_id is not None
except Exception as e:
print(f"SSL not ready yet: {e}")
return False


def _create_admin_client():
config = {
"bootstrap.servers": common.INSTANCE['kafka_connect_str'],
"socket.timeout.ms": 1000,
"socket.timeout.ms": 5000, # Increased for SSL handshake
"topic.metadata.refresh.interval.ms": 2000,
}
config.update(common.get_authentication_configuration(common.INSTANCE))
auth_config = common.get_authentication_configuration(common.INSTANCE)
config.update(auth_config)

return AdminClient(config)
4 changes: 2 additions & 2 deletions kafka_consumer/tests/docker/kerberos/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
-Dzookeeper.requireClientAuthScheme=sasl

broker:
image: confluentinc/cp-server:7.3.0
image: confluentinc/cp-server:${CONFLUENT_VERSION}
hostname: broker.kerberos-demo.local
container_name: broker
ports:
Expand Down Expand Up @@ -86,7 +86,7 @@ services:
retries: 10

broker2:
image: confluentinc/cp-server:7.3.0
image: confluentinc/cp-server:${CONFLUENT_VERSION}
hostname: broker2.kerberos-demo.local
container_name: broker2
ports:
Expand Down
43 changes: 20 additions & 23 deletions kafka_consumer/tests/docker/noauth/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,52 +1,49 @@
services:
zookeeper:
image: docker.io/bitnami/zookeeper:${ZK_VERSION}
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
hostname: zookeeper
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: nc -vz localhost 2181 || exit -1
start_period: 5s
interval: 5s
test: ["CMD", "bash", "-c", "echo 'srvr' | nc localhost 2181 | grep -q 'Mode:'"]
interval: 10s
timeout: 5s
retries: 10
retries: 5

kafka1:
image: docker.io/bitnami/kafka:${KAFKA_VERSION}
image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
container_name: kafka1
hostname: kafka1
ports:
- 9092:9092
environment:
KAFKA_CFG_BROKER_ID: 1
KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_LISTENERS: INTERNAL://:19092,EXTERNAL://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://:19092,EXTERNAL://127.0.0.1:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
ALLOW_PLAINTEXT_LISTENER: "true"
KAFKA_ENABLE_KRAFT: false
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
zookeeper:
condition: service_healthy

kafka2:
image: docker.io/bitnami/kafka:${KAFKA_VERSION}
image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
ports:
- 9093:9093
container_name: kafka2
hostname: kafka2
environment:
KAFKA_CFG_BROKER_ID: 2
KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_LISTENERS: INTERNAL://:19093,EXTERNAL://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://:19093,EXTERNAL://127.0.0.1:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
ALLOW_PLAINTEXT_LISTENER: "true"
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
zookeeper:
condition: service_healthy
Loading
Loading