Search before asking
Version
25.2
What's Wrong?
Implicitly specifying columns when writing data to Doris will trigger a schema validation error.
What You Expected?
No error
How to Reproduce?
CREATE TABLE `ail` (
`k_id` bigint NOT NULL DEFAULT "0" COMMENT "xxx",
`o_no` varchar(64) NOT NULL DEFAULT "" COMMENT "xx",
`i_in` int NOT NULL DEFAULT "0" COMMENT "xx",
`s_id` bigint NOT NULL DEFAULT "0" COMMENT "xx",
`s_tt` int NOT NULL DEFAULT "0" COMMENT "xxx",
`id` bigint NOT NULL AUTO_INCREMENT(1) COMMENT "xxx"
) ENGINE=OLAP
UNIQUE KEY(`k_id`, `o_no`, `i_in`, `s_id`, `s_tt`)
pyspark code:
import logging
import sys
from pyspark.sql import SparkSession
def log(message):
"""自定义日志函数,强制刷新输出"""
print(message)
sys.stdout.flush()
def create_spark_session(app_name="Doris_Data_Ingestion"):
"""创建SparkSession"""
spark = SparkSession.builder \
.appName(app_name) \
.getOrCreate()
return spark
def validate_source_data(spark, source_table):
"""验证源表数据"""
try:
# 检查源表是否存在
spark.sql("select * from mock_source").show()
except Exception as e:
print(f"验证源表数据失败: {str(e)}")
return False
source_df = spark.sql("select k_id from mock_source")
source_df.printSchema()
write_df = source_df.write \
.format("doris") \
.option("doris.table.identifier", "test.ail") \
.option("doris.fenodes", "127.0.0.1:8030") \
.option("user", "admin") \
.option("password", "xxxxxxxx") \
.option("doris.sink.batch.size", "500000") \
.option("doris.sink.batch.interval.ms", "15000") \
.option("doris.sink.max-retries", "3") \
.option("doris.write.fields", "k_id") \
.option("doris.read.field", "k_id") \
.mode("append")
# .option("mergeSchema", "true") \
# .option("columnMapping", "k_id") \
write_df.save()
def create_doris_temp_view(spark):
"""创建Doris临时视图"""
try:
# 创建Doris临时视图
spark.sql("""
CREATE TEMPORARY VIEW mock_source
AS SELECT 1 AS k_id
""")
return True
except Exception as e:
print(f"创建Doris临时视图失败: {str(e)}")
return False
def main():
log("=" * 60)
print("\n步骤1: 创建 spark sessioin...")
spark = create_spark_session()
# spark.sparkContext.setLogLevel("DEBUG")
print("\n步骤2: 创建临时视图")
create_doris_temp_view(spark)
print("\n步骤3: 验证源表数据...")
if not validate_source_data(spark, 'test.ail'):
spark.stop()
return
if __name__ == "__main__":
main()
Use spark connector 25.2
spark-submit --master local --jars spark-doris-connector-spark-3.5-25.2.0.jar t1.py
error message:
2026-04-15 15:15:40,216 [INFO] [dispatcher-BlockManagerMaster] BlockManagerInfo: Removed broadcast_0_piece0 on 9.0.16.20:38273 in memory (size: 3.7 KiB, free: 434.4 MiB)
Traceback (most recent call last):
File "/data/t1.py", line 75, in <module>
main()
File "/data/t1.py", line 69, in main
if not validate_source_data(spark, 'test.ail'):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/data/t1.py", line 44, in validate_source_data
write_df.save()
File "/data/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 966, in save
File "/data/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/data/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.AnalysisException: Cannot write incompatible data to table 'test.ail':
- Cannot find data for output column 'o_no'
- Cannot find data for output column 'i_in'
- Cannot find data for output column 's_id'
- Cannot find data for output column 's_tt'
- Cannot find data for output column 'id'
Spark connector 1.3.2 is ok.
Anything Else?
No response
Are you willing to submit PR?
Code of Conduct
Search before asking
Version
25.2
What's Wrong?
Implicitly specifying columns when writing data to Doris will trigger a schema validation error.
What You Expected?
No error
How to Reproduce?
pyspark code:
Use spark connector 25.2
spark-submit --master local --jars spark-doris-connector-spark-3.5-25.2.0.jar t1.pyerror message:
Spark connector 1.3.2 is ok.
Anything Else?
No response
Are you willing to submit PR?
Code of Conduct