Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public class DatasourceLoadConfig {
classLoaderFactoryName.put(
"CONSOLE",
"org.apache.seatunnel.datasource.plugin.console.ConsoleDataSourceFactory");
classLoaderFactoryName.put(
"ICEBERG",
"com.apache.seatunnel.datasource.plugin.iceberg.IcebergDataSourceFactory");

classLoaderJarName.put("JDBC-ORACLE", "datasource-jdbc-oracle-");
classLoaderJarName.put("JDBC-CLICKHOUSE", "datasource-jdbc-clickhouse-");
Expand All @@ -130,6 +133,7 @@ public class DatasourceLoadConfig {
classLoaderJarName.put("JDBC-HIVE", "datasource-jdbc-hive-");
classLoaderJarName.put("FAKESOURCE", "datasource-fakesource-");
classLoaderJarName.put("CONSOLE", "datasource-console-");
classLoaderJarName.put("ICEBERG", "datasource-iceberg-");
}

public static final Set<String> pluginSet =
Expand All @@ -151,7 +155,8 @@ public class DatasourceLoadConfig {
"MongoDB",
"JDBC-Db2",
"FakeSource",
"Console");
"Console",
"Iceberg");

public static Map<String, DatasourceClassLoader> datasourceClassLoaders = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>${revision}</version>
</parent>

<artifactId>datasource-iceberg</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>1.7.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.6</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.15</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<skip>${e2e.dependency.skip}</skip>
<appendOutput>true</appendOutput>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apache.seatunnel.datasource.plugin.iceberg;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;

import lombok.NonNull;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class IcebergDataSourceChannel implements DataSourceChannel {
private transient Catalog catalog;

@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return IcebergOptionRule.optionRule();
}

@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return IcebergOptionRule.metadataRule();
}

@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
try {
initCatalog(requestParams);
List<String> tablesList = new ArrayList<>();
Namespace namespace = Namespace.of(database);

for (TableIdentifier identifier : catalog.listTables(namespace)) {
tablesList.add(identifier.name());
}

return tablesList;
} catch (Exception e) {
throw new DataSourcePluginException("Failed to list tables in Iceberg", e);
}
}

@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try {
initCatalog(requestParams);
List<String> databasesList = new ArrayList<>();

// Check if the catalog is an instance of HadoopCatalog
if (catalog instanceof HadoopCatalog) {
HadoopCatalog hadoopCatalog = (HadoopCatalog) catalog;

for (Namespace namespace : hadoopCatalog.listNamespaces()) {
String dbName = namespace.level(0);
databasesList.add(dbName);
}
} else {
throw new DataSourcePluginException("Unsupported catalog type");
}

return databasesList;
} catch (Exception e) {
throw new DataSourcePluginException("Failed to list databases in Iceberg", e);
}
}

@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {

try {
initCatalog(requestParams);
TableIdentifier tableIdentifier = TableIdentifier.of(database, table);

return catalog.loadTable(tableIdentifier).schema().asStruct().fields().stream()
.map(
field -> {
TableField tableField = new TableField();
tableField.setName(field.name());
tableField.setType(field.type().toString());
return tableField;
})
.collect(Collectors.toList());

} catch (Exception e) {
throw new DataSourcePluginException("Failed to get table fields from Iceberg", e);
}
}

@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try {
initCatalog(requestParams);

return true;
} catch (Exception e) {

throw new DataSourcePluginException(
"Check Iceberg connectivity failed, " + e.getMessage(), e);
}
}

private void initCatalog(Map<String, String> requestParams) {
if (catalog == null) {
Configuration conf = new Configuration();
String warehouseUri = requestParams.get(IcebergOptionRule.WAREHOUSE_URI.key());
if (warehouseUri != null) {
catalog = new HadoopCatalog(conf, warehouseUri);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apache.seatunnel.datasource.plugin.iceberg;

import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;

import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;

import java.util.Set;

@AutoService(DataSourceFactory.class)
public class IcebergDataSourceFactory implements DataSourceFactory {
public static final String ICEBERG_PLUGIN_NAME = "Iceberg";
public static final String ICEBERG_PLUGIN_ICON = "Iceberg";
public static final String ICEBERG_PLUGIN_VERSION = "1.0.0";

@Override
public String factoryIdentifier() {
return ICEBERG_PLUGIN_NAME;
}

@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(
DataSourcePluginInfo.builder()
.name(ICEBERG_PLUGIN_NAME)
.icon(ICEBERG_PLUGIN_ICON)
.version(ICEBERG_PLUGIN_VERSION)
.supportVirtualTables(false)
.type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
.build());
}

@Override
public DataSourceChannel createChannel() {
return new IcebergDataSourceChannel();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apache.seatunnel.datasource.plugin.iceberg;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class IcebergOptionRule {

// HDFS URI of the Iceberg warehouse (also used as connection URI)
public static final Option<String> WAREHOUSE_URI =
Options.key("warehouse-uri")
.stringType()
.noDefaultValue()
.withDescription(
"The HDFS URI of the Iceberg warehouse, e.g., hdfs://master:9000/user/iceberg/warehouse.");

// Name of the Iceberg database to read or write
public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("The name of Iceberg database to read or write.");

// Type of the catalog
public static final Option<String> CATALOG_TYPE =
Options.key("catalog-type")
.stringType()
.noDefaultValue()
.withDescription("The type of the catalog, e.g., 'hadoop'.");

public static OptionRule optionRule() {
return OptionRule.builder().required(WAREHOUSE_URI, CATALOG_TYPE).build();
}

public static OptionRule metadataRule() {
return OptionRule.builder().required(DATABASE).build();
}
}
Loading