Skip to content
3 changes: 3 additions & 0 deletions docs/apis/evaluation/dataset.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<!-- prettier-ignore-start -->
::: t4_devkit.evaluation.dataset
<!-- prettier-ignore-end -->
7 changes: 7 additions & 0 deletions docs/apis/evaluation/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `evaluation`

- [Evaluation Tasks](./task.md)
- [Load Dataset](./dataset.md)
- [Matching Boxes](./matching.md)
- [Matching Results](./result.md)
- [Metrics](./metric.md)
9 changes: 9 additions & 0 deletions docs/apis/evaluation/matching.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<!-- prettier-ignore-start -->
::: t4_devkit.evaluation.matching.parameter

::: t4_devkit.evaluation.matching.scorer

::: t4_devkit.evaluation.matching.policy

::: t4_devkit.evaluation.matching.algorithm
<!-- prettier-ignore-end -->
5 changes: 5 additions & 0 deletions docs/apis/evaluation/metric.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<!-- prettier-ignore-start -->
::: t4_devkit.evaluation.metric.ap

::: t4_devkit.evaluation.metric.clear
<!-- prettier-ignore-end -->
5 changes: 5 additions & 0 deletions docs/apis/evaluation/result.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<!-- prettier-ignore-start -->
::: t4_devkit.evaluation.result.box

::: t4_devkit.evaluation.result.status
<!-- prettier-ignore-end -->
3 changes: 3 additions & 0 deletions docs/apis/evaluation/task.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<!-- prettier-ignore-start -->
::: t4_devkit.evaluation.task
<!-- prettier-ignore-end -->
7 changes: 7 additions & 0 deletions mkdocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ nav:
- Serialize Schema: apis/schema/serialize.md
- t4_devkit.dataclass: apis/dataclass.md
- t4_devkit.filtering: apis/filtering.md
- t4_devkit.evaluation:
- Home: apis/evaluation/index.md
- Evaluation Tasks: apis/evaluation/task.md
- Load Dataset: apis/evaluation/dataset.md
- Matching Boxes: apis/evaluation/matching.md
- Matching Results: apis/evaluation/result.md
- Metrics: apis/evaluation/metric.md
- t4_devkit.viewer: apis/viewer.md
- t4_devkit.common: apis/common.md

Expand Down
27 changes: 27 additions & 0 deletions t4_devkit/dataclass/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@ def corners(self, box_scale: float = 1.0) -> NDArrayF64:
# Rotate and translate
return np.dot(self.rotation.rotation_matrix, corners).T + self.position

def diff_yaw(self, other: Box3D) -> float:
"""Return the yaw difference between the two boxes.

Args:
other (Box3D): Another box.

Raises:
ValueError: Both boxes must have the same `frame_id`.

Returns:
Yaw difference in the range of [-pi, pi].
"""
if self.frame_id != other.frame_id:
raise ValueError(f"Invalid frame comparison: {self.frame_id=} and {other.frame_id=}")

yaw1, *_ = self.rotation.yaw_pitch_roll
yaw2, *_ = other.rotation.yaw_pitch_roll

def _clip(diff: float) -> float:
if diff < -np.pi:
diff += 2 * np.pi
elif diff > np.pi:
diff -= 2 * np.pi
return diff

return _clip(yaw2 - yaw1)


@define(eq=False)
class Box2D(BaseBox):
Expand Down
7 changes: 7 additions & 0 deletions t4_devkit/evaluation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .dataset import * # noqa
from .matching import * # noqa
from .result import * # noqa
from .matching import * # noqa
from .result import * # noqa
from .metric import * # noqa
from .task import * # noqa
124 changes: 124 additions & 0 deletions t4_devkit/evaluation/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from attrs import define

from t4_devkit import Tier4
from t4_devkit.dataclass import HomogeneousMatrix, TransformBuffer

from .task import EvaluationTask

if TYPE_CHECKING:
from t4_devkit.dataclass import BoxLike
from t4_devkit.schema import EgoPose, Sensor


__all__ = ["load_dataset", "FrameGroundTruth", "SceneGroundTruth"]


def load_dataset(data_root: str, task: EvaluationTask) -> SceneGroundTruth:
"""Load dataset.

Args:
data_root (str): Root directory path to the dataset.
task (EvaluationTask): Evaluation task.

Returns:
SceneGroundTruth: Loaded container of ground truths.
"""
t4 = Tier4("annotation", data_root=data_root, verbose=False)

frames: list[FrameGroundTruth] = []
for i, sample in enumerate(t4.sample):
# annotation boxes
boxes = (
list(map(t4.get_box3d, sample.ann_3ds))
if task.is_3d()
else list(map(t4.get_box2d, sample.ann_2ds))
)

# transformation matrix from ego to map
ego_pose = _closest_ego_pose(t4, sample.timestamp)
ego2map = HomogeneousMatrix(
position=ego_pose.translation,
rotation=ego_pose.rotation,
src="base_link",
dst="map",
)

frames.append(
FrameGroundTruth(
unix_time=sample.timestamp,
frame_index=i,
boxes=boxes,
ego2map=ego2map,
)
)

# transformation matrices from ego to each sensor
ego2sensors = TransformBuffer()
for cs_record in t4.calibrated_sensor:
sensor: Sensor = t4.get("sensor", cs_record.sensor_token)
matrix = HomogeneousMatrix(
position=cs_record.translation,
rotation=cs_record.rotation,
src="base_link",
dst=sensor.channel,
)

ego2sensors.set_transform(matrix)

return SceneGroundTruth(data_root=data_root, frames=frames, ego2sensors=ego2sensors)


def _closest_ego_pose(t4: Tier4, timestamp: int) -> EgoPose:
"""Lookup the ego pose record at the closest timestamp."""
return min(t4.ego_pose, key=lambda e: abs(e.timestamp - timestamp))


@define
class FrameGroundTruth:
"""A container of boxes at a single frame.

Attributes:
unix_time (int): Unix timestamp.
frame_index (int): Index number of the frame.
boxes (list[BoxLike]): List of ground truth instances.
ego2map (HomogeneousMatrix): Transformation matrix from ego to map coordinate.
"""

unix_time: int
frame_index: int
boxes: list[BoxLike]
ego2map: HomogeneousMatrix


@define
class SceneGroundTruth:
"""A container of frame ground truths.

Attributes:
data_root (str): Root directory path to the dataset.
frames (list[FrameGroundTruth]): List of frame ground truths.
ego2sensors (TransformBuffer): Buffer of transformation matrices from ego to each sensor coordinates.
"""

data_root: str
frames: list[FrameGroundTruth]
ego2sensors: TransformBuffer

def lookup_frame(self, unix_time: int, tolerance: int) -> FrameGroundTruth | None:
"""Lookup the closest set of ground truth frame.

Return None if the minimum time difference exceeds `tolerance`.

Args:
unix_time (int): Unix timestamp.
tolerance (int): Time difference tolerance in micro seconds.

Returns:
Return frame ground truth if succeeded, otherwise None.
"""
closest = min(self.frames, key=lambda f: abs(unix_time - f.unix_time))
return closest if abs(unix_time - closest.unix_time) <= tolerance else None
4 changes: 4 additions & 0 deletions t4_devkit/evaluation/matching/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .algorithm import * # noqa
from .parameter import * # noqa
from .policy import * # noqa
from .scorer import * # noqa
150 changes: 150 additions & 0 deletions t4_devkit/evaluation/matching/algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from copy import deepcopy
from typing import TYPE_CHECKING, Sequence, TypeVar

import numpy as np

from ..result import BoxMatch

if TYPE_CHECKING:
from t4_devkit.dataclass import BoxLike
from t4_devkit.typing import NDArrayF64

from .policy import MatchingPolicyLike
from .scorer import MatchingScorerLike

__all__ = ["GreedyMatcher", "MatchingAlgorithmLike"]


# ===== Base Class for Matching Algorithm =====


class MatchingAlgorithmImpl(ABC):
"""Abstract base class for matching algorithm class."""

def __init__(
self,
scorer: MatchingScorerLike,
policy: MatchingPolicyLike,
matchable_threshold: float,
) -> None:
super().__init__()
self._scorer = scorer
self._policy = policy
self._matchable_threshold = matchable_threshold

def __call__(
self,
estimations: Sequence[BoxLike],
ground_truths: Sequence[BoxLike],
) -> list[BoxMatch]:
"""Execute matching.

Args:
estimations (Sequence[BoxLike]): Sequence of estimations.
ground_truths (Sequence[BoxLike]): Sequence of ground truths.

Returns:
list[BoxMatch]: List of matches.
"""
score_table = self._score_table(estimations, ground_truths)
return self._do_matching(estimations, ground_truths, score_table)

def _score_table(
self,
estimations: Sequence[BoxLike],
ground_truths: Sequence[BoxLike],
) -> NDArrayF64:
"""Create a score table.

Args:
estimations (Sequence[BoxLike]): Sequence of estimations.
ground_truths (Sequence[BoxLike]): Sequence of ground truths.

Returns:
NDArrayF64: Score table in the shape of (NumEst, NumGT).
"""
num_rows, num_cols = len(estimations), len(ground_truths)

table: NDArrayF64 = np.full((num_rows, num_cols), fill_value=np.nan)
for i, box1 in enumerate(estimations):
for j, box2 in enumerate(ground_truths):
if box1.frame_id != box2.frame_id:
continue

score = self._scorer(box1, box2)

# check if boxes distance and label is matchable
if self._scorer.is_better_than(
score, self._matchable_threshold
) and self._policy.is_matchable(box1, box2):
table[i, j] = score

return table

def _get_indices(self, score_table: NDArrayF64) -> tuple[int, int]:
"""Return indices of estimation and ground truth in the score table at the best score.

Args:
score_table (NDArrayF64): Score table in the shape of (NumEst, NumGt).

Returns:
Estimation index and ground truth index.
"""
estimation_idx, ground_truth_idx = (
np.unravel_index(np.nanargmin(score_table), score_table.shape)
if self._scorer.is_smaller_score_better()
else np.unravel_index(np.nanargmax(score_table), score_table.shape)
)
return estimation_idx, ground_truth_idx

@abstractmethod
def _do_matching(
self,
estimations: Sequence[BoxLike],
ground_truths: Sequence[BoxLike],
score_table: NDArrayF64,
) -> list[BoxMatch]:
pass


MatchingAlgorithmLike = TypeVar("MatchingAlgorithmLike", bound=MatchingAlgorithmImpl)


# ===== Specific Matching Algorithms =====


class GreedyMatcher(MatchingAlgorithmImpl):
def _do_matching(
self,
estimations: Sequence[BoxLike],
ground_truths: Sequence[BoxLike],
score_table: NDArrayF64,
) -> list[BoxMatch]:
tmp_estimations = list(deepcopy(estimations))
tmp_ground_truths = list(deepcopy(ground_truths))

output: list[BoxMatch] = []
# 1. match the nearest matchable estimations and GTs
num_estimations, *_ = score_table.shape
for _ in range(num_estimations):
if np.isnan(score_table).all():
break

estimation_idx, ground_truth_idx = self._get_indices(score_table)

estimation_picked = tmp_estimations.pop(estimation_idx)
ground_truth_picked = tmp_ground_truths.pop(ground_truth_idx)
output.append(BoxMatch(estimation_picked, ground_truth_picked))

# remove picked estimations and GTs
score_table = np.delete(score_table, estimation_idx, axis=0)
score_table = np.delete(score_table, ground_truth_idx, axis=1)

# 2. assign remaining estimations(=FPs) and GTs(=FNs)
output += [BoxMatch(estimation=estimation) for estimation in tmp_estimations]
output += [BoxMatch(ground_truth=ground_truth) for ground_truth in tmp_ground_truths]

return output
Loading
Loading