Skip to content

kafka_schema_registry_url cannot resolve protobuf import statements #1076

@yokofly

Description

@yokofly

Company or project name

No response

Describe what's wrong

the Protobuf schema-registry path ignores the references field, and
there’s no importer to resolve import statements.

Exact code path:

  • src/Formats/KafkaSchemaRegistry.cpp:44 and src/Formats/
    KafkaSchemaRegistry.cpp:103 only parse schema (and id) from the registry
    JSON; references is never read.
  • src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp:261 uses
    registry.fetchSchema(id) and then parses the raw schema string into a
    FileDescriptorProto with DescriptorPool (no importer/source tree), so import
    "google/protobuf/timestamp.proto" cannot be resolved.
  • Local format_schema works because src/Formats/ProtobufSchemas.cpp:33 uses
    ImporterWithSourceTree and maps google_protos_path, so imports resolve
    there.

So yes, the references array from schema registry is effectively ignored today
in Proton’s protobuf SR path.

Does it reproduce on the most recent release?

Yes

How to reproduce

services:
  redpanda:
    image: docker.redpanda.com/redpandadata/redpanda:v24.2.4
    container_name: redpanda
    command:
      - redpanda start
      - --smp 1
      - --overprovisioned
      - --node-id 0
      - --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
      - --pandaproxy-addr 0.0.0.0:8082
      - --advertise-pandaproxy-addr localhost:8082
      - --schema-registry-addr 0.0.0.0:8081
    ports:
      - "9092:9092"
      - "8081:8081"
      - "8082:8082"
    healthcheck:
      test: ["CMD", "rpk", "cluster", "health"]
      interval: 5s
      timeout: 3s
      retries: 10

  redpanda-console:
    image: docker.redpanda.com/redpandadata/console:v2.7.2
    container_name: redpanda-console
    ports:
      - "8080:8080"
    environment:
      KAFKA_BROKERS: "redpanda:29092"
      KAFKA_SCHEMAREGISTRY_ENABLED: "true"
      KAFKA_SCHEMAREGISTRY_URLS: "http://redpanda:8081"
      KAFKA_PROTOBUF_ENABLED: "true"
      KAFKA_PROTOBUF_SCHEMAREGISTRY_ENABLED: "true"
      KAFKA_PROTOBUF_SCHEMAREGISTRY_REFRESHINTERVAL: "5s"
    depends_on:
      redpanda:
        condition: service_healthy

  proton:
    image: ghcr.io/timeplus-io/proton:3.0.11
    container_name: proton
    ports:
      - "8123:8123"
      - "8463:8463"
    depends_on:
      redpanda:
        condition: service_healthy

#!/bin/bash
#
# Bug: Protobuf import not resolved with kafka_schema_registry_url
#
# Prerequisites:
#   pip install confluent-kafka protobuf grpcio-tools
#

set -e
cd "$(dirname "$0")"

echo "============================================================"
echo "Bug: Protobuf import not resolved with kafka_schema_registry_url"
echo "============================================================"

# Check dependencies
echo ""
echo "Checking Python dependencies..."
python3 -c "import confluent_kafka" 2>/dev/null || { echo "ERROR: Run 'pip install confluent-kafka'"; exit 1; }
python3 -c "import google.protobuf" 2>/dev/null || { echo "ERROR: Run 'pip install protobuf'"; exit 1; }
python3 -c "import grpc_tools.protoc" 2>/dev/null || { echo "ERROR: Run 'pip install grpcio-tools'"; exit 1; }
echo "  All dependencies OK"

# Step 1: Generate protobuf Python class
echo ""
echo "Step 1: Generate protobuf Python class..."
cat > test_message.proto << 'EOF'
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message TestMessage {
  int64 id = 1;
  google.protobuf.Timestamp created_at = 2;
}
EOF

PROTO_PATH=$(python3 -c "import google.protobuf; import os; print(os.path.dirname(os.path.dirname(google.protobuf.__file__)))")
python3 -m grpc_tools.protoc --python_out=. -I. -I"$PROTO_PATH" test_message.proto
echo "  Generated test_message_pb2.py"

# Step 2: Create topic
echo ""
echo "Step 2: Create Kafka topic..."
docker exec redpanda rpk topic delete test-import-bug 2>/dev/null || true
docker exec redpanda rpk topic create test-import-bug -p 1 2>/dev/null || true

# Step 3: Produce messages using Confluent serializer
echo ""
echo "Step 3: Produce messages with Confluent Protobuf Serializer..."
python3 << 'PYEOF'
import sys
sys.path.insert(0, '.')
import test_message_pb2
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer

schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})
serializer = ProtobufSerializer(test_message_pb2.TestMessage, schema_registry, {"use.deprecated.format": False})
producer = Producer({"bootstrap.servers": "localhost:9092"})

for i in range(3):
    msg = test_message_pb2.TestMessage()
    msg.id = 100 + i
    msg.created_at.seconds = 1735100000 + i
    msg.created_at.nanos = 123456789
    
    serialized = serializer(msg, SerializationContext("test-import-bug", MessageField.VALUE))
    producer.produce("test-import-bug", value=serialized)
    print(f"  Produced: id={msg.id}, created_at=({msg.created_at.seconds}, {msg.created_at.nanos})")

producer.flush()
print("  Done!")
PYEOF

# Step 4: Show schema registry state
echo ""
echo "Step 4: Schema Registry state..."
echo "  Subjects:"
curl -s http://localhost:8081/subjects | python3 -c "import sys,json; [print(f'    - {s}') for s in json.load(sys.stdin)]"
SCHEMA_ID=$(curl -s http://localhost:8081/subjects/test-import-bug-value/versions/latest | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
echo "  Schema ID: $SCHEMA_ID"
echo "  References:"
curl -s http://localhost:8081/schemas/ids/$SCHEMA_ID | python3 -c "import sys,json; d=json.load(sys.stdin); [print(f'    - {r}') for r in d.get('references',[])]"

# Step 5: Test with Proton - Schema Registry (should FAIL)
echo ""
echo "Step 5: Test Proton with kafka_schema_registry_url (EXPECTED TO FAIL)..."
docker exec proton proton-client --query "DROP STREAM IF EXISTS test_sr"
docker exec proton proton-client --query "
CREATE EXTERNAL STREAM test_sr (
    id int64,
    created_at tuple(seconds int64, nanos int32)
)
SETTINGS
    type = 'kafka',
    brokers = 'redpanda:29092',
    topic = 'test-import-bug',
    data_format = 'ProtobufSingle',
    kafka_schema_registry_url = 'http://redpanda:8081'
"
echo ""
echo "  Query result:"
timeout 5 docker exec proton proton-client --query \
    "SELECT * FROM test_sr LIMIT 1 SETTINGS seek_to='earliest'" 2>&1 | sed 's/^/    /' || true

# Step 6: Test with Proton - Local format_schema (should WORK)
echo ""
echo "Step 6: Test Proton with local format_schema (EXPECTED TO WORK)..."
docker exec proton proton-client --query "DROP STREAM IF EXISTS test_local"
docker exec proton proton-client --query "DROP FORMAT SCHEMA IF EXISTS test_schema TYPE Protobuf"
docker exec proton proton-client --query "
CREATE FORMAT SCHEMA test_schema AS
\$\$
syntax = \"proto3\";
import \"google/protobuf/timestamp.proto\";
message TestMessage {
  int64 id = 1;
  google.protobuf.Timestamp created_at = 2;
}
\$\$
TYPE Protobuf
"
docker exec proton proton-client --query "
CREATE EXTERNAL STREAM test_local (
    id int64,
    created_at tuple(seconds int64, nanos int32)
)
SETTINGS
    type = 'kafka',
    brokers = 'redpanda:29092',
    topic = 'test-import-bug',
    data_format = 'ProtobufSingle',
    format_schema = 'test_schema:TestMessage'
"
echo ""
echo "  Query result:"
timeout 5 docker exec proton proton-client --query \
    "SELECT * FROM test_local LIMIT 3 SETTINGS seek_to='earliest'" 2>&1 | sed 's/^/    /' || true

# Summary
echo ""
echo "============================================================"
echo "SUMMARY"
echo "============================================================"
echo ""
echo "✅ Redpanda Console: Check http://localhost:8080 -> Topics -> test-import-bug"
echo "   The messages should decode correctly showing id and created_at fields."
echo ""
echo "❌ Proton (kafka_schema_registry_url): FAILS with 'No message type in schema'"
echo "   Proton cannot resolve 'import google/protobuf/timestamp.proto'"
echo ""
echo "✅ Proton (local format_schema): WORKS with same import statement"
echo "   The import resolves correctly using built-in google protos"
echo ""
echo "ROOT CAUSE: KafkaSchemaRegistry.cpp ignores 'references' field from schema registry"
echo "============================================================"

# Cleanup generated files
rm -f test_message.proto test_message_pb2.py 2>/dev/null || true

Expected behavior

No response

Error message and/or stacktrace

No response

Additional context

# Bug: Protobuf `import` Statements Not Resolved with `kafka_schema_registry_url`

## Summary

When using `kafka_schema_registry_url` with Protobuf format, schemas containing `import` statements (e.g., `import "google/protobuf/timestamp.proto"`) fail to parse with error:

Code: 2504. DB::Exception: No message type in schema


**Redpanda Console can decode the same messages correctly**, proving the bug is in Proton's schema registry integration, not in the message format or schema registry.

## Quick Reproduction

```bash
# 1. Start services
docker compose up -d

# 2. Wait for healthy
docker compose ps

# 3. Install Python dependencies
pip install confluent-kafka protobuf grpcio-tools

# 4. Run reproduction script
./reproduce.sh

What the Script Demonstrates

Component Result
Redpanda Console ✅ Decodes messages correctly
Proton (kafka_schema_registry_url) No message type in schema
Proton (local format_schema) ✅ Works with same import

Root Cause

In src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp, the SchemaRegistryWithCache::fetchSchema() method:

  1. Fetches schema from registry via KafkaSchemaRegistry::fetchSchema()
  2. KafkaSchemaRegistry::fetchSchema() only reads the schema field, ignores references
  3. Uses a bare google::protobuf::DescriptorPool without any source tree
  4. Cannot resolve import statements without the referenced schemas

Compare to local format_schema which uses DiskSourceTree with google_protos_path mapped, allowing imports to resolve correctly.

Workaround

Use local CREATE FORMAT SCHEMA instead of kafka_schema_registry_url:

-- Create local schema (import works!)
CREATE FORMAT SCHEMA my_schema AS
$$
syntax = "proto3";
import "google/protobuf/timestamp.proto";

message MyMessage {
  int64 id = 1;
  google.protobuf.Timestamp created_at = 2;
}
$$
TYPE Protobuf;

-- Use format_schema instead of kafka_schema_registry_url
CREATE EXTERNAL STREAM my_stream (
    id int64,
    created_at tuple(seconds int64, nanos int32)
)
SETTINGS
    type = 'kafka',
    brokers = 'kafka:9092',
    topic = 'my-topic',
    data_format = 'ProtobufSingle',
    format_schema = 'my_schema:MyMessage';  -- Works!

Environment

  • Proton: 3.0.11+
  • Schema Registry: Confluent-compatible (Redpanda, Confluent, etc.)
  • Tested with official Confluent Python Protobuf Serializer

Code References

  • src/Formats/KafkaSchemaRegistry.cpp:83 - Only reads schema field, ignores references
  • src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp:261-283 - Uses bare DescriptorPool
  • src/Formats/ProtobufSchemas.cpp:36-41 - Shows how local schema uses DiskSourceTree

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions