Skip to content

Commit f08db8e

Browse files
committed
deliver messages asynchronously
1 parent bef401d commit f08db8e

5 files changed

Lines changed: 48 additions & 0 deletions

File tree

graphql-anycable.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Gem::Specification.new do |spec|
3030
spec.add_dependency "anyway_config", ">= 1.3", "< 3"
3131
spec.add_dependency "graphql", ">= 1.11", "< 3"
3232
spec.add_dependency "redis", ">= 4.2.0"
33+
spec.add_dependency "activejob", ">= 5.0.0"
3334

3435
spec.add_development_dependency "anycable-rails"
3536
spec.add_development_dependency "bundler", "~> 2.0"

lib/graphql-anycable.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require_relative "graphql/anycable/cleaner"
77
require_relative "graphql/anycable/config"
88
require_relative "graphql/anycable/railtie" if defined?(Rails)
9+
require_relative "graphql/subscriptions/adapters/base_job"
910
require_relative "graphql/anycable/stats"
1011
require_relative "graphql/subscriptions/anycable_subscriptions"
1112

lib/graphql/anycable/config.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ class Config < Anyway::Config
1212
attr_config use_redis_object_on_cleanup: true
1313
attr_config use_client_provided_uniq_id: true
1414
attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side.
15+
16+
attr_config use_async_broadcasting: true
17+
attr_config async_broadcasting: { queue: "broadcasting", class: "GraphQL::Adapters::BaseJob" }
1518
end
1619
end
1720
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
require "active_job"
4+
5+
module GraphQL
6+
module Adapters
7+
class BaseJob < ActiveJob::Base
8+
DEFAULT_QUEUE_NAME = :default
9+
10+
queue_as { GraphQL::AnyCable.config.async_broadcasting["queue"] || DEFAULT_QUEUE_NAME }
11+
12+
def perform(subscription_object, execute_method, event, object)
13+
Marshal.load(subscription_object).public_send(execute_method, Marshal.load(event), Marshal.load(object))
14+
end
15+
end
16+
end
17+
end

lib/graphql/subscriptions/anycable_subscriptions.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class AnyCableSubscriptions < GraphQL::Subscriptions
6060
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
6161
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
6262
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
63+
EXECUTOR_METHOD_NAME = "execute_synchronically" # method, who execute the main logic
6364

6465
# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
6566
def initialize(serializer: Serialize, **rest)
@@ -73,6 +74,13 @@ def execute_all(event, object)
7374
fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1)
7475
return if fingerprints.empty?
7576

77+
perform(event, object)
78+
end
79+
80+
def execute_synchronically(event, object)
81+
fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1)
82+
return if fingerprints.empty?
83+
7684
fingerprint_subscription_ids = Hash[fingerprints.zip(
7785
redis.pipelined do |pipeline|
7886
fingerprints.map do |fingerprint|
@@ -243,6 +251,24 @@ def fetch_channel_istate(channel)
243251
def redis_key(prefix)
244252
"#{config.redis_prefix}-#{prefix}"
245253
end
254+
255+
def executor_class_job
256+
custom_class = config.async_broadcasting["class"]
257+
258+
return Adapters::BaseJob unless custom_class
259+
260+
Object.const_get(config.async_broadcasting["class"])
261+
end
262+
263+
def perform(event, object)
264+
unless config.use_async_broadcasting
265+
return public_send(EXECUTOR_METHOD_NAME, event, object)
266+
end
267+
268+
args = [Marshal.dump(self), EXECUTOR_METHOD_NAME, Marshal.dump(event), Marshal.dump(object)]
269+
270+
executor_class_job.perform_later(*args)
271+
end
246272
end
247273
end
248274
end

0 commit comments

Comments
 (0)