-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathmethod_utils.py
More file actions
1329 lines (1081 loc) · 48.2 KB
/
method_utils.py
File metadata and controls
1329 lines (1081 loc) · 48.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Helper functions for the analysis methods."""
# pylint: disable=C0302
import itertools
import logging
import warnings
from dataclasses import dataclass
from enum import Enum, auto
from typing import Callable, Final, Optional, Tuple, TypeAlias
import numpy as np
import pandas as pd
import shapely
from pedpy.column_identifier import (
CROSSING_FRAME_COL,
DENSITY_COL,
DISTANCE_COL,
END_POSITION_COL,
FIRST_FRAME_COL,
FRAME_COL,
ID_COL,
INTERSECTION_COL,
LAST_FRAME_COL,
MID_POSITION_COL,
NEIGHBORS_COL,
NEIGHBOR_ID_COL,
POINT_COL,
POLYGON_COL,
SPECIES_COL,
START_POSITION_COL,
TIME_COL,
V_X_COL,
V_Y_COL,
WINDOW_SIZE_COL,
X_COL,
Y_COL,
)
from pedpy.data.geometry import MeasurementArea, MeasurementLine, WalkableArea
from pedpy.data.trajectory_data import TrajectoryData
from pedpy.errors import PedPyValueError
_log = logging.getLogger(__name__)
LambdaGroupFunction: TypeAlias = Callable[[pd.DataFrame, MeasurementLine], pd.DataFrame]
class SpeedCalculation(Enum): # pylint: disable=too-few-public-methods
"""Method-identifier used to compute the movement at traj borders."""
BORDER_EXCLUDE = auto()
BORDER_ADAPTIVE = auto()
BORDER_SINGLE_SIDED = auto()
class AccelerationCalculation(Enum): # pylint: disable=too-few-public-methods
"""Method-identifier used to compute the movement at traj borders."""
BORDER_EXCLUDE = auto()
class DataValidationStatus(Enum): # pylint: disable=too-few-public-methods
"""Identifies the result of a return value."""
DATA_CORRECT = auto()
COLUMN_MISSING = auto()
ENTRY_MISSING = auto()
@dataclass(
kw_only=True,
)
class Cutoff:
"""Maximal extend of a Voronoi polygon.
The maximal extend is an approximated circle with the given radius and
number of line segments used to approximate a quarter circle.
Attributes:
radius (float): radius of the approximated circle
quad_segments (int): number of line elements used to approximate a
quarter circle
"""
radius: float
quad_segments: int = 3
def is_trajectory_valid(*, traj_data: TrajectoryData, walkable_area: WalkableArea) -> bool:
"""Checks if all trajectory data points lie within the given walkable area.
Args:
traj_data (TrajectoryData): trajectory data
walkable_area (WalkableArea): walkable area in which the pedestrians
should be
Returns:
All points lie within walkable area
"""
return get_invalid_trajectory(traj_data=traj_data, walkable_area=walkable_area).empty
def get_invalid_trajectory(*, traj_data: TrajectoryData, walkable_area: WalkableArea) -> pd.DataFrame:
"""Returns all trajectory data points outside the given walkable area.
Args:
traj_data (TrajectoryData): trajectory data
walkable_area (WalkableArea): walkable area in which the pedestrians
should be
Returns:
DataFrame showing all data points outside the given walkable area
"""
return traj_data.data.loc[~shapely.within(traj_data.data.point, walkable_area.polygon)]
def compute_frame_range_in_area(
*,
traj_data: TrajectoryData,
measurement_line: MeasurementLine,
width: float,
) -> Tuple[pd.DataFrame, MeasurementArea]:
"""Compute the frame ranges for each pedestrian inside the measurement area.
The measurement area is virtually created by creating a second measurement
line parallel to the given one offsetting by the given width. The area
between these line is the used measurement area.
.. image:: /images/passing_area_from_lines.svg
:width: 80 %
:align: center
For each pedestrians now the frames when they enter and leave the virtual
measurement area is computed. In this frame interval they have to be inside
the measurement area continuously. They also need to enter and leave the
measurement area via different measurement lines. If leaving the area
between the two lines, crossing the same line twice they will be ignored.
For a better understanding, see the image below, where red parts of the
trajectories are the detected ones inside the area. These frame intervals
will be returned.
.. image:: /images/frames_in_area.svg
:width: 80 %
:align: center
.. note::
As passing we define the frame, the pedestrians enter the area and then
move through the complete area without leaving it. Hence, doing a
closed analysis of the movement area with several measuring ranges
underestimates the actual movement time.
Args:
traj_data (TrajectoryData): trajectory data
measurement_line (MeasurementLine): measurement line
width (float): distance to the second measurement line
Returns:
Tuple[pandas.DataFrame, MeasurementArea]: DataFrame containing the
columns 'id', 'entering_frame' describing the frame the pedestrian
crossed the first or second line, 'leaving_frame' describing the frame
the pedestrian crossed the second or first line, and the created
measurement area
"""
# Create the second measurement line with the given offset
second_line = MeasurementLine(shapely.offset_curve(measurement_line.line, distance=width))
# Reverse the order of the coordinates for the second line string to
# create a rectangular area between the lines
measurement_area = MeasurementArea(
[
*measurement_line.coords,
*second_line.coords[::-1],
]
)
inside_range = _get_continuous_parts_in_area(traj_data=traj_data, measurement_area=measurement_area)
crossing_frames_first = _compute_crossing_frames(
traj_data=traj_data,
measurement_line=measurement_line,
count_on_line=True,
)
crossing_frames_second = _compute_crossing_frames(
traj_data=traj_data, measurement_line=second_line, count_on_line=True
)
start_crossed_1 = _check_crossing_in_frame_range(
inside_range=inside_range,
crossing_frames=crossing_frames_first,
check_column=FIRST_FRAME_COL,
column_name="start_crossed_1",
)
end_crossed_1 = _check_crossing_in_frame_range(
inside_range=inside_range,
crossing_frames=crossing_frames_first,
check_column=LAST_FRAME_COL,
column_name="end_crossed_1",
)
start_crossed_2 = _check_crossing_in_frame_range(
inside_range=inside_range,
crossing_frames=crossing_frames_second,
check_column=FIRST_FRAME_COL,
column_name="start_crossed_2",
)
end_crossed_2 = _check_crossing_in_frame_range(
inside_range=inside_range,
crossing_frames=crossing_frames_second,
check_column=LAST_FRAME_COL,
column_name="end_crossed_2",
)
frame_range_between_lines = (
start_crossed_1.merge(
start_crossed_2,
how="outer",
on=[ID_COL, FIRST_FRAME_COL, LAST_FRAME_COL],
)
.merge(
end_crossed_1,
how="outer",
on=[ID_COL, FIRST_FRAME_COL, LAST_FRAME_COL],
)
.merge(
end_crossed_2,
how="outer",
on=[ID_COL, FIRST_FRAME_COL, LAST_FRAME_COL],
)
)
frame_range_between_lines = frame_range_between_lines[
(frame_range_between_lines.start_crossed_1 & frame_range_between_lines.end_crossed_2)
| (frame_range_between_lines.start_crossed_2 & frame_range_between_lines.end_crossed_1)
]
return (
frame_range_between_lines.loc[:, (ID_COL, FIRST_FRAME_COL, LAST_FRAME_COL)],
measurement_area,
)
def compute_neighbors(
individual_voronoi_data: pd.DataFrame,
as_list: bool = True,
) -> pd.DataFrame:
"""Compute the neighbors of each pedestrian based on the Voronoi cells.
Computation of the neighborhood of each pedestrian per frame. Every other
pedestrian is a neighbor if the Voronoi cells of both pedestrian touch
and some point. The threshold for touching is set to 1mm.
Important:
For legacy reasons the function :func:`~method_utils.compute_neighbors`
works also without specifing :code:`as_list` (defaults to
:code:`True`). We highly discourage using this, as its result is
harder to be used in further computations. Use 'as_list=False' instead.
The default value may change in future versions of *PedPy*.
Args:
individual_voronoi_data (pandas.DataFrame): individual voronoi data,
needs to contain a column 'polygon', which holds a
:class:`shapely.Polygon` (result from
:func:`~method_utils.compute_individual_voronoi_polygons`)
as_list (bool): Return the neighbors as a list per pedestrian and frame,
if :code:`True`, otherwise each neighbor is in a single row.
Returns:
DataFrame containing the columns 'id', 'frame' and 'neighbors', where
neighbors are a list of the neighbor's IDs if as_list is :code:`True`.
Otherwise the DataFrame contains the columns 'id', 'frame',
'neighbor_id'.
"""
if as_list:
warnings.warn(
"The parameter 'as_list=True' is deprecated and may change in a "
"future version. It is kept for backwards compatibility. We "
"highly discourage using this, as its result is harder to be "
"used in further computations. Use 'as_list=False' instead.",
category=DeprecationWarning,
stacklevel=2, # Makes the warning appear at the caller level
)
return _compute_neighbors_list(individual_voronoi_data=individual_voronoi_data)
else:
return _compute_neighbors_single(individual_voronoi_data=individual_voronoi_data)
def _compute_neighbors_list(
individual_voronoi_data: pd.DataFrame,
) -> pd.DataFrame:
neighbor_df = []
for frame, frame_data in individual_voronoi_data.groupby(FRAME_COL):
touching = shapely.dwithin(
np.array(frame_data[POLYGON_COL])[:, np.newaxis],
np.array(frame_data[POLYGON_COL])[np.newaxis, :],
1e-9, # Voronoi cells as close as 1 mm are touching
)
# the peds are not neighbors of themselves
for i in range(len(touching)):
touching[i, i] = False
# create matrix with ped IDs
ids = np.outer(
np.ones_like(frame_data[ID_COL].to_numpy()),
frame_data[ID_COL].to_numpy().reshape(1, -1),
)
neighbors = np.where(touching, ids, np.nan)
neighbors_list = [
np.array(neighbor)[~np.isnan(np.array(neighbor))].astype(int).tolist() for neighbor in neighbors
]
frame_df = pd.DataFrame(
zip(
frame_data[ID_COL].to_numpy(),
itertools.repeat(frame),
neighbors_list,
),
columns=[ID_COL, FRAME_COL, NEIGHBORS_COL],
)
neighbor_df.append(frame_df)
if not neighbor_df:
return pd.DataFrame(columns=[ID_COL, FRAME_COL, NEIGHBORS_COL])
return pd.concat(neighbor_df)
def _compute_neighbors_single(
individual_voronoi_data: pd.DataFrame,
) -> pd.DataFrame:
neighbor_df = []
for frame, frame_data in individual_voronoi_data.groupby(FRAME_COL):
polygons = frame_data[POLYGON_COL].to_numpy()
touching = shapely.dwithin(polygons[:, np.newaxis], polygons[np.newaxis, :], 1e-9)
# the peds are not neighbors of themselves
np.fill_diagonal(touching, False)
# Filter neighbor relationships based on the touching matrix
ids = frame_data[ID_COL].to_numpy()
row_idx, col_idx = np.where(touching) # Get row and column indices of True values
id_column = ids[row_idx] # Extract original IDs
neighbor_column = ids[col_idx] # Extract corresponding neighbor IDs
# Create DataFrame for this frame's neighbors
frame_neighbors = pd.DataFrame(
{
ID_COL: id_column,
FRAME_COL: frame,
NEIGHBOR_ID_COL: neighbor_column,
}
)
# Append to the result list
neighbor_df.append(frame_neighbors)
if not neighbor_df:
return pd.DataFrame(columns=[ID_COL, FRAME_COL, NEIGHBOR_ID_COL])
# Concatenate all frames' data into a single DataFrame
return pd.concat(neighbor_df, ignore_index=True).sort_values(by=[FRAME_COL, ID_COL]).reset_index(drop=True)
def compute_neighbor_distance(
*,
traj_data: TrajectoryData,
neighborhood: pd.DataFrame,
) -> pd.DataFrame:
"""Compute the distance between the neighbors.
Computes the distance between the position of neighbors. As neighbors the
result of :func:`~compute_neighbors` with parameter :code:`as_list=False`.
Note:
The resulting :class:`~pandas.DataFrame` is symmetric. If pedestrian A
is a neighbor of pedestrian B, then pedestrian B is also a neighbor of
pedestrian A. Consequently, the distance between both appears twice in
the :class:`~pandas.DataFrame`.
Args:
traj_data (TrajectoryData): trajectory data
neighborhood (pd.DataFrame): DataFrame containing the columns 'id',
'frame' and 'neighbor_id'. The result of :func:`~compute_neighbors`
with parameter :code:`as_list=False` can be used here as input.
Raises:
PedPyValueError: When passing a result of :func:`~compute_neighbors`
with parameter :code:`as_list=True`.
Returns:
DataFrame containing the columns 'id', 'frame', 'neighbor_id' and
'distance'.
"""
if NEIGHBORS_COL in neighborhood.columns:
raise PedPyValueError(
"Cannot compute distance between neighbors with list-format data. "
"Please use the result of compute_neighbors with parameter "
"as_list=False."
)
neighbors_with_position = neighborhood.merge(
traj_data.data[[ID_COL, FRAME_COL, POINT_COL]],
on=[ID_COL, FRAME_COL],
how="left",
)
neighbors_with_position = neighbors_with_position.merge(
traj_data.data[[ID_COL, FRAME_COL, POINT_COL]],
left_on=[NEIGHBOR_ID_COL, FRAME_COL],
right_on=[ID_COL, FRAME_COL],
suffixes=("", "_neighbor"),
)
neighbors_with_position[DISTANCE_COL] = shapely.distance(
neighbors_with_position[POINT_COL],
neighbors_with_position["point_neighbor"],
)
return neighbors_with_position[[ID_COL, FRAME_COL, NEIGHBOR_ID_COL, DISTANCE_COL]]
def compute_time_distance_line(*, traj_data: TrajectoryData, measurement_line: MeasurementLine) -> pd.DataFrame:
"""Compute the time and distance to the measurement line.
Compute the time (in frames) and distance to the first crossing of the
measurement line for each pedestrian. For further information how the
crossing frames are computed see :func:`~compute_crossing_frames`.
All frames after a pedestrian has crossed the line will be omitted in the
results.
Args:
traj_data (TrajectoryData): trajectory data
measurement_line (MeasurementLine): line which is crossed
Returns:
DataFrame containing 'id', 'frame', 'distance' (meters to measurement
line), and 'time' (seconds until crossing)
"""
df_distance_time = traj_data.data[[ID_COL, FRAME_COL, POINT_COL]].copy(deep=True)
# Compute distance to measurement line
df_distance_time[DISTANCE_COL] = shapely.distance(df_distance_time[POINT_COL], measurement_line.line)
# Compute time to entrance
crossing_frame = compute_crossing_frames(traj_data=traj_data, measurement_line=measurement_line).rename(
columns={FRAME_COL: CROSSING_FRAME_COL}
)
df_distance_time = df_distance_time.merge(crossing_frame, on=ID_COL)
df_distance_time[TIME_COL] = (
df_distance_time[CROSSING_FRAME_COL] - df_distance_time[FRAME_COL]
) / traj_data.frame_rate
# Delete all rows where the line has already been passed
df_distance_time = df_distance_time[df_distance_time.time >= 0]
return df_distance_time.loc[:, [ID_COL, FRAME_COL, DISTANCE_COL, TIME_COL]]
def compute_individual_voronoi_polygons(
*,
traj_data: TrajectoryData,
walkable_area: WalkableArea,
cut_off: Optional[Cutoff] = None,
use_blind_points: Optional[bool] = None,
) -> pd.DataFrame:
"""Compute the individual Voronoi polygon for each person and frame.
The Voronoi cell will be computed based on the Voronoi tesselation of the
pedestrians position. The resulting polygons will then be intersected with
the walkable area.
.. warning::
In case of non-convex walkable areas it might happen that Voronoi cell
will be cut at unexpected places.
The computed Voronoi cells will stretch all the way to the boundaries of
the walkable area. As seen below:
.. image:: /images/voronoi_wo_cutoff.svg
:width: 80 %
:align: center
In cases with only a few pedestrians not close to each other or large
walkable areas this might not be desired behavior as the size of the
Voronoi polygon is directly related to the individual density. In this
case the size of the Voronoi polygon can be restricted by a
:class:`Cutoff`, where you give a radius and the number of line segments
used to approximate a quarter circle. The differences the number of
line segments has on the circle can be seen in the plot below:
.. image:: /images/voronoi_cutoff_differences.svg
:width: 80 %
:align: center
Using this cut off information, the resulting Voronoi polygons would like
this:
.. image:: /images/voronoi_w_cutoff.svg
:width: 80 %
:align: center
Args:
traj_data (TrajectoryData): trajectory data
walkable_area (WalkableArea): bounding area, where pedestrian are
supposed to walk
cut_off (Cutoff): cutoff information, which provide the largest
possible extend of a single Voronoi polygon
use_blind_points (bool): **Deprecated.** This parameter has no effect
and will be removed in a future version. The underlying
Voronoi computation now handles any number of pedestrians
(including fewer than 4 and collinear configurations)
natively via :func:`shapely.voronoi_polygons`.
Returns:
DataFrame containing the columns 'id', 'frame','polygon' (
:class:`shapely.Polygon`), and 'density' in :math:`1/m^2`.
"""
if use_blind_points is not None:
warnings.warn(
"The parameter 'use_blind_points' is deprecated and has no "
"effect. It will be removed in a future version. The underlying "
"Voronoi computation handles any number of pedestrians natively.",
category=DeprecationWarning,
stacklevel=2,
)
all_ids = []
all_frames = []
all_polygons = []
wa_polygon = walkable_area.polygon
for _frame, peds_in_frame in traj_data.data.groupby(traj_data.data.frame):
points = peds_in_frame[POINT_COL].to_numpy()
coords = peds_in_frame[[X_COL, Y_COL]].to_numpy()
multipoint = shapely.MultiPoint(coords)
voronoi_gc = shapely.voronoi_polygons(multipoint, extend_to=wa_polygon, ordered=True)
voronoi_polys = shapely.get_parts(voronoi_gc)
# Intersect with walkable area
clipped = shapely.intersection(voronoi_polys, wa_polygon)
# Apply cutoff if specified
if cut_off is not None:
buffers = shapely.buffer(
points,
cut_off.radius,
quad_segs=cut_off.quad_segments,
)
clipped = shapely.intersection(clipped, buffers)
# Resolve non-Polygon geometries (e.g. MultiPolygon from
# non-convex walkable areas) by selecting the part that
# contains the pedestrian's position
clipped = _resolve_multipolygons(clipped, points)
all_ids.append(peds_in_frame[ID_COL].values)
all_frames.append(peds_in_frame[FRAME_COL].values)
all_polygons.append(clipped)
if not all_ids:
return pd.DataFrame(
{
ID_COL: pd.Series(dtype=int),
FRAME_COL: pd.Series(dtype=int),
POLYGON_COL: pd.Series(dtype=object),
DENSITY_COL: pd.Series(dtype=float),
}
)
result = pd.DataFrame(
{
ID_COL: np.concatenate(all_ids),
FRAME_COL: np.concatenate(all_frames),
POLYGON_COL: np.concatenate(all_polygons),
}
)
result[DENSITY_COL] = 1.0 / shapely.area(result[POLYGON_COL].values)
return result
def _resolve_multipolygons(polygons: np.ndarray, points: np.ndarray) -> np.ndarray:
"""Resolve non-Polygon geometries to the part containing the point.
When intersecting Voronoi cells with a non-convex walkable area,
the result may be a MultiPolygon or GeometryCollection. This
function selects the polygon part that contains the pedestrian's
position for each such geometry.
Args:
polygons: array of shapely geometries
points: array of shapely Points (one per polygon)
Returns:
Array of shapely Polygons
Raises:
PedPyValueError: if a pedestrian's position does not lie within any
polygon part of their Voronoi cell. This indicates the trajectory
data is inconsistent with the walkable area (e.g. the position is
outside the walkable area or inside an obstacle).
"""
type_ids = shapely.get_type_id(polygons)
multi_mask = type_ids != 3 # not a simple Polygon
if not multi_mask.any():
return polygons
resolved = polygons.copy()
for idx in np.where(multi_mask)[0]:
parts = shapely.get_parts(polygons[idx])
# Filter to polygonal parts only (type_id 3 == Polygon).
# A GeometryCollection may also contain lines or points; selecting
# one of those would violate the return-type contract and cause
# division-by-zero when computing density (1 / area).
polygon_parts = parts[shapely.get_type_id(parts) == 3]
if len(polygon_parts) == 0:
raise PedPyValueError(
f"Pedestrian at position {points[idx]} has a Voronoi cell with "
f"no polygonal parts after intersection with the walkable area. "
f"Ensure all trajectory positions lie within the walkable area "
f"and outside any obstacles."
)
covers_mask = shapely.covers(polygon_parts, points[idx])
if not covers_mask.any():
raise PedPyValueError(
f"Pedestrian at position {points[idx]} does not lie within any "
f"part of their Voronoi cell. This indicates the trajectory "
f"position is outside the walkable area or inside an obstacle. "
f"Verify the trajectory data is consistent with the walkable area."
)
resolved[idx] = polygon_parts[covers_mask][0]
return resolved
def compute_intersecting_polygons(
*,
individual_voronoi_data: pd.DataFrame,
measurement_area: MeasurementArea,
) -> pd.DataFrame:
"""Compute the intersection of the voronoi cells with the measurement area.
.. image:: /images/voronoi_density.svg
:width: 60 %
:align: center
Args:
individual_voronoi_data (pandas.DataFrame): individual voronoi data,
needs to contain a column 'polygon' (:class:`shapely.Polygon`),
result from
:func:`~method_utils.compute_individual_voronoi_polygons`
measurement_area (MeasurementArea): measurement area for which the
intersection will be computed.
Returns:
DataFrame containing the columns 'id', 'frame' and
'intersection' which is the intersection of the individual Voronoi
polygon and the given measurement area as :class:`shapely.Polygon`.
"""
df_intersection = individual_voronoi_data[[ID_COL, FRAME_COL]].copy()
df_intersection[INTERSECTION_COL] = shapely.intersection(individual_voronoi_data.polygon, measurement_area.polygon)
return df_intersection
def compute_crossing_frames(
*,
traj_data: TrajectoryData,
measurement_line: MeasurementLine,
) -> pd.DataFrame:
"""Compute the frames at the pedestrians pass the measurement line.
As crossing we define a movement that moves across the measurement line.
When the movement ends on the line, the line is not crossed. When it
starts on the line, it counts as crossed. A visual representation is shown
below, where the movement goes from left to right and each dot indicates
the position at one frame. Red highlights where the person has crossed the
measurement line.
.. image:: /images/crossing_frames.svg
:width: 80 %
:align: center
Note:
Due to oscillations, it may happen that a pedestrian crosses the
measurement line multiple times in a small-time interval.
Args:
traj_data (pandas.DataFrame): trajectory data
measurement_line (MeasurementLine): measurement line which is crossed
Returns:
DataFrame containing the columns 'id', 'frame', where 'frame' is
the frame where the measurement line is crossed.
"""
return _compute_crossing_frames(
traj_data=traj_data,
measurement_line=measurement_line,
count_on_line=False,
)
def _compute_crossing_frames(
*,
traj_data: TrajectoryData,
measurement_line: MeasurementLine,
count_on_line: bool,
) -> pd.DataFrame:
"""Compute the frames at which pedestrians pass the measurement line.
If count_on_line is set to True, the crossing frame is the one where the
pedestrian touches the line. Otherwise, it is the frame where the
pedestrian crosses the line without stopping on it.
Args:
traj_data (pandas.DataFrame): trajectory data
measurement_line (MeasurementLine): measurement line which is crossed
count_on_line (bool): Count movement ending on line (True) or only if
movement crosses line, but does not end on line.
Returns:
DataFrame containing the columns 'id', 'frame', where 'frame' is
the frame where the measurement line is crossed.
"""
# stack is used to get the coordinates in the correct order, as pygeos
# does not support creating linestring from points directly. The
# resulting array looks as follows:
# [[[x_0_start, y_0_start], [x_0_end, y_0_end]],
# [[x_1_start, y_1_start], [x_1_end, y_1_end]], ... ]
df_movement = _compute_individual_movement(traj_data=traj_data, frame_step=1, bidirectional=False)
df_movement["movement"] = shapely.linestrings(
np.stack(
[
shapely.get_coordinates(df_movement.start_position),
shapely.get_coordinates(df_movement.end_position),
],
axis=1,
)
)
movement_crosses_line = shapely.intersects(df_movement.movement, measurement_line.line)
if count_on_line:
crossing_frames = df_movement.loc[movement_crosses_line][[ID_COL, FRAME_COL]]
else:
# Case when crossing means movement crosses the line, but the end point
# is not on it
# Minimum distance to consider crossing complete
CROSSING_THRESHOLD: Final = 1e-5 # noqa: N806
movement_ends_on_line = shapely.distance(df_movement.end_position, measurement_line.line) < CROSSING_THRESHOLD
crossing_frames = df_movement.loc[(movement_crosses_line) & (~movement_ends_on_line)][[ID_COL, FRAME_COL]]
return crossing_frames
def _compute_individual_movement(
*,
traj_data: TrajectoryData,
frame_step: int,
bidirectional: bool = True,
speed_border_method: SpeedCalculation = SpeedCalculation.BORDER_ADAPTIVE,
) -> pd.DataFrame:
if speed_border_method == SpeedCalculation.BORDER_EXCLUDE:
return _compute_movement_exclude_border(traj_data, frame_step, bidirectional)
if speed_border_method == SpeedCalculation.BORDER_SINGLE_SIDED:
return _compute_movement_single_sided_border(traj_data, frame_step, bidirectional)
if speed_border_method == SpeedCalculation.BORDER_ADAPTIVE:
return _compute_movememnt_adaptive_border(traj_data, frame_step, bidirectional)
raise PedPyValueError("speed border method not accepted")
def _compute_movement_exclude_border(
traj_data: TrajectoryData,
frame_step: int,
bidirectional: bool,
) -> pd.DataFrame:
"""Compute the individual movement in the time interval frame_step.
The movement is computed for the interval [frame - frame_step: frame +
frame_step], if one of the boundaries is not contained in the trajectory
frame these points will not be considered.
Args:
traj_data (pandas.DataFrame): trajectory data
frame_step (int): how many frames back and forwards are used to compute
the movement.
bidirectional (bool): if True also the future frame_step points will
be used to determine the movement
Returns:
DataFrame containing the columns: 'id', 'frame', 'start_position',
'end_position', 'window_size'. Where 'start_position'/'end_position' are
the points where the movement start/ends, and 'window_size' is the
number of frames between the movement start and end.
"""
df_movement = traj_data.data.copy(deep=True)
df_movement[START_POSITION_COL] = df_movement.groupby(by=ID_COL).point.shift(frame_step)
df_movement["start_frame"] = df_movement.groupby(by=ID_COL).frame.shift(frame_step)
if bidirectional:
df_movement[END_POSITION_COL] = df_movement.groupby(df_movement.id).point.shift(-frame_step)
df_movement["end_frame"] = df_movement.groupby(df_movement.id).frame.shift(-frame_step)
else:
df_movement[END_POSITION_COL] = df_movement.point
df_movement["end_frame"] = df_movement.frame
df_movement[WINDOW_SIZE_COL] = df_movement.end_frame - df_movement.start_frame
return df_movement[
[
ID_COL,
FRAME_COL,
START_POSITION_COL,
END_POSITION_COL,
WINDOW_SIZE_COL,
]
].dropna()
def _compute_movement_single_sided_border(
traj_data: TrajectoryData,
frame_step: int,
bidirectional: bool,
) -> pd.DataFrame:
"""Compute the individual movement in the time interval frame_step.
The movement is computed for the interval [frame - frame_step: frame +
frame_step], if one of the boundaries is not contained in the trajectory
frame will be used as boundary. Hence, the intervals become [frame,
frame + frame_step], or [frame - frame_step, frame] respectively.
Args:
traj_data (pandas.DataFrame): trajectory data
frame_step (int): how many frames back and forwards are used to compute
the movement.
bidirectional (bool): if True also the future frame_step points will
be used to determine the movement
Returns:
DataFrame containing the columns: 'id', 'frame', 'start_position',
'end_position', 'window_size' .Where 'start_position'/'end_position' are
the points where the movement start/ends, and 'window_size' is the
number of frames between the movement start and end.
"""
df_movement = traj_data.data.copy(deep=True)
df_movement[START_POSITION_COL] = df_movement.groupby(by=ID_COL).point.shift(frame_step).fillna(df_movement.point)
df_movement["start_frame"] = df_movement.groupby(by=ID_COL).frame.shift(frame_step).fillna(df_movement.frame)
if bidirectional:
df_movement[END_POSITION_COL] = (
df_movement.groupby(df_movement.id).point.shift(-frame_step).fillna(df_movement.point)
)
df_movement["end_frame"] = (
df_movement.groupby(df_movement.id).frame.shift(-frame_step).fillna(df_movement.frame)
)
else:
df_movement[END_POSITION_COL] = df_movement.point
df_movement["end_frame"] = df_movement.frame
df_movement[WINDOW_SIZE_COL] = df_movement.end_frame - df_movement.start_frame
return df_movement[
[
ID_COL,
FRAME_COL,
START_POSITION_COL,
END_POSITION_COL,
WINDOW_SIZE_COL,
]
]
def _compute_movememnt_adaptive_border(
traj_data: TrajectoryData,
frame_step: int,
bidirectional: bool,
) -> pd.DataFrame:
"""Compute the individual movement in the time interval frame_step.
The movement is computed for the interval [frame - frame_step: frame +
frame_step], if one of the boundaries is not contained in the trajectory
frame use the maximum available number of frames on this side and the same
number of frames also on the other side.
Args:
traj_data (pandas.DataFrame): trajectory data
frame_step (int): how many frames back and forwards are used to compute
the movement.
bidirectional (bool): if True also the future frame_step points will
be used to determine the movement
Returns:
DataFrame containing the columns: 'id', 'frame', 'start_position',
'end_position', 'window_size'. Where 'start_position'/'end_position' are
the points where the movement start/ends, and 'window_size' is the
number of frames between the movement start and end.
"""
df_movement = traj_data.data.copy(deep=True)
frame_infos = df_movement.groupby(by=ID_COL).agg(frame_min=(FRAME_COL, "min"), frame_max=(FRAME_COL, "max"))
df_movement = df_movement.merge(frame_infos, on=ID_COL)
df_movement["distance_min"] = np.abs(df_movement.frame - df_movement["frame_min"])
df_movement["distance_max"] = np.abs(df_movement.frame - df_movement["frame_max"])
df_movement[WINDOW_SIZE_COL] = np.minimum(
frame_step,
np.minimum(df_movement.distance_min.values, df_movement.distance_max.values),
)
df_movement["start_frame"] = df_movement.frame - df_movement.window_size
df_movement["end_frame"] = df_movement.frame + df_movement.window_size
start = (
df_movement[[ID_COL, FRAME_COL, "start_frame", WINDOW_SIZE_COL]]
.merge(
df_movement[[ID_COL, FRAME_COL, POINT_COL]],
left_on=[ID_COL, "start_frame"],
right_on=[ID_COL, FRAME_COL],
suffixes=("", "_drop"),
)
.drop("frame_drop", axis=1)
.rename({POINT_COL: START_POSITION_COL}, axis=1)
)
if bidirectional:
end = (
df_movement[[ID_COL, FRAME_COL, "end_frame"]]
.merge(
df_movement[[ID_COL, FRAME_COL, POINT_COL]],
left_on=[ID_COL, "end_frame"],
right_on=[ID_COL, FRAME_COL],
suffixes=("", "_drop"),
)
.drop("frame_drop", axis=1)
.rename({POINT_COL: END_POSITION_COL}, axis=1)
)
# as the window is used on both sides
start[WINDOW_SIZE_COL] = 2 * start[WINDOW_SIZE_COL]
else:
df_movement[END_POSITION_COL] = df_movement[POINT_COL]
end = df_movement[[ID_COL, FRAME_COL, END_POSITION_COL]].copy(deep=True)
result = start.merge(end, on=[ID_COL, FRAME_COL])[
[
ID_COL,
FRAME_COL,
START_POSITION_COL,
END_POSITION_COL,
WINDOW_SIZE_COL,
]
]
return result[result.window_size > 0]
def _compute_individual_movement_acceleration(
*,
traj_data: TrajectoryData,
frame_step: int,
acceleration_border_method: AccelerationCalculation = (AccelerationCalculation.BORDER_EXCLUDE),
) -> pd.DataFrame:
if acceleration_border_method == AccelerationCalculation.BORDER_EXCLUDE:
return _compute_movement_acceleration_exclude_border(traj_data, frame_step)
raise PedPyValueError("acceleration border method not accepted")
def _compute_movement_acceleration_exclude_border(
traj_data: TrajectoryData,
frame_step: int,
) -> pd.DataFrame:
"""Compute the individual movement in the time interval frame_step.
The movement is computed for the interval [frame - frame_step: frame +
frame_step], if one of the boundaries is not contained in the trajectory
frame these points will not be considered.
Args:
traj_data (pandas.DataFrame): trajectory data
frame_step (int): how many frames back and forwards are used to compute
the movement.
bidirectional (bool): if True also the future frame_step points
will be used to determine the movement