Skip to content

Commit 5614928

Browse files
rvirani1clauderafel-roboflow
authored
Make roboflow_vision_events@v1 non-SIMD so scalar fields accept dynamic step outputs (#2231)
* Make roboflow_vision_events@v1 non-SIMD so scalar fields accept dynamic step outputs The SIMD declaration caused the compile-time checker to reject any batch-oriented selector on scalar metadata fields like item_count, location, item_type, etc. Since the block never performed any shared per-batch computation (just a serial for-loop), SIMD was unnecessary. This aligns with the other sink blocks (email_notification, webhook, event_writer) which are all non-SIMD. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add regression tests for non-SIMD compile-time batch compatibility (ENT-1126) Adds two tests that guard against re-introducing SIMD on vision_events: - test_manifest_is_not_simd: asserts accepts_batch_input() is False - test_batch_selector_on_scalar_field_passes_compile_check: exercises the exact graph constructor check (verify_declared_batch_compatibility_against_actual_inputs) that previously rejected batch-oriented selectors on scalar metadata fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Rafel Bennasar <253519461+rafel-roboflow@users.noreply.github.com>
1 parent 9157446 commit 5614928

2 files changed

Lines changed: 130 additions & 86 deletions

File tree

  • inference/core/workflows/core_steps/sinks/roboflow/vision_events
  • tests/workflows/unit_tests/core_steps/sinks/roboflow/vision_events

inference/core/workflows/core_steps/sinks/roboflow/vision_events/v1.py

Lines changed: 48 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
PREDICTION_TYPE_KEY,
2222
)
2323
from inference.core.workflows.execution_engine.entities.base import (
24-
Batch,
2524
OutputDefinition,
2625
WorkflowImageData,
2726
)
@@ -355,10 +354,6 @@ class BlockManifest(WorkflowBlockManifest):
355354
examples=[False, "$inputs.disable_vision_events"],
356355
)
357356

358-
@classmethod
359-
def get_parameters_accepting_batches(cls) -> List[str]:
360-
return ["input_image", "output_image", "predictions"]
361-
362357
@classmethod
363358
def describe_outputs(cls) -> List[OutputDefinition]:
364359
return [
@@ -393,9 +388,9 @@ def get_manifest(cls) -> Type[WorkflowBlockManifest]:
393388

394389
def run(
395390
self,
396-
input_image: Optional[Batch[WorkflowImageData]],
397-
output_image: Optional[Batch[WorkflowImageData]],
398-
predictions: Optional[Batch[Union[sv.Detections, dict]]],
391+
input_image: Optional[WorkflowImageData],
392+
output_image: Optional[WorkflowImageData],
393+
predictions: Optional[Union[sv.Detections, dict]],
399394
event_type: str,
400395
solution: str,
401396
custom_metadata: Dict[str, Any],
@@ -420,79 +415,56 @@ def run(
420415
"https://docs.roboflow.com/api-reference/authentication"
421416
"#retrieve-an-api-key to learn how to retrieve one."
422417
)
423-
# Determine batch size from whichever image input is provided
424-
batch_size = 1
425-
if input_image is not None:
426-
batch_size = len(input_image)
427-
elif output_image is not None:
428-
batch_size = len(output_image)
429-
elif predictions is not None:
430-
batch_size = len(predictions)
431418

432419
if disable_sink:
433-
return [
434-
{
435-
"error_status": False,
436-
"message": "Sink was disabled by parameter `disable_sink`",
437-
}
438-
for _ in range(batch_size)
439-
]
420+
return {
421+
"error_status": False,
422+
"message": "Sink was disabled by parameter `disable_sink`",
423+
}
440424

441-
input_images = [None] * batch_size if input_image is None else input_image
442-
output_images = [None] * batch_size if output_image is None else output_image
443-
predictions_list = [None] * batch_size if predictions is None else predictions
444-
445-
result = []
446-
for img_in, img_out, pred in zip(input_images, output_images, predictions_list):
447-
event_data = _build_event_data(
448-
event_type=event_type,
449-
external_id=external_id,
450-
qc_result=qc_result,
451-
location=location,
452-
item_count=item_count,
453-
item_type=item_type,
454-
alert_type=alert_type,
455-
severity=severity,
456-
alert_description=alert_description,
457-
custom_value=custom_value,
458-
related_event_id=related_event_id,
459-
feedback=feedback,
460-
)
461-
462-
task = partial(
463-
_execute_vision_event,
464-
api_base_url=API_BASE_URL,
465-
api_key=self._api_key,
466-
input_image=img_in,
467-
output_image=img_out,
468-
prediction=pred,
469-
event_type=event_type,
470-
solution=solution,
471-
event_data=event_data,
472-
custom_metadata=custom_metadata,
473-
)
425+
event_data = _build_event_data(
426+
event_type=event_type,
427+
external_id=external_id,
428+
qc_result=qc_result,
429+
location=location,
430+
item_count=item_count,
431+
item_type=item_type,
432+
alert_type=alert_type,
433+
severity=severity,
434+
alert_description=alert_description,
435+
custom_value=custom_value,
436+
related_event_id=related_event_id,
437+
feedback=feedback,
438+
)
474439

475-
if fire_and_forget and self._background_tasks:
476-
self._background_tasks.add_task(task)
477-
result.append(
478-
{
479-
"error_status": False,
480-
"message": "Vision event sent in background task",
481-
}
482-
)
483-
elif fire_and_forget and self._thread_pool_executor:
484-
self._thread_pool_executor.submit(task)
485-
result.append(
486-
{
487-
"error_status": False,
488-
"message": "Vision event sent in background task",
489-
}
490-
)
491-
else:
492-
error_status, message = task()
493-
result.append({"error_status": error_status, "message": message})
440+
task = partial(
441+
_execute_vision_event,
442+
api_base_url=API_BASE_URL,
443+
api_key=self._api_key,
444+
input_image=input_image,
445+
output_image=output_image,
446+
prediction=predictions,
447+
event_type=event_type,
448+
solution=solution,
449+
event_data=event_data,
450+
custom_metadata=custom_metadata,
451+
)
494452

495-
return result
453+
if fire_and_forget and self._background_tasks:
454+
self._background_tasks.add_task(task)
455+
return {
456+
"error_status": False,
457+
"message": "Vision event sent in background task",
458+
}
459+
elif fire_and_forget and self._thread_pool_executor:
460+
self._thread_pool_executor.submit(task)
461+
return {
462+
"error_status": False,
463+
"message": "Vision event sent in background task",
464+
}
465+
else:
466+
error_status, message = task()
467+
return {"error_status": error_status, "message": message}
496468

497469

498470
def _build_event_data(

tests/workflows/unit_tests/core_steps/sinks/roboflow/vision_events/test_v1.py

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -453,10 +453,9 @@ def test_run_disabled() -> None:
453453
fire_and_forget=False,
454454
disable_sink=True,
455455
)
456-
assert isinstance(result, list)
457-
assert len(result) == 1
458-
assert result[0]["error_status"] is False
459-
assert "disabled" in result[0]["message"].lower()
456+
assert isinstance(result, dict)
457+
assert result["error_status"] is False
458+
assert "disabled" in result["message"].lower()
460459

461460

462461
@patch(
@@ -481,8 +480,8 @@ def test_run_fire_and_forget_background_tasks(mock_execute: MagicMock) -> None:
481480
)
482481

483482
background_tasks.add_task.assert_called_once()
484-
assert result[0]["error_status"] is False
485-
assert "background" in result[0]["message"].lower()
483+
assert result["error_status"] is False
484+
assert "background" in result["message"].lower()
486485

487486

488487
@patch(
@@ -507,8 +506,8 @@ def test_run_fire_and_forget_thread_pool(mock_execute: MagicMock) -> None:
507506
)
508507

509508
thread_pool.submit.assert_called_once()
510-
assert result[0]["error_status"] is False
511-
assert "background" in result[0]["message"].lower()
509+
assert result["error_status"] is False
510+
assert "background" in result["message"].lower()
512511

513512

514513
@patch(
@@ -533,5 +532,78 @@ def test_run_synchronous(mock_execute: MagicMock) -> None:
533532
)
534533

535534
mock_execute.assert_called_once()
536-
assert result[0]["error_status"] is False
537-
assert result[0]["message"] == "Vision event sent successfully"
535+
assert result["error_status"] is False
536+
assert result["message"] == "Vision event sent successfully"
537+
538+
539+
# === Non-SIMD / Compilation Regression Tests (ENT-1126) ===
540+
541+
542+
def test_manifest_is_not_simd() -> None:
543+
"""Block must be non-SIMD so the engine broadcasts scalar params per image."""
544+
assert BlockManifest.accepts_batch_input() is False
545+
546+
547+
def test_batch_selector_on_scalar_field_passes_compile_check() -> None:
548+
"""Regression test for ENT-1126.
549+
550+
Exercises ``verify_declared_batch_compatibility_against_actual_inputs``
551+
(graph_constructor.py:1659), the exact compile-time check that rejected
552+
batch-oriented selectors on ``item_count`` when the block was SIMD.
553+
554+
A StepNode backed by our manifest with a batch-oriented input on
555+
``item_count`` must NOT raise ``ExecutionGraphStructureError``.
556+
"""
557+
from inference.core.workflows.errors import ExecutionGraphStructureError
558+
from inference.core.workflows.execution_engine.v1.compiler.entities import (
559+
DynamicStepInputDefinition,
560+
NodeInputCategory,
561+
ParameterSpecification,
562+
StepNode,
563+
)
564+
from inference.core.workflows.execution_engine.v1.compiler.graph_constructor import (
565+
verify_declared_batch_compatibility_against_actual_inputs,
566+
)
567+
568+
manifest = BlockManifest(
569+
type="roboflow_core/roboflow_vision_events@v1",
570+
name="vision_events",
571+
event_type="inventory_count",
572+
solution="test-solution",
573+
item_count="$steps.counter.count",
574+
)
575+
576+
step_node = StepNode(
577+
node_category="step_node",
578+
name="vision_events",
579+
selector="$steps.vision_events",
580+
data_lineage=[],
581+
step_manifest=manifest,
582+
input_data={
583+
"item_count": DynamicStepInputDefinition(
584+
parameter_specification=ParameterSpecification(
585+
parameter_name="item_count",
586+
nested_element_key=None,
587+
nested_element_index=None,
588+
),
589+
category=NodeInputCategory.BATCH_STEP_OUTPUT,
590+
data_lineage=["<workflow_input>"],
591+
selector="$steps.counter.count",
592+
),
593+
},
594+
batch_oriented_parameters=set(),
595+
)
596+
597+
# batch_compatibility_of_properties says item_count is NOT batch-compatible
598+
batch_compat = {"item_count": {False}}
599+
600+
# Must not raise. Before the fix (when accepts_batch_input() was True),
601+
# this exact call raised ExecutionGraphStructureError because a
602+
# batch-oriented selector was plugged into a non-batch parameter.
603+
result = verify_declared_batch_compatibility_against_actual_inputs(
604+
node="$steps.vision_events",
605+
step_node_data=step_node,
606+
input_data=step_node.input_data,
607+
batch_compatibility_of_properties=batch_compat,
608+
)
609+
assert isinstance(result, set)

0 commit comments

Comments
 (0)