|
18 | 18 | from numpy.testing import assert_array_equal |
19 | 19 | from segy.factory import SegyFactory |
20 | 20 | from segy.schema import HeaderField |
| 21 | +from segy.schema import HeaderSpec |
21 | 22 | from segy.schema import SegySpec |
| 23 | +from segy.schema import TraceSpec |
22 | 24 | from segy.standards import get_segy_standard |
23 | 25 |
|
24 | 26 | from mdio import mdio_to_segy |
@@ -560,3 +562,125 @@ def read_segy_trace_header(trace_index: int) -> bytes: |
560 | 562 | assert_array_equal(mdio_header_bytes, segy_header_bytes) |
561 | 563 |
|
562 | 564 | segy_trace_idx += 1 |
| 565 | + |
| 566 | + |
| 567 | +def _spec_with_trace_header_fields(base_spec: SegySpec, fields: list[HeaderField]) -> SegySpec: |
| 568 | + """Return a copy of base_spec whose trace header contains exactly the given fields. |
| 569 | +
|
| 570 | + Unlike ``SegySpec.customize``, which merges into the base field set, this builds a |
| 571 | + fresh ``HeaderSpec`` so we can produce true subsets and arbitrary orderings used in |
| 572 | + the header projection tests. |
| 573 | + """ |
| 574 | + trace = TraceSpec( |
| 575 | + header=HeaderSpec( |
| 576 | + fields=fields, |
| 577 | + item_size=base_spec.trace.header.item_size, |
| 578 | + endianness=base_spec.trace.header.endianness, |
| 579 | + ), |
| 580 | + data=base_spec.trace.data, |
| 581 | + ) |
| 582 | + return SegySpec( |
| 583 | + segy_standard=base_spec.segy_standard, |
| 584 | + text_header=base_spec.text_header, |
| 585 | + binary_header=base_spec.binary_header, |
| 586 | + trace=trace, |
| 587 | + endianness=base_spec.endianness, |
| 588 | + ) |
| 589 | + |
| 590 | + |
| 591 | +@dataclass |
| 592 | +class ProjectionFixture: |
| 593 | + """Pre-ingested MDIO and baseline SegySpec shared by header projection tests.""" |
| 594 | + |
| 595 | + root: Path |
| 596 | + mdio_path: Path |
| 597 | + baseline_spec: SegySpec |
| 598 | + |
| 599 | + |
| 600 | +@pytest.fixture(scope="module") |
| 601 | +def projection_mdio(tmp_path_factory: pytest.TempPathFactory) -> ProjectionFixture: |
| 602 | + """Ingest a small 3D-stack SEG-Y once for the header projection tests.""" |
| 603 | + grid_conf, factory_conf, to_mdio_conf, _ = STACK_3D_CONF |
| 604 | + root = tmp_path_factory.mktemp("header_projection") |
| 605 | + segy_path = root / f"{grid_conf.name}.sgy" |
| 606 | + mdio_path = root / f"{grid_conf.name}.mdio" |
| 607 | + |
| 608 | + mock_nd_segy(segy_path, grid_conf, factory_conf) |
| 609 | + baseline_spec = _segy_spec_mock_nd_segy(grid_conf, factory_conf) |
| 610 | + segy_to_mdio( |
| 611 | + segy_spec=baseline_spec, |
| 612 | + mdio_template=TemplateRegistry().get(to_mdio_conf.template), |
| 613 | + input_path=segy_path, |
| 614 | + output_path=mdio_path, |
| 615 | + overwrite=True, |
| 616 | + ) |
| 617 | + return ProjectionFixture(root=root, mdio_path=mdio_path, baseline_spec=baseline_spec) |
| 618 | + |
| 619 | + |
| 620 | +class TestExportSegySpecHeaderProjection: |
| 621 | + """Verify mdio_to_segy supports subset and reordered SegySpecs (issue #769).""" |
| 622 | + |
| 623 | + def test_reordered_spec_produces_identical_bytes(self, projection_mdio: ProjectionFixture) -> None: |
| 624 | + """Reordering trace header fields must not change the serialized SEG-Y.""" |
| 625 | + baseline_spec = projection_mdio.baseline_spec |
| 626 | + reordered_spec = _spec_with_trace_header_fields( |
| 627 | + baseline_spec, |
| 628 | + list(reversed(baseline_spec.trace.header.fields)), |
| 629 | + ) |
| 630 | + |
| 631 | + baseline_out = projection_mdio.root / "baseline.sgy" |
| 632 | + reordered_out = projection_mdio.root / "reordered.sgy" |
| 633 | + |
| 634 | + mdio_to_segy(segy_spec=baseline_spec, input_path=projection_mdio.mdio_path, output_path=baseline_out) |
| 635 | + mdio_to_segy(segy_spec=reordered_spec, input_path=projection_mdio.mdio_path, output_path=reordered_out) |
| 636 | + |
| 637 | + assert baseline_out.read_bytes() == reordered_out.read_bytes() |
| 638 | + |
| 639 | + def test_subset_spec_preserves_selected_fields(self, projection_mdio: ProjectionFixture) -> None: |
| 640 | + """Subsetting trace header fields writes those fields at their declared byte locations.""" |
| 641 | + baseline_spec = projection_mdio.baseline_spec |
| 642 | + |
| 643 | + subset_fields = [ |
| 644 | + HeaderField(name="crossline", byte=193, format="int32"), |
| 645 | + HeaderField(name="inline", byte=189, format="int32"), |
| 646 | + HeaderField(name="samples_per_trace", byte=115, format="int16"), |
| 647 | + HeaderField(name="sample_interval", byte=117, format="int16"), |
| 648 | + ] |
| 649 | + subset_spec = _spec_with_trace_header_fields(baseline_spec, subset_fields) |
| 650 | + |
| 651 | + baseline_out = projection_mdio.root / "baseline_for_subset.sgy" |
| 652 | + subset_out = projection_mdio.root / "subset.sgy" |
| 653 | + |
| 654 | + mdio_to_segy(segy_spec=baseline_spec, input_path=projection_mdio.mdio_path, output_path=baseline_out) |
| 655 | + mdio_to_segy(segy_spec=subset_spec, input_path=projection_mdio.mdio_path, output_path=subset_out) |
| 656 | + |
| 657 | + baseline_sgy = SegyFileWrapper(baseline_out, spec=baseline_spec) |
| 658 | + subset_sgy = SegyFileWrapper(subset_out, spec=subset_spec) |
| 659 | + |
| 660 | + assert baseline_sgy.num_traces == subset_sgy.num_traces |
| 661 | + |
| 662 | + baseline_traces = baseline_sgy.trace[:] |
| 663 | + subset_traces = subset_sgy.trace[:] |
| 664 | + |
| 665 | + for name in ("inline", "crossline", "samples_per_trace", "sample_interval"): |
| 666 | + assert_array_equal(subset_traces.header[name], baseline_traces.header[name]) |
| 667 | + assert_array_equal(subset_traces.sample, baseline_traces.sample) |
| 668 | + |
| 669 | + def test_missing_field_in_mdio_raises(self, projection_mdio: ProjectionFixture) -> None: |
| 670 | + """SegySpec with a field absent from MDIO headers must raise ValueError.""" |
| 671 | + # 'offset' is not ingested for STACK_3D_CONF; requesting it must fail. |
| 672 | + bad_fields = [ |
| 673 | + HeaderField(name="inline", byte=189, format="int32"), |
| 674 | + HeaderField(name="crossline", byte=193, format="int32"), |
| 675 | + HeaderField(name="offset", byte=37, format="int32"), |
| 676 | + HeaderField(name="samples_per_trace", byte=115, format="int16"), |
| 677 | + HeaderField(name="sample_interval", byte=117, format="int16"), |
| 678 | + ] |
| 679 | + bad_spec = _spec_with_trace_header_fields(projection_mdio.baseline_spec, bad_fields) |
| 680 | + |
| 681 | + with pytest.raises(ValueError, match="offset"): |
| 682 | + mdio_to_segy( |
| 683 | + segy_spec=bad_spec, |
| 684 | + input_path=projection_mdio.mdio_path, |
| 685 | + output_path=projection_mdio.root / "should_not_exist.sgy", |
| 686 | + ) |
0 commit comments