Skip to content

Commit 0975d8f

Browse files
committed
Merge branch 'master' into refactor/remove-mobx-from-quotas-page
2 parents 833334f + b558bcd commit 0975d8f

28 files changed

Lines changed: 2242 additions & 275 deletions

File tree

.github/workflows/repository-dispatch.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ on:
88
- '.github/workflows/repository-dispatch.yml'
99
- '.github/workflows/backend-lint-test.yml'
1010
- '.github/workflows/frontend-verify.yml'
11+
- 'test-images.json'
1112
tags:
1213
- '*'
1314
branches:

backend/pkg/api/connect/service/console/service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func (api *Service) ListMessages(
6161
MaxResults: int(req.Msg.GetMaxResults()),
6262
FilterInterpreterCode: req.Msg.GetFilterInterpreterCode(),
6363
Enterprise: req.Msg.GetEnterprise(),
64+
PageToken: req.Msg.GetPageToken(),
6465
}
6566

6667
interpreterCode, err := lmq.DecodeInterpreterCode()
@@ -96,6 +97,8 @@ func (api *Service) ListMessages(
9697
IgnoreMaxSizeLimit: req.Msg.GetIgnoreMaxSizeLimit(),
9798
KeyDeserializer: fromProtoEncoding(req.Msg.GetKeyDeserializer()),
9899
ValueDeserializer: fromProtoEncoding(req.Msg.GetValueDeserializer()),
100+
PageToken: lmq.PageToken,
101+
PageSize: int(req.Msg.GetPageSize()),
99102
}
100103

101104
timeout := 35 * time.Second

backend/pkg/api/connect/service/console/stream_progress_reporter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (p *streamProgressReporter) OnMessage(message *console.TopicMessage) {
213213
}
214214
}
215215

216-
func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
216+
func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool, nextPageToken string) {
217217
p.writeMutex.Lock()
218218
defer p.writeMutex.Unlock()
219219

@@ -222,6 +222,7 @@ func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
222222
IsCancelled: isCancelled,
223223
MessagesConsumed: p.messagesConsumed.Load(),
224224
BytesConsumed: p.bytesConsumed.Load(),
225+
NextPageToken: nextPageToken,
225226
}
226227

227228
if err := p.stream.Send(

backend/pkg/api/handle_topic_messages_integration_test.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"connectrpc.com/connect"
2929
"github.com/stretchr/testify/assert"
3030
"github.com/stretchr/testify/require"
31+
"github.com/twmb/franz-go/pkg/kadm"
3132
"github.com/twmb/franz-go/pkg/kgo"
3233
"github.com/twmb/franz-go/pkg/sr"
3334
"google.golang.org/genproto/googleapis/rpc/errdetails"
@@ -1050,3 +1051,222 @@ func randomString(n int) string {
10501051
}
10511052
return string(b)
10521053
}
1054+
1055+
func (s *APIIntegrationTestSuite) TestListMessages_Pagination() {
1056+
t := s.T()
1057+
1058+
require := require.New(t)
1059+
assert := assert.New(t)
1060+
1061+
ctx := t.Context()
1062+
1063+
client := v1ac.NewConsoleServiceClient(
1064+
http.DefaultClient,
1065+
s.httpAddress(),
1066+
connect.WithGRPCWeb(),
1067+
)
1068+
1069+
// Create test topic with multiple messages
1070+
testTopicName := testutil.TopicNameForTest("pagination_test")
1071+
// Create topic
1072+
_, err := s.kafkaAdminClient.CreateTopic(ctx, 3, 1, nil, testTopicName)
1073+
require.NoError(err)
1074+
1075+
defer func() {
1076+
s.kafkaAdminClient.DeleteTopics(ctx, testTopicName)
1077+
}()
1078+
1079+
// Wait for topic to be ready
1080+
require.Eventually(func() bool {
1081+
metadata, err := s.kafkaAdminClient.Metadata(ctx, testTopicName)
1082+
if err != nil {
1083+
return false
1084+
}
1085+
topic, exists := metadata.Topics[testTopicName]
1086+
return exists && topic.Err == nil && len(topic.Partitions) == 3
1087+
}, 30*time.Second, 100*time.Millisecond, "Topic should be created and ready")
1088+
1089+
// Produce messages
1090+
const messageCount = 150
1091+
records := make([]*kgo.Record, messageCount)
1092+
for i := range messageCount {
1093+
records[i] = &kgo.Record{
1094+
Key: []byte(fmt.Sprintf("key-%d", i)),
1095+
Value: []byte(fmt.Sprintf(`{"id": %d, "message": "test message %d"}`, i, i)),
1096+
Topic: testTopicName,
1097+
}
1098+
}
1099+
1100+
produceResults := s.kafkaClient.ProduceSync(ctx, records...)
1101+
require.NoError(produceResults.FirstErr())
1102+
1103+
// Wait for all produced messages to be committed
1104+
require.Eventually(func() bool {
1105+
offsets, err := s.kafkaAdminClient.ListEndOffsets(ctx, testTopicName)
1106+
if err != nil {
1107+
return false
1108+
}
1109+
totalMessages := int64(0)
1110+
offsets.Each(func(offset kadm.ListedOffset) {
1111+
totalMessages += offset.Offset
1112+
})
1113+
t.Logf("Total messages across all partitions: %d (expected %d)", totalMessages, messageCount)
1114+
return totalMessages >= int64(messageCount)
1115+
}, 30*time.Second, 100*time.Millisecond, "All produced messages should be committed")
1116+
1117+
t.Run("first page with cursor pagination", func(t *testing.T) {
1118+
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
1119+
Topic: testTopicName,
1120+
PartitionId: -1, // All partitions
1121+
StartOffset: -1, // Recent
1122+
PageSize: 50, // Triggers cursor pagination
1123+
PageToken: "", // First page
1124+
}))
1125+
require.NoError(err)
1126+
require.NotNil(stream)
1127+
1128+
var messages []*v1pb.ListMessagesResponse_DataMessage
1129+
var done *v1pb.ListMessagesResponse_StreamCompletedMessage
1130+
1131+
for stream.Receive() {
1132+
msg := stream.Msg()
1133+
switch v := msg.ControlMessage.(type) {
1134+
case *v1pb.ListMessagesResponse_Data:
1135+
messages = append(messages, v.Data)
1136+
case *v1pb.ListMessagesResponse_Done:
1137+
done = v.Done
1138+
}
1139+
}
1140+
1141+
require.NoError(stream.Err())
1142+
require.NotNil(done, "Should have completion message")
1143+
1144+
// Verify first page results
1145+
assert.Equal(int64(50), done.MessagesConsumed, "Should consume 50 messages (default page size)")
1146+
assert.NotEmpty(done.NextPageToken, "Should have next page token")
1147+
assert.Len(messages, 50, "Should return 50 messages")
1148+
1149+
// Verify descending order within partitions
1150+
// Group messages by partition
1151+
messagesByPartition := make(map[int32][]*v1pb.ListMessagesResponse_DataMessage)
1152+
for _, msg := range messages {
1153+
messagesByPartition[msg.PartitionId] = append(messagesByPartition[msg.PartitionId], msg)
1154+
}
1155+
1156+
// Check each partition is in descending order
1157+
for partitionID, partitionMessages := range messagesByPartition {
1158+
if len(partitionMessages) > 1 {
1159+
for i := 0; i < len(partitionMessages)-1; i++ {
1160+
assert.Greater(partitionMessages[i].Offset, partitionMessages[i+1].Offset,
1161+
"Messages in partition %d should be in descending offset order", partitionID)
1162+
}
1163+
}
1164+
}
1165+
})
1166+
1167+
t.Run("paginate through all messages", func(t *testing.T) {
1168+
var allMessages []*v1pb.ListMessagesResponse_DataMessage
1169+
pageToken := ""
1170+
pageCount := 0
1171+
maxPages := 5 // Safety limit
1172+
1173+
for pageCount < maxPages {
1174+
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
1175+
Topic: testTopicName,
1176+
PartitionId: -1,
1177+
StartOffset: -1,
1178+
PageSize: 50,
1179+
PageToken: pageToken,
1180+
}))
1181+
require.NoError(err)
1182+
require.NotNil(stream)
1183+
1184+
var pageMessages []*v1pb.ListMessagesResponse_DataMessage
1185+
var done *v1pb.ListMessagesResponse_StreamCompletedMessage
1186+
1187+
for stream.Receive() {
1188+
msg := stream.Msg()
1189+
switch v := msg.ControlMessage.(type) {
1190+
case *v1pb.ListMessagesResponse_Data:
1191+
pageMessages = append(pageMessages, v.Data)
1192+
case *v1pb.ListMessagesResponse_Done:
1193+
done = v.Done
1194+
}
1195+
}
1196+
1197+
require.NoError(stream.Err())
1198+
require.NotNil(done)
1199+
1200+
allMessages = append(allMessages, pageMessages...)
1201+
pageCount++
1202+
1203+
t.Logf("Page %d: fetched %d messages, nextPageToken=%s", pageCount, len(pageMessages), done.NextPageToken)
1204+
1205+
if done.NextPageToken == "" {
1206+
break
1207+
}
1208+
1209+
pageToken = done.NextPageToken
1210+
}
1211+
1212+
// Verify we got all messages
1213+
assert.Equal(messageCount, len(allMessages), "Should fetch all %d messages across pages", messageCount)
1214+
assert.LessOrEqual(pageCount, 4, "Should complete in 3-4 pages (150 messages / 50 per page)")
1215+
})
1216+
1217+
t.Run("error when filter with pagination", func(t *testing.T) {
1218+
filterCode := base64.StdEncoding.EncodeToString([]byte("return true"))
1219+
1220+
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
1221+
Topic: testTopicName,
1222+
PartitionId: -1,
1223+
StartOffset: -1,
1224+
PageSize: 50,
1225+
PageToken: "",
1226+
FilterInterpreterCode: filterCode,
1227+
}))
1228+
require.NoError(err)
1229+
require.NotNil(stream)
1230+
1231+
// Stream should error
1232+
for stream.Receive() {
1233+
// Should not receive any messages
1234+
}
1235+
1236+
err = stream.Err()
1237+
require.Error(err, "Should error when using filter with pagination")
1238+
assert.Contains(err.Error(), "cannot use filters with pagination")
1239+
})
1240+
1241+
t.Run("standard pagination still works", func(t *testing.T) {
1242+
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
1243+
Topic: testTopicName,
1244+
PartitionId: -1,
1245+
StartOffset: -2, // Oldest
1246+
MaxResults: 50, // Standard pagination
1247+
PageToken: "",
1248+
}))
1249+
require.NoError(err)
1250+
require.NotNil(stream)
1251+
1252+
var messages []*v1pb.ListMessagesResponse_DataMessage
1253+
var done *v1pb.ListMessagesResponse_StreamCompletedMessage
1254+
1255+
for stream.Receive() {
1256+
msg := stream.Msg()
1257+
switch v := msg.ControlMessage.(type) {
1258+
case *v1pb.ListMessagesResponse_Data:
1259+
messages = append(messages, v.Data)
1260+
case *v1pb.ListMessagesResponse_Done:
1261+
done = v.Done
1262+
}
1263+
}
1264+
1265+
require.NoError(stream.Err())
1266+
require.NotNil(done)
1267+
1268+
// In standard pagination, should not have cursor pagination tokens
1269+
assert.Empty(done.NextPageToken, "Standard pagination should not return page token")
1270+
assert.LessOrEqual(len(messages), 50, "Should respect maxResults limit")
1271+
})
1272+
}

backend/pkg/api/httptypes/types.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@ import (
2121
// used in Console Enterprise to implement the hooks.
2222
type ListMessagesRequest struct {
2323
TopicName string `json:"topicName"`
24-
StartOffset int64 `json:"startOffset"` // -1 for recent (newest - results), -2 for oldest offset, -3 for newest, -4 for timestamp
25-
StartTimestamp int64 `json:"startTimestamp"` // Start offset by unix timestamp in ms (only considered if start offset is set to -4)
26-
PartitionID int32 `json:"partitionId"` // -1 for all partition ids
27-
MaxResults int `json:"maxResults"`
24+
StartOffset int64 `json:"startOffset"` // -1 for recent (newest - results), -2 for oldest offset, -3 for newest, -4 for timestamp
25+
StartTimestamp int64 `json:"startTimestamp"` // Start offset by unix timestamp in ms (only considered if start offset is set to -4)
26+
PartitionID int32 `json:"partitionId"` // -1 for all partition ids
27+
MaxResults int `json:"maxResults"` // Maximum number of messages to fetch (1-10000).
2828
FilterInterpreterCode string `json:"filterInterpreterCode"` // Base64 encoded code
2929

30+
// Pagination fields (used when PageSize > 0)
31+
PageToken string `json:"pageToken,omitempty"`
32+
PageSize int `json:"pageSize,omitempty"` // Page size for pagination (1-10000)
33+
3034
// Enterprise may only be set in the Enterprise mode. The JSON deserialization is deferred
3135
// to the enterprise backend.
3236
Enterprise json.RawMessage `json:"enterprise,omitempty"`
@@ -50,6 +54,15 @@ func (l *ListMessagesRequest) OK() error {
5054
return errors.New("max results must be between 1 and 500")
5155
}
5256

57+
// Pagination mode: when PageSize > 0, filters are not supported
58+
if l.PageSize > 0 {
59+
if l.FilterInterpreterCode != "" {
60+
return errors.New("cannot use filters with pagination")
61+
}
62+
// PageToken validation is done in the console package decodePageToken
63+
return nil
64+
}
65+
5366
if _, err := l.DecodeInterpreterCode(); err != nil {
5467
return fmt.Errorf("failed to decode interpreter code %w", err)
5568
}

backend/pkg/connector/guide/iceberg_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func NewIcebergSinkGuide(opts ...Option) Guide {
2222
DefaultGuide: DefaultGuide{
2323
options: o,
2424
},
25-
className: "io.tabular.iceberg.connect.IcebergSinkConnector",
25+
className: "org.apache.iceberg.connect.IcebergSinkConnector",
2626
wizardSteps: []model.ValidationResponseStep{
2727
{
2828
Name: "Topics to export",

backend/pkg/connector/patch/iceberg_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewConfigPatchIcebergSink() *ConfigPatchIcebergSink {
3434
Exclude: nil,
3535
},
3636
ConnectorClassSelector: IncludeExcludeSelector{
37-
Include: regexp.MustCompile(`io.tabular.iceberg.connect.IcebergSinkConnector`),
37+
Include: regexp.MustCompile(`org.apache.iceberg.connect.IcebergSinkConnector`),
3838
Exclude: nil,
3939
},
4040
}

0 commit comments

Comments
 (0)