Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class MCProperties {
public static final String DEFAULT_CONNECT_TIMEOUT = "10"; // 10s
public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
public static final String VALIDATE_CONNECTION = "mc.validate_connection";
public static final String DEFAULT_VALIDATE_CONNECTION = "false";

public static final String MAX_FIELD_SIZE = "mc.max_field_size_bytes";
public static final String DEFAULT_MAX_FIELD_SIZE = "8388608"; // 8 * 1024 * 1024 = 8MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public void updateMCCommitData(List<TMCCommitData> commitDataList) {

public void beginInsert(ExternalTable dorisTable, Optional<InsertCommandContext> ctx) throws UserException {
this.table = (MaxComputeExternalTable) dorisTable;
if (table.isUnsupportedOdpsTable()) {
throw new UserException("Writing MaxCompute external table or logical view is not supported: "
+ table.getDbName() + "." + table.getName());
}

try {
TableIdentifier tableId = catalog.getOdpsTableIdentifier(table.getDbName(), table.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.transaction.TransactionManagerFactory;

import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.Partition;
import com.aliyun.odps.account.AccountFormat;
import com.aliyun.odps.table.TableIdentifier;
Expand All @@ -52,7 +53,6 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {

// you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints
private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";

private Map<String, String> props;
private Odps odps;
private String endpoint;
Expand Down Expand Up @@ -206,6 +206,9 @@ protected void initLocalObjectsImpl() {
odps = MCUtils.createMcClient(props);
odps.setDefaultProject(defaultProject);
odps.setEndpoint(endpoint);
odps.getRestClient().setConnectTimeout(connectTimeout);
odps.getRestClient().setReadTimeout(readTimeout);
odps.getRestClient().setRetryTimes(retryTimes);

String accountFormatProp = props.getOrDefault(MCProperties.ACCOUNT_FORMAT, MCProperties.DEFAULT_ACCOUNT_FORMAT);
if (accountFormatProp.equals(MCProperties.ACCOUNT_FORMAT_NAME)) {
Expand All @@ -227,12 +230,56 @@ protected void initLocalObjectsImpl() {
boolean enableNamespaceSchema = Boolean.parseBoolean(
props.getOrDefault(MCProperties.ENABLE_NAMESPACE_SCHEMA, MCProperties.DEFAULT_ENABLE_NAMESPACE_SCHEMA));
mcStructureHelper = McStructureHelper.getHelper(enableNamespaceSchema, defaultProject);
boolean validateConnection = Boolean.parseBoolean(
props.getOrDefault(MCProperties.VALIDATE_CONNECTION, MCProperties.DEFAULT_VALIDATE_CONNECTION));
if (validateConnection) {
validateMaxComputeConnection(enableNamespaceSchema);
}

initPreExecutionAuthenticator();
metadataOps = new MaxComputeMetadataOps(this, odps);
transactionManager = TransactionManagerFactory.createMCTransactionManager(this);
}

protected void validateMaxComputeConnection(boolean enableNamespaceSchema) {
if (enableNamespaceSchema) {
validateMaxComputeProjectAndNamespaceSchema();
} else {
validateMaxComputeProject();
}
}

private void validateMaxComputeProject() {
try {
if (!maxComputeProjectExists(defaultProject)) {
throw new RuntimeException("MaxCompute project '" + defaultProject
+ "' does not exist or is not accessible");
}
} catch (Exception e) {
throw new RuntimeException("Failed to validate MaxCompute project '" + defaultProject
+ "'. Check " + MCProperties.PROJECT + ", " + MCProperties.ENDPOINT
+ " and credentials. Cause: " + e.getMessage());
}
}

private void validateMaxComputeProjectAndNamespaceSchema() {
try {
validateMaxComputeNamespaceSchemaAccess(defaultProject);
} catch (Exception e) {
throw new RuntimeException("Failed to validate MaxCompute project '" + defaultProject
+ "' with namespace schema. Check " + MCProperties.PROJECT + ", " + MCProperties.ENDPOINT
+ ", credentials and namespace schema support. Cause: " + e.getMessage());
}
}

protected boolean maxComputeProjectExists(String projectName) throws OdpsException {
return odps.projects().exists(projectName);
}

protected void validateMaxComputeNamespaceSchemaAccess(String projectName) throws OdpsException {
odps.schemas().iterator(projectName).hasNext();
}

public Odps getClient() {
makeSureInitialized();
return odps;
Expand Down Expand Up @@ -401,7 +448,7 @@ public void checkProperties() throws DdlException {
MCProperties.DEFAULT_SPLIT_BYTE_SIZE));

if (splitByteSize < 10485760L) {
throw new DdlException(MCProperties.SPLIT_ROW_COUNT + " must be greater than or equal to 10485760");
throw new DdlException(MCProperties.SPLIT_BYTE_SIZE + " must be greater than or equal to 10485760");
}

} else if (splitStrategy.equals(MCProperties.SPLIT_BY_ROW_COUNT_STRATEGY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ public Table getOdpsTable() {
.orElse(null);
}

public boolean isUnsupportedOdpsTable() {
Table odpsTable = getOdpsTable();
return odpsTable.isExternalTable() || odpsTable.isVirtualView();
}

@Override
public boolean isPartitionedTable() {
makeSureInitialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,11 @@ public List<Split> getSplits(int numBackends) throws UserException {
long getOdpsTableTime = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplits: getOdpsTable cost {} ms", getOdpsTableTime - startTime);

if (odpsTable.isExternalTable() || odpsTable.isVirtualView()) {
throw new UserException("Reading MaxCompute external table or logical view is not supported: "
+ table.getDbName() + "." + table.getName());
}

if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.maxcompute;

import org.apache.doris.common.UserException;

import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.Optional;

public class MCTransactionTest {
@Test
public void testBeginInsertRejectsOdpsExternalTable() {
assertBeginInsertRejectsUnsupportedOdpsTable("mc_external_table");
}

@Test
public void testBeginInsertRejectsOdpsLogicalView() {
assertBeginInsertRejectsUnsupportedOdpsTable("mc_logical_view");
}

private void assertBeginInsertRejectsUnsupportedOdpsTable(String tableName) {
MaxComputeExternalCatalog catalog = Mockito.mock(MaxComputeExternalCatalog.class);
MaxComputeExternalTable table = Mockito.mock(MaxComputeExternalTable.class);
Mockito.when(table.isUnsupportedOdpsTable()).thenReturn(true);
Mockito.when(table.getDbName()).thenReturn("default");
Mockito.when(table.getName()).thenReturn(tableName);

MCTransaction transaction = new MCTransaction(catalog);

UserException exception = Assert.assertThrows(UserException.class,
() -> transaction.beginInsert(table, Optional.empty()));
Assert.assertTrue(exception.getMessage().contains(
"Writing MaxCompute external table or logical view is not supported: default." + tableName));
Mockito.verify(catalog, Mockito.never()).getOdpsTableIdentifier(Mockito.anyString(), Mockito.anyString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.maxcompute;

import org.apache.doris.common.DdlException;
import org.apache.doris.common.maxcompute.MCProperties;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class MaxComputeExternalCatalogTest {
@Test
public void testSplitByteSizeErrorMessage() {
Map<String, String> props = new HashMap<>();
addRequiredProperties(props);
props.put(MCProperties.SPLIT_STRATEGY, MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY);
props.put(MCProperties.SPLIT_BYTE_SIZE, "1048576");

MaxComputeExternalCatalog catalog = new MaxComputeExternalCatalog(1L, "mc_catalog", null, props, "");

DdlException exception = Assert.assertThrows(DdlException.class, catalog::checkProperties);
Assert.assertTrue(exception.getMessage().contains(
MCProperties.SPLIT_BYTE_SIZE + " must be greater than or equal to 10485760"));
Assert.assertFalse(exception.getMessage().contains(MCProperties.SPLIT_ROW_COUNT));
}

@Test
public void testInitSkipsValidationByDefault() {
Map<String, String> props = createRequiredProperties(true);
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);

catalog.initForTest();

Assert.assertNull(catalog.checkedProjectName);
Assert.assertNull(catalog.checkedNamespaceSchemaProjectName);
}

@Test
public void testInitValidatesProjectWhenValidationEnabled() {
Map<String, String> props = createRequiredProperties(false);
props.put(MCProperties.VALIDATE_CONNECTION, "true");
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);

catalog.initForTest();

Assert.assertEquals("mc_project", catalog.checkedProjectName);
Assert.assertNull(catalog.checkedNamespaceSchemaProjectName);
}

@Test
public void testInitValidatesSchemaWhenNamespaceSchemaEnabled() {
Map<String, String> props = createRequiredProperties(true);
props.put(MCProperties.VALIDATE_CONNECTION, "true");
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);

catalog.initForTest();

Assert.assertNull(catalog.checkedProjectName);
Assert.assertEquals("mc_project", catalog.checkedNamespaceSchemaProjectName);
}

@Test
public void testInitReportsInaccessibleProject() {
Map<String, String> props = createRequiredProperties(false);
props.put(MCProperties.VALIDATE_CONNECTION, "true");
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);
catalog.projectExists = false;

RuntimeException exception = Assert.assertThrows(RuntimeException.class, catalog::initForTest);

Assert.assertTrue(exception.getMessage().contains("Failed to validate MaxCompute project 'mc_project'"));
Assert.assertTrue(exception.getMessage().contains("does not exist or is not accessible"));
Assert.assertNull(catalog.checkedNamespaceSchemaProjectName);
}

@Test
public void testInitReportsUnsupportedNamespaceSchema() {
Map<String, String> props = createRequiredProperties(true);
props.put(MCProperties.VALIDATE_CONNECTION, "true");
TestMaxComputeExternalCatalog catalog = new TestMaxComputeExternalCatalog(props);
catalog.threeTierModel = false;

RuntimeException exception = Assert.assertThrows(RuntimeException.class, catalog::initForTest);

Assert.assertTrue(exception.getMessage().contains("Failed to validate MaxCompute project 'mc_project'"));
Assert.assertTrue(exception.getMessage().contains("does not support namespace schema"));
}

private static Map<String, String> createRequiredProperties(boolean enableNamespaceSchema) {
Map<String, String> props = new HashMap<>();
addRequiredProperties(props);
props.put(MCProperties.ENABLE_NAMESPACE_SCHEMA, Boolean.toString(enableNamespaceSchema));
return props;
}

private static void addRequiredProperties(Map<String, String> props) {
props.put(MCProperties.PROJECT, "mc_project");
props.put(MCProperties.ENDPOINT, "http://service.cn-beijing.maxcompute.aliyun-inc.com/api");
props.put(MCProperties.ACCESS_KEY, "access_key");
props.put(MCProperties.SECRET_KEY, "secret_key");
}

private static class TestMaxComputeExternalCatalog extends MaxComputeExternalCatalog {
private boolean projectExists = true;
private boolean threeTierModel = true;
private String checkedProjectName;
private String checkedNamespaceSchemaProjectName;

private TestMaxComputeExternalCatalog(Map<String, String> props) {
super(1L, "mc_catalog", null, props, "");
}

private void initForTest() {
initLocalObjectsImpl();
}

@Override
protected boolean maxComputeProjectExists(String projectName) {
checkedProjectName = projectName;
return projectExists;
}

@Override
protected void validateMaxComputeNamespaceSchemaAccess(String projectName) {
checkedNamespaceSchemaProjectName = projectName;
if (!threeTierModel) {
throw new RuntimeException("does not support namespace schema");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
Expand Down Expand Up @@ -436,4 +437,27 @@ public void testGetSplits_noLimit_normalPath() throws Exception {

Assert.assertFalse(result.isEmpty());
}

@Test
public void testGetSplitsRejectsOdpsExternalTable() {
assertGetSplitsRejectsUnsupportedOdpsTable(true, false, "mc_external_table");
}

@Test
public void testGetSplitsRejectsOdpsLogicalView() {
assertGetSplitsRejectsUnsupportedOdpsTable(false, true, "mc_logical_view");
}

private void assertGetSplitsRejectsUnsupportedOdpsTable(boolean isExternalTable, boolean isVirtualView,
String tableName) {
Mockito.when(odpsTable.isExternalTable()).thenReturn(isExternalTable);
Mockito.when(odpsTable.isVirtualView()).thenReturn(isVirtualView);
Mockito.when(table.getDbName()).thenReturn("default");
Mockito.when(table.getName()).thenReturn(tableName);

UserException exception = Assert.assertThrows(UserException.class, () -> node.getSplits(1));
Assert.assertTrue(exception.getMessage().contains(
"Reading MaxCompute external table or logical view is not supported: default." + tableName));
Mockito.verify(odpsTable, Mockito.never()).getFileNum();
}
}
2 changes: 1 addition & 1 deletion fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ under the License.
<version>${maxcompute.version}</version>
</dependency>
<dependency>
<!-- This is a dependency chain: odps-sdk-core:0.53.2-public → credentials-java:0.3.12 → com.aliyun:tea:[1.1.14,2.0.0)-->
<!-- Keep tea pinned for Alibaba SDK dependency chains that allow com.aliyun:tea:[1.1.14,2.0.0). -->
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
<version>1.4.1</version>
Expand Down
Loading
Loading