Skip to content

Commit 60afaae

Browse files
committed
sql: Automatically convert json.Number values to appropriate ClickHouse types
Fixes: #4192
1 parent 4d921b5 commit 60afaae

2 files changed

Lines changed: 16 additions & 4 deletions

File tree

internal/impl/sql/processor_sql_raw.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,12 @@ func NewSQLRawProcessorFromConfig(conf *service.ParsedConfig, mgr *service.Resou
212212
}
213213

214214
var argsConverter argsConverter
215-
if driverStr == "postgres" {
215+
switch driverStr {
216+
case "postgres", "pgx":
216217
argsConverter = bloblValuesToPgSQLValues
217-
} else {
218+
case "clickhouse":
219+
argsConverter = bloblValuesToClickHouseValues
220+
default:
218221
argsConverter = func(v []any) []any { return v }
219222
}
220223

internal/impl/sql/processor_sql_select.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ type sqlSelectProcessor struct {
108108
builder squirrel.SelectBuilder
109109
dbMut sync.RWMutex
110110

111-
where string
112-
argsMapping *bloblang.Executor
111+
where string
112+
argsMapping *bloblang.Executor
113+
argsConverter argsConverter
113114

114115
logger *service.Logger
115116
shutSig *shutdown.Signaller
@@ -162,6 +163,13 @@ func NewSQLSelectProcessorFromConfig(conf *service.ParsedConfig, mgr *service.Re
162163
s.builder = s.builder.PlaceholderFormat(squirrel.Colon)
163164
}
164165

166+
switch driverStr {
167+
case "clickhouse":
168+
s.argsConverter = bloblValuesToClickHouseValues
169+
default:
170+
s.argsConverter = func(v []any) []any { return v }
171+
}
172+
165173
if conf.Contains("prefix") {
166174
prefixStr, err := conf.FieldString("prefix")
167175
if err != nil {
@@ -233,6 +241,7 @@ func (s *sqlSelectProcessor) ProcessBatch(ctx context.Context, batch service.Mes
233241
msg.SetError(fmt.Errorf("mapping returned non-array result: %T", iargs))
234242
continue
235243
}
244+
args = s.argsConverter(args)
236245
}
237246

238247
queryBuilder := s.builder

0 commit comments

Comments
 (0)