Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions arrow/array/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,7 @@ func (b *RecordBuilder) NewRecord() arrow.Record {
}

// UnmarshalOne reads one row (a JSON object) from the supplied decoder and
// appends a value to each field in the RecordBuilder. Missing fields are
// appended as nulls and unrecognized keys are silently ignored.
// appends a value to each field in the RecordBuilder.
//
// Unlike UnmarshalJSON, this method receives an already-configured
// json.Decoder, so options such as UseNumber set by the caller are honored
Expand All @@ -434,43 +433,69 @@ func (b *RecordBuilder) UnmarshalOne(dec *json.Decoder) error {
return fmt.Errorf("record should start with '{', not %s", t)
}

keylist := make(map[string]bool)
// consume one row checking for duplicates and nulls
keylist := make(map[string]json.RawMessage)
for dec.More() {
keyTok, err := dec.Token()
if err != nil {
return err
}

key := keyTok.(string)
if keylist[key] {
if _, ok := keylist[key]; ok {
return fmt.Errorf("key %s shows up twice in row to be decoded", key)
}
keylist[key] = true

var val json.RawMessage
if err := dec.Decode(&val); err != nil {
return err
}

indices := b.schema.FieldIndices(key)
if len(indices) == 0 {
var extra interface{}
if err := dec.Decode(&extra); err != nil {
return err
}
continue
}

if err := b.fields[indices[0]].UnmarshalOne(dec); err != nil {
return err
idx := indices[0]

if bytes.Equal(val, []byte("null")) && !b.schema.Field(idx).Nullable {
return fmt.Errorf("field '%s' is non-nullable but got null", key)
}

keylist[key] = val
}

// consume the closing '}'
if _, err := dec.Token(); err != nil {
return err
}

// check that all non-nullable fields were specified
for i := 0; i < b.schema.NumFields(); i++ {
f := b.schema.Field(i)
if _, ok := keylist[f.Name]; !ok && !f.Nullable {
return fmt.Errorf("field '%s' is required but no value was given", f.Name)
}
}

// at this point we know there are no integrity errors, append values to field builders
for key, val := range keylist {
valDec := json.NewDecoder(bytes.NewReader(val))
valDec.UseNumber()

indices := b.schema.FieldIndices(key)
if err := b.fields[indices[0]].UnmarshalOne(valDec); err != nil {
return err
}
}
Comment on lines +481 to +490

// append nulls to nullable fields if values were not present
for i := 0; i < b.schema.NumFields(); i++ {
if !keylist[b.schema.Field(i).Name] {
if _, ok := keylist[b.schema.Field(i).Name]; !ok {
b.fields[i].AppendNull()
}
}

return nil
}

Expand Down
50 changes: 41 additions & 9 deletions arrow/array/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package array_test

import (
"bytes"
"fmt"
"reflect"
"strings"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/json"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -485,9 +487,9 @@ func TestRecordBuilder(t *testing.T) {
mapDt.SetItemNullable(false)
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
{Name: "map", Type: mapDt},
{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
{Name: "f2-f64-notnull", Type: arrow.PrimitiveTypes.Float64, Nullable: false},
{Name: "map", Type: mapDt, Nullable: true},
},
nil,
)
Expand All @@ -498,11 +500,14 @@ func TestRecordBuilder(t *testing.T) {
b.Retain()
b.Release()

b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3}, nil)
b.Field(0).(*array.Int32Builder).AppendNull()
b.Field(0).(*array.Int32Builder).AppendValues([]int32{2, 3}, nil)
b.Field(0).(*array.Int32Builder).AppendValues([]int32{4, 5}, nil)
b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5}, nil)

b.Field(1).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2, 3.3, 4.4, 5.5}, nil)

mb := b.Field(2).(*array.MapBuilder)
for i := 0; i < 5; i++ {
for i := range 5 {
mb.Append(true)

if i%3 == 0 {
Expand All @@ -511,14 +516,23 @@ func TestRecordBuilder(t *testing.T) {
}
}

err := b.UnmarshalJSON([]byte(`{"f1-i32": null, "f2-f64-notnull": null, "map": null}`))
assert.Contains(t, err.Error(), "field 'f2-f64-notnull' is non-nullable but got null")

err = b.UnmarshalJSON([]byte(`{"f1-i32": null, "map": null}`))
assert.Contains(t, err.Error(), "field 'f2-f64-notnull' is required but no value was given")

err = b.UnmarshalJSON([]byte(`{"f1-i32": 6, "f2-f64-notnull": 6.6, "map": [{"key": "4": "value": "d"}]}`))
assert.NoError(t, err)
Comment on lines +525 to +526

rec := b.NewRecordBatch()
defer rec.Release()

if got, want := rec.Schema(), schema; !got.Equal(want) {
t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
}

if got, want := rec.NumRows(), int64(5); got != want {
if got, want := rec.NumRows(), int64(6); got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
}
if got, want := rec.NumCols(), int64(3); got != want {
Expand All @@ -527,9 +541,27 @@ func TestRecordBuilder(t *testing.T) {
if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
t.Fatalf("invalid column name: got=%q, want=%q", got, want)
}
if got, want := rec.Column(2).String(), `[{["0" "2" "3"] ["a" "b" "c"]} {[] []} {[] []} {["3" "2" "3"] ["a" "b" "c"]} {[] []}]`; got != want {
t.Fatalf("invalid column name: got=%q, want=%q", got, want)

if got, want := rec.Column(0).String(), `[(null) 2 3 4 5 6]`; got != want {
t.Fatalf("invalid column values: got=%q, want=%q", got, want)
}
if got, want := rec.Column(1).String(), `[1.1 2.2 3.3 4.4 5.5 6.6]`; got != want {
t.Fatalf("invalid column values: got=%q, want=%q", got, want)
}
if got, want := rec.Column(2).String(), `[{["0" "2" "3"] ["a" "b" "c"]} {[] []} {[] []} {["3" "2" "3"] ["a" "b" "c"]} {[] []} {["4"] ["d"]}]`; got != want {
t.Fatalf("invalid column values: got=%q, want=%q", got, want)
}

// roundtripping from JSON with array.FromJSON should work
arr := array.RecordToStructArray(rec)
defer arr.Release()
jsonStr, err := json.Marshal(arr)
assert.NoError(t, err)

roundtripped, _, err := array.FromJSON(mem, arr.DataType(), bytes.NewReader(jsonStr))
defer roundtripped.Release()
assert.NoError(t, err)
assert.Truef(t, array.Equal(arr, roundtripped), "JSON round trip returns different array: got=%q, want=%d", arr, roundtripped)
Comment on lines +561 to +564
}

func TestRecordBuilderResize(t *testing.T) {
Expand Down
31 changes: 22 additions & 9 deletions arrow/array/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,14 @@ func (a *Struct) GetOneForMarshal(i int) interface{} {
}

tmp := make(map[string]interface{})
fieldList := a.data.dtype.(*arrow.StructType).Fields()
dtype := a.data.dtype.(*arrow.StructType)
fieldList := dtype.Fields()
for j, d := range a.fields {
tmp[fieldList[j].Name] = d.GetOneForMarshal(i)
if dtype.Field(j).Nullable && a.IsNull(i) {
tmp[fieldList[j].Name] = nil
} else {
tmp[fieldList[j].Name] = d.GetOneForMarshal(i)
}
}
Comment on lines 209 to 218
return tmp
}
Expand Down Expand Up @@ -467,19 +472,27 @@ func (b *StructBuilder) UnmarshalOne(dec *json.Decoder) error {
if keylist[key] {
return fmt.Errorf("key %s is specified twice", key)
}

keylist[key] = true

idx, ok := b.dtype.(*arrow.StructType).FieldIdx(key)
var next json.RawMessage
if err := dec.Decode(&next); err != nil {
return err
}

dtype := b.dtype.(*arrow.StructType)

idx, ok := dtype.FieldIdx(key)
if !ok {
var extra interface{}
if err := dec.Decode(&extra); err != nil {
return err
}
continue
}

if err := b.fields[idx].UnmarshalOne(dec); err != nil {
if bytes.Equal(next, []byte("null")) && !dtype.Field(idx).Nullable {
return fmt.Errorf("field '%s' is non-nullable but got null", dtype.Field(idx).Name)
}

valDec := json.NewDecoder(bytes.NewReader(next))
valDec.UseNumber()
if err := b.fields[idx].UnmarshalOne(valDec); err != nil {
return err
Comment on lines +489 to 496
}
}
Expand Down
46 changes: 22 additions & 24 deletions arrow/array/struct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package array_test

import (
"errors"
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -472,52 +474,48 @@ func TestStructArrayUnmarshalJSONMissingFields(t *testing.T) {
name string
jsonInput string
want string
panic bool
panicErr error
}{
{
name: "missing required field",
jsonInput: `[{"f2": 3, "f3": {"f3_1": "test"}}]`,
panic: true,
panicErr: errors.New("arrow/array: index out of range"),
want: "",
},
{
name: "missing optional fields",
jsonInput: `[{"f2": 3, "f3": {"f3_3": "test"}}]`,
panic: false,
panicErr: nil,
want: `{[(null)] [3] {[(null)] [(null)] ["test"]}}`,
},
{
name: "explicit null in required field",
jsonInput: `[{"f2": 3, "f3": {"f3_3": null}}]`,
panicErr: errors.New("field 'f3_3' is non-nullable but got null"),
want: "",
},
}

for _, tc := range tests {
t.Run(
tc.name, func(t *testing.T) {

var val bool

sb := array.NewStructBuilder(pool, dtype)
defer sb.Release()

if tc.panic {
defer func() {
e := recover()
if e == nil {
t.Fatalf("this should have panicked, but did not; slice value %v", val)
}
if got, want := e.(string), "arrow/array: index out of range"; got != want {
t.Fatalf("invalid error. got=%q, want=%q", got, want)
}
}()
} else {
defer func() {
if e := recover(); e != nil {
t.Fatalf("unexpected panic: %v", e)
}
}()
}
defer func() {
e := recover()
if e == nil && tc.panicErr != nil {
t.Fatalf("did not panic, expected panic: %v", tc.panicErr)
} else if e != nil && tc.panicErr == nil {
t.Fatalf("unexpected panic: %v", e)
} else if e != nil && tc.panicErr != nil && fmt.Errorf("%s", e).Error() != tc.panicErr.Error() {
t.Fatalf("invalid error. got=%v, want=%v", e, tc.panicErr.Error())
}
}()

err := sb.UnmarshalJSON([]byte(tc.jsonInput))
if err != nil {
t.Fatal(err)
panic(err)
}
Comment on lines +505 to 519

arr := sb.NewArray().(*array.Struct)
Expand Down
6 changes: 3 additions & 3 deletions arrow/array/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func TestTableFromRecords(t *testing.T) {

schema := arrow.NewSchema(
[]arrow.Field{
{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
},
nil,
Expand Down Expand Up @@ -793,7 +793,7 @@ func TestTableToString(t *testing.T) {

schema := arrow.NewSchema(
[]arrow.Field{
{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
},
nil,
Expand Down Expand Up @@ -822,7 +822,7 @@ func TestTableToString(t *testing.T) {
expected_str :=
`schema:
fields: 2
- f1-i32: type=int32
- f1-i32: type=int32, nullable
- f2-f64: type=float64
f1-i32: [[1 2 3 4 5 6 7 8 (null) 10], [111 112 113 114 115 116 117 118 119 120]]
f2-f64: [[11 12 13 14 15 16 17 18 19 20], [211 212 213 214 215 216 217 218 219 220]]
Expand Down
6 changes: 5 additions & 1 deletion arrow/array/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ func RecordToJSON(rec arrow.RecordBatch, w io.Writer) error {
cols := make(map[string]interface{})
for i := 0; int64(i) < rec.NumRows(); i++ {
for j, c := range rec.Columns() {
cols[fields[j].Name] = c.GetOneForMarshal(i)
if rec.Schema().Field(j).Nullable && c.IsNull(i) {
cols[fields[j].Name] = nil
} else {
cols[fields[j].Name] = c.GetOneForMarshal(i)
}
}
if err := enc.Encode(cols); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion arrow/array/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestTopLevelValidate(t *testing.T) {

t.Run("ValidateRecord validates all columns", func(t *testing.T) {
validArr := makeStringArrayRaw(t, []int32{0, 3, 6}, "abcdef", 2, 0)
corruptArr := makeStringArrayRaw(t, []int32{0, 5, 3, 5}, "hello", 3, 0)
corruptArr := makeStringArrayRaw(t, []int32{0, 4, 3}, "hello", 2, 0)

schema := arrow.NewSchema([]arrow.Field{
{Name: "ok", Type: arrow.BinaryTypes.String},
Expand Down
Loading