Skip to content

Commit 2c638aa

Browse files
committed
fix stream on deploys
1 parent d1a5941 commit 2c638aa

7 files changed

Lines changed: 28 additions & 46 deletions

File tree

lib/actors/actor/caller_consumer.ex

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -956,10 +956,7 @@ defmodule Actors.Actor.CallerConsumer do
956956
kind == :UNNAMED ->
957957
false
958958

959-
match?(true, stateful) ->
960-
true
961-
962-
not is_nil(channel_group) and length(channel_group) > 0 ->
959+
stateful == true ->
963960
true
964961

965962
true ->

lib/actors/actor/entity/lifecycle.ex

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
7878
actor.id
7979
)
8080

81-
schedule_deactivate(deactivation_strategy, get_jitter())
81+
schedule_deactivate(actor, deactivation_strategy, get_jitter())
8282

8383
state =
8484
case maybe_schedule_snapshot_advance(snapshot_strategy) do
@@ -252,7 +252,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
252252
%ActorDeactivationStrategy{strategy: deactivation_strategy} =
253253
_actor_deactivation_strategy
254254
}
255-
} = _actor
255+
} = actor
256256
} = state
257257
) do
258258
queue_length = Process.info(self(), :message_queue_len)
@@ -265,7 +265,7 @@ defmodule Actors.Actor.Entity.Lifecycle do
265265
{:stop, :shutdown, state}
266266

267267
_ ->
268-
schedule_deactivate(deactivation_strategy)
268+
schedule_deactivate(actor, deactivation_strategy)
269269
{:noreply, state}
270270
end
271271
end
@@ -423,7 +423,16 @@ defmodule Actors.Actor.Entity.Lifecycle do
423423

424424
defp maybe_schedule_snapshot_advance(_), do: :ok
425425

426-
defp schedule_deactivate(deactivation_strategy, timeout_factor \\ 0) do
426+
defp schedule_deactivate(actor, deactivation_strategy, timeout_factor \\ 0)
427+
428+
defp schedule_deactivate(
429+
%Actor{settings: %ActorSettings{kind: :PROJECTION}},
430+
deactivation_strategy,
431+
timeout_factor
432+
),
433+
do: :ok
434+
435+
defp schedule_deactivate(_actor, deactivation_strategy, timeout_factor) do
427436
strategy = maybe_get_default_deactivation_strategy(deactivation_strategy)
428437

429438
Process.send_after(

lib/actors/actor/entity/lifecycle/projection_consumers.ex

Lines changed: 0 additions & 28 deletions
This file was deleted.

lib/actors/actor/entity/lifecycle/stream_consumer.ex

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule Actors.Actor.Entity.Lifecycle.StreamConsumer do
66
alias Spawn.Utils.Nats
77
alias Spawn.Fact
88
alias Google.Protobuf.Timestamp
9+
alias Sidecar.GracefulShutdown
910

1011
@type fact :: %Fact{}
1112

@@ -47,9 +48,14 @@ defmodule Actors.Actor.Entity.Lifecycle.StreamConsumer do
4748

4849
@spec handle_message(any(), Message.t(), any()) :: Message.t()
4950
def handle_message(_processor_name, message, _context) do
50-
message
51-
|> build_fact()
52-
|> Message.configure_ack(on_success: :term)
51+
if GracefulShutdown.running?() do
52+
message
53+
|> build_fact()
54+
|> Message.configure_ack(on_success: :term)
55+
else
56+
message
57+
|> Message.failed("Failed to deliver because app is draining")
58+
end
5359
end
5460

5561
@spec handle_batch(any(), Message.t(), any(), opts()) :: list(Message.t())

lib/actors/actor/entity/lifecycle/stream_initiator.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Actors.Actor.Entity.Lifecycle.StreamInitiator do
44
"""
55
require Logger
66

7-
alias Actors.Actor.Entity.Lifecycle.ProjectionConsumers
7+
alias Actors.Actor.Entity.Lifecycle.StreamConsumer
88

99
alias Spawn.Actors.Actor
1010
alias Spawn.Actors.ProjectionSettings
@@ -245,7 +245,7 @@ defmodule Actors.Actor.Entity.Lifecycle.StreamInitiator do
245245
end
246246

247247
defp start_pipeline(actor) do
248-
ProjectionConsumers.new(%{
248+
StreamConsumer.start_link(%{
249249
actor_name: stream_name(actor),
250250
projection_pid: self(),
251251
strict_ordering: actor.settings.projection_settings.strict_events_ordering

lib/actors/supervisors/actor_supervisor.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,9 @@ defmodule Actors.Supervisors.ActorSupervisor do
4343
supervisor_process_logger(__MODULE__),
4444
get_pubsub_adapter(opts),
4545
Actors.Actor.Pubsub,
46-
Actors.Actor.Entity.Lifecycle.ProjectionConsumers,
4746
Actors.Actor.Entity.Supervisor.child_spec(opts)
4847
] ++
49-
maybe_add_invocation_scheduler(opts) ++
50-
[{CallerProducer, []}] ++ consumers
48+
maybe_add_invocation_scheduler(opts) ++ [{CallerProducer, []}] ++ consumers
5149

5250
Supervisor.init(children, strategy: :one_for_one)
5351
end

lib/sidecar/supervisor.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ defmodule Sidecar.Supervisor do
1717
[
1818
supervisor_process_logger(__MODULE__),
1919
{Sidecar.GracefulShutdown, opts},
20-
{Sidecar.ProcessSupervisor, opts},
21-
{Sidecar.GRPC.Supervisor, opts}
20+
{Sidecar.GRPC.Supervisor, opts},
21+
{Sidecar.ProcessSupervisor, opts}
2222
]
2323
|> Enum.reject(&is_nil/1)
2424

0 commit comments

Comments
 (0)