Skip to content

Commit a535a27

Browse files
committed
sr: Introduce transport interface
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent c68b5fe commit a535a27

3 files changed

Lines changed: 89 additions & 0 deletions

File tree

src/v/pandaproxy/schema_registry/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,19 @@ redpanda_cc_library(
6666
],
6767
)
6868

69+
redpanda_cc_library(
70+
name = "transport",
71+
hdrs = [
72+
"transport.h",
73+
],
74+
visibility = ["//visibility:public"],
75+
deps = [
76+
"//src/v/cluster",
77+
"//src/v/model",
78+
"@seastar",
79+
],
80+
)
81+
6982
redpanda_cc_library(
7083
name = "core",
7184
srcs = [

src/v/pandaproxy/schema_registry/fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ class seq_writer;
2222
class service;
2323
class sharded_store;
2424
class store;
25+
class transport;
2526

2627
} // namespace pandaproxy::schema_registry
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
#pragma once
13+
14+
#include "cluster/errc.h"
15+
#include "cluster/types.h"
16+
#include "model/fundamental.h"
17+
#include "model/record.h"
18+
19+
#include <seastar/core/coroutine.hh>
20+
#include <seastar/core/future.hh>
21+
#include <seastar/util/noncopyable_function.hh>
22+
23+
#include <optional>
24+
25+
namespace pandaproxy::schema_registry {
26+
27+
/// Result of a produce operation — includes offset for collision detection.
28+
struct produce_result {
29+
model::offset base_offset;
30+
};
31+
32+
/// Abstract transport for schema registry's internal topic I/O.
33+
///
34+
/// Implementations exist for kafka::client (legacy) and kafka::data::rpc
35+
/// (RPC-based, no auth overhead).
36+
class transport {
37+
public:
38+
transport() = default;
39+
virtual ~transport() = default;
40+
transport(const transport&) = delete;
41+
transport& operator=(const transport&) = delete;
42+
transport(transport&&) = delete;
43+
transport& operator=(transport&&) = delete;
44+
45+
virtual ss::future<> stop() = 0;
46+
47+
/// Produce a batch to the _schemas topic. Returns the base_offset.
48+
virtual ss::future<produce_result> produce(model::record_batch batch) = 0;
49+
50+
/// Get the high watermark (next offset) for the _schemas topic.
51+
virtual ss::future<model::offset> get_high_watermark() = 0;
52+
53+
/// Consume batches from [start, end) on the _schemas topic.
54+
/// Calls consumer(batch) for each batch. Handles pagination internally.
55+
virtual ss::future<> consume_range(
56+
model::offset start,
57+
model::offset end,
58+
ss::noncopyable_function<ss::future<>(model::record_batch)> consumer) = 0;
59+
60+
/// One-time startup configuration (credentials, ACLs).
61+
/// Default: no-op (RPC transport needs no configuration).
62+
virtual ss::future<> configure() { co_return; }
63+
64+
/// Create the internal schema registry topic.
65+
virtual ss::future<cluster::errc> create_topic(
66+
model::topic_namespace_view,
67+
int32_t partition_count,
68+
cluster::topic_properties,
69+
std::optional<int16_t> replication_factor = std::nullopt) = 0;
70+
71+
/// Whether the transport uses ephemeral credentials.
72+
virtual bool has_ephemeral_credentials() const = 0;
73+
};
74+
75+
} // namespace pandaproxy::schema_registry

0 commit comments

Comments
 (0)