Skip to content

Commit 0029a20

Browse files
committed
[soa] add support to variable length string in soa, native only
1 parent 6c59d0d commit 0029a20

2 files changed

Lines changed: 631 additions & 120 deletions

File tree

bjdata/decoder.py

Lines changed: 253 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,16 @@ def prodlist(mylist):
379379

380380

381381
def __decode_soa_schema(fp_read, intern_object_keys, le):
382-
"""Decode SOA schema: {field1:type1, field2:type2, ...}"""
382+
"""Decode SOA schema: {field1:type1, field2:type2, ...}
383+
384+
Supports:
385+
- Fixed-length numeric types
386+
- Boolean (T marker)
387+
- Null (Z marker)
388+
- Fixed-length string: S<int><length>
389+
- Dictionary-based string: [$S#<n><str1><str2>...
390+
- Offset-table-based string: [$<int-type>]
391+
"""
383392
schema = []
384393
marker = fp_read(1)
385394

@@ -393,20 +402,153 @@ def __decode_soa_schema(fp_read, intern_object_keys, le):
393402

394403
# Decode field type marker
395404
type_marker = fp_read(1)
396-
if type_marker not in __TYPES_FIXLEN and type_marker not in (
397-
TYPE_BOOL_TRUE,
398-
TYPE_BOOL_FALSE,
399-
):
405+
406+
if type_marker in __TYPES_FIXLEN:
407+
# Fixed-length numeric type
408+
schema.append(
409+
{
410+
"name": field_name,
411+
"type": "numeric",
412+
"marker": type_marker,
413+
"bytes": __DTYPELEN_MAP[type_marker],
414+
}
415+
)
416+
elif type_marker in (TYPE_BOOL_TRUE, TYPE_BOOL_FALSE):
417+
# Boolean type (1 byte: T or F in payload)
418+
schema.append(
419+
{"name": field_name, "type": "bool", "marker": type_marker, "bytes": 1}
420+
)
421+
elif type_marker == TYPE_NULL:
422+
# Null/placeholder (0 bytes in payload)
423+
schema.append({"name": field_name, "type": "null", "bytes": 0})
424+
elif type_marker in (TYPE_STRING, TYPE_HIGH_PREC):
425+
# Fixed-length string: S<int><length>
426+
length_marker = fp_read(1)
427+
if length_marker not in __TYPES_INT:
428+
raise DecoderException("SOA schema only supports fixed-length types")
429+
length = __decode_int_non_negative(fp_read, length_marker, le)
430+
schema.append(
431+
{
432+
"name": field_name,
433+
"type": "string",
434+
"encoding": "fixed",
435+
"bytes": length,
436+
}
437+
)
438+
elif type_marker == ARRAY_START:
439+
# Dictionary or offset-based string encoding
440+
next_marker = fp_read(1)
441+
if next_marker == CONTAINER_TYPE:
442+
inner = fp_read(1)
443+
if inner in (TYPE_STRING, TYPE_HIGH_PREC):
444+
# Dictionary-based string: [$S#<n><str1><str2>...
445+
if fp_read(1) != CONTAINER_COUNT:
446+
raise DecoderException("Expected # in dict-string schema")
447+
dict_count = __decode_int_non_negative(fp_read, fp_read(1), le)
448+
dictionary = []
449+
for _ in range(dict_count):
450+
str_len = __decode_int_non_negative(fp_read, fp_read(1), le)
451+
dictionary.append(fp_read(str_len).decode("utf-8"))
452+
# Index size based on dictionary size
453+
idx_bytes = (
454+
1 if dict_count <= 255 else (2 if dict_count <= 65535 else 4)
455+
)
456+
idx_marker = (
457+
TYPE_UINT8
458+
if idx_bytes == 1
459+
else (TYPE_UINT16 if idx_bytes == 2 else TYPE_UINT32)
460+
)
461+
schema.append(
462+
{
463+
"name": field_name,
464+
"type": "string",
465+
"encoding": "dict",
466+
"bytes": idx_bytes,
467+
"dict": dictionary,
468+
"index_marker": idx_marker,
469+
}
470+
)
471+
elif inner in __TYPES_INT:
472+
# Offset-table-based string: [$<int-type>]
473+
if fp_read(1) != ARRAY_END:
474+
raise DecoderException("Expected ] in offset-string schema")
475+
schema.append(
476+
{
477+
"name": field_name,
478+
"type": "string",
479+
"encoding": "offset",
480+
"bytes": __DTYPELEN_MAP[inner],
481+
"index_marker": inner,
482+
}
483+
)
484+
else:
485+
raise DecoderException(
486+
"SOA schema only supports fixed-length types"
487+
)
488+
else:
489+
raise DecoderException("SOA schema only supports fixed-length types")
490+
else:
400491
raise DecoderException("SOA schema only supports fixed-length types")
401492

402-
schema.append((field_name, type_marker))
403493
marker = fp_read(1)
404494

405495
return schema
406496

407497

498+
def __decode_soa_field_value(field, raw, record_index, le):
499+
"""Decode a single field value from raw payload bytes.
500+
501+
Args:
502+
field: Field schema dictionary
503+
raw: Raw bytes for this field
504+
record_index: Index of current record (for offset-based strings)
505+
le: Little-endian flag
506+
507+
Returns:
508+
Decoded field value
509+
"""
510+
ftype = field["type"]
511+
512+
if ftype == "numeric":
513+
marker = field["marker"]
514+
if marker in (TYPE_BOOL_TRUE, TYPE_BOOL_FALSE):
515+
return raw[0:1] == TYPE_BOOL_TRUE
516+
elif marker in __NUMPY_DTYPE_MAP:
517+
return buffer2numpy(raw, dtype=npdtype(__NUMPY_DTYPE_MAP[marker]))[0]
518+
elif ftype == "bool":
519+
return raw[0:1] == TYPE_BOOL_TRUE
520+
elif ftype == "null":
521+
return None
522+
elif ftype == "string":
523+
encoding = field["encoding"]
524+
if encoding == "fixed":
525+
# Strip null padding and decode
526+
return raw.rstrip(b"\x00").decode("utf-8")
527+
elif encoding == "dict":
528+
# Look up in dictionary
529+
idx_marker = field["index_marker"]
530+
idx = buffer2numpy(raw, dtype=npdtype(__NUMPY_DTYPE_MAP[idx_marker]))[
531+
0
532+
].item()
533+
return field["dict"][idx]
534+
elif encoding == "offset":
535+
# Use offset table
536+
idx_marker = field["index_marker"]
537+
idx = buffer2numpy(raw, dtype=npdtype(__NUMPY_DTYPE_MAP[idx_marker]))[
538+
0
539+
].item()
540+
offsets = field["offsets"]
541+
return field["string_buffer"][offsets[idx] : offsets[idx + 1]]
542+
543+
return None
544+
545+
408546
def __decode_soa(fp_read, schema, is_row_major, intern_object_keys, le):
409-
"""Decode SOA payload into numpy structured array"""
547+
"""Decode SOA payload into numpy structured array.
548+
549+
Supports both row-major (interleaved) and column-major (columnar) layouts,
550+
with all string encoding types (fixed, dict, offset).
551+
"""
410552
# Read count (can be scalar or ND dimensions)
411553
marker = fp_read(1)
412554
if marker != CONTAINER_COUNT:
@@ -427,55 +569,112 @@ def __decode_soa(fp_read, schema, is_row_major, intern_object_keys, le):
427569
count = __decode_int_non_negative(fp_read, marker, le)
428570
dims = [count]
429571

430-
# Build numpy dtype for structured array
431-
dtype_list = []
432-
for field_name, type_marker in schema:
433-
if type_marker in (TYPE_BOOL_TRUE, TYPE_BOOL_FALSE):
434-
dtype_list.append((field_name, "?"))
435-
else:
436-
dtype_list.append((field_name, __NUMPY_DTYPE_MAP[type_marker]))
437-
438-
struct_dtype = npdtype(dtype_list)
439-
result = npempty(count, dtype=struct_dtype)
572+
# Calculate record size
573+
record_bytes = sum(f["bytes"] for f in schema)
574+
575+
# Read payload
576+
payload = fp_read(record_bytes * count)
577+
578+
# Read deferred offset tables and string buffers for offset-based fields
579+
offset_fields = [f for f in schema if f.get("encoding") == "offset"]
580+
for field in offset_fields:
581+
idx_bytes = field["bytes"]
582+
idx_marker = field["index_marker"]
583+
# Read offset table (count + 1 entries)
584+
offsets = []
585+
for _ in range(count + 1):
586+
offsets.append(__METHOD_MAP[idx_marker](fp_read, idx_marker, le))
587+
field["offsets"] = offsets
588+
# Read string buffer
589+
buffer_len = offsets[-1] if offsets else 0
590+
field["string_buffer"] = (
591+
fp_read(buffer_len).decode("utf-8") if buffer_len > 0 else ""
592+
)
440593

441-
if is_row_major:
442-
# Row-major: interleaved - read one record at a time
443-
for i in range(count):
444-
for field_name, type_marker in schema:
445-
if type_marker in (TYPE_BOOL_TRUE, TYPE_BOOL_FALSE):
446-
# Boolean: read T or F byte
447-
bool_byte = fp_read(1)
448-
result[field_name][i] = bool_byte == TYPE_BOOL_TRUE
594+
# Check if we have any string fields (need list of dicts) or can use numpy
595+
has_variable_strings = any(f.get("encoding") in ("dict", "offset") for f in schema)
596+
597+
# Build numpy dtype for structured array if possible
598+
if not has_variable_strings:
599+
dtype_list = []
600+
for field in schema:
601+
if field["type"] == "numeric":
602+
marker = field["marker"]
603+
if marker in (TYPE_BOOL_TRUE, TYPE_BOOL_FALSE):
604+
dtype_list.append((field["name"], "?"))
449605
else:
450-
# Numeric: read raw bytes
451-
nbytes = __DTYPELEN_MAP[type_marker]
452-
raw = fp_read(nbytes)
453-
value = buffer2numpy(
454-
raw, dtype=npdtype(__NUMPY_DTYPE_MAP[type_marker])
455-
)[0]
456-
result[field_name][i] = value
457-
else:
458-
# Column-major: all values of field1, then field2, etc.
459-
for field_name, type_marker in schema:
460-
if type_marker in (TYPE_BOOL_TRUE, TYPE_BOOL_FALSE):
461-
# Boolean: read T/F bytes
462-
bool_bytes = fp_read(count)
463-
for i in range(count):
464-
result[field_name][i] = bool_bytes[i : i + 1] == TYPE_BOOL_TRUE
606+
dtype_list.append((field["name"], __NUMPY_DTYPE_MAP[marker]))
607+
elif field["type"] == "bool":
608+
dtype_list.append((field["name"], "?"))
609+
elif field["type"] == "null":
610+
# No good numpy equivalent, fall back to object
611+
dtype_list.append((field["name"], "O"))
612+
elif field["type"] == "string" and field["encoding"] == "fixed":
613+
# Fixed-length string
614+
dtype_list.append((field["name"], f'U{field["bytes"]}'))
615+
616+
# Create numpy structured array
617+
struct_dtype = npdtype(dtype_list)
618+
result = npempty(count, dtype=struct_dtype)
619+
620+
# Fill in values
621+
for i in range(count):
622+
if is_row_major:
623+
base_offset = i * record_bytes
624+
field_offset = 0
465625
else:
466-
# Numeric: read all values at once
467-
nbytes = __DTYPELEN_MAP[type_marker]
468-
raw = fp_read(count * nbytes)
469-
values = buffer2numpy(
470-
raw, dtype=npdtype(__NUMPY_DTYPE_MAP[type_marker])
471-
)
472-
result[field_name] = values
473-
474-
# Reshape if ND
475-
if len(dims) > 1:
476-
result = result.reshape(dims)
626+
col_offset = 0
627+
628+
for field in schema:
629+
if is_row_major:
630+
raw = payload[
631+
base_offset
632+
+ field_offset : base_offset
633+
+ field_offset
634+
+ field["bytes"]
635+
]
636+
field_offset += field["bytes"]
637+
else:
638+
field_offset = col_offset + i * field["bytes"]
639+
raw = payload[field_offset : field_offset + field["bytes"]]
640+
col_offset += field["bytes"] * count
641+
642+
result[field["name"]][i] = __decode_soa_field_value(field, raw, i, le)
643+
644+
# Reshape if ND
645+
if len(dims) > 1:
646+
result = result.reshape(dims)
647+
648+
return result
649+
650+
# Fall back to list of dicts for variable-length strings
651+
records = []
652+
for i in range(count):
653+
record = {}
654+
if is_row_major:
655+
# Row-major: all fields of record i are contiguous
656+
base_offset = i * record_bytes
657+
field_offset = 0
658+
for field in schema:
659+
raw = payload[
660+
base_offset
661+
+ field_offset : base_offset
662+
+ field_offset
663+
+ field["bytes"]
664+
]
665+
record[field["name"]] = __decode_soa_field_value(field, raw, i, le)
666+
field_offset += field["bytes"]
667+
else:
668+
# Column-major: all values of field j are contiguous
669+
col_offset = 0
670+
for field in schema:
671+
field_offset = col_offset + i * field["bytes"]
672+
raw = payload[field_offset : field_offset + field["bytes"]]
673+
record[field["name"]] = __decode_soa_field_value(field, raw, i, le)
674+
col_offset += field["bytes"] * count
675+
records.append(record)
477676

478-
return result
677+
return records
479678

480679

481680
def __get_container_params(
@@ -840,7 +1039,8 @@ def load(
8401039
+----------------------------------+---------------+
8411040
8421041
SOA (Structure of Arrays) format is automatically detected and decoded
843-
to numpy structured arrays (record arrays).
1042+
to a list of dicts. String fields with fixed, dict, or offset encoding
1043+
are all supported.
8441044
"""
8451045
if object_pairs_hook is None and object_hook is None:
8461046
object_hook = __object_hook_noop

0 commit comments

Comments
 (0)