Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Multiallelic sites, and split alleles (mutiple records for a site) are both acce

A VCZ store is just a VCZ file - typically in cloud object store - so it's possible to create one using bio2zarr (e.g. vcf2zarr).

However, when the VCFs being appended contain different variants the store must be created with the full set of variants. This is achieved by calling `vczstore create` with the VCZ files that collectively define the set of variants (e.g. one for each genotype array). (Currently `vczstore create` can only be called with two arguments - but you can call it repeatedly to built up the store from multiple files.)
However, when the VCFs being appended contain different variants the store must be created with the full set of variants. This is achieved by calling `vczstore create` with the VCZ files that collectively define the set of variants (e.g. one for each genotype array). (Currently `vczstore create` can only be called with one or two input VCZ arguments - but you can call it repeatedly to built up the store from multiple files.)

After creation the store contains no samples.

Expand Down
10 changes: 10 additions & 0 deletions tests/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ def test_merge_alts(alts1, alts2, expected):
assert _merge_alts(alts1, alts2) == expected


def test_create__single():
vcz1 = make_vcz([0], [100], [["A", "T"]])
vcz_out = zarr.storage.MemoryStore()
create(vcz_out, vcz1)
root = zarr.open(vcz_out)
assert_array_equal(root["variant_position"][:], [100])
assert root["variant_allele"][0, 0] == "A"
assert root["variant_allele"][0, 1] == "T"


def test_create__no_match():
# Disjoint alts at same position → 2 output variants
vcz1 = make_vcz([0], [100], [["A", "T"]])
Expand Down
267 changes: 146 additions & 121 deletions vczstore/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 +525,16 @@ def create(vcz_out, *vczs, show_progress=False, backend_storage=None) -> None:
"""Create a new, empty store vcz_out using merged variants from vczs
using -m none semantics with stable variant ordering.

Currently vczs must contain exactly two stores. This requirement may be lifted
in the future.
Currently vczs must contain exactly one or two stores. This requirement may be
lifted in the future.

Both stores must have identical contig_id arrays. Output contains all variants
from both stores; variants at the same position whose alt sets overlap are merged
into a single record with combined alleles. Only fixed variant fields are
written; other variant fields are not included, and sample and call
fields are included, but with no samples.
In the case of one store, the new store is the same except it contains no samples.

In the case of two stores, both must have identical contig_id arrays. Output
contains all variants from both stores; variants at the same position whose alt
sets overlap are merged into a single record with combined alleles. Only fixed
variant fields are written; other variant fields are not included, and sample and
call fields are included, but with no samples.

When two records are merged: variant_id are joined with ";", variant_quality takes
the max (ignoring missing), variant_filter is the union. variant_filter requires
Expand All @@ -541,150 +543,177 @@ def create(vcz_out, *vczs, show_progress=False, backend_storage=None) -> None:
them.
"""

if len(vczs) != 2:
raise NotImplementedError("A store can only be created from two VCZ files.")

vcz1, vcz2 = vczs
if len(vczs) not in (1, 2):
raise NotImplementedError(
"A store can only be created from one or two VCZ files."
)

with transaction(
vcz_out, backend_storage=backend_storage, message="create", create_repo=True
) as vcz_out:
vcz1 = vczs[0]
root1 = zarr.open(vcz1, mode="r")
root2 = zarr.open(vcz2, mode="r")

if not np.all(root1["contig_id"][:] == root2["contig_id"][:]):
raise ValueError("contig_id fields must be identical")

(
out_contig,
out_position,
out_length,
out_id,
out_quality,
out_filter,
out_allele,
) = _compute_merged_variants(root1, root2, show_progress=show_progress)

n_variants = out_contig.shape[0]

out_root = open_zarr(
vcz_out,
mode="w",
backend_storage=backend_storage,
zarr_format=root1.metadata.zarr_format,
)
out_root.attrs.update(root1.attrs)

# copy direct from vcz1
vcz1_copy_vars = [
var
for var in root1.keys()
if var.startswith("contig_") or var.startswith("filter_")
]
logger.debug(f"Copying arrays for {', '.join(vcz1_copy_vars)}")
copy_store(vcz1, vcz_out, array_keys=vcz1_copy_vars)

logger.debug("Creating variant arrays")
arr = root1["variant_contig"]
create_group_array(
out_root,
"variant_contig",
data=out_contig,
shape=out_contig.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)
arr = root1["variant_position"]
create_group_array(
out_root,
"variant_position",
data=out_position,
shape=out_position.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)
if out_length is not None:
arr = root1["variant_length"]
create_group_array(
out_root,
"variant_length",
data=out_length,
shape=out_length.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
if len(vczs) == 1:
out_root = open_zarr(
vcz_out,
mode="w",
backend_storage=backend_storage,
zarr_format=root1.metadata.zarr_format,
)

if out_id is not None:
arr = root1["variant_id"]
create_group_array(
out_root,
"variant_id",
data=out_id,
shape=out_id.shape,
dtype=STRING_DTYPE_NAME,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
out_root.attrs.update(root1.attrs)

# copy direct from vcz1
vcz1_copy_vars = [
var
for var in root1.keys()
if not var.startswith("call_") and not var == "sample_id"
]
logger.debug(f"Copying arrays for {', '.join(vcz1_copy_vars)}")
copy_store(vcz1, vcz_out, array_keys=vcz1_copy_vars)

else: # len(vczs) == 2
vcz2 = vczs[1]
root2 = zarr.open(vcz2, mode="r")

if not np.all(root1["contig_id"][:] == root2["contig_id"][:]):
raise ValueError("contig_id fields must be identical")

(
out_contig,
out_position,
out_length,
out_id,
out_quality,
out_filter,
out_allele,
) = _compute_merged_variants(root1, root2, show_progress=show_progress)

n_variants = out_contig.shape[0]

out_root = open_zarr(
vcz_out,
mode="w",
backend_storage=backend_storage,
zarr_format=root1.metadata.zarr_format,
)
arr = root1["variant_id_mask"]
out_root.attrs.update(root1.attrs)

# copy direct from vcz1
vcz1_copy_vars = [
var
for var in root1.keys()
if var.startswith("contig_") or var.startswith("filter_")
]
logger.debug(f"Copying arrays for {', '.join(vcz1_copy_vars)}")
copy_store(vcz1, vcz_out, array_keys=vcz1_copy_vars)

logger.debug("Creating variant arrays")
arr = root1["variant_contig"]
create_group_array(
out_root,
"variant_id_mask",
data=out_id == ".",
shape=out_id.shape,
"variant_contig",
data=out_contig,
shape=out_contig.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)
if out_quality is not None:
arr = root1["variant_quality"]
arr = root1["variant_position"]
create_group_array(
out_root,
"variant_quality",
data=out_quality,
shape=out_quality.shape,
"variant_position",
data=out_position,
shape=out_position.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)
if out_filter is not None:
arr = root1["variant_filter"]
if out_length is not None:
arr = root1["variant_length"]
create_group_array(
out_root,
"variant_length",
data=out_length,
shape=out_length.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)

if out_id is not None:
arr = root1["variant_id"]
create_group_array(
out_root,
"variant_id",
data=out_id,
shape=out_id.shape,
dtype=STRING_DTYPE_NAME,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)
arr = root1["variant_id_mask"]
create_group_array(
out_root,
"variant_id_mask",
data=out_id == ".",
shape=out_id.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)
if out_quality is not None:
arr = root1["variant_quality"]
create_group_array(
out_root,
"variant_quality",
data=out_quality,
shape=out_quality.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants"],
)
if out_filter is not None:
arr = root1["variant_filter"]
create_group_array(
out_root,
"variant_filter",
data=out_filter,
shape=out_filter.shape,
dtype=arr.dtype,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants", "filters"],
)

arr = root1["variant_allele"]
create_group_array(
out_root,
"variant_filter",
data=out_filter,
shape=out_filter.shape,
dtype=arr.dtype,
"variant_allele",
data=out_allele,
shape=out_allele.shape,
dtype=STRING_DTYPE_NAME,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants", "filters"],
dimension_names=["variants", "alleles"],
)

arr = root1["variant_allele"]
create_group_array(
out_root,
"variant_allele",
data=out_allele,
shape=out_allele.shape,
dtype=STRING_DTYPE_NAME,
chunks=arr.chunks,
compressor=get_compressor_config(arr),
dimension_names=["variants", "alleles"],
)
if out_length is not None:
indexer = VcfZarrIndexer(vcz_out)
indexer.create_index()

logger.debug("Creating empty sample and call arrays")
for var in root1.keys():
if var.startswith("call_"):
arr = root1[var]
shape = (n_variants, 0) + arr.shape[2:]
# TODO: should allow sample chunk size to be overridden/enforced here
create_empty_group_array(
out_root,
var,
Expand All @@ -706,7 +735,3 @@ def create(vcz_out, *vczs, show_progress=False, backend_storage=None) -> None:
compressor=get_compressor_config(arr),
dimension_names=array_dims(arr),
)

if out_length is not None:
indexer = VcfZarrIndexer(vcz_out)
indexer.create_index()
Loading