diff --git a/docs/source/api/methods.rst b/docs/source/api/methods.rst index 08b4185d..a78914b6 100644 --- a/docs/source/api/methods.rst +++ b/docs/source/api/methods.rst @@ -67,3 +67,11 @@ Utilities :members: :no-private-members: :no-special-members: + +Dimensionless Numbers +^^^^^^^^^^^^^^^^^^^^^ + +.. autoapimodule:: dimensionless_number_calculator + :members: + :no-private-members: + :no-special-members: diff --git a/notebooks/user_guide.ipynb b/notebooks/user_guide.ipynb index 242e4eef..fb65ae8c 100644 --- a/notebooks/user_guide.ipynb +++ b/notebooks/user_guide.ipynb @@ -3352,6 +3352,92 @@ ":::" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Dimensionless Numbers: Intrusion and Avoidance\n", + "\n", + "The dimensionless Intrusion number $\\mathcal{I}n$ and Avoidance number $\\mathcal{A}v$\n", + "classify crowd flow regimes based on personal space intrusions and collision anticipation\n", + "([Cordes et al., *PNAS Nexus*, 2024](https://doi.org/10.1093/pnasnexus/pgae120)).\n", + "\n", + "The **Intrusion number** quantifies encroachment on personal space:\n", + "\n", + "$$\n", + "\\mathcal{I}n_i = \\sum_{j \\in \\mathcal{N}_i}\n", + "\\left(\\frac{r_{\\text{soc}} - \\ell_{\\text{min}}}{r_{ij} - \\ell_{\\text{min}}}\\right)^2\n", + "$$\n", + "\n", + "The **Avoidance number** quantifies the imminence of collisions via the time-to-collision (TTC):\n", + "\n", + "$$\n", + "\\mathcal{A}v_i = \\max_{j \\in \\mathcal{N}'_i}\n", + "\\frac{\\tau_0}{\\tau_{ij}}\n", + "$$\n", + "\n", + "where $r_{\\text{soc}} = 0.8\\,$m is the social radius, $\\ell_{\\text{min}} = 0.2\\,$m the pedestrian diameter,\n", + "and $\\tau_0 = 3\\,$s a reference timescale." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pedpy import IntrusionMethod, compute_avoidance, compute_intrusion\n", + "\n", + "intrusion = compute_intrusion(traj_data=traj)\n", + "intrusion.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "avoidance = compute_avoidance(traj_data=traj, frame_step=5, tau_0=3.0, radius=0.2)\n", + "avoidance.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Temporal evolution\n", + "\n", + "We can visualize how these numbers evolve over time by averaging over all pedestrians per frame:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "mean_in = intrusion.groupby(\"frame\")[\"intrusion\"].mean()\n", + "mean_av = avoidance.groupby(\"frame\")[\"avoidance\"].mean()\n", + "\n", + "fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))\n", + "\n", + "ax1.plot(mean_in.index / traj.frame_rate, mean_in.values)\n", + "ax1.set_xlabel(\"Time / s\")\n", + "ax1.set_ylabel(r\"$\\langle \\mathcal{I}n \\rangle$\")\n", + "ax1.set_title(\"Mean Intrusion Number\")\n", + "\n", + "ax2.plot(mean_av.index / traj.frame_rate, mean_av.values)\n", + "ax2.set_xlabel(\"Time / s\")\n", + "ax2.set_ylabel(r\"$\\langle \\mathcal{A}v \\rangle$\")\n", + "ax2.set_title(\"Mean Avoidance Number\")\n", + "\n", + "fig.tight_layout()\n", + "plt.show()" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/pedpy/__init__.py b/pedpy/__init__.py index 50a37d79..11ddfdd5 100644 --- a/pedpy/__init__.py +++ b/pedpy/__init__.py @@ -12,6 +12,7 @@ from .column_identifier import ( ACC_COL, + AVOIDANCE_COL, A_X_COL, A_Y_COL, COUNT_COL, @@ -25,6 +26,7 @@ FRAME_COL, ID_COL, INTERSECTION_COL, + INTRUSION_COL, LAST_FRAME_COL, MEAN_SPEED_COL, MID_FRAME_COL, @@ -86,6 +88,11 @@ compute_passing_density, compute_voronoi_density, ) +from .methods.dimensionless_number_calculator import ( + IntrusionMethod, + compute_avoidance, + compute_intrusion, +) from .methods.flow_calculator import ( compute_flow, compute_line_flow, @@ -96,6 +103,7 @@ Cutoff, SpeedCalculation, compute_frame_range_in_area, + compute_individual_distances, compute_individual_voronoi_polygons, compute_intersecting_polygons, compute_neighbor_distance, @@ -188,6 +196,7 @@ "compute_frame_range_in_area", "compute_individual_voronoi_polygons", "compute_intersecting_polygons", + "compute_individual_distances", "compute_neighbors", "compute_neighbor_distance", "compute_time_distance_line", @@ -217,6 +226,9 @@ "compute_mean_acceleration_per_frame", "compute_voronoi_acceleration", "correct_invalid_trajectories", + "compute_intrusion", + "IntrusionMethod", + "compute_avoidance", "PEDPY_BLUE", "PEDPY_GREEN", "PEDPY_GREY", @@ -271,6 +283,8 @@ "MID_POSITION_COL", "END_POSITION_COL", "WINDOW_SIZE_COL", + "INTRUSION_COL", + "AVOIDANCE_COL", "__version__", "AccelerationError", "GeometryError", diff --git a/pedpy/column_identifier.py b/pedpy/column_identifier.py index bde0ee2f..2f4add48 100644 --- a/pedpy/column_identifier.py +++ b/pedpy/column_identifier.py @@ -44,3 +44,6 @@ SPEED_SP2_COL: Final = "s_sp-1" FLOW_SP1_COL: Final = "j_sp+1" FLOW_SP2_COL: Final = "j_sp-1" + +INTRUSION_COL: Final = "intrusion" +AVOIDANCE_COL: Final = "avoidance" diff --git a/pedpy/methods/dimensionless_number_calculator.py b/pedpy/methods/dimensionless_number_calculator.py new file mode 100644 index 00000000..c743f726 --- /dev/null +++ b/pedpy/methods/dimensionless_number_calculator.py @@ -0,0 +1,163 @@ +"""Dimensionless numbers for pedestrian crowd classification. + +Implements the Intrusion number and Avoidance number defined in +Cordes et al., PNAS Nexus 2024 (https://doi.org/10.1093/pnasnexus/pgae120). +""" + +from enum import Enum + +import numpy as np +import pandas as pd +import shapely + +from pedpy.column_identifier import ( + AVOIDANCE_COL, + FRAME_COL, + ID_COL, + INTRUSION_COL, +) +from pedpy.data.trajectory_data import TrajectoryData +from pedpy.methods.method_utils import ( + SpeedCalculation, + compute_individual_distances, +) +from pedpy.methods.speed_calculator import compute_individual_speed + + +class IntrusionMethod(Enum): # pylint: disable=too-few-public-methods + """Identifier of method to compute the intrusion.""" + + MAX = "max" + """Use the max value in neighborhood as intrusion.""" + SUM = "sum" + """Use the sum of all values in neighborhood as intrusion.""" + + +def compute_intrusion( + *, + traj_data: TrajectoryData, + r_soc: float = 0.8, + l_min: float = 0.2, + method: IntrusionMethod = IntrusionMethod.SUM, +) -> pd.DataFrame: + r"""Compute the intrusion number for each pedestrian per frame. + + The intrusion variable :math:`\mathcal{I}n_i` quantifies how much + other agents encroach on pedestrian *i*'s personal space + (Cordes et al. 2024, Eq. 1): + + .. math:: + + \mathcal{I}n_i = \sum_{j \in \mathcal{N}_i} + \left(\frac{r_\text{soc} - \ell_\text{min}} + {r_{ij} - \ell_\text{min}}\right)^{k_I}, + + with :math:`k_I = 2`. The neighbor set :math:`\mathcal{N}_i` contains + all agents *j* with :math:`r_{ij} \leq 3\,r_\text{soc}`. + + Args: + traj_data (TrajectoryData): trajectory data to analyze + r_soc (float): social radius in m (default 0.8) + l_min (float): pedestrian diameter in m (default 0.2) + method (IntrusionMethod): aggregation over neighbors, + ``SUM`` (default, as in the paper) or ``MAX`` + + Returns: + DataFrame with columns 'id', 'frame', and 'intrusion'. + Agents with no neighbors within the cutoff (or only at + exactly ``l_min`` distance) are absent from the result. + """ + intrusion = compute_individual_distances(traj_data=traj_data) + intrusion = intrusion.loc[(intrusion.distance <= 3 * r_soc) & (intrusion.distance > l_min)].copy() + intrusion.loc[:, INTRUSION_COL] = ((r_soc - l_min) / (intrusion.distance - l_min)) ** 2 + intrusion = ( + intrusion.groupby(by=[ID_COL, FRAME_COL]) + .agg( + intrusion=(INTRUSION_COL, method.value), + ) + .reset_index() + ) + + return intrusion + + +def compute_avoidance( + *, + traj_data: TrajectoryData, + frame_step: int, + radius: float = 0.4, + tau_0: float, +) -> pd.DataFrame: + r"""Compute the avoidance number for each pedestrian per frame. + + The avoidance variable :math:`\mathcal{A}v_i` quantifies the + imminence of collisions that pedestrian *i* faces, based on + the time-to-collision (TTC) with neighbors + (Cordes et al. 2024, Eq. 2): + + .. math:: + + \mathcal{A}v_i = \sum_{j \in \mathcal{N}'_i} + \left(\frac{\tau_0}{\tau_{ij}}\right)^{k_A}, + + with :math:`k_A = 1`. The neighbor set :math:`\mathcal{N}'_i` is + restricted to the agent with the shortest TTC :math:`\tau_{ij}`, + implemented by taking the ``max`` over :math:`\tau_0 / \tau_{ij}`. + + Args: + traj_data (TrajectoryData): trajectory data to analyze + frame_step (int): number of frames used for velocity computation + radius (float): disc diameter :math:`\ell_\text{soc}` for TTC + computation in m (default 0.4, as in the paper) + tau_0 (float): reference timescale in s (paper uses 3.0) + + Returns: + DataFrame with columns 'id', 'frame', and 'avoidance' + """ + velocity = compute_individual_speed( + traj_data=traj_data, + frame_step=frame_step, + compute_velocity=True, + speed_calculation=SpeedCalculation.BORDER_SINGLE_SIDED, + ) + + data = traj_data.data.merge(velocity, on=[ID_COL, FRAME_COL]) + data["velocity"] = shapely.points(data.v_x, data.v_y) + + matrix = data.merge(data, how="inner", on=FRAME_COL, suffixes=("", "_neighbor")) + matrix = matrix[matrix[ID_COL] != matrix[f"{ID_COL}_neighbor"]] + + pos = shapely.get_coordinates(matrix.point) + pos_neighbor = shapely.get_coordinates(matrix.point_neighbor) + distance = np.linalg.norm(pos - pos_neighbor, axis=1) + + delta_v = shapely.get_coordinates(matrix.velocity) - shapely.get_coordinates(matrix.velocity_neighbor) + delta_v_norm = np.linalg.norm(delta_v, axis=1) + + # only compute for pairs with nonzero distance and nonzero relative velocity + computable = (distance > 0) & (delta_v_norm > 0) + + ttc = np.full(matrix.shape[0], np.inf) + + if np.any(computable): + d_c = distance[computable] + dv_c = delta_v[computable] + dv_norm_c = delta_v_norm[computable] + + e_v = (pos[computable] - pos_neighbor[computable]) / d_c[:, np.newaxis] + v_rel_hat = dv_c / dv_norm_c[:, np.newaxis] + cos_alpha = np.sum(e_v * v_rel_hat, axis=1) + + capital_a = (cos_alpha**2 - 1) * d_c**2 + radius**2 + sqrt_a_safe = np.sqrt(np.maximum(capital_a, 0.0)) + + valid = (capital_a >= 0) & (-cos_alpha * d_c - sqrt_a_safe >= 0) + + idx = np.where(computable)[0][valid] + ttc[idx] = (-cos_alpha[valid] * d_c[valid] - sqrt_a_safe[valid]) / dv_norm_c[valid] + + matrix = matrix.copy() + matrix[AVOIDANCE_COL] = tau_0 / ttc + + avoidance = matrix.groupby(by=[ID_COL, FRAME_COL], as_index=False).agg(avoidance=(AVOIDANCE_COL, "max")) + return avoidance diff --git a/pedpy/methods/method_utils.py b/pedpy/methods/method_utils.py index 4153ce1d..30d83d99 100644 --- a/pedpy/methods/method_utils.py +++ b/pedpy/methods/method_utils.py @@ -1295,3 +1295,35 @@ def is_individual_speed_valid( return DataValidationStatus.ENTRY_MISSING return DataValidationStatus.DATA_CORRECT + + +def compute_individual_distances(*, traj_data: TrajectoryData) -> pd.DataFrame: + """Compute pairwise distances between all pedestrians per frame. + + Args: + traj_data: trajectory data + + Returns: + DataFrame with columns 'id', 'frame', 'neighbor_id', and 'distance' + """ + neighbor_point_col = f"{POINT_COL}_{NEIGHBOR_ID_COL}" + points = traj_data.data[[ID_COL, FRAME_COL, POINT_COL]] + neighbor_points = points.rename( + columns={ + ID_COL: NEIGHBOR_ID_COL, + POINT_COL: neighbor_point_col, + } + ) + matrix = points.merge( + neighbor_points, + how="inner", + on=FRAME_COL, + ) + matrix = matrix[matrix[ID_COL] != matrix[NEIGHBOR_ID_COL]] + matrix[DISTANCE_COL] = np.linalg.norm( + shapely.get_coordinates(matrix[POINT_COL]) - shapely.get_coordinates(matrix[neighbor_point_col]), + axis=1, + ) + + matrix = matrix.drop(columns=[POINT_COL, neighbor_point_col]) + return matrix.reset_index(drop=True) diff --git a/tests/unit_tests/methods/test_dimensionless_number_calculator.py b/tests/unit_tests/methods/test_dimensionless_number_calculator.py new file mode 100644 index 00000000..5fe886cf --- /dev/null +++ b/tests/unit_tests/methods/test_dimensionless_number_calculator.py @@ -0,0 +1,145 @@ +import pandas as pd +import pytest + +from pedpy.column_identifier import AVOIDANCE_COL, FRAME_COL, ID_COL, INTRUSION_COL +from pedpy.data.trajectory_data import TrajectoryData +from pedpy.methods.dimensionless_number_calculator import ( + IntrusionMethod, + compute_avoidance, + compute_intrusion, +) + + +def _make_traj(positions, frame_rate=10): + """Build a single-frame TrajectoryData from a list of (x, y) positions.""" + rows = [] + for pid, (x, y) in enumerate(positions): + rows.append({"id": pid, "frame": 0, "x": x, "y": y}) + return TrajectoryData(data=pd.DataFrame(rows), frame_rate=frame_rate) + + +# --------------------------------------------------------------------------- +# Intrusion +# --------------------------------------------------------------------------- + + +class TestComputeIntrusion: + """Tests for compute_intrusion (Cordes et al. 2024, Eq. 1).""" + + def test_two_agents_known_distance(self): + """Two agents at known distance, check In_i by hand.""" + r_soc = 0.8 + l_min = 0.2 + d = 0.5 # distance between the two agents + # Expected per-pair intrusion: ((0.8-0.2)/(0.5-0.2))^2 = (0.6/0.3)^2 = 4.0 + traj = _make_traj([(0, 0), (d, 0)]) + result = compute_intrusion(traj_data=traj, r_soc=r_soc, l_min=l_min) + + for pid in [0, 1]: + val = result.loc[result[ID_COL] == pid, INTRUSION_COL].iloc[0] + assert val == pytest.approx(4.0) + + def test_neighbor_cutoff_excludes_distant_agents(self): + """Agents beyond 3*r_soc should not contribute to intrusion.""" + r_soc = 0.8 + l_min = 0.2 + far = 3 * r_soc + 0.1 # just beyond cutoff + traj = _make_traj([(0, 0), (far, 0)]) + result = compute_intrusion(traj_data=traj, r_soc=r_soc, l_min=l_min) + + # Both agents should be absent (no neighbors within cutoff) + assert result.empty + + def test_three_agents_sum(self): + """Three agents: agent 0 has two neighbors, check sum.""" + r_soc = 0.8 + l_min = 0.2 + d1 = 0.5 + d2 = 0.6 + traj = _make_traj([(0, 0), (d1, 0), (0, d2)]) + + result = compute_intrusion(traj_data=traj, r_soc=r_soc, l_min=l_min, method=IntrusionMethod.SUM) + + in_01 = ((r_soc - l_min) / (d1 - l_min)) ** 2 + in_02 = ((r_soc - l_min) / (d2 - l_min)) ** 2 + expected_0 = in_01 + in_02 + + val = result.loc[result[ID_COL] == 0, INTRUSION_COL].iloc[0] + assert val == pytest.approx(expected_0) + + def test_method_max(self): + """IntrusionMethod.MAX returns the max over neighbors.""" + r_soc = 0.8 + l_min = 0.2 + d1 = 0.5 + d2 = 0.6 + traj = _make_traj([(0, 0), (d1, 0), (0, d2)]) + + result = compute_intrusion(traj_data=traj, r_soc=r_soc, l_min=l_min, method=IntrusionMethod.MAX) + + in_01 = ((r_soc - l_min) / (d1 - l_min)) ** 2 + in_02 = ((r_soc - l_min) / (d2 - l_min)) ** 2 + expected_0 = max(in_01, in_02) + + val = result.loc[result[ID_COL] == 0, INTRUSION_COL].iloc[0] + assert val == pytest.approx(expected_0) + + +# --------------------------------------------------------------------------- +# Avoidance +# --------------------------------------------------------------------------- + + +def _make_moving_traj(agent_data, frame_rate=10): + """Build multi-frame TrajectoryData for agents with constant velocity. + + agent_data: list of (x0, y0, vx, vy) per agent. + Creates two frames separated by dt = 1/frame_rate so velocities resolve. + """ + rows = [] + dt = 1.0 / frame_rate + for pid, (x0, y0, vx, vy) in enumerate(agent_data): + for frame in range(3): + rows.append( + { + "id": pid, + "frame": frame, + "x": x0 + vx * frame * dt, + "y": y0 + vy * frame * dt, + } + ) + return TrajectoryData(data=pd.DataFrame(rows), frame_rate=frame_rate) + + +class TestComputeAvoidance: + """Tests for compute_avoidance (Cordes et al. 2024, Eq. 2).""" + + def test_head_on_collision(self): + """Two agents approaching head-on: TTC is analytically known.""" + # Agent 0 at x=-2, moving right at v=1 + # Agent 1 at x=+2, moving left at v=-1 + # At frame 1 (dt=0.1): positions are -1.9 and +1.9, distance=3.8 + # Relative speed = 2, TTC = (d - R) / |delta_v| = (3.8 - 0.2) / 2 = 1.8 + radius = 0.2 + tau_0 = 3.0 + d_at_frame1 = 3.8 + expected_ttc = (d_at_frame1 - radius) / 2.0 + expected_av = tau_0 / expected_ttc + + traj = _make_moving_traj([(-2, 0, 1, 0), (2, 0, -1, 0)], frame_rate=10) + result = compute_avoidance(traj_data=traj, frame_step=1, radius=radius, tau_0=tau_0) + + # Check frame 1 (middle frame where velocity is computed) + row = result[(result[ID_COL] == 0) & (result[FRAME_COL] == 1)] + assert len(row) == 1 + assert row[AVOIDANCE_COL].iloc[0] == pytest.approx(expected_av, rel=0.05) + + def test_diverging_agents_zero_avoidance(self): + """Two agents moving apart should have Av = 0 (TTC = inf).""" + tau_0 = 3.0 + traj = _make_moving_traj([(-2, 0, -1, 0), (2, 0, 1, 0)], frame_rate=10) + result = compute_avoidance(traj_data=traj, frame_step=1, radius=0.2, tau_0=tau_0) + + row = result[(result[ID_COL] == 0) & (result[FRAME_COL] == 1)] + assert len(row) == 1 + assert row[AVOIDANCE_COL].iloc[0] == pytest.approx(0.0, abs=1e-10)