# vim config/doris-connector-sink.properties
...
# 遇到错误不中断
errors.tolerance=all
# 死信队列的 Kafka 主题名
errors.deadletterqueue.topic.name=test_error_topic
# 写入死信队列时,带上出错上下文信息(header)
errors.deadletterqueue.context.headers.enable=true
# 如果 Kafka 自动创建 DLQ topic,指定副本数
errors.deadletterqueue.topic.replication.factor=1
max_filter_ratio=1
strict_mode=false
# curl -s http://10.62.10.142:8083/connectors/test-doris-sink/status | jq .
{
"name": "test-doris-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.62.10.142:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.62.10.142:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:636)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.doris.kafka.connector.exception.StreamLoadException: failed to stream load data with label: test__KC_0__KC_testdb_test_log1__KC_1917473__KC_38\n\tat org.apache.doris.kafka.connector.writer.load.DorisStreamLoad.load(DorisStreamLoad.java:127)\n\tat org.apache.doris.kafka.connector.writer.StreamLoadWriter.flush(StreamLoadWriter.java:173)\n\tat org.apache.doris.kafka.connector.writer.DorisWriter.insertRecord(DorisWriter.java:133)\n\tat org.apache.doris.kafka.connector.writer.StreamLoadWriter.insert(StreamLoadWriter.java:151)\n\tat org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:154)\n\tat org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:135)\n\tat org.apache.doris.kafka.connector.DorisSinkTask.put(DorisSinkTask.java:97)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606)\n\t... 11 more\nCaused by: org.apache.doris.kafka.connector.exception.StreamLoadException: stream load error: [DATA_QUALITY_ERROR]too many filtered rows, see more in http://10.64.56.145:8040/api/_load_error_log?file=__shard_2/error_log_insert_stmt_7b4aa6b54bbac34c-8f9ba09739bbf587_7b4aa6b54bbac34c_8f9ba09739bbf587\n\tat org.apache.doris.kafka.connector.writer.load.DorisStreamLoad.load(DorisStreamLoad.java:110)\n\t... 18 more\n"
}
],
"type": "sink"
}
# curl -s 'http://10.64.56.145:8040/api/_load_error_log?file=__shard_961/error_log_insert_stmt_7f4d3b96a6d3e0a3-89131e53ebefd79b_7f4d3b96a6d3e0a3_89131e53ebefd79b' | head -n 2
Reason: Parse json data for JsonDoc failed. code: 2, error info: The document root must not be followed by other values.. src line [10.64.56.188 1199 bbs-test.mobileapi.hupu.com 10.64.48.68 - [05/Jun/2025:14:50:03 +0800] "GET /interface/getsThreadAdPv?client=x HTTP/1.1" 502 166 "-" 0.000 "kanqiu server"];
Reason: Parse json data for JsonDoc failed. code: 2, error info: The document root must not be followed by other values.. src line [10.64.56.188 26053 bbs-test.mobileapi.hupu.com 10.64.48.68 - [05/Jun/2025:14:55:03 +0800] "GET /interface/getsThreadAdPv?client=x HTTP/1.1" 502 166 "-" 0.000 "kanqiu server"];