Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 108 additions & 39 deletions plugins/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
import base64
import contextlib
import logging
import multiprocessing.dummy
import os
from packaging.version import Version
Expand All @@ -17,6 +18,7 @@
import fiftyone.constants as foc
import fiftyone.core.fields as fof
import fiftyone.core.media as fom
import fiftyone.core.metadata as fomm
import fiftyone.core.storage as fos
import fiftyone.core.utils as fou
import fiftyone.operators as foo
Expand Down Expand Up @@ -319,6 +321,16 @@ def _import_media_only_inputs(ctx, inputs):
view=types.AutocompleteView(multiple=True),
)

inputs.bool(
"metadata",
default=True,
label="Compute metadata",
description=(
"Whether to populate the `metadata` field of each imported sample"
),
view=types.CheckboxView(),
)

ready = _upload_media_inputs(ctx, inputs)
if not ready:
return False
Expand Down Expand Up @@ -559,6 +571,16 @@ def _import_media_and_labels_inputs(ctx, inputs):
view=types.AutocompleteView(multiple=True),
)

inputs.bool(
"metadata",
default=True,
label="Compute metadata",
description=(
"Whether to populate the `metadata` field of each imported sample"
),
view=types.CheckboxView(),
)

return _upload_media_inputs(ctx, inputs)


Expand Down Expand Up @@ -795,6 +817,7 @@ def _upload_media_bytes(ctx):
def _import_media_only(ctx):
style = ctx.params.get("style", None)
tags = ctx.params.get("tags", None)
metadata = ctx.params.get("metadata", False)

if style == "UPLOAD":
filepath = _upload_media_bytes(ctx)
Expand Down Expand Up @@ -823,29 +846,51 @@ def _import_media_only(ctx):
for progress in _upload_media(ctx, tasks):
yield progress

make_sample = lambda f: fo.Sample(filepath=f, tags=tags)
samples = map(make_sample, filepaths)
with contextlib.ExitStack() as context:
if metadata:
num_workers = fou.recommend_thread_pool_workers()
pool = multiprocessing.dummy.Pool(processes=num_workers)
context.enter_context(pool)

def make_sample(filepath):
media_type = fom.get_media_type(filepath)
metadata = fomm._compute_sample_metadata(filepath, media_type)
return fo.Sample(
filepath=filepath,
metadata=metadata,
media_type=media_type,
tags=tags,
)

# @todo can remove version check if we require `fiftyone>=1.5.0`
if ctx.delegated or Version(foc.VERSION) < Version("1.5.0"):
kwargs = {}
samples = pool.map(make_sample, filepaths)
else:

# @todo can remove version check if we require `fiftyone>=1.6.0`
if Version(foc.VERSION) >= Version("1.6.0"):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=10.0)
def make_sample(filepath):
return fo.Sample(filepath=filepath, tags=tags)

ctx.dataset.add_samples(samples, num_samples=num_total, **kwargs)
return
samples = map(make_sample, filepaths)

num_added = 0
for ids in ctx.dataset.add_samples(
samples, generator=True, progress=False
):
num_added += len(ids)
progress = num_added / num_total
label = f"Loaded {num_added} of {num_total}"
yield ctx.trigger("set_progress", dict(progress=progress, label=label))
# @todo can remove version check if we require `fiftyone>=1.5.0`
if ctx.delegated or Version(foc.VERSION) < Version("1.5.0"):
kwargs = {}

# @todo can remove version check if we require `fiftyone>=1.6.0`
if Version(foc.VERSION) >= Version("1.6.0"):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=10.0)

ctx.dataset.add_samples(samples, num_samples=num_total, **kwargs)
else:
num_added = 0
for ids in ctx.dataset.add_samples(
samples, generator=True, progress=False
):
num_added += len(ids)
progress = num_added / num_total
label = f"Loaded {num_added} of {num_total}"
yield ctx.trigger(
"set_progress", dict(progress=progress, label=label)
)


def _import_media_and_labels(ctx):
Expand All @@ -858,6 +903,7 @@ def _import_media_and_labels(ctx):
label_field = ctx.params.get("label_field", None)
label_types = ctx.params.get("label_types", None)
tags = ctx.params.get("tags", None)
metadata = ctx.params.get("metadata", False)
dynamic = ctx.params.get("dynamic", False)
kwargs = ctx.params.get("kwargs", {})

Expand All @@ -869,19 +915,49 @@ def _import_media_and_labels(ctx):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=10.0)

ctx.dataset.add_dir(
dataset_dir=dataset_dir,
dataset_type=dataset_type,
data_path=data_path,
labels_path=labels_path,
label_field=label_field,
tags=tags,
dynamic=dynamic,
**kwargs,
)
# @todo can remove version check if we require `fiftyone>=1.5.0`
if Version(foc.VERSION) < Version("1.5.0"):
ids = ctx.dataset.add_dir(
dataset_dir=dataset_dir,
dataset_type=dataset_type,
data_path=data_path,
labels_path=labels_path,
label_field=label_field,
tags=tags,
dynamic=dynamic,
**kwargs,
)

return
yield
if metadata:
for _ids in fou.iter_batches(ids, 10000):
ctx.dataset.select(_ids).compute_metadata(overwrite=True)
else:
num_added = 0
for ids in ctx.dataset.add_dir(
dataset_dir=dataset_dir,
dataset_type=dataset_type,
data_path=data_path,
labels_path=labels_path,
label_field=label_field,
tags=tags,
dynamic=dynamic,
generator=True,
**kwargs,
):
if metadata:
with fou.SuppressLogging(logging.INFO):
ctx.dataset.select(ids).compute_metadata(
overwrite=True, progress=False
)

if not ctx.delegated:
num_added += len(ids)
label = f"Loaded {num_added} samples"
yield ctx.trigger("set_progress", dict(label=label))

if not ctx.delegated:
label = f"Loaded {num_added} samples"
yield ctx.trigger("set_progress", dict(progress=1, label=label))


def _upload_labels_bytes(ctx, tmp_dir):
Expand Down Expand Up @@ -974,14 +1050,7 @@ def _upload_media(ctx, tasks):

num_uploaded = 0
num_total = len(tasks)

# @todo can switch to this if we require `fiftyone>=0.22.2`
# num_workers = fou.recommend_thread_pool_workers()

if hasattr(fou, "recommend_thread_pool_workers"):
num_workers = fou.recommend_thread_pool_workers()
else:
num_workers = fo.config.max_thread_pool_workers or 8
num_workers = fou.recommend_thread_pool_workers()

with multiprocessing.dummy.Pool(processes=num_workers) as pool:
for _ in pool.imap_unordered(_do_upload_media, tasks):
Expand Down
2 changes: 1 addition & 1 deletion plugins/io/fiftyone.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: "@voxel51/io"
description: A collection of import/export utilities
version: 1.1.0
version: 1.2.0
fiftyone:
version: ">=0.22.2"
url: https://github.com/voxel51/fiftyone-plugins/tree/main/plugins/io
Expand Down
80 changes: 24 additions & 56 deletions plugins/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1869,13 +1869,7 @@ def _compute_metadata_generator(
skip_failures=True,
warn_failures=True,
):
# @todo can switch to this if we require `fiftyone>=0.22.2`
# num_workers = fou.recommend_thread_pool_workers(num_workers)

if hasattr(fou, "recommend_thread_pool_workers"):
num_workers = fou.recommend_thread_pool_workers(num_workers)
elif num_workers is None:
num_workers = fo.config.max_thread_pool_workers or 8
num_workers = fou.recommend_thread_pool_workers(num_workers)

if not overwrite:
sample_collection = sample_collection.exists("metadata", False)
Expand All @@ -1890,38 +1884,35 @@ def _compute_metadata_generator(
return

inputs = zip(ids, filepaths, media_types)
values = {}

try:
num_computed = 0
with contextlib.ExitStack() as exit_context:
pb = fou.ProgressBar(total=num_total)
exit_context.enter_context(pb)

if num_workers > 1:
pool = multiprocessing.dummy.Pool(processes=num_workers)
exit_context.enter_context(pool)
tasks = pool.imap_unordered(_do_compute_metadata, inputs)
else:
tasks = map(_do_compute_metadata, inputs)
with contextlib.ExitStack() as context:
if num_workers > 1:
pool = multiprocessing.dummy.Pool(processes=num_workers)
context.enter_context(pool)
tasks = pool.imap_unordered(_do_compute_metadata, inputs)
else:
tasks = map(_do_compute_metadata, inputs)

for sample_id, metadata in pb(tasks):
values[sample_id] = metadata
num_computed = 0
with fou.get_default_batcher(
tasks, progress=True, total=num_total
) as batcher:
for batch in batcher:
sample_collection.set_values(
"metadata", dict(batch), key_field="id"
)

num_computed += 1
if num_computed % 10 == 0:
progress = num_computed / num_total
label = f"Computed {num_computed} of {num_total}"
yield ctx.trigger(
"set_progress", dict(progress=progress, label=label)
)
finally:
sample_collection.set_values("metadata", values, key_field="id")
num_computed += len(batch)
progress = num_computed / num_total
label = f"Computed {num_computed} of {num_total}"
yield ctx.trigger(
"set_progress", dict(progress=progress, label=label)
)

if skip_failures and not warn_failures:
return

num_missing = len(sample_collection.exists("metadata", False)) + 1
num_missing = len(sample_collection.exists("metadata", False))
if num_missing > 0:
msg = (
"Failed to populate metadata on %d samples. "
Expand All @@ -1937,33 +1928,10 @@ def _compute_metadata_generator(

def _do_compute_metadata(args):
sample_id, filepath, media_type = args
metadata = _compute_sample_metadata(
filepath, media_type, skip_failures=True
)
metadata = fomm._compute_sample_metadata(filepath, media_type)
return sample_id, metadata


def _compute_sample_metadata(filepath, media_type, skip_failures=False):
if not skip_failures:
return _get_metadata(filepath, media_type)

try:
return _get_metadata(filepath, media_type)
except:
return None


def _get_metadata(filepath, media_type):
if media_type == fom.IMAGE:
metadata = fomm.ImageMetadata.build_for(filepath)
elif media_type == fom.VIDEO:
metadata = fomm.VideoMetadata.build_for(filepath)
else:
metadata = fomm.Metadata.build_for(filepath)

return metadata


class GenerateThumbnails(foo.Operator):
@property
def config(self):
Expand Down
4 changes: 2 additions & 2 deletions plugins/utils/fiftyone.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: "@voxel51/utils"
description: A collection of utility operators
version: 1.2.0
version: 1.3.0
fiftyone:
version: ">=0.22"
version: ">=0.22.2"
url: https://github.com/voxel51/fiftyone-plugins/tree/main/plugins/utils
license: Apache 2.0
operators:
Expand Down