@@ -232,21 +232,48 @@ async def _decode_async(
232232 self ,
233233 chunk_bytes_and_specs : list [tuple [Buffer | None , ArraySpec ]],
234234 ) -> Iterable [NDBuffer | None ]:
235- """Async fallback: walk codecs one at a time (like BatchedCodecPipeline)."""
235+ """Async fallback: walk codecs one at a time (like BatchedCodecPipeline).
236+
237+ Metadata must be resolved forward through the codec chain so each codec
238+ gets the correct spec during reverse (decode) traversal. This matches
239+ BatchedCodecPipeline._codecs_with_resolved_metadata_batched.
240+ """
236241 chunk_bytes_batch , chunk_specs = _unzip2 (chunk_bytes_and_specs )
237242
238- for bb_codec in self .bytes_bytes_codecs [::- 1 ]:
243+ # Resolve metadata forward: aa → ab → bb, recording the spec at each step.
244+ aa_specs : list [list [ArraySpec ]] = []
245+ specs = list (chunk_specs )
246+ for aa_codec in self .array_array_codecs :
247+ aa_specs .append (specs )
248+ specs = [aa_codec .resolve_metadata (s ) for s in specs ]
249+
250+ ab_specs = specs
251+ specs = [self .array_bytes_codec .resolve_metadata (s ) for s in specs ]
252+
253+ bb_specs : list [list [ArraySpec ]] = []
254+ for bb_codec in self .bytes_bytes_codecs :
255+ bb_specs .append (specs )
256+ specs = [bb_codec .resolve_metadata (s ) for s in specs ]
257+
258+ # Decode in reverse, using the forward-resolved specs.
259+ for bb_codec , bb_spec in zip (
260+ self .bytes_bytes_codecs [::- 1 ], bb_specs [::- 1 ], strict = False
261+ ):
239262 chunk_bytes_batch = list (
240- await bb_codec .decode (zip (chunk_bytes_batch , chunk_specs , strict = False ))
263+ await bb_codec .decode (zip (chunk_bytes_batch , bb_spec , strict = False ))
241264 )
242265
243266 chunk_array_batch : list [NDBuffer | None ] = list (
244- await self .array_bytes_codec .decode (zip (chunk_bytes_batch , chunk_specs , strict = False ))
267+ await self .array_bytes_codec .decode (
268+ zip (chunk_bytes_batch , ab_specs , strict = False )
269+ )
245270 )
246271
247- for aa_codec in self .array_array_codecs [::- 1 ]:
272+ for aa_codec , aa_spec in zip (
273+ self .array_array_codecs [::- 1 ], aa_specs [::- 1 ], strict = False
274+ ):
248275 chunk_array_batch = list (
249- await aa_codec .decode (zip (chunk_array_batch , chunk_specs , strict = False ))
276+ await aa_codec .decode (zip (chunk_array_batch , aa_spec , strict = False ))
250277 )
251278
252279 return chunk_array_batch
0 commit comments