Skip to content

Commit f89dad9

Browse files
committed
[fix](fe) Improve MaxCompute catalog validation
1 parent e9a146d commit f89dad9

9 files changed

Lines changed: 382 additions & 2 deletions

File tree

fe/fe-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,10 @@ under the License.
369369
<artifactId>odps-sdk-table-api</artifactId>
370370
<version>${maxcompute.version}</version>
371371
</dependency>
372+
<dependency>
373+
<groupId>com.aliyun</groupId>
374+
<artifactId>maxcompute20220104</artifactId>
375+
</dependency>
372376
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
373377
<dependency>
374378
<groupId>org.springframework.boot</groupId>

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public void updateMCCommitData(List<TMCCommitData> commitDataList) {
7171

7272
public void beginInsert(ExternalTable dorisTable, Optional<InsertCommandContext> ctx) throws UserException {
7373
this.table = (MaxComputeExternalTable) dorisTable;
74+
if (table.isUnsupportedOdpsTable()) {
75+
throw new UserException("Writing MaxCompute external table or logical view is not supported: "
76+
+ table.getDbName() + "." + table.getName());
77+
}
7478

7579
try {
7680
TableIdentifier tableId = catalog.getOdpsTableIdentifier(table.getDbName(), table.getName());

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727
import org.apache.doris.datasource.SessionContext;
2828
import org.apache.doris.transaction.TransactionManagerFactory;
2929

30+
import com.aliyun.maxcompute20220104.models.GetProjectRequest;
31+
import com.aliyun.maxcompute20220104.models.GetProjectResponse;
32+
import com.aliyun.maxcompute20220104.models.GetProjectResponseBody.GetProjectResponseBodyData;
3033
import com.aliyun.odps.Odps;
34+
import com.aliyun.odps.OdpsException;
3135
import com.aliyun.odps.Partition;
3236
import com.aliyun.odps.account.AccountFormat;
3337
import com.aliyun.odps.table.TableIdentifier;
@@ -52,6 +56,13 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
5256

5357
// you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints
5458
private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
59+
private static final int VALIDATION_CONNECT_TIMEOUT_SECONDS = 10;
60+
private static final int VALIDATION_READ_TIMEOUT_SECONDS = 10;
61+
private static final int VALIDATION_RETRY_TIMES = 1;
62+
private static final String OPEN_API_AUTH_TYPE_ACCESS_KEY = "access_key";
63+
private static final String OPEN_API_AUTH_TYPE_RAM_ROLE_ARN = "ram_role_arn";
64+
private static final String OPEN_API_AUTH_TYPE_ECS_RAM_ROLE = "ecs_ram_role";
65+
private static final String OPEN_API_ROLE_SESSION_NAME = "doris-maxcompute-catalog";
5566

5667
private Map<String, String> props;
5768
private Odps odps;
@@ -206,6 +217,7 @@ protected void initLocalObjectsImpl() {
206217
odps = MCUtils.createMcClient(props);
207218
odps.setDefaultProject(defaultProject);
208219
odps.setEndpoint(endpoint);
220+
configureOdpsRestClient();
209221

210222
String accountFormatProp = props.getOrDefault(MCProperties.ACCOUNT_FORMAT, MCProperties.DEFAULT_ACCOUNT_FORMAT);
211223
if (accountFormatProp.equals(MCProperties.ACCOUNT_FORMAT_NAME)) {
@@ -227,12 +239,136 @@ protected void initLocalObjectsImpl() {
227239
boolean enableNamespaceSchema = Boolean.parseBoolean(
228240
props.getOrDefault(MCProperties.ENABLE_NAMESPACE_SCHEMA, MCProperties.DEFAULT_ENABLE_NAMESPACE_SCHEMA));
229241
mcStructureHelper = McStructureHelper.getHelper(enableNamespaceSchema, defaultProject);
242+
validateMaxComputeConnection(enableNamespaceSchema);
230243

231244
initPreExecutionAuthenticator();
232245
metadataOps = new MaxComputeMetadataOps(this, odps);
233246
transactionManager = TransactionManagerFactory.createMCTransactionManager(this);
234247
}
235248

249+
private void configureOdpsRestClient() {
250+
odps.getRestClient().setConnectTimeout(connectTimeout);
251+
odps.getRestClient().setReadTimeout(readTimeout);
252+
odps.getRestClient().setRetryTimes(retryTimes);
253+
}
254+
255+
protected void validateMaxComputeConnection(boolean enableNamespaceSchema) {
256+
int originConnectTimeout = odps.getRestClient().getConnectTimeout();
257+
int originReadTimeout = odps.getRestClient().getReadTimeout();
258+
int originRetryTimes = odps.getRestClient().getRetryTimes();
259+
configureOdpsRestClientForValidation();
260+
try {
261+
if (enableNamespaceSchema) {
262+
validateMaxComputeProjectAndNamespaceSchema();
263+
} else {
264+
validateMaxComputeProject();
265+
}
266+
} finally {
267+
restoreOdpsRestClient(originConnectTimeout, originReadTimeout, originRetryTimes);
268+
}
269+
}
270+
271+
private void configureOdpsRestClientForValidation() {
272+
odps.getRestClient().setConnectTimeout(Math.min(connectTimeout, VALIDATION_CONNECT_TIMEOUT_SECONDS));
273+
odps.getRestClient().setReadTimeout(Math.min(readTimeout, VALIDATION_READ_TIMEOUT_SECONDS));
274+
odps.getRestClient().setRetryTimes(Math.min(retryTimes, VALIDATION_RETRY_TIMES));
275+
}
276+
277+
private void restoreOdpsRestClient(int originConnectTimeout, int originReadTimeout, int originRetryTimes) {
278+
odps.getRestClient().setConnectTimeout(originConnectTimeout);
279+
odps.getRestClient().setReadTimeout(originReadTimeout);
280+
odps.getRestClient().setRetryTimes(originRetryTimes);
281+
}
282+
283+
private void validateMaxComputeProject() {
284+
try {
285+
if (!maxComputeProjectExists(defaultProject)) {
286+
throw new RuntimeException("MaxCompute project '" + defaultProject
287+
+ "' does not exist or is not accessible");
288+
}
289+
} catch (Exception e) {
290+
throw new RuntimeException("Failed to validate MaxCompute project '" + defaultProject
291+
+ "'. Check " + MCProperties.PROJECT + ", " + MCProperties.ENDPOINT
292+
+ " and credentials. Cause: " + e.getMessage());
293+
}
294+
}
295+
296+
private void validateMaxComputeProjectAndNamespaceSchema() {
297+
try {
298+
if (!isMaxComputeProjectThreeTierModel(defaultProject)) {
299+
throw new RuntimeException("MaxCompute project '" + defaultProject
300+
+ "' does not support namespace schema");
301+
}
302+
} catch (Exception e) {
303+
throw new RuntimeException("Failed to validate MaxCompute project '" + defaultProject
304+
+ "' with GetProject. Check " + MCProperties.PROJECT + ", " + MCProperties.ENDPOINT
305+
+ ", credentials and namespace schema support. Cause: " + e.getMessage());
306+
}
307+
}
308+
309+
protected boolean maxComputeProjectExists(String projectName) throws OdpsException {
310+
return odps.projects().exists(projectName);
311+
}
312+
313+
protected boolean isMaxComputeProjectThreeTierModel(String projectName) throws Exception {
314+
GetProjectRequest request = new GetProjectRequest()
315+
.setVerbose(false)
316+
.setWithQuotaProductType(false)
317+
.setWithStorageTierInfo(false);
318+
GetProjectResponse response = createMaxComputeOpenApiClient().getProject(projectName, request);
319+
GetProjectResponseBodyData data = response.getBody() == null ? null : response.getBody().getData();
320+
if (data == null || data.getThreeTierModel() == null) {
321+
throw new RuntimeException("MaxCompute GetProject did not return threeTierModel");
322+
}
323+
return Boolean.TRUE.equals(data.getThreeTierModel());
324+
}
325+
326+
private com.aliyun.maxcompute20220104.Client createMaxComputeOpenApiClient() throws Exception {
327+
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
328+
.setCredential(createOpenApiCredentialClient())
329+
.setRegionId(getMaxComputeRegion())
330+
.setConnectTimeout(Math.min(connectTimeout, VALIDATION_CONNECT_TIMEOUT_SECONDS) * 1000)
331+
.setReadTimeout(Math.min(readTimeout, VALIDATION_READ_TIMEOUT_SECONDS) * 1000);
332+
return new com.aliyun.maxcompute20220104.Client(config);
333+
}
334+
335+
private com.aliyun.credentials.Client createOpenApiCredentialClient() {
336+
String authType = props.getOrDefault(MCProperties.AUTH_TYPE, MCProperties.DEFAULT_AUTH_TYPE);
337+
com.aliyun.credentials.models.Config credentialConfig = new com.aliyun.credentials.models.Config();
338+
if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_AK_SK)) {
339+
credentialConfig.setType(OPEN_API_AUTH_TYPE_ACCESS_KEY)
340+
.setAccessKeyId(props.get(MCProperties.ACCESS_KEY))
341+
.setAccessKeySecret(props.get(MCProperties.SECRET_KEY));
342+
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_RAM_ROLE_ARN)) {
343+
credentialConfig.setType(OPEN_API_AUTH_TYPE_RAM_ROLE_ARN)
344+
.setAccessKeyId(props.get(MCProperties.ACCESS_KEY))
345+
.setAccessKeySecret(props.get(MCProperties.SECRET_KEY))
346+
.setRoleArn(props.get(MCProperties.RAM_ROLE_ARN))
347+
.setRoleSessionName(OPEN_API_ROLE_SESSION_NAME);
348+
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_ECS_RAM_ROLE)) {
349+
credentialConfig.setType(OPEN_API_AUTH_TYPE_ECS_RAM_ROLE)
350+
.setRoleName(props.get(MCProperties.ECS_RAM_ROLE));
351+
} else {
352+
throw new RuntimeException("Unsupported MaxCompute auth type: " + authType);
353+
}
354+
return new com.aliyun.credentials.Client(credentialConfig);
355+
}
356+
357+
protected String getMaxComputeRegion() {
358+
if (props.containsKey(MCProperties.REGION)) {
359+
return normalizeMaxComputeRegion(props.get(MCProperties.REGION));
360+
}
361+
String[] endpointSplit = endpoint.split("\\.");
362+
if (endpointSplit.length >= 2) {
363+
return normalizeMaxComputeRegion(endpointSplit[1]);
364+
}
365+
throw new RuntimeException("Can not infer MaxCompute region from endpoint: " + endpoint);
366+
}
367+
368+
private String normalizeMaxComputeRegion(String region) {
369+
return region.replace("oss-", "").replace("-vpc", "").replace("-intranet", "");
370+
}
371+
236372
public Odps getClient() {
237373
makeSureInitialized();
238374
return odps;
@@ -401,7 +537,7 @@ public void checkProperties() throws DdlException {
401537
MCProperties.DEFAULT_SPLIT_BYTE_SIZE));
402538

403539
if (splitByteSize < 10485760L) {
404-
throw new DdlException(MCProperties.SPLIT_ROW_COUNT + " must be greater than or equal to 10485760");
540+
throw new DdlException(MCProperties.SPLIT_BYTE_SIZE + " must be greater than or equal to 10485760");
405541
}
406542

407543
} else if (splitStrategy.equals(MCProperties.SPLIT_BY_ROW_COUNT_STRATEGY)) {

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,11 @@ public Table getOdpsTable() {
328328
.orElse(null);
329329
}
330330

331+
public boolean isUnsupportedOdpsTable() {
332+
Table odpsTable = getOdpsTable();
333+
return odpsTable.isExternalTable() || odpsTable.isVirtualView();
334+
}
335+
331336
@Override
332337
public boolean isPartitionedTable() {
333338
makeSureInitialized();

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,11 @@ public List<Split> getSplits(int numBackends) throws UserException {
707707
long getOdpsTableTime = System.currentTimeMillis();
708708
LOG.info("MaxComputeScanNode getSplits: getOdpsTable cost {} ms", getOdpsTableTime - startTime);
709709

710+
if (odpsTable.isExternalTable() || odpsTable.isVirtualView()) {
711+
throw new UserException("Reading MaxCompute external table or logical view is not supported: "
712+
+ table.getDbName() + "." + table.getName());
713+
}
714+
710715
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
711716
return result;
712717
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.maxcompute;
19+
20+
import org.apache.doris.common.UserException;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
import org.mockito.Mockito;
25+
26+
import java.util.Optional;
27+
28+
public class MCTransactionTest {
29+
@Test
30+
public void testBeginInsertRejectsOdpsExternalTable() {
31+
assertBeginInsertRejectsUnsupportedOdpsTable("mc_external_table");
32+
}
33+
34+
@Test
35+
public void testBeginInsertRejectsOdpsLogicalView() {
36+
assertBeginInsertRejectsUnsupportedOdpsTable("mc_logical_view");
37+
}
38+
39+
private void assertBeginInsertRejectsUnsupportedOdpsTable(String tableName) {
40+
MaxComputeExternalCatalog catalog = Mockito.mock(MaxComputeExternalCatalog.class);
41+
MaxComputeExternalTable table = Mockito.mock(MaxComputeExternalTable.class);
42+
Mockito.when(table.isUnsupportedOdpsTable()).thenReturn(true);
43+
Mockito.when(table.getDbName()).thenReturn("default");
44+
Mockito.when(table.getName()).thenReturn(tableName);
45+
46+
MCTransaction transaction = new MCTransaction(catalog);
47+
48+
UserException exception = Assert.assertThrows(UserException.class,
49+
() -> transaction.beginInsert(table, Optional.empty()));
50+
Assert.assertTrue(exception.getMessage().contains(
51+
"Writing MaxCompute external table or logical view is not supported: default." + tableName));
52+
Mockito.verify(catalog, Mockito.never()).getOdpsTableIdentifier(Mockito.anyString(), Mockito.anyString());
53+
}
54+
}

0 commit comments

Comments
 (0)