33require "anycable"
44require "graphql/subscriptions"
55require "graphql/anycable/errors"
6-
76# rubocop: disable Metrics/AbcSize, Metrics/LineLength, Metrics/MethodLength
87
98# A subscriptions implementation that sends data as AnyCable broadcastings.
@@ -56,10 +55,11 @@ class AnyCableSubscriptions < GraphQL::Subscriptions
5655
5756 def_delegators :"GraphQL::AnyCable" , :redis , :config
5857
59- SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
60- FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
61- SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
62- CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
58+ SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
59+ FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
60+ SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
61+ CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
62+ CREATED_AT_KEY = "objects:list-created-times" # HASH: Stores name and created_time of object
6363
6464 # @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
6565 def initialize ( serializer : Serialize , **rest )
@@ -131,7 +131,6 @@ def write_subscription(query, events)
131131 # Store subscription_id in the channel state to cleanup on disconnect
132132 write_subscription_id ( channel , channel_uniq_id )
133133
134-
135134 events . each do |event |
136135 channel . stream_from ( redis_key ( SUBSCRIPTIONS_PREFIX ) + event . fingerprint )
137136 end
@@ -145,8 +144,13 @@ def write_subscription(query, events)
145144 }
146145
147146 redis . multi do |pipeline |
147+ full_subscription_id = "#{ redis_key ( SUBSCRIPTION_PREFIX ) } #{ subscription_id } "
148+
148149 pipeline . sadd ( redis_key ( CHANNEL_PREFIX ) + channel_uniq_id , [ subscription_id ] )
149- pipeline . mapped_hmset ( redis_key ( SUBSCRIPTION_PREFIX ) + subscription_id , data )
150+ pipeline . mapped_hmset ( full_subscription_id , data )
151+
152+ pipeline . hset ( redis_key ( CREATED_AT_KEY ) , full_subscription_id , Time . now . to_s )
153+
150154 events . each do |event |
151155 pipeline . zincrby ( redis_key ( FINGERPRINTS_PREFIX ) + event . topic , 1 , event . fingerprint )
152156 pipeline . sadd ( redis_key ( SUBSCRIPTIONS_PREFIX ) + event . fingerprint , [ subscription_id ] )
@@ -182,7 +186,10 @@ def delete_subscription(subscription_id)
182186 fingerprint_subscriptions [ redis_key ( FINGERPRINTS_PREFIX ) + topic ] = score
183187 end
184188 # Delete subscription itself
185- pipeline . del ( redis_key ( SUBSCRIPTION_PREFIX ) + subscription_id )
189+ full_subscription_id = "#{ redis_key ( SUBSCRIPTION_PREFIX ) } #{ subscription_id } "
190+
191+ pipeline . del ( full_subscription_id )
192+ pipeline . hdel ( redis_key ( CREATED_AT_KEY ) , full_subscription_id )
186193 end
187194 # Clean up fingerprints that doesn't have any subscriptions left
188195 redis . pipelined do |pipeline |
0 commit comments