Skip to content

Commit d1e407c

Browse files
committed
[fix](fe) Improve MaxCompute catalog validation
1 parent e9a146d commit d1e407c

12 files changed

Lines changed: 402 additions & 7 deletions

File tree

fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class MCProperties {
5656
public static final String DEFAULT_CONNECT_TIMEOUT = "10"; // 10s
5757
public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
5858
public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
59+
public static final String VALIDATE_CONNECTION = "mc.validate_connection";
60+
public static final String DEFAULT_VALIDATE_CONNECTION = "false";
5961

6062
public static final String MAX_FIELD_SIZE = "mc.max_field_size_bytes";
6163
public static final String DEFAULT_MAX_FIELD_SIZE = "8388608"; // 8 * 1024 * 1024 = 8MB

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public void updateMCCommitData(List<TMCCommitData> commitDataList) {
7171

7272
public void beginInsert(ExternalTable dorisTable, Optional<InsertCommandContext> ctx) throws UserException {
7373
this.table = (MaxComputeExternalTable) dorisTable;
74+
if (table.isUnsupportedOdpsTable()) {
75+
throw new UserException("Writing MaxCompute external table or logical view is not supported: "
76+
+ table.getDbName() + "." + table.getName());
77+
}
7478

7579
try {
7680
TableIdentifier tableId = catalog.getOdpsTableIdentifier(table.getDbName(), table.getName());

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.doris.transaction.TransactionManagerFactory;
2929

3030
import com.aliyun.odps.Odps;
31+
import com.aliyun.odps.OdpsException;
3132
import com.aliyun.odps.Partition;
3233
import com.aliyun.odps.account.AccountFormat;
3334
import com.aliyun.odps.table.TableIdentifier;
@@ -52,7 +53,6 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
5253

5354
// you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints
5455
private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
55-
5656
private Map<String, String> props;
5757
private Odps odps;
5858
private String endpoint;
@@ -206,6 +206,9 @@ protected void initLocalObjectsImpl() {
206206
odps = MCUtils.createMcClient(props);
207207
odps.setDefaultProject(defaultProject);
208208
odps.setEndpoint(endpoint);
209+
odps.getRestClient().setConnectTimeout(connectTimeout);
210+
odps.getRestClient().setReadTimeout(readTimeout);
211+
odps.getRestClient().setRetryTimes(retryTimes);
209212

210213
String accountFormatProp = props.getOrDefault(MCProperties.ACCOUNT_FORMAT, MCProperties.DEFAULT_ACCOUNT_FORMAT);
211214
if (accountFormatProp.equals(MCProperties.ACCOUNT_FORMAT_NAME)) {
@@ -227,12 +230,56 @@ protected void initLocalObjectsImpl() {
227230
boolean enableNamespaceSchema = Boolean.parseBoolean(
228231
props.getOrDefault(MCProperties.ENABLE_NAMESPACE_SCHEMA, MCProperties.DEFAULT_ENABLE_NAMESPACE_SCHEMA));
229232
mcStructureHelper = McStructureHelper.getHelper(enableNamespaceSchema, defaultProject);
233+
boolean validateConnection = Boolean.parseBoolean(
234+
props.getOrDefault(MCProperties.VALIDATE_CONNECTION, MCProperties.DEFAULT_VALIDATE_CONNECTION));
235+
if (validateConnection) {
236+
validateMaxComputeConnection(enableNamespaceSchema);
237+
}
230238

231239
initPreExecutionAuthenticator();
232240
metadataOps = new MaxComputeMetadataOps(this, odps);
233241
transactionManager = TransactionManagerFactory.createMCTransactionManager(this);
234242
}
235243

244+
protected void validateMaxComputeConnection(boolean enableNamespaceSchema) {
245+
if (enableNamespaceSchema) {
246+
validateMaxComputeProjectAndNamespaceSchema();
247+
} else {
248+
validateMaxComputeProject();
249+
}
250+
}
251+
252+
private void validateMaxComputeProject() {
253+
try {
254+
if (!maxComputeProjectExists(defaultProject)) {
255+
throw new RuntimeException("MaxCompute project '" + defaultProject
256+
+ "' does not exist or is not accessible");
257+
}
258+
} catch (Exception e) {
259+
throw new RuntimeException("Failed to validate MaxCompute project '" + defaultProject
260+
+ "'. Check " + MCProperties.PROJECT + ", " + MCProperties.ENDPOINT
261+
+ " and credentials. Cause: " + e.getMessage());
262+
}
263+
}
264+
265+
private void validateMaxComputeProjectAndNamespaceSchema() {
266+
try {
267+
validateMaxComputeNamespaceSchemaAccess(defaultProject);
268+
} catch (Exception e) {
269+
throw new RuntimeException("Failed to validate MaxCompute project '" + defaultProject
270+
+ "' with namespace schema. Check " + MCProperties.PROJECT + ", " + MCProperties.ENDPOINT
271+
+ ", credentials and namespace schema support. Cause: " + e.getMessage());
272+
}
273+
}
274+
275+
protected boolean maxComputeProjectExists(String projectName) throws OdpsException {
276+
return odps.projects().exists(projectName);
277+
}
278+
279+
protected void validateMaxComputeNamespaceSchemaAccess(String projectName) throws OdpsException {
280+
odps.schemas().iterator(projectName).hasNext();
281+
}
282+
236283
public Odps getClient() {
237284
makeSureInitialized();
238285
return odps;
@@ -401,7 +448,7 @@ public void checkProperties() throws DdlException {
401448
MCProperties.DEFAULT_SPLIT_BYTE_SIZE));
402449

403450
if (splitByteSize < 10485760L) {
404-
throw new DdlException(MCProperties.SPLIT_ROW_COUNT + " must be greater than or equal to 10485760");
451+
throw new DdlException(MCProperties.SPLIT_BYTE_SIZE + " must be greater than or equal to 10485760");
405452
}
406453

407454
} else if (splitStrategy.equals(MCProperties.SPLIT_BY_ROW_COUNT_STRATEGY)) {

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,11 @@ public Table getOdpsTable() {
328328
.orElse(null);
329329
}
330330

331+
public boolean isUnsupportedOdpsTable() {
332+
Table odpsTable = getOdpsTable();
333+
return odpsTable.isExternalTable() || odpsTable.isVirtualView();
334+
}
335+
331336
@Override
332337
public boolean isPartitionedTable() {
333338
makeSureInitialized();

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,11 @@ public List<Split> getSplits(int numBackends) throws UserException {
707707
long getOdpsTableTime = System.currentTimeMillis();
708708
LOG.info("MaxComputeScanNode getSplits: getOdpsTable cost {} ms", getOdpsTableTime - startTime);
709709

710+
if (odpsTable.isExternalTable() || odpsTable.isVirtualView()) {
711+
throw new UserException("Reading MaxCompute external table or logical view is not supported: "
712+
+ table.getDbName() + "." + table.getName());
713+
}
714+
710715
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
711716
return result;
712717
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.maxcompute;
19+
20+
import org.apache.doris.common.UserException;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
import org.mockito.Mockito;
25+
26+
import java.util.Optional;
27+
28+
public class MCTransactionTest {
29+
@Test
30+
public void testBeginInsertRejectsOdpsExternalTable() {
31+
assertBeginInsertRejectsUnsupportedOdpsTable("mc_external_table");
32+
}
33+
34+
@Test
35+
public void testBeginInsertRejectsOdpsLogicalView() {
36+
assertBeginInsertRejectsUnsupportedOdpsTable("mc_logical_view");
37+
}
38+
39+
private void assertBeginInsertRejectsUnsupportedOdpsTable(String tableName) {
40+
MaxComputeExternalCatalog catalog = Mockito.mock(MaxComputeExternalCatalog.class);
41+
MaxComputeExternalTable table = Mockito.mock(MaxComputeExternalTable.class);
42+
Mockito.when(table.isUnsupportedOdpsTable()).thenReturn(true);
43+
Mockito.when(table.getDbName()).thenReturn("default");
44+
Mockito.when(table.getName()).thenReturn(tableName);
45+
46+
MCTransaction transaction = new MCTransaction(catalog);
47+
48+
UserException exception = Assert.assertThrows(UserException.class,
49+
() -> transaction.beginInsert(table, Optional.empty()));
50+
Assert.assertTrue(exception.getMessage().contains(
51+
"Writing MaxCompute external table or logical view is not supported: default." + tableName));
52+
Mockito.verify(catalog, Mockito.never()).getOdpsTableIdentifier(Mockito.anyString(), Mockito.anyString());
53+
}
54+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.maxcompute;
19+
20+
import org.apache.doris.common.DdlException;
21+
import org.apache.doris.common.maxcompute.MCProperties;
22+
23+
import org.junit.Assert;
24+
import org.junit.Test;
25+
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
public class MaxComputeExternalCatalogTest {
30+
@Test
31+
public void testSplitByteSizeErrorMessage() {
32+
Map<String, String> props = new HashMap<>();
33+
addRequiredProperties(props);
34+
props.put(MCProperties.SPLIT_STRATEGY, MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY);
35+
props.put(MCProperties.SPLIT_BYTE_SIZE, "1048576");
36+
37+
MaxComputeExternalCatalog catalog = new MaxComputeExternalCatalog(1L, "mc_catalog", null, props, "");
38+
39+
DdlException exception = Assert.assertThrows(DdlException.class, catalog::checkProperties);
40+
Assert.assertTrue(exception.getMessage().contains(
41+
MCProperties.SPLIT_BYTE_SIZE + " must be greater than or equal to 10485760"));
42+
Assert.assertFalse(exception.getMessage().contains(MCProperties.SPLIT_ROW_COUNT));
43+
}
44+
45+
@Test
46+
public void testInitSkipsValidationByDefault() {
47+
Map<String, String> props = createRequiredProperties(true);
48+
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);
49+
50+
catalog.initForTest();
51+
52+
Assert.assertNull(catalog.checkedProjectName);
53+
Assert.assertNull(catalog.checkedNamespaceSchemaProjectName);
54+
}
55+
56+
@Test
57+
public void testInitValidatesProjectWhenValidationEnabled() {
58+
Map<String, String> props = createRequiredProperties(false);
59+
props.put(MCProperties.VALIDATE_CONNECTION, "true");
60+
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);
61+
62+
catalog.initForTest();
63+
64+
Assert.assertEquals("mc_project", catalog.checkedProjectName);
65+
Assert.assertNull(catalog.checkedNamespaceSchemaProjectName);
66+
}
67+
68+
@Test
69+
public void testInitValidatesSchemaWhenNamespaceSchemaEnabled() {
70+
Map<String, String> props = createRequiredProperties(true);
71+
props.put(MCProperties.VALIDATE_CONNECTION, "true");
72+
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);
73+
74+
catalog.initForTest();
75+
76+
Assert.assertNull(catalog.checkedProjectName);
77+
Assert.assertEquals("mc_project", catalog.checkedNamespaceSchemaProjectName);
78+
}
79+
80+
@Test
81+
public void testInitReportsInaccessibleProject() {
82+
Map<String, String> props = createRequiredProperties(false);
83+
props.put(MCProperties.VALIDATE_CONNECTION, "true");
84+
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);
85+
catalog.projectExists = false;
86+
87+
RuntimeException exception = Assert.assertThrows(RuntimeException.class, catalog::initForTest);
88+
89+
Assert.assertTrue(exception.getMessage().contains("Failed to validate MaxCompute project 'mc_project'"));
90+
Assert.assertTrue(exception.getMessage().contains("does not exist or is not accessible"));
91+
Assert.assertNull(catalog.checkedNamespaceSchemaProjectName);
92+
}
93+
94+
@Test
95+
public void testInitReportsUnsupportedNamespaceSchema() {
96+
Map<String, String> props = createRequiredProperties(true);
97+
props.put(MCProperties.VALIDATE_CONNECTION, "true");
98+
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);
99+
catalog.threeTierModel = false;
100+
101+
RuntimeException exception = Assert.assertThrows(RuntimeException.class, catalog::initForTest);
102+
103+
Assert.assertTrue(exception.getMessage().contains("Failed to validate MaxCompute project 'mc_project'"));
104+
Assert.assertTrue(exception.getMessage().contains("does not support namespace schema"));
105+
}
106+
107+
private static Map<String, String> createRequiredProperties(boolean enableNamespaceSchema) {
108+
Map<String, String> props = new HashMap<>();
109+
addRequiredProperties(props);
110+
props.put(MCProperties.ENABLE_NAMESPACE_SCHEMA, Boolean.toString(enableNamespaceSchema));
111+
return props;
112+
}
113+
114+
private static void addRequiredProperties(Map<String, String> props) {
115+
props.put(MCProperties.PROJECT, "mc_project");
116+
props.put(MCProperties.ENDPOINT, "http://service.cn-beijing.maxcompute.aliyun-inc.com/api");
117+
props.put(MCProperties.ACCESS_KEY, "access_key");
118+
props.put(MCProperties.SECRET_KEY, "secret_key");
119+
}
120+
121+
private static class TestMaxComputeExternalCatalog extends MaxComputeExternalCatalog {
122+
private boolean projectExists = true;
123+
private boolean threeTierModel = true;
124+
private String checkedProjectName;
125+
private String checkedNamespaceSchemaProjectName;
126+
127+
private TestMaxComputeExternalCatalog(Map<String, String> props) {
128+
super(1L, "mc_catalog", null, props, "");
129+
}
130+
131+
private void initForTest() {
132+
initLocalObjectsImpl();
133+
}
134+
135+
@Override
136+
protected boolean maxComputeProjectExists(String projectName) {
137+
checkedProjectName = projectName;
138+
return projectExists;
139+
}
140+
141+
@Override
142+
protected void validateMaxComputeNamespaceSchemaAccess(String projectName) {
143+
checkedNamespaceSchemaProjectName = projectName;
144+
if (!threeTierModel) {
145+
throw new RuntimeException("does not support namespace schema");
146+
}
147+
}
148+
}
149+
}

fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.doris.analysis.TupleId;
2828
import org.apache.doris.catalog.Column;
2929
import org.apache.doris.catalog.PrimitiveType;
30+
import org.apache.doris.common.UserException;
3031
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
3132
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
3233
import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
@@ -436,4 +437,27 @@ public void testGetSplits_noLimit_normalPath() throws Exception {
436437

437438
Assert.assertFalse(result.isEmpty());
438439
}
440+
441+
@Test
442+
public void testGetSplitsRejectsOdpsExternalTable() {
443+
assertGetSplitsRejectsUnsupportedOdpsTable(true, false, "mc_external_table");
444+
}
445+
446+
@Test
447+
public void testGetSplitsRejectsOdpsLogicalView() {
448+
assertGetSplitsRejectsUnsupportedOdpsTable(false, true, "mc_logical_view");
449+
}
450+
451+
private void assertGetSplitsRejectsUnsupportedOdpsTable(boolean isExternalTable, boolean isVirtualView,
452+
String tableName) {
453+
Mockito.when(odpsTable.isExternalTable()).thenReturn(isExternalTable);
454+
Mockito.when(odpsTable.isVirtualView()).thenReturn(isVirtualView);
455+
Mockito.when(table.getDbName()).thenReturn("default");
456+
Mockito.when(table.getName()).thenReturn(tableName);
457+
458+
UserException exception = Assert.assertThrows(UserException.class, () -> node.getSplits(1));
459+
Assert.assertTrue(exception.getMessage().contains(
460+
"Reading MaxCompute external table or logical view is not supported: default." + tableName));
461+
Mockito.verify(odpsTable, Mockito.never()).getFileNum();
462+
}
439463
}

fe/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1435,7 +1435,7 @@ under the License.
14351435
<version>${maxcompute.version}</version>
14361436
</dependency>
14371437
<dependency>
1438-
<!-- This is a dependency chain: odps-sdk-core:0.53.2-public → credentials-java:0.3.12 → com.aliyun:tea:[1.1.14,2.0.0)-->
1438+
<!-- Keep tea pinned for Alibaba SDK dependency chains that allow com.aliyun:tea:[1.1.14,2.0.0). -->
14391439
<groupId>com.aliyun</groupId>
14401440
<artifactId>tea</artifactId>
14411441
<version>1.4.1</version>

0 commit comments

Comments
 (0)