Search before asking
What happened
【BUG1】
- 场景:kafka->postgreSQL 或其他
- 描述:kafka connector 在reader的时候 无法解析json格式的Column,会将整个json作为一个字段传给writer,不会根据Column的字段来拆分
【BUG2 Maybe】
- ElasticSearch7 connector 缺少SSL的keyStore证书传递,只有trustStore证书传递,会导致自签的SSL证书认证失败
What you expected to happen
kafka connector 在reader的时候 无法解析json格式的Column,会将整个json作为一个字段传给writer,不会根据Column的字段来拆分
How to reproduce
解决方式KafkaSourceFactory.createSource() 方法调用 new KafkaSyncConverter(),当codec为json时,缺少以下代码:
if (!CollectionUtils.isEmpty(kafkaConfig.getColumn())) {
List typeList =
kafkaConfig.getColumn().stream()
.map(FieldConfig::getType)
.collect(Collectors.toList());
this.toInternalConverters = new ArrayList<>();
for (TypeConfig s : typeList) {
toInternalConverters.add(
wrapIntoNullableInternalConverter(createInternalConverter(s.getType())));
}
}
Anything else
No response
Version
master
Are you willing to submit PR?
Code of Conduct
Search before asking
What happened
【BUG1】
【BUG2 Maybe】
What you expected to happen
kafka connector 在reader的时候 无法解析json格式的Column,会将整个json作为一个字段传给writer,不会根据Column的字段来拆分
How to reproduce
解决方式KafkaSourceFactory.createSource() 方法调用 new KafkaSyncConverter(),当codec为json时,缺少以下代码:
if (!CollectionUtils.isEmpty(kafkaConfig.getColumn())) {
List typeList =
kafkaConfig.getColumn().stream()
.map(FieldConfig::getType)
.collect(Collectors.toList());
this.toInternalConverters = new ArrayList<>();
for (TypeConfig s : typeList) {
toInternalConverters.add(
wrapIntoNullableInternalConverter(createInternalConverter(s.getType())));
}
}
Anything else
No response
Version
master
Are you willing to submit PR?
Code of Conduct