Skip to content

Commit cac3980

Browse files
authored
[rest] Introduce consumers api definition in rest (#7339)
1 parent da3ffd0 commit cac3980

8 files changed

Lines changed: 273 additions & 0 deletions

File tree

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.consumer;
20+
21+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
22+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
23+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
24+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
25+
26+
/** Entry representing a consumer with id and nextSnapshot. */
27+
@JsonIgnoreProperties(ignoreUnknown = true)
28+
public class ConsumerInfo {
29+
private static final String FIELD_CONSUMER_ID = "consumerId";
30+
private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";
31+
32+
@JsonProperty(FIELD_CONSUMER_ID)
33+
private final String consumerId;
34+
35+
@JsonProperty(FIELD_NEXT_SNAPSHOT)
36+
private final Long nextSnapshot;
37+
38+
@JsonCreator
39+
public ConsumerInfo(
40+
@JsonProperty(FIELD_CONSUMER_ID) String consumerId,
41+
@JsonProperty(FIELD_NEXT_SNAPSHOT) Long nextSnapshot) {
42+
this.consumerId = consumerId;
43+
this.nextSnapshot = nextSnapshot;
44+
}
45+
46+
@JsonGetter(FIELD_CONSUMER_ID)
47+
public String getConsumerId() {
48+
return consumerId;
49+
}
50+
51+
@JsonGetter(FIELD_NEXT_SNAPSHOT)
52+
public Long getNextSnapshot() {
53+
return nextSnapshot;
54+
}
55+
}

paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.annotation.Public;
2424
import org.apache.paimon.annotation.VisibleForTesting;
2525
import org.apache.paimon.catalog.Identifier;
26+
import org.apache.paimon.consumer.ConsumerInfo;
2627
import org.apache.paimon.function.FunctionChange;
2728
import org.apache.paimon.options.Options;
2829
import org.apache.paimon.partition.Partition;
@@ -64,6 +65,7 @@
6465
import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
6566
import org.apache.paimon.rest.responses.GetViewResponse;
6667
import org.apache.paimon.rest.responses.ListBranchesResponse;
68+
import org.apache.paimon.rest.responses.ListConsumersResponse;
6769
import org.apache.paimon.rest.responses.ListDatabasesResponse;
6870
import org.apache.paimon.rest.responses.ListFunctionDetailsResponse;
6971
import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse;
@@ -594,6 +596,37 @@ public PagedList<Snapshot> listSnapshotsPaged(
594596
return new PagedList<>(snapshots, response.getNextPageToken());
595597
}
596598

599+
/**
600+
* Get paged consumers list of the table.
601+
*
602+
* @param identifier path of the table to list consumers
603+
* @param maxResults Optional parameter indicating the maximum number of results to include in
604+
* the result. If maxResults is not specified or set to 0, will return the default number of
605+
* max results.
606+
* @param pageToken Optional parameter indicating the next page token allows list to be start
607+
* from a specific point.
608+
* @return a list of the consumers with provided page size(@param maxResults) in this table and
609+
* next page token.
610+
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists
611+
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
612+
* this table
613+
*/
614+
public PagedList<ConsumerInfo> listConsumersPaged(
615+
Identifier identifier, @Nullable Integer maxResults, @Nullable String pageToken) {
616+
ListConsumersResponse response =
617+
client.get(
618+
resourcePaths.consumers(
619+
identifier.getDatabaseName(), identifier.getObjectName()),
620+
buildPagedQueryParams(maxResults, pageToken),
621+
ListConsumersResponse.class,
622+
restAuthFunction);
623+
List<ConsumerInfo> consumers = response.getConsumers();
624+
if (consumers == null) {
625+
return new PagedList<>(emptyList(), null);
626+
}
627+
return new PagedList<>(consumers, response.getNextPageToken());
628+
}
629+
597630
/**
598631
* Commit snapshot for table.
599632
*

paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ResourcePaths {
3434
protected static final String BRANCHES = "branches";
3535
protected static final String TAGS = "tags";
3636
protected static final String SNAPSHOTS = "snapshots";
37+
protected static final String CONSUMERS = "consumers";
3738
protected static final String VIEWS = "views";
3839
protected static final String TABLE_DETAILS = "table-details";
3940
protected static final String VIEW_DETAILS = "view-details";
@@ -261,6 +262,17 @@ public String tags(String databaseName, String objectName) {
261262
TAGS);
262263
}
263264

265+
public String consumers(String databaseName, String objectName) {
266+
return SLASH.join(
267+
V1,
268+
prefix,
269+
DATABASES,
270+
encodeString(databaseName),
271+
TABLES,
272+
encodeString(objectName),
273+
CONSUMERS);
274+
}
275+
264276
public String tag(String databaseName, String objectName, String tagName) {
265277
return SLASH.join(
266278
V1,
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.rest.responses;
20+
21+
import org.apache.paimon.consumer.ConsumerInfo;
22+
23+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
24+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
25+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
26+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
27+
28+
import java.util.List;
29+
30+
/** Response for list consumers. */
31+
@JsonIgnoreProperties(ignoreUnknown = true)
32+
public class ListConsumersResponse implements PagedResponse<ConsumerInfo> {
33+
34+
private static final String FIELD_CONSUMERS = "consumers";
35+
private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
36+
37+
@JsonProperty(FIELD_CONSUMERS)
38+
private final List<ConsumerInfo> consumers;
39+
40+
@JsonProperty(FIELD_NEXT_PAGE_TOKEN)
41+
private final String nextPageToken;
42+
43+
public ListConsumersResponse(@JsonProperty(FIELD_CONSUMERS) List<ConsumerInfo> consumers) {
44+
this(consumers, null);
45+
}
46+
47+
@JsonCreator
48+
public ListConsumersResponse(
49+
@JsonProperty(FIELD_CONSUMERS) List<ConsumerInfo> consumers,
50+
@JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) {
51+
this.consumers = consumers;
52+
this.nextPageToken = nextPageToken;
53+
}
54+
55+
@JsonGetter(FIELD_CONSUMERS)
56+
public List<ConsumerInfo> getConsumers() {
57+
return this.consumers;
58+
}
59+
60+
@Override
61+
public List<ConsumerInfo> data() {
62+
return this.consumers;
63+
}
64+
65+
@JsonGetter(FIELD_NEXT_PAGE_TOKEN)
66+
public String getNextPageToken() {
67+
return this.nextPageToken;
68+
}
69+
}

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.Snapshot;
2323
import org.apache.paimon.annotation.Public;
2424
import org.apache.paimon.annotation.VisibleForTesting;
25+
import org.apache.paimon.consumer.ConsumerInfo;
2526
import org.apache.paimon.function.Function;
2627
import org.apache.paimon.function.FunctionChange;
2728
import org.apache.paimon.partition.Partition;
@@ -742,6 +743,28 @@ PagedList<Snapshot> listSnapshotsPaged(
742743
Identifier identifier, @Nullable Integer maxResults, @Nullable String pageToken)
743744
throws TableNotExistException;
744745

746+
/**
747+
* Get paged consumers list of the table.
748+
*
749+
* @param identifier path of the table to list consumers
750+
* @param maxResults Optional parameter indicating the maximum number of results to include in
751+
* the result. If maxResults is not specified or set to 0, will return the default number of
752+
* max results.
753+
* @param pageToken Optional parameter indicating the next page token allows list to be start
754+
* from a specific point.
755+
* @return a list of the consumers with provided page size(@param maxResults) in this table and
756+
* next page token. Each consumer is represented as a Map.Entry where the key is the
757+
* consumer id and the value is the next snapshot id.
758+
* @throws TableNotExistException if the table does not exist
759+
* @throws UnsupportedOperationException if the catalog does not {@link
760+
* #supportsVersionManagement()}
761+
*/
762+
default PagedList<ConsumerInfo> listConsumersPaged(
763+
Identifier identifier, @Nullable Integer maxResults, @Nullable String pageToken)
764+
throws TableNotExistException {
765+
throw new UnsupportedOperationException("This catalog does not support list consumers");
766+
}
767+
745768
/**
746769
* rollback table by the given {@link Identifier} and instant.
747770
*

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.paimon.catalog.PropertyChange;
3232
import org.apache.paimon.catalog.TableMetadata;
3333
import org.apache.paimon.catalog.TableQueryAuthResult;
34+
import org.apache.paimon.consumer.ConsumerInfo;
3435
import org.apache.paimon.fs.FileIO;
3536
import org.apache.paimon.fs.Path;
3637
import org.apache.paimon.fs.ResolvingFileIO;
@@ -358,6 +359,19 @@ public PagedList<Snapshot> listSnapshotsPaged(
358359
}
359360
}
360361

362+
@Override
363+
public PagedList<ConsumerInfo> listConsumersPaged(
364+
Identifier identifier, @Nullable Integer maxResults, @Nullable String pageToken)
365+
throws TableNotExistException {
366+
try {
367+
return api.listConsumersPaged(identifier, maxResults, pageToken);
368+
} catch (NoSuchResourceException e) {
369+
throw new TableNotExistException(identifier);
370+
} catch (ForbiddenException e) {
371+
throw new TableNoPermissionException(identifier, e);
372+
}
373+
}
374+
361375
@Override
362376
public boolean supportsListObjectsPaged() {
363377
return true;

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.paimon.catalog.PropertyChange;
2929
import org.apache.paimon.catalog.RenamingSnapshotCommit;
3030
import org.apache.paimon.catalog.TableMetadata;
31+
import org.apache.paimon.consumer.ConsumerInfo;
32+
import org.apache.paimon.consumer.ConsumerManager;
3133
import org.apache.paimon.fs.FileIO;
3234
import org.apache.paimon.fs.Path;
3335
import org.apache.paimon.fs.local.LocalFileIO;
@@ -75,6 +77,7 @@
7577
import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
7678
import org.apache.paimon.rest.responses.GetViewResponse;
7779
import org.apache.paimon.rest.responses.ListBranchesResponse;
80+
import org.apache.paimon.rest.responses.ListConsumersResponse;
7881
import org.apache.paimon.rest.responses.ListDatabasesResponse;
7982
import org.apache.paimon.rest.responses.ListFunctionDetailsResponse;
8083
import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse;
@@ -388,6 +391,10 @@ && isTableByIdRequest(request.getPath())) {
388391
resources.length == 4
389392
&& ResourcePaths.TABLES.equals(resources[1])
390393
&& ResourcePaths.SNAPSHOTS.equals(resources[3]);
394+
boolean isListConsumers =
395+
resources.length == 4
396+
&& ResourcePaths.TABLES.equals(resources[1])
397+
&& ResourcePaths.CONSUMERS.equals(resources[3]);
391398
boolean isLoadSnapshot =
392399
resources.length == 5
393400
&& ResourcePaths.TABLES.equals(resources[1])
@@ -494,6 +501,8 @@ && isTableByIdRequest(request.getPath())) {
494501
return snapshotHandle(identifier);
495502
} else if (isListSnapshots) {
496503
return listSnapshots(identifier);
504+
} else if (isListConsumers) {
505+
return listConsumers(identifier);
497506
} else if (isLoadSnapshot) {
498507
return loadSnapshot(identifier, resources[4]);
499508
} else if (isTableAuth) {
@@ -785,6 +794,19 @@ private MockResponse listSnapshots(Identifier identifier) throws Exception {
785794
return new MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
786795
}
787796

797+
private MockResponse listConsumers(Identifier identifier) throws Exception {
798+
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
799+
ConsumerManager consumerManager =
800+
new ConsumerManager(table.fileIO(), table.location(), "main");
801+
Map<String, Long> consumers = consumerManager.consumers();
802+
List<ConsumerInfo> consumerEntries =
803+
consumers.entrySet().stream()
804+
.map(e -> new ConsumerInfo(e.getKey(), e.getValue()))
805+
.collect(Collectors.toList());
806+
ListConsumersResponse response = new ListConsumersResponse(consumerEntries, null);
807+
return new MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
808+
}
809+
788810
private MockResponse loadSnapshot(Identifier identifier, String version) throws Exception {
789811

790812
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.paimon.catalog.CatalogTestBase;
2929
import org.apache.paimon.catalog.Identifier;
3030
import org.apache.paimon.catalog.PropertyChange;
31+
import org.apache.paimon.consumer.ConsumerInfo;
32+
import org.apache.paimon.consumer.ConsumerManager;
3133
import org.apache.paimon.data.BinaryString;
3234
import org.apache.paimon.data.GenericRow;
3335
import org.apache.paimon.data.InternalRow;
@@ -2585,6 +2587,49 @@ void testSnapshotMethods() throws Exception {
25852587
table.snapshot(14));
25862588
}
25872589

2590+
@Test
2591+
void testListConsumers() throws Exception {
2592+
Identifier identifier = Identifier.create("test_table_db", "consumers_table");
2593+
catalog.createDatabase(identifier.getDatabaseName(), true);
2594+
catalog.createTable(
2595+
identifier,
2596+
new Schema(
2597+
Lists.newArrayList(new DataField(0, "col", DataTypes.INT())),
2598+
Collections.emptyList(),
2599+
Collections.emptyList(),
2600+
emptyMap(),
2601+
""),
2602+
true);
2603+
FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
2604+
2605+
// Create some snapshots
2606+
batchWrite(fileStoreTable, singletonList(1));
2607+
batchWrite(fileStoreTable, singletonList(1));
2608+
batchWrite(fileStoreTable, singletonList(1));
2609+
2610+
// Create consumers
2611+
ConsumerManager consumerManager =
2612+
new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location());
2613+
consumerManager.resetConsumer("consumer1", new org.apache.paimon.consumer.Consumer(1));
2614+
consumerManager.resetConsumer("consumer2", new org.apache.paimon.consumer.Consumer(2));
2615+
2616+
// Test listConsumersPaged
2617+
assertThat(catalog.listConsumersPaged(identifier, null, null).getElements().size())
2618+
.isEqualTo(2);
2619+
2620+
// Test with RESTApi directly
2621+
RESTApi api = ((RESTCatalog) catalog).api();
2622+
List<ConsumerInfo> consumers =
2623+
PagedList.listAllFromPagedApi(
2624+
token -> api.listConsumersPaged(identifier, null, token));
2625+
assertThat(consumers)
2626+
.extracting(ConsumerInfo::getConsumerId)
2627+
.containsExactlyInAnyOrder("consumer1", "consumer2");
2628+
assertThat(consumers)
2629+
.extracting(ConsumerInfo::getNextSnapshot)
2630+
.containsExactlyInAnyOrder(1L, 2L);
2631+
}
2632+
25882633
@Test
25892634
public void testObjectTable() throws Exception {
25902635
// create object table

0 commit comments

Comments
 (0)