diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index 083729fc2..368d24e00 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -40,7 +40,7 @@ jobs: - name: Run java checkstyle run: - cd flink-doris-connector && mvn clean checkstyle:check -Pflink1 -pl flink-doris-connector-flink1 -am + cd flink-doris-connector && mvn clean install checkstyle:check -DskipTests -Pflink1 -pl flink-doris-connector-flink1 -am - name: Setup java 17 uses: actions/setup-java@v2 @@ -50,4 +50,4 @@ jobs: - name: Run java checkstyle run: - cd flink-doris-connector && mvn clean checkstyle:check -Pflink2 -pl flink-doris-connector-flink2 -am \ No newline at end of file + cd flink-doris-connector && mvn clean install checkstyle:check -DskipTests -Pflink2 -pl flink-doris-connector-flink2 -am \ No newline at end of file diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 99379ff86..1d33d21a9 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -25,6 +25,8 @@ import java.util.Objects; import java.util.Properties; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE; @@ -531,6 +533,11 @@ public DorisExecutionOptions build() { streamLoadProp.put(READ_JSON_BY_LINE, true); } + // Enable gz compression by default + if (streamLoadProp != null && !streamLoadProp.containsKey(COMPRESS_TYPE)) { + streamLoadProp.put(COMPRESS_TYPE, COMPRESS_TYPE_GZ); + } + Preconditions.checkArgument( bufferFlushIntervalMs >= 1000, "bufferFlushIntervalMs must be greater than or equal to 1 second"); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java index 97c2868ad..d1aaf4227 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java @@ -36,10 +36,12 @@ public void testBuild() { Properties expected = new Properties(); expected.put("format", "json"); expected.put("read_json_by_line", true); - Assert.assertTrue(actual.size() == 2); + expected.put("compress_type", "gz"); + Assert.assertTrue(actual.size() == 3); Assert.assertTrue(actual.get("format").equals(expected.get("format"))); Assert.assertTrue( actual.get("read_json_by_line").equals(expected.get("read_json_by_line"))); + Assert.assertTrue(actual.get("compress_type").equals(expected.get("compress_type"))); } @Test diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java b/flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java index f0c027a04..669f8abd2 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java @@ -142,6 +142,9 @@ protected void setSinkConfDefaultConfig(List argList) { argList.add(JDBC_URL + "=" + getDorisQueryUrl()); argList.add(SINK_CONF); argList.add(SINK_LABEL_PREFIX + "=" + "label"); + // disable gz compression for json format, doris 2.1 does not support json compression + argList.add(SINK_CONF); + argList.add("sink.properties.compress_type="); } public static void closeE2EContainers() { diff --git a/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 3d1345b88..f3374f314 100644 --- a/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -77,6 +77,8 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_GROUP_COMMIT = "tbl_group_commit"; static final String TABLE_OVERWRITE = "tbl_overwrite"; static final String TABLE_GZ_FORMAT = "tbl_gz_format"; + static final String TABLE_GZ_FORMAT_DEFAULT = "tbl_gz_format_default"; + static final String TABLE_NO_COMPRESS = "tbl_no_compress"; static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column"; @@ -137,6 +139,7 @@ public void testSinkJsonFormat() throws Exception { Properties properties = new Properties(); properties.setProperty("read_json_by_line", "true"); properties.setProperty("format", "json"); + properties.setProperty("compress_type", ""); // mock data Map row1 = new HashMap<>(); @@ -223,6 +226,7 @@ public void testTableSinkJsonFormat() throws Exception { + " 'sink.ignore.update-before' = 'true'," + " 'sink.properties.format' = 'json'," + " 'sink.properties.read_json_by_line' = 'true'," + + " 'sink.properties.compress_type' = ''," + " 'sink.label-prefix' = 'doris_sink" + UUID.randomUUID() + "'" @@ -481,6 +485,98 @@ public void testTableGzFormat() throws Exception { ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); } + @Test + public void testTableDefaultGzFormat() throws Exception { + initializeTable(TABLE_GZ_FORMAT_DEFAULT, DataModel.UNIQUE); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_gz_default_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.enable.batch-mode' = '%s'," + + " 'sink.enable-delete' = 'false'," + + " 'sink.label-prefix' = '" + + UUID.randomUUID() + + "'," + + " 'sink.properties.column_separator' = '\\x01'," + + " 'sink.properties.line_delimiter' = '\\x02'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_GZ_FORMAT_DEFAULT, + getDorisUsername(), + getDorisPassword(), + batchMode); + tEnv.executeSql(sinkDDL); + tEnv.executeSql( + "INSERT INTO doris_gz_default_sink SELECT 'doris',1 union all SELECT 'flink',2"); + + Thread.sleep(25000); + List expected = Arrays.asList("doris,1", "flink,2"); + String query = + String.format( + "select name,age from %s.%s order by 1", DATABASE, TABLE_GZ_FORMAT_DEFAULT); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + } + + @Test + public void testTableNoCompressFormat() throws Exception { + initializeTable(TABLE_NO_COMPRESS, DataModel.UNIQUE); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_no_compress_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.enable.batch-mode' = '%s'," + + " 'sink.enable-delete' = 'false'," + + " 'sink.label-prefix' = '" + + UUID.randomUUID() + + "'," + + " 'sink.properties.column_separator' = '\\x01'," + + " 'sink.properties.line_delimiter' = '\\x02'," + + " 'sink.properties.compress_type' = ''" + + ")", + getFenodes(), + DATABASE + "." + TABLE_NO_COMPRESS, + getDorisUsername(), + getDorisPassword(), + batchMode); + tEnv.executeSql(sinkDDL); + tEnv.executeSql( + "INSERT INTO doris_no_compress_sink SELECT 'doris',1 union all SELECT 'flink',2"); + + Thread.sleep(25000); + List expected = Arrays.asList("doris,1", "flink,2"); + String query = + String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_NO_COMPRESS); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + } + @Test public void testJobManagerFailoverSink() throws Exception { LOG.info("start to test JobManagerFailoverSink."); @@ -780,6 +876,7 @@ public void testSinkUnicodeColumn() throws Exception { Properties properties = new Properties(); properties.setProperty("read_json_by_line", "true"); properties.setProperty("format", "json"); + properties.setProperty("compress_type", ""); // mock data Map row1 = new HashMap<>();