Skip to content

Commit 2766336

Browse files
xiangfu0claude
andauthored
Decouple Flink connector from pinot-controller (#18054)
- Replace ControllerRequestClient with PinotAdminClient in Flink connector - Add typed TableConfig/Schema fetch APIs to pinot-java-client - Merge batch config overrides instead of appending (SegmentUploaderDefault requires exactly 1) - Pass tableType query param in getTableConfig for efficient filtering - Preserve controller path prefix in FlinkQuickStart for proxied deployments - Use SLF4J parameterized logging in PinotAdminClientExample - Update README with new PinotAdminClient API usage Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3587076 commit 2766336

10 files changed

Lines changed: 377 additions & 133 deletions

File tree

pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,22 @@
1818
*/
1919
package org.apache.pinot.client.admin;
2020

21+
import java.io.IOException;
2122
import java.util.Map;
2223
import java.util.Properties;
24+
import org.apache.pinot.spi.config.table.TableConfig;
25+
import org.apache.pinot.spi.config.table.TableType;
26+
import org.apache.pinot.spi.data.Schema;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2329

2430

2531
/**
2632
* Example demonstrating how to use PinotAdminClient.
2733
*/
2834
public class PinotAdminClientExample {
35+
private static final Logger LOGGER = LoggerFactory.getLogger(PinotAdminClientExample.class);
36+
2937
private PinotAdminClientExample() {
3038
}
3139

@@ -34,110 +42,115 @@ public static void main(String[] args) {
3442
try (PinotAdminClient adminClient = new PinotAdminClient("localhost:9000")) {
3543
exampleBasicUsage(adminClient);
3644
} catch (Exception e) {
37-
System.err.println("Error in basic usage example: " + e.getMessage());
45+
LOGGER.error("Error in basic usage example", e);
3846
}
3947

4048
// Example 2: Usage with basic authentication
4149
try {
4250
Properties properties = new Properties();
4351
properties.setProperty("pinot.admin.request.timeout.ms", "30000");
4452

45-
PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", properties,
53+
try (PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", properties,
4654
PinotAdminAuthentication.AuthType.BASIC,
47-
Map.of("username", "admin", "password", "password"));
48-
49-
exampleWithAuthentication(adminClient);
50-
adminClient.close();
55+
Map.of("username", "admin", "password", "password"))) {
56+
exampleWithAuthentication(adminClient);
57+
}
5158
} catch (Exception e) {
52-
System.err.println("Error in authentication example: " + e.getMessage());
59+
LOGGER.error("Error in authentication example", e);
5360
}
5461

5562
// Example 3: Usage with bearer token authentication
5663
try {
5764
Properties properties = new Properties();
58-
PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", properties,
65+
try (PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", properties,
5966
PinotAdminAuthentication.AuthType.BEARER,
60-
Map.of("token", "your-bearer-token"));
61-
62-
exampleWithBearerAuth(adminClient);
63-
adminClient.close();
67+
Map.of("token", "your-bearer-token"))) {
68+
exampleWithBearerAuth(adminClient);
69+
}
6470
} catch (Exception e) {
65-
System.err.println("Error in bearer auth example: " + e.getMessage());
71+
LOGGER.error("Error in bearer auth example", e);
6672
}
6773
}
6874

6975
private static void exampleBasicUsage(PinotAdminClient adminClient)
7076
throws PinotAdminException {
71-
System.out.println("=== Basic Usage Example ===");
77+
LOGGER.info("=== Basic Usage Example ===");
7278

7379
try {
7480
// List tables
7581
var tables = adminClient.getTableClient().listTables(null, null, null);
76-
System.out.println("Tables: " + tables);
82+
LOGGER.info("Tables: {}", tables);
7783

7884
// List schemas
7985
var schemas = adminClient.getSchemaClient().listSchemaNames();
80-
System.out.println("Schemas: " + schemas);
86+
LOGGER.info("Schemas: {}", schemas);
8187

8288
// List instances
8389
var instances = adminClient.getInstanceClient().listInstances();
84-
System.out.println("Instances: " + instances);
90+
LOGGER.info("Instances: {}", instances);
8591

8692
// List tenants
8793
var tenants = adminClient.getTenantClient().listTenants();
88-
System.out.println("Tenants: " + tenants);
94+
LOGGER.info("Tenants: {}", tenants);
8995

9096
// List task types
9197
var taskTypes = adminClient.getTaskClient().listTaskTypes();
92-
System.out.println("Task types: " + taskTypes);
98+
LOGGER.info("Task types: {}", taskTypes);
9399
} catch (PinotAdminException e) {
94-
System.out.println("Admin operation failed: " + e.getMessage());
100+
LOGGER.error("Admin operation failed", e);
95101
}
96102
}
97103

98104
private static void exampleWithAuthentication(PinotAdminClient adminClient)
99-
throws PinotAdminException {
100-
System.out.println("=== Authentication Example ===");
105+
throws IOException {
106+
LOGGER.info("=== Authentication Example ===");
101107

102108
try {
103109
// Get a specific table configuration
104110
String tableConfig = adminClient.getTableClient().getTableConfig("myTable");
105-
System.out.println("Table config: " + tableConfig);
111+
LOGGER.info("Table config: {}", tableConfig);
112+
113+
TableConfig offlineTableConfig =
114+
adminClient.getTableClient().getTableConfigObjectForType("myTable", TableType.OFFLINE);
115+
LOGGER.info("Typed table config: {}", offlineTableConfig.getTableName());
116+
117+
Schema schema = adminClient.getSchemaClient().getSchemaObject("myTable");
118+
LOGGER.info("Typed schema: {}", schema.getSchemaName());
106119

107120
// Validate a schema
108121
String schemaConfig =
109122
"{\"schemaName\":\"testSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
110123
String validationResult = adminClient.getSchemaClient().validateSchema(schemaConfig);
111-
System.out.println("Schema validation: " + validationResult);
124+
LOGGER.info("Schema validation: {}", validationResult);
112125
} catch (PinotAdminAuthenticationException e) {
113-
System.out.println("Authentication failed: " + e.getMessage());
126+
LOGGER.error("Authentication failed", e);
114127
} catch (PinotAdminNotFoundException e) {
115-
System.out.println("Resource not found: " + e.getMessage());
128+
LOGGER.error("Resource not found", e);
116129
} catch (PinotAdminException e) {
117-
System.out.println("Admin operation failed: " + e.getMessage());
130+
LOGGER.error("Admin operation failed", e);
118131
}
119132
}
120133

121134
private static void exampleWithBearerAuth(PinotAdminClient adminClient)
122135
throws PinotAdminException {
123-
System.out.println("=== Bearer Authentication Example ===");
136+
LOGGER.info("=== Bearer Authentication Example ===");
124137

125138
try {
126139
// Create a new schema
127140
String schemaConfig =
128141
"{\"schemaName\":\"exampleSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
129142
String createResult = adminClient.getSchemaClient().createSchema(schemaConfig);
130-
System.out.println("Schema creation: " + createResult);
143+
LOGGER.info("Schema creation: {}", createResult);
131144

132145
// Get instance information
133146
var liveInstances = adminClient.getInstanceClient().listLiveInstances();
134-
System.out.println("Live instances: " + liveInstances);
147+
LOGGER.info("Live instances: {}", liveInstances);
135148
} catch (PinotAdminAuthenticationException e) {
136-
System.out.println("Authentication failed: " + e.getMessage());
149+
LOGGER.error("Authentication failed", e);
137150
} catch (PinotAdminValidationException e) {
138-
System.out.println("Validation failed: " + e.getMessage());
151+
LOGGER.error("Validation failed", e);
139152
} catch (PinotAdminException e) {
140-
System.out.println("Admin operation failed: " + e.getMessage());
153+
LOGGER.error("Admin operation failed", e);
141154
}
142155
}
143156
}

pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ The admin client consists of:
5555

5656
```java
5757
import org.apache.pinot.client.admin.*;
58+
import org.apache.pinot.spi.config.table.TableConfig;
59+
import org.apache.pinot.spi.config.table.TableType;
60+
import org.apache.pinot.spi.data.Schema;
5861

5962
// Create client without authentication
6063
try(PinotAdminClient adminClient = new PinotAdminClient("localhost:9000")){
@@ -64,6 +67,9 @@ List<String> tables = adminClient.getTableClient().listTables(null, null, null);
6467
// Get a specific table configuration
6568
String config = adminClient.getTableClient().getTableConfig("myTable");
6669

70+
// Get a typed table configuration
71+
TableConfig offlineConfig = adminClient.getTableClient().getTableConfigObjectForType("myTable", TableType.OFFLINE);
72+
6773
// List schemas
6874
List<String> schemas = adminClient.getSchemaClient().listSchemaNames();
6975
}
@@ -152,7 +158,7 @@ List<String> schemas = schemaClient.listSchemaNames();
152158
// Get schema configuration as JSON
153159
String schema = schemaClient.getSchema("mySchema");
154160

155-
// Get a typed Schema object
161+
// Get a typed schema object
156162
Schema schemaObject = schemaClient.getSchemaObject("mySchema");
157163

158164
// Create a new schema

pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ public String getTableConfig(String tableName, @Nullable String tableType)
126126
return response.toString();
127127
}
128128

129+
/**
130+
* Gets the configuration for a specific raw table and type as a typed object.
131+
*/
132+
public TableConfig getTableConfigObjectForType(String tableName, TableType tableType)
133+
throws PinotAdminException, IOException {
134+
return getTableConfigObject(tableName, tableType.name());
135+
}
136+
129137
/**
130138
* Gets the configuration for a specific table as a typed object.
131139
*/

pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,23 @@ public void testGetTableConfigObjectWithTableType()
139139
eq(HEADERS));
140140
}
141141

142+
@Test
143+
public void testGetTypedTableConfig()
144+
throws Exception {
145+
TableConfig expectedTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("tbl1").build();
146+
String jsonResponse = "{\"OFFLINE\":" + JsonUtils.objectToString(expectedTableConfig) + "}";
147+
JsonNode mockResponse = new ObjectMapper().readTree(jsonResponse);
148+
lenient().when(_mockTransport.executeGet(anyString(), anyString(), any(), any()))
149+
.thenReturn(mockResponse);
150+
151+
TableConfig tableConfig = _adminClient.getTableClient().getTableConfigObjectForType("tbl1", TableType.OFFLINE);
152+
153+
assertEquals(tableConfig.getTableName(), "tbl1_OFFLINE");
154+
assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
155+
verify(_mockTransport).executeGet(eq(CONTROLLER_ADDRESS), eq("/tables/tbl1"), eq(Map.of("type", "OFFLINE")),
156+
eq(HEADERS));
157+
}
158+
142159
@Test
143160
public void testListSchemas()
144161
throws Exception {

pinot-connectors/pinot-flink-connector/README.md

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,40 +30,62 @@ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvi
3030
execEnv.setParallelism(2); // optional
3131
DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
3232

33-
// Create a ControllerRequestClient to fetch Pinot schema and table config
34-
HttpClient httpClient = HttpClient.getInstance();
35-
ControllerRequestClient client = new ControllerRequestClient(
36-
ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
33+
// Create a PinotAdminClient to fetch Pinot schema and table config
34+
String controllerUrl = "http://localhost:9000";
35+
URI controllerUri = URI.create(controllerUrl);
36+
String controllerAddress = controllerUri.getAuthority();
37+
String controllerPath = controllerUri.getPath();
38+
if (controllerPath != null && !controllerPath.isEmpty() && !"/".equals(controllerPath)) {
39+
controllerAddress += controllerPath.endsWith("/") ? controllerPath.substring(0, controllerPath.length() - 1)
40+
: controllerPath;
41+
}
42+
Properties properties = new Properties();
43+
properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME, controllerUri.getScheme());
3744

38-
// fetch Pinot schema
39-
Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
40-
// fetch Pinot table config
41-
TableConfig tableConfig = client.getTableClient().getTableConfigObject("starbucksStores", "OFFLINE");
42-
// create Flink Pinot Sink
43-
srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
45+
try (PinotAdminClient client = new PinotAdminClient(controllerAddress, properties)) {
46+
// fetch Pinot schema
47+
Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
48+
// fetch Pinot table config
49+
TableConfig tableConfig =
50+
client.getTableClient().getTableConfigObjectForType("starbucksStores", TableType.OFFLINE);
51+
// create Flink Pinot Sink
52+
srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema,
53+
controllerUrl));
54+
}
4455
execEnv.execute();
4556
```
4657

4758
## Quick start for realtime(upsert) table backfill
4859
```java
4960
// Set up flink env and data source
5061
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
51-
execEnv.setParallelism(2); // mandatory for upsert tables wi
62+
execEnv.setParallelism(2); // mandatory for upsert tables
5263
DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
5364

54-
// Create a ControllerRequestClient to fetch Pinot schema and table config
55-
HttpClient httpClient = HttpClient.getInstance();
56-
ControllerRequestClient client = new ControllerRequestClient(
57-
ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
65+
// Create a PinotAdminClient to fetch Pinot schema and table config
66+
String controllerUrl = "http://localhost:9000";
67+
URI controllerUri = URI.create(controllerUrl);
68+
String controllerAddress = controllerUri.getAuthority();
69+
String controllerPath = controllerUri.getPath();
70+
if (controllerPath != null && !controllerPath.isEmpty() && !"/".equals(controllerPath)) {
71+
controllerAddress += controllerPath.endsWith("/") ? controllerPath.substring(0, controllerPath.length() - 1)
72+
: controllerPath;
73+
}
74+
Properties properties = new Properties();
75+
properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME, controllerUri.getScheme());
5876

59-
// fetch Pinot schema
60-
Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
61-
// fetch Pinot table config
62-
TableConfig tableConfig = client.getTableClient().getTableConfigObject("starbucksStores", "REALTIME");
77+
try (PinotAdminClient client = new PinotAdminClient(controllerAddress, properties)) {
78+
// fetch Pinot schema
79+
Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
80+
// fetch Pinot table config
81+
TableConfig tableConfig =
82+
client.getTableClient().getTableConfigObjectForType("starbucksStores", TableType.REALTIME);
6383

64-
// create Flink Pinot Sink (partition it same as the realtime stream(e.g. kafka) in case of upsert tables)
65-
srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key % partitions, r -> (Integer) r.getField("primaryKey"))
66-
.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
84+
// create Flink Pinot Sink (partition it same as the realtime stream(e.g. kafka) in case of upsert tables)
85+
srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key % partitions, r -> (Integer) r.getField("primaryKey"))
86+
.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema,
87+
controllerUrl));
88+
}
6789
execEnv.execute();
6890

6991
```

pinot-connectors/pinot-flink-connector/pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,16 @@
4040
</dependency>
4141
<dependency>
4242
<groupId>org.apache.pinot</groupId>
43-
<artifactId>pinot-controller</artifactId>
43+
<artifactId>pinot-core</artifactId>
4444
</dependency>
4545
<dependency>
4646
<groupId>org.apache.pinot</groupId>
4747
<artifactId>pinot-segment-writer-file-based</artifactId>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.apache.pinot</groupId>
51+
<artifactId>pinot-segment-uploader-default</artifactId>
52+
</dependency>
4953

5054
<dependency>
5155
<groupId>org.apache.flink</groupId>
@@ -63,6 +67,11 @@
6367
</dependency>
6468

6569
<!-- Test Dependencies -->
70+
<dependency>
71+
<groupId>org.apache.pinot</groupId>
72+
<artifactId>pinot-controller</artifactId>
73+
<scope>test</scope>
74+
</dependency>
6675
<dependency>
6776
<groupId>org.apache.pinot</groupId>
6877
<artifactId>pinot-integration-test-base</artifactId>

0 commit comments

Comments
 (0)