Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Changelog

All notable changes to this project will be documented in this file.

## Unreleased

### Added

- Schema: Added a `Decimal` common type carrying precision and scale via a new `LogicalParams` struct, enabling lossless conversion between Avro, Parquet, and database `NUMBER`/`NUMERIC` decimals. Includes `NewDecimal`, `FormatDecimal`/`ParseDecimal`, and `DecimalParams.Format`/`Parse`/`ValidateValue` helpers, plus a `Common.Validate` entry point. `ParseFromAny` and `InferFromAny` now accept `encoding/json.Number` values, so schemas pipelined through `json.Decoder.UseNumber()` round-trip without precision loss. (@Jeffail)

## 4.70.0 - 2026-04-02

### Added
Expand Down
199 changes: 196 additions & 3 deletions public/schema/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,24 @@
// This optimization is particularly useful in scenarios where schemas are
// transmitted over the network or stored in external systems, as it eliminates
// the need to parse and recalculate fingerprints on cache hits.
//
// # Parameterised logical types
//
// Some types carry parameters beyond what the type identifier conveys. For
// example, a Decimal requires a precision and a scale. These parameters are
// attached to the [Common] schema via the [Common.Logical] field, which holds
// a [LogicalParams] struct. Only the field within [LogicalParams] that
// corresponds to the schema's [Common.Type] should be set.
//
// Use [Common.Validate] to confirm a schema's parameters are internally
// consistent before relying on it.
package schema

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
)
Expand All @@ -75,6 +88,14 @@ const (
Union CommonType = 12
Timestamp CommonType = 13
Any CommonType = 14
Decimal CommonType = 15
)

// Decimal precision bounds. The upper bound matches the widest precision that
// can be represented losslessly across Avro, Parquet and Oracle NUMBER.
const (
DecimalMinPrecision int32 = 1
DecimalMaxPrecision int32 = 38
)

// String returns a human readable string representation of the type.
Expand Down Expand Up @@ -108,6 +129,8 @@ func (t CommonType) String() string {
return "TIMESTAMP"
case Any:
return "ANY"
case Decimal:
return "DECIMAL"
default:
return "UNKNOWN"
}
Expand Down Expand Up @@ -143,6 +166,8 @@ func typeFromStr(v string) (CommonType, error) {
return Timestamp, nil
case "ANY":
return Any, nil
case "DECIMAL":
return Decimal, nil
default:
return 0, fmt.Errorf("unrecognised type string: %v", v)
}
Expand All @@ -157,6 +182,31 @@ type Common struct {
Type CommonType
Optional bool
Children []Common

// Logical holds parameters for parameterised types (e.g. Decimal). Only
// the field within LogicalParams that corresponds to Type should be
// populated; setting parameters that do not apply to the type is a
// validation error.
Logical *LogicalParams
}

// LogicalParams groups the optional parameter blocks that parameterised
// CommonType values may carry. Each parameterised type has its own field;
// at most one is expected to be non-nil for any given Common schema.
type LogicalParams struct {
Decimal *DecimalParams
}

// DecimalParams describes a fixed-precision decimal number.
//
// Precision is the total number of significant digits and must be in
// [DecimalMinPrecision, DecimalMaxPrecision]. Scale is the number of digits
// to the right of the decimal point and must be in [0, Precision]. These
// constraints describe the lossless intersection across Avro, Parquet and
// Oracle NUMBER.
type DecimalParams struct {
Precision int32
Scale int32
}

const (
Expand All @@ -165,6 +215,8 @@ const (
anyFieldOptional = "optional"
anyFieldChildren = "children"
anyFieldFingerprint = "fingerprint"
anyFieldPrecision = "precision"
anyFieldScale = "scale"
)

// ToAny serializes the common schema into a generic Go value, with structured
Expand Down Expand Up @@ -203,11 +255,32 @@ func (c *Common) ToAny() any {
m[anyFieldChildren] = children
}

if c.Type == Decimal && c.Logical != nil && c.Logical.Decimal != nil {
m[anyFieldPrecision] = int64(c.Logical.Decimal.Precision)
m[anyFieldScale] = int64(c.Logical.Decimal.Scale)
}

return m
}

// ParseFromAny deserializes a common schema from a generic Go value.
// ParseFromAny deserializes a common schema from a generic Go value. The
// resulting schema is validated via [Common.Validate] before being returned.
func ParseFromAny(v any) (Common, error) {
c, err := parseFromAnyNoValidate(v)
if err != nil {
return c, err
}
if err := c.Validate(); err != nil {
return c, err
}
return c, nil
}

// parseFromAnyNoValidate performs the structural deserialisation without
// running [Common.Validate]. It is used internally so that recursive parsing
// of nested children does not validate each subtree O(depth) times; the
// public [ParseFromAny] entry point validates exactly once at the top level.
func parseFromAnyNoValidate(v any) (Common, error) {
var c Common

obj, ok := v.(map[string]any)
Expand Down Expand Up @@ -236,13 +309,13 @@ func ParseFromAny(v any) (Common, error) {
if optionalB, ok := optional.(bool); ok {
c.Optional = optionalB
} else {
return c, fmt.Errorf("expected field `optional` of type string, got %T", obj[anyFieldOptional])
return c, fmt.Errorf("expected field `optional` of type bool, got %T", obj[anyFieldOptional])
}
}

if cArr, ok := obj[anyFieldChildren].([]any); ok {
for i, cEle := range cArr {
cChild, err := ParseFromAny(cEle)
cChild, err := parseFromAnyNoValidate(cEle)
if err != nil {
return c, fmt.Errorf("child element %v: %w", i, err)
}
Expand All @@ -251,9 +324,123 @@ func ParseFromAny(v any) (Common, error) {
}
}

_, hasPrecision := obj[anyFieldPrecision]
_, hasScale := obj[anyFieldScale]
if hasPrecision || hasScale {
if c.Type != Decimal {
return c, fmt.Errorf("fields `precision` and `scale` are only valid for type DECIMAL, got %v", c.Type)
}
if !hasPrecision {
return c, errors.New("type DECIMAL requires field `precision`")
}
if !hasScale {
return c, errors.New("type DECIMAL requires field `scale`")
}

precision, err := anyIntField(obj, anyFieldPrecision)
if err != nil {
return c, err
}
scale, err := anyIntField(obj, anyFieldScale)
if err != nil {
return c, err
}

c.Logical = &LogicalParams{
Decimal: &DecimalParams{
Precision: precision,
Scale: scale,
},
}
} else if c.Type == Decimal {
return c, errors.New("type DECIMAL requires fields `precision` and `scale`")
}

return c, nil
}

// anyIntField extracts an integer-valued field from a map[string]any,
// accepting any of the integer or float numeric types that JSON-derived maps
// commonly produce. Float values must have no fractional part.
func anyIntField(obj map[string]any, key string) (int32, error) {
v, ok := obj[key]
if !ok {
return 0, fmt.Errorf("missing field `%s`", key)
}

switch n := v.(type) {

@josephwoodward josephwoodward Apr 28, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle json.Number in the case statement below as well?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch, @josephwoodward — addressed in d1e0b1a. anyIntField now handles json.Number via Int64(), and a sweep turned up a sibling gap in InferFromAny (any caller piping values through json.Decoder.UseNumber() would have hit unsupported data type for every numeric value), which now classifies json.Number as Int64 when it parses as an integer and Float64 otherwise. End-to-end coverage exercises the full ToAny -> json.Marshal -> Decoder.UseNumber -> ParseFromAny round-trip.

case int:
return int32Bounded(int64(n), key)
case int32:
return n, nil
case int64:
return int32Bounded(n, key)
case float32:
if float32(int64(n)) != n {
return 0, fmt.Errorf("field `%s` must be an integer, got %v", key, n)
}
return int32Bounded(int64(n), key)
case float64:
if float64(int64(n)) != n {
return 0, fmt.Errorf("field `%s` must be an integer, got %v", key, n)
}
return int32Bounded(int64(n), key)
case json.Number:
i, err := n.Int64()
if err != nil {
return 0, fmt.Errorf("field `%s` must be an integer, got %v", key, n)
}
return int32Bounded(i, key)
default:
return 0, fmt.Errorf("expected field `%s` of integer type, got %T", key, v)
}
}

func int32Bounded(n int64, key string) (int32, error) {
const maxInt32 = int64(^uint32(0) >> 1)
const minInt32 = -maxInt32 - 1
if n < minInt32 || n > maxInt32 {
return 0, fmt.Errorf("field `%s` value %d does not fit in int32", key, n)
}
return int32(n), nil
}

// Validate enforces the parameter invariants of parameterised types
// (currently only [Decimal]) and that no parameter blocks are attached to
// types that do not accept them. It recurses into [Common.Children].
//
// Structural invariants — for example that an [Object] has children, or that
// a [Union] has more than one child — are not currently enforced; the
// validation surface may grow as new logical types arrive.
//
// Schemas constructed via [ParseFromAny] are validated automatically. Schemas
// constructed by struct literal should call Validate before being passed to
// converters or caches.
func (c *Common) Validate() error {
if c.Type == Decimal {
if c.Logical == nil || c.Logical.Decimal == nil {
return errors.New("type DECIMAL requires Logical.Decimal parameters")
}
d := c.Logical.Decimal
if d.Precision < DecimalMinPrecision || d.Precision > DecimalMaxPrecision {
return fmt.Errorf("decimal precision %d out of range [%d, %d]", d.Precision, DecimalMinPrecision, DecimalMaxPrecision)
}
if d.Scale < 0 || d.Scale > d.Precision {
return fmt.Errorf("decimal scale %d out of range [0, precision=%d]", d.Scale, d.Precision)
}
} else if c.Logical != nil && c.Logical.Decimal != nil {
return fmt.Errorf("Logical.Decimal parameters are only valid for type DECIMAL, got %v", c.Type)
}

for i, child := range c.Children {
if err := child.Validate(); err != nil {
return fmt.Errorf("child %d (%q): %w", i, child.Name, err)
}
}

return nil
}

// Fingerprint returns a deterministic hash identifier for the schema structure.
// Two schemas with the same structure will produce the same fingerprint,
// regardless of memory location. This is useful for caching schema conversions
Expand Down Expand Up @@ -281,6 +468,12 @@ func (c *Common) writeFingerprint(w io.Writer) {
fmt.Fprint(w, "O:0|")
}

// Write parameters for parameterised types. Only emitted when present so
// that schemas of unparameterised types retain their existing fingerprint.
if c.Type == Decimal && c.Logical != nil && c.Logical.Decimal != nil {
fmt.Fprintf(w, "D:%d:%d|", c.Logical.Decimal.Precision, c.Logical.Decimal.Scale)
}

// Write children count and recursively fingerprint each child
fmt.Fprintf(w, "C:%d|", len(c.Children))
for i, child := range c.Children {
Expand Down
1 change: 1 addition & 0 deletions public/schema/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestSchemaStringify(t *testing.T) {
{Input: Union, Output: "UNION"},
{Input: Timestamp, Output: "TIMESTAMP"},
{Input: Any, Output: "ANY"},
{Input: Decimal, Output: "DECIMAL"},
{Input: zeroType, Output: "UNKNOWN"},
{Input: CommonType(-1), Output: "UNKNOWN"},
} {
Expand Down
Loading
Loading