Skip to content

Commit 368047e

Browse files
TRANSFER-467: Raw 2 Table parser (#105)
1 parent 9a202d7 commit 368047e

2 files changed

Lines changed: 105 additions & 23 deletions

File tree

docs/resources/transfer_endpoint.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ Optional:
434434

435435
- `blank` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kafka_source--parser--blank))
436436
- `json` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kafka_source--parser--json))
437+
- `raw_table` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kafka_source--parser--raw_table))
437438
- `schema_registry` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kafka_source--parser--schema_registry))
438439
- `tskv` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kafka_source--parser--tskv))
439440

@@ -488,6 +489,18 @@ Optional:
488489

489490

490491

492+
<a id="nestedblock--settings--kafka_source--parser--raw_table"></a>
493+
### Nested Schema for `settings.kafka_source.parser.raw_table`
494+
495+
Optional:
496+
497+
- `add_headers` (Boolean) Add headers column to output virtual table
498+
- `add_key` (Boolean) Add key column to output virtual table
499+
- `add_timestamp` (Boolean) Add timestamp column to output virtual table
500+
- `keys_as_bytes` (Boolean) Make keys column as `bytes`, for non-utf8 characters
501+
- `value_as_bytes` (Boolean) Make value column as `bytes`, for non-utf8 characters
502+
503+
491504
<a id="nestedblock--settings--kafka_source--parser--schema_registry"></a>
492505
### Nested Schema for `settings.kafka_source.parser.schema_registry`
493506

@@ -718,6 +731,7 @@ Optional:
718731

719732
- `blank` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kinesis_source--parser--blank))
720733
- `json` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kinesis_source--parser--json))
734+
- `raw_table` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kinesis_source--parser--raw_table))
721735
- `schema_registry` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kinesis_source--parser--schema_registry))
722736
- `tskv` (Block, Optional) (see [below for nested schema](#nestedblock--settings--kinesis_source--parser--tskv))
723737

@@ -772,6 +786,18 @@ Optional:
772786

773787

774788

789+
<a id="nestedblock--settings--kinesis_source--parser--raw_table"></a>
790+
### Nested Schema for `settings.kinesis_source.parser.raw_table`
791+
792+
Optional:
793+
794+
- `add_headers` (Boolean) Add headers column to output virtual table
795+
- `add_key` (Boolean) Add key column to output virtual table
796+
- `add_timestamp` (Boolean) Add timestamp column to output virtual table
797+
- `keys_as_bytes` (Boolean) Make keys column as `bytes`, for non-utf8 characters
798+
- `value_as_bytes` (Boolean) Make value column as `bytes`, for non-utf8 characters
799+
800+
775801
<a id="nestedblock--settings--kinesis_source--parser--schema_registry"></a>
776802
### Nested Schema for `settings.kinesis_source.parser.schema_registry`
777803

internal/provider/transfer_endpoint_kafka.go

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ type endpointParser struct {
139139
TSKV *transferParserGeneric `tfsdk:"tskv"`
140140
Blank *blankParser `tfsdk:"blank"`
141141
SchemaRegistry *schemaRegistryParser `tfsdk:"schema_registry"`
142+
RawTable *rawTableParser `tfsdk:"raw_table"`
142143
}
143144

144145
func endpointKafkaParserSchema() schema.Block {
@@ -148,44 +149,40 @@ func endpointKafkaParserSchema() schema.Block {
148149
"tskv": transferParserGenericSchema(),
149150
"blank": blankParserSchema(),
150151
"schema_registry": schemaRegistrySchema(),
152+
"raw_table": rawTableParserSchema(),
151153
},
152154
}
153155
}
154156

155157
func (m *endpointParser) parse(e *endpoint.Parser) diag.Diagnostics {
156158
var diags diag.Diagnostics
157159

158-
switch p := e.GetParser().(type) {
159-
case *endpoint.Parser_JsonParser:
160+
resetParsers := func() {
161+
m.JSON = nil
160162
m.TSKV = nil
161163
m.Blank = nil
164+
m.RawTable = nil
162165
m.SchemaRegistry = nil
163-
if m.JSON == nil {
164-
m.JSON = new(transferParserGeneric)
165-
}
166+
}
167+
switch p := e.GetParser().(type) {
168+
case *endpoint.Parser_RawTable:
169+
resetParsers()
170+
m.RawTable = new(rawTableParser)
171+
diags.Append(m.RawTable.parse(p.RawTable)...)
172+
case *endpoint.Parser_JsonParser:
173+
resetParsers()
174+
m.JSON = new(transferParserGeneric)
166175
diags.Append(m.JSON.parse(p.JsonParser)...)
167176
case *endpoint.Parser_TskvParser:
168-
m.JSON = nil
169-
m.Blank = nil
170-
m.SchemaRegistry = nil
171-
if m.TSKV == nil {
172-
m.TSKV = new(transferParserGeneric)
173-
}
177+
resetParsers()
178+
m.TSKV = new(transferParserGeneric)
174179
diags.Append(m.TSKV.parse(p.TskvParser)...)
175180
case *endpoint.Parser_BlankParser:
176-
m.JSON = nil
177-
m.TSKV = nil
178-
m.SchemaRegistry = nil
179-
if m.Blank == nil {
180-
m.Blank = new(blankParser)
181-
}
181+
resetParsers()
182+
m.Blank = new(blankParser)
182183
case *endpoint.Parser_ConfluentSchemaRegistryParser:
183-
m.JSON = nil
184-
m.TSKV = nil
185-
m.Blank = nil
186-
if m.SchemaRegistry == nil {
187-
m.SchemaRegistry = new(schemaRegistryParser)
188-
}
184+
resetParsers()
185+
m.SchemaRegistry = new(schemaRegistryParser)
189186
diags.Append(m.SchemaRegistry.parse(p.ConfluentSchemaRegistryParser)...)
190187
default:
191188
diags.Append(diag.NewErrorDiagnostic("unknown parser type", fmt.Sprintf("%v", e.GetParser())))
@@ -198,6 +195,10 @@ func (m *endpointParser) convert(r *endpoint.Parser) diag.Diagnostics {
198195
var diags diag.Diagnostics
199196

200197
switch {
198+
case m.RawTable != nil:
199+
prsr := new(endpoint.RawTable)
200+
diags.Append(m.RawTable.convert(prsr)...)
201+
r.Parser = &endpoint.Parser_RawTable{RawTable: prsr}
201202
case m.SchemaRegistry != nil:
202203
prsr := new(endpoint.ConfluentSchemaRegistryParser)
203204
diags.Append(m.SchemaRegistry.convert(prsr)...)
@@ -327,6 +328,61 @@ type transferParserGeneric struct {
327328
AddRestColumn types.Bool `tfsdk:"add_rest_column"`
328329
}
329330

331+
type rawTableParser struct {
332+
ValueAsBytes types.Bool `tfsdk:"value_as_bytes"`
333+
KeysAsBytes types.Bool `tfsdk:"keys_as_bytes"`
334+
AddTimestamp types.Bool `tfsdk:"add_timestamp"`
335+
AddHeaders types.Bool `tfsdk:"add_headers"`
336+
AddKey types.Bool `tfsdk:"add_key"`
337+
}
338+
339+
func (p *rawTableParser) parse(table *endpoint.RawTable) diag.Diagnostics {
340+
var diags diag.Diagnostics
341+
p.ValueAsBytes = types.BoolValue(table.ValueAsBytes)
342+
p.KeysAsBytes = types.BoolValue(table.KeysAsBytes)
343+
p.AddTimestamp = types.BoolValue(table.AddTimestamp)
344+
p.AddHeaders = types.BoolValue(table.AddHeaders)
345+
p.AddKey = types.BoolValue(table.AddKey)
346+
return diags
347+
}
348+
349+
func (p *rawTableParser) convert(r *endpoint.RawTable) diag.Diagnostics {
350+
var diags diag.Diagnostics
351+
r.KeysAsBytes = p.KeysAsBytes.ValueBool()
352+
r.ValueAsBytes = p.ValueAsBytes.ValueBool()
353+
r.AddTimestamp = p.AddTimestamp.ValueBool()
354+
r.AddHeaders = p.AddHeaders.ValueBool()
355+
r.AddKey = p.AddKey.ValueBool()
356+
return diags
357+
}
358+
359+
func rawTableParserSchema() schema.Block {
360+
return schema.SingleNestedBlock{
361+
Attributes: map[string]schema.Attribute{
362+
"value_as_bytes": schema.BoolAttribute{
363+
Optional: true,
364+
MarkdownDescription: "Make value column as `bytes`, for non-utf8 characters",
365+
},
366+
"keys_as_bytes": schema.BoolAttribute{
367+
Optional: true,
368+
MarkdownDescription: "Make keys column as `bytes`, for non-utf8 characters",
369+
},
370+
"add_timestamp": schema.BoolAttribute{
371+
Optional: true,
372+
MarkdownDescription: "Add timestamp column to output virtual table",
373+
},
374+
"add_headers": schema.BoolAttribute{
375+
Optional: true,
376+
MarkdownDescription: "Add headers column to output virtual table",
377+
},
378+
"add_key": schema.BoolAttribute{
379+
Optional: true,
380+
MarkdownDescription: "Add key column to output virtual table",
381+
},
382+
},
383+
}
384+
}
385+
330386
func blankParserSchema() schema.Block {
331387
return schema.SingleNestedBlock{
332388
Attributes: map[string]schema.Attribute{},

0 commit comments

Comments
 (0)