Company or project name
No response
Describe what's wrong
the Protobuf schema-registry path ignores the references field, and
there’s no importer to resolve import statements.
Exact code path:
- src/Formats/KafkaSchemaRegistry.cpp:44 and src/Formats/
KafkaSchemaRegistry.cpp:103 only parse schema (and id) from the registry
JSON; references is never read.
- src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp:261 uses
registry.fetchSchema(id) and then parses the raw schema string into a
FileDescriptorProto with DescriptorPool (no importer/source tree), so import
"google/protobuf/timestamp.proto" cannot be resolved.
- Local format_schema works because src/Formats/ProtobufSchemas.cpp:33 uses
ImporterWithSourceTree and maps google_protos_path, so imports resolve
there.
So yes, the references array from schema registry is effectively ignored today
in Proton’s protobuf SR path.
Does it reproduce on the most recent release?
Yes
How to reproduce
services:
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v24.2.4
container_name: redpanda
command:
- redpanda start
- --smp 1
- --overprovisioned
- --node-id 0
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
- --schema-registry-addr 0.0.0.0:8081
ports:
- "9092:9092"
- "8081:8081"
- "8082:8082"
healthcheck:
test: ["CMD", "rpk", "cluster", "health"]
interval: 5s
timeout: 3s
retries: 10
redpanda-console:
image: docker.redpanda.com/redpandadata/console:v2.7.2
container_name: redpanda-console
ports:
- "8080:8080"
environment:
KAFKA_BROKERS: "redpanda:29092"
KAFKA_SCHEMAREGISTRY_ENABLED: "true"
KAFKA_SCHEMAREGISTRY_URLS: "http://redpanda:8081"
KAFKA_PROTOBUF_ENABLED: "true"
KAFKA_PROTOBUF_SCHEMAREGISTRY_ENABLED: "true"
KAFKA_PROTOBUF_SCHEMAREGISTRY_REFRESHINTERVAL: "5s"
depends_on:
redpanda:
condition: service_healthy
proton:
image: ghcr.io/timeplus-io/proton:3.0.11
container_name: proton
ports:
- "8123:8123"
- "8463:8463"
depends_on:
redpanda:
condition: service_healthy
#!/bin/bash
#
# Bug: Protobuf import not resolved with kafka_schema_registry_url
#
# Prerequisites:
# pip install confluent-kafka protobuf grpcio-tools
#
set -e
cd "$(dirname "$0")"
echo "============================================================"
echo "Bug: Protobuf import not resolved with kafka_schema_registry_url"
echo "============================================================"
# Check dependencies
echo ""
echo "Checking Python dependencies..."
python3 -c "import confluent_kafka" 2>/dev/null || { echo "ERROR: Run 'pip install confluent-kafka'"; exit 1; }
python3 -c "import google.protobuf" 2>/dev/null || { echo "ERROR: Run 'pip install protobuf'"; exit 1; }
python3 -c "import grpc_tools.protoc" 2>/dev/null || { echo "ERROR: Run 'pip install grpcio-tools'"; exit 1; }
echo " All dependencies OK"
# Step 1: Generate protobuf Python class
echo ""
echo "Step 1: Generate protobuf Python class..."
cat > test_message.proto << 'EOF'
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message TestMessage {
int64 id = 1;
google.protobuf.Timestamp created_at = 2;
}
EOF
PROTO_PATH=$(python3 -c "import google.protobuf; import os; print(os.path.dirname(os.path.dirname(google.protobuf.__file__)))")
python3 -m grpc_tools.protoc --python_out=. -I. -I"$PROTO_PATH" test_message.proto
echo " Generated test_message_pb2.py"
# Step 2: Create topic
echo ""
echo "Step 2: Create Kafka topic..."
docker exec redpanda rpk topic delete test-import-bug 2>/dev/null || true
docker exec redpanda rpk topic create test-import-bug -p 1 2>/dev/null || true
# Step 3: Produce messages using Confluent serializer
echo ""
echo "Step 3: Produce messages with Confluent Protobuf Serializer..."
python3 << 'PYEOF'
import sys
sys.path.insert(0, '.')
import test_message_pb2
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})
serializer = ProtobufSerializer(test_message_pb2.TestMessage, schema_registry, {"use.deprecated.format": False})
producer = Producer({"bootstrap.servers": "localhost:9092"})
for i in range(3):
msg = test_message_pb2.TestMessage()
msg.id = 100 + i
msg.created_at.seconds = 1735100000 + i
msg.created_at.nanos = 123456789
serialized = serializer(msg, SerializationContext("test-import-bug", MessageField.VALUE))
producer.produce("test-import-bug", value=serialized)
print(f" Produced: id={msg.id}, created_at=({msg.created_at.seconds}, {msg.created_at.nanos})")
producer.flush()
print(" Done!")
PYEOF
# Step 4: Show schema registry state
echo ""
echo "Step 4: Schema Registry state..."
echo " Subjects:"
curl -s http://localhost:8081/subjects | python3 -c "import sys,json; [print(f' - {s}') for s in json.load(sys.stdin)]"
SCHEMA_ID=$(curl -s http://localhost:8081/subjects/test-import-bug-value/versions/latest | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
echo " Schema ID: $SCHEMA_ID"
echo " References:"
curl -s http://localhost:8081/schemas/ids/$SCHEMA_ID | python3 -c "import sys,json; d=json.load(sys.stdin); [print(f' - {r}') for r in d.get('references',[])]"
# Step 5: Test with Proton - Schema Registry (should FAIL)
echo ""
echo "Step 5: Test Proton with kafka_schema_registry_url (EXPECTED TO FAIL)..."
docker exec proton proton-client --query "DROP STREAM IF EXISTS test_sr"
docker exec proton proton-client --query "
CREATE EXTERNAL STREAM test_sr (
id int64,
created_at tuple(seconds int64, nanos int32)
)
SETTINGS
type = 'kafka',
brokers = 'redpanda:29092',
topic = 'test-import-bug',
data_format = 'ProtobufSingle',
kafka_schema_registry_url = 'http://redpanda:8081'
"
echo ""
echo " Query result:"
timeout 5 docker exec proton proton-client --query \
"SELECT * FROM test_sr LIMIT 1 SETTINGS seek_to='earliest'" 2>&1 | sed 's/^/ /' || true
# Step 6: Test with Proton - Local format_schema (should WORK)
echo ""
echo "Step 6: Test Proton with local format_schema (EXPECTED TO WORK)..."
docker exec proton proton-client --query "DROP STREAM IF EXISTS test_local"
docker exec proton proton-client --query "DROP FORMAT SCHEMA IF EXISTS test_schema TYPE Protobuf"
docker exec proton proton-client --query "
CREATE FORMAT SCHEMA test_schema AS
\$\$
syntax = \"proto3\";
import \"google/protobuf/timestamp.proto\";
message TestMessage {
int64 id = 1;
google.protobuf.Timestamp created_at = 2;
}
\$\$
TYPE Protobuf
"
docker exec proton proton-client --query "
CREATE EXTERNAL STREAM test_local (
id int64,
created_at tuple(seconds int64, nanos int32)
)
SETTINGS
type = 'kafka',
brokers = 'redpanda:29092',
topic = 'test-import-bug',
data_format = 'ProtobufSingle',
format_schema = 'test_schema:TestMessage'
"
echo ""
echo " Query result:"
timeout 5 docker exec proton proton-client --query \
"SELECT * FROM test_local LIMIT 3 SETTINGS seek_to='earliest'" 2>&1 | sed 's/^/ /' || true
# Summary
echo ""
echo "============================================================"
echo "SUMMARY"
echo "============================================================"
echo ""
echo "✅ Redpanda Console: Check http://localhost:8080 -> Topics -> test-import-bug"
echo " The messages should decode correctly showing id and created_at fields."
echo ""
echo "❌ Proton (kafka_schema_registry_url): FAILS with 'No message type in schema'"
echo " Proton cannot resolve 'import google/protobuf/timestamp.proto'"
echo ""
echo "✅ Proton (local format_schema): WORKS with same import statement"
echo " The import resolves correctly using built-in google protos"
echo ""
echo "ROOT CAUSE: KafkaSchemaRegistry.cpp ignores 'references' field from schema registry"
echo "============================================================"
# Cleanup generated files
rm -f test_message.proto test_message_pb2.py 2>/dev/null || true
Expected behavior
No response
Error message and/or stacktrace
No response
Additional context
# Bug: Protobuf `import` Statements Not Resolved with `kafka_schema_registry_url`
## Summary
When using `kafka_schema_registry_url` with Protobuf format, schemas containing `import` statements (e.g., `import "google/protobuf/timestamp.proto"`) fail to parse with error:
Code: 2504. DB::Exception: No message type in schema
**Redpanda Console can decode the same messages correctly**, proving the bug is in Proton's schema registry integration, not in the message format or schema registry.
## Quick Reproduction
```bash
# 1. Start services
docker compose up -d
# 2. Wait for healthy
docker compose ps
# 3. Install Python dependencies
pip install confluent-kafka protobuf grpcio-tools
# 4. Run reproduction script
./reproduce.sh
What the Script Demonstrates
| Component |
Result |
| Redpanda Console |
✅ Decodes messages correctly |
| Proton (kafka_schema_registry_url) |
❌ No message type in schema |
| Proton (local format_schema) |
✅ Works with same import |
Root Cause
In src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp, the SchemaRegistryWithCache::fetchSchema() method:
- Fetches schema from registry via
KafkaSchemaRegistry::fetchSchema()
KafkaSchemaRegistry::fetchSchema() only reads the schema field, ignores references
- Uses a bare
google::protobuf::DescriptorPool without any source tree
- Cannot resolve
import statements without the referenced schemas
Compare to local format_schema which uses DiskSourceTree with google_protos_path mapped, allowing imports to resolve correctly.
Workaround
Use local CREATE FORMAT SCHEMA instead of kafka_schema_registry_url:
-- Create local schema (import works!)
CREATE FORMAT SCHEMA my_schema AS
$$
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message MyMessage {
int64 id = 1;
google.protobuf.Timestamp created_at = 2;
}
$$
TYPE Protobuf;
-- Use format_schema instead of kafka_schema_registry_url
CREATE EXTERNAL STREAM my_stream (
id int64,
created_at tuple(seconds int64, nanos int32)
)
SETTINGS
type = 'kafka',
brokers = 'kafka:9092',
topic = 'my-topic',
data_format = 'ProtobufSingle',
format_schema = 'my_schema:MyMessage'; -- Works!
Environment
- Proton: 3.0.11+
- Schema Registry: Confluent-compatible (Redpanda, Confluent, etc.)
- Tested with official Confluent Python Protobuf Serializer
Code References
src/Formats/KafkaSchemaRegistry.cpp:83 - Only reads schema field, ignores references
src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp:261-283 - Uses bare DescriptorPool
src/Formats/ProtobufSchemas.cpp:36-41 - Shows how local schema uses DiskSourceTree
Company or project name
No response
Describe what's wrong
the Protobuf schema-registry path ignores the references field, and
there’s no importer to resolve import statements.
Exact code path:
KafkaSchemaRegistry.cpp:103 only parse schema (and id) from the registry
JSON; references is never read.
registry.fetchSchema(id) and then parses the raw schema string into a
FileDescriptorProto with DescriptorPool (no importer/source tree), so import
"google/protobuf/timestamp.proto" cannot be resolved.
ImporterWithSourceTree and maps google_protos_path, so imports resolve
there.
So yes, the references array from schema registry is effectively ignored today
in Proton’s protobuf SR path.
Does it reproduce on the most recent release?
Yes
How to reproduce
Expected behavior
No response
Error message and/or stacktrace
No response
Additional context
Code: 2504. DB::Exception: No message type in schema
What the Script Demonstrates
No message type in schemaRoot Cause
In
src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp, theSchemaRegistryWithCache::fetchSchema()method:KafkaSchemaRegistry::fetchSchema()KafkaSchemaRegistry::fetchSchema()only reads theschemafield, ignoresreferencesgoogle::protobuf::DescriptorPoolwithout any source treeimportstatements without the referenced schemasCompare to local
format_schemawhich usesDiskSourceTreewithgoogle_protos_pathmapped, allowing imports to resolve correctly.Workaround
Use local
CREATE FORMAT SCHEMAinstead ofkafka_schema_registry_url:Environment
Code References
src/Formats/KafkaSchemaRegistry.cpp:83- Only readsschemafield, ignoresreferencessrc/Processors/Formats/Impl/ProtobufRowInputFormat.cpp:261-283- Uses bareDescriptorPoolsrc/Formats/ProtobufSchemas.cpp:36-41- Shows how local schema usesDiskSourceTree