Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ config :concentrate,
Concentrate.GroupFilter.TimeOutOfRange,
Concentrate.GroupFilter.RemoveUnneededTimes,
Concentrate.GroupFilter.VehiclePastStop,
Concentrate.GroupFilter.VehicleBeforeStop,
Concentrate.GroupFilter.Shuttle,
Concentrate.GroupFilter.SkippedDepartures,
Concentrate.GroupFilter.CancelledTrip,
Expand All @@ -38,7 +39,8 @@ config :concentrate,
reporters: [
Concentrate.Reporter.VehicleLatency,
Concentrate.Reporter.StopTimeUpdateLatency,
Concentrate.Reporter.Latency
Concentrate.Reporter.Latency,
Concentrate.Reporter.VehicleGoingFirstStop
],
encoders: [
files: [
Expand Down
101 changes: 101 additions & 0 deletions lib/concentrate/group_filter/cache/vehicle_before_stop.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule Concentrate.GroupFilter.Cache.VehicleBeforeStop do
@moduledoc """
Server to maintain a cache of previously seen StopTimeUpdates for a given trip.

As the vehicle moves through, we'll remove the older updates. Periodically,
we'll scan for StopTimeUpdates in the past and remove them.
"""
use GenServer
alias Concentrate.{VehiclePosition, StopTimeUpdate}

@table __MODULE__
# 5 minutes
@stale_timeout_seconds 300

@spec stop_time_updates_for_vehicle(VehiclePosition.t(), [StopTimeUpdate.t()]) :: [
StopTimeUpdate.t()
]
def stop_time_updates_for_vehicle(vehicle_position, stop_time_updates) do
if is_integer(VehiclePosition.stop_sequence(vehicle_position)) do
insert_new_updates!(stop_time_updates)
delete_old_updates(vehicle_position)
fetch_updates_with_stop_sequence_ge_than_vehicle(vehicle_position)
else
stop_time_updates
end
rescue
ArgumentError ->
stop_time_updates
end

defp insert_new_updates!(stop_time_updates) do
inserts =
for stu <- stop_time_updates,
stop_sequence <- List.wrap(StopTimeUpdate.stop_sequence(stu)) do
trip_id = StopTimeUpdate.trip_id(stu)
time = StopTimeUpdate.time(stu)
:ets.match_delete(@table, {trip_id, stop_sequence, :_, :_})
{trip_id, stop_sequence, time, stu}
end

:ets.insert(@table, inserts)
end

defp fetch_updates_with_stop_sequence_ge_than_vehicle(vp) do
unsorted =
:ets.select(@table, [
{
{VehiclePosition.trip_id(vp), :"$1", :_, :"$2"},
[{:>=, :"$1", VehiclePosition.stop_sequence(vp)}],
[:"$2"]
}
])

Enum.sort_by(unsorted, &StopTimeUpdate.stop_sequence/1)
end

defp delete_old_updates(vp) do
:ets.select_delete(@table, [
{
{VehiclePosition.trip_id(vp), :"$1", :_, :_},
[{:<, :"$1", VehiclePosition.stop_sequence(vp)}],
[true]
}
])
end

def start_link([]) do
GenServer.start_link(__MODULE__, [])
end

@impl GenServer
def init([]) do
@table = :ets.new(@table, [:bag, :named_table, :public])
schedule_clear!()
{:ok, []}
end

@impl GenServer
def handle_info(:clear, state) do
now = System.system_time(:seconds)
minimum_time = now - @stale_timeout_seconds

:ets.select_delete(@table, [
{
{:_, :_, :"$1", :_},
[{:<, :"$1", minimum_time}],
[true]
}
])

{:noreply, state}
end

def handle_info(message, state) do
super(message, state)
end

defp schedule_clear! do
send(self(), :clear)
end
end
17 changes: 17 additions & 0 deletions lib/concentrate/group_filter/vehicle_before_stop.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Concentrate.GroupFilter.VehicleBeforeStop do
@moduledoc """
Adds a historic StopTimeUpdate for the trip if the vehicle hasn't moved past it yet.
"""
@behaviour Concentrate.GroupFilter
alias Concentrate.TripUpdate
alias Concentrate.GroupFilter.Cache.VehicleBeforeStop, as: Cache

@impl Concentrate.GroupFilter
def filter({%TripUpdate{} = tu, [vp], stus}) do
stus = Cache.stop_time_updates_for_vehicle(vp, stus)

{tu, [vp], stus}
end

def filter(other), do: other
end
31 changes: 31 additions & 0 deletions lib/concentrate/reporter/vehicle_going_first_stop.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Concentrate.Reporter.VehicleGoingFirstStop do
@moduledoc false
@behaviour Concentrate.Reporter
require Logger
alias Concentrate.{VehiclePosition, StopTimeUpdate, TripUpdate}

@impl Concentrate.Reporter
def init do
:ok
end

@impl Concentrate.Reporter
def log(groups, state) do
for {tu, vps, stus} <- groups,
tu != nil,
stus != [],
vp <- vps do
if VehiclePosition.status(vp) != :STOPPED_AT do
stu = hd(stus)
first_stop_sequence = StopTimeUpdate.stop_sequence(stu)
vp_stop_sequence = VehiclePosition.stop_sequence(vp)

if vp_stop_sequence < first_stop_sequence do
Logger.error("#{inspect(TripUpdate.route_id(tu))}")
end
end
end

{[], state}
end
end
9 changes: 5 additions & 4 deletions lib/concentrate/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ defmodule Concentrate.Supervisor do
end

def children(config) do
pool = pool()
misc = misc()
alerts = alerts(config[:alerts])
gtfs = gtfs(config[:gtfs])
pipeline = pipeline(config)
Enum.concat([pool, alerts, gtfs, pipeline])
Enum.concat([misc, alerts, gtfs, pipeline])
end

def pool do
def misc do
[
:hackney_pool.child_spec(:http_producer_pool, timeout: 30_000, max_connections: 100)
:hackney_pool.child_spec(:http_producer_pool, timeout: 30_000, max_connections: 100),
Concentrate.GroupFilter.Cache.VehicleBeforeStop
]
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Concentrate.GroupFilter.Cache.VehicleBeforeStopTest do
@moduledoc false
use ExUnit.Case
import Concentrate.GroupFilter.Cache.VehicleBeforeStop
alias Concentrate.{VehiclePosition, StopTimeUpdate}

defp supervised(_) do
{:ok, _} = start_supervised(Concentrate.GroupFilter.Cache.VehicleBeforeStop)
:ok
end

describe "stop_time_updates_for_vehicle/2" do
setup :supervised

test "restores older StopTimeUpdate values if the vehicle hasn't reached them" do
trip_id = "before_stop_test"

vp =
VehiclePosition.new(
id: "vehicle",
trip_id: trip_id,
stop_sequence: 1,
latitude: 1.0,
longitude: 1.0
)

stus =
for stop_sequence <- 1..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence)
end

assert stop_time_updates_for_vehicle(vp, stus) == stus
# restores the first two StopTimeUpdates since the vehicle hasn't
# reached them
assert stop_time_updates_for_vehicle(vp, Enum.drop(stus, 2)) == stus

# restores the second StopTimeUpdate since the vehicle is past the
# first one
vp = VehiclePosition.update_stop_sequence(vp, 2)
assert stop_time_updates_for_vehicle(vp, Enum.drop(stus, 2)) == Enum.drop(stus, 1)
end

test "uses updated stop time updates for future changes" do
trip_id = "before_stop_test"

vp =
VehiclePosition.new(
id: "vehicle",
trip_id: trip_id,
stop_sequence: 1,
latitude: 1.0,
longitude: 1.0
)

stus =
for stop_sequence <- 1..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence)
end

assert stop_time_updates_for_vehicle(vp, stus) == stus
vp = VehiclePosition.update_stop_sequence(vp, 2)

new_stus =
for stop_sequence <- 3..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence, arrival_time: 5)
end

# we expect one old update, plus the two new ones
expected = Enum.slice(stus, 1..1) ++ new_stus
assert stop_time_updates_for_vehicle(vp, new_stus) == expected
end
end

describe "missing ETS table" do
test "stop_time_updates_for_vehicle returns same updates" do
vp = VehiclePosition.new(latitude: 1, longitude: 1)
stu = StopTimeUpdate.new([])
assert stop_time_updates_for_vehicle(vp, [stu]) == [stu]
end
end
end
47 changes: 47 additions & 0 deletions test/concentrate/group_filter/vehicle_before_stop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Concentrate.GroupFilter.VehicleBeforeStopTest do
@moduledoc false
use ExUnit.Case
import Concentrate.GroupFilter.VehicleBeforeStop
alias Concentrate.{TripUpdate, VehiclePosition, StopTimeUpdate}

describe "filter/1" do
setup do
{:ok, _} = start_supervised(Concentrate.GroupFilter.Cache.VehicleBeforeStop)
:ok
end

test "restores older StopTimeUpdate values if the vehicle hasn't reached them yet" do
trip_id = "before_stop_test"
tu = TripUpdate.new(trip_id: trip_id)

vp =
VehiclePosition.new(
id: "vehicle",
trip_id: trip_id,
stop_sequence: 1,
latitude: 1.0,
longitude: 1.0
)

stus =
for stop_sequence <- 1..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence)
end

group = {tu, [vp], stus}
assert filter(group) == group
# restores the first two StopTimeUpdates since the vehicle hasn't
# reached them
assert filter({tu, [vp], Enum.drop(stus, 2)}) == group

# restores the second StopTimeUpdate since the vehicle is past the
# first one
vp = VehiclePosition.update_stop_sequence(vp, 2)
assert filter({tu, [vp], Enum.drop(stus, 2)}) == {tu, [vp], Enum.drop(stus, 1)}
end

test "ignores unknown values" do
assert filter(:value) == :value
end
end
end
4 changes: 2 additions & 2 deletions test/concentrate/supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ defmodule Concentrate.SupervisorTest do

describe "children/1" do
test "builds the right number of children" do
# currently, the right number is 4: HTTP pool, alerts, GTFS, pipeline
# currently, the right number is 5: HTTP pool, alerts, GTFS, cache, pipeline
actual = children([])

assert length(actual) == 4
assert length(actual) == 5
end
end
end