1+ # frozen_string_literal: true
12# SPDX-License-Identifier: Proprietary
23#
34# ASRFacet-Rb: Attack Surface Reconnaissance Framework
1112# This file is part of ASRFacet-Rb and is subject to the terms
1213# and conditions defined in the LICENSE file.
1314
15+ require "concurrent"
1416require "thread"
1517require "time"
1618
1719module ASRFacet
1820 class EventBus
1921 DEFAULT_MAX_QUEUE = 1_000
2022
21- EVENT_TYPES = %i[
22- domain
23- subdomain
24- dns_record
25- ip_address
26- open_port
27- http_response
28- ssl_cert
29- finding
30- error
31- asn
32- crawl
33- js_endpoint
34- correlation
35- service
36- stage
37- ] . freeze
38-
3923 def initialize ( max_queue : DEFAULT_MAX_QUEUE , logger : ASRFacet ::Core ::ThreadSafe )
40- @subscribers = Hash . new { |hash , key | hash [ key ] = [ ] }
24+ @handlers = Concurrent :: Map . new { |hash , key | hash [ key ] = [ ] }
4125 @max_queue = max_queue . to_i . positive? ? max_queue . to_i : DEFAULT_MAX_QUEUE
4226 @queue = SizedQueue . new ( @max_queue )
4327 @mutex = Mutex . new
@@ -49,59 +33,41 @@ def initialize(max_queue: DEFAULT_MAX_QUEUE, logger: ASRFacet::Core::ThreadSafe)
4933 blocked_pushes : 0
5034 }
5135 @accepting = true
52- rescue StandardError
53- @subscribers = Hash . new { |hash , key | hash [ key ] = [ ] }
54- @queue = Queue . new
55- @mutex = Mutex . new
56- @logger = logger
57- @max_queue = DEFAULT_MAX_QUEUE
58- @stats = { emitted : 0 , dispatched : 0 , dropped : 0 , blocked_pushes : 0 }
59- @accepting = true
60- end
61-
62- def stats
63- @mutex . synchronize do
64- @stats . merge (
65- subscribers : @subscribers . transform_values ( &:size ) ,
66- queue_depth : queue_depth ,
67- max_queue : @max_queue ,
68- accepting : @accepting
69- )
70- end
71- rescue StandardError
72- { emitted : 0 , dispatched : 0 , dropped : 0 , blocked_pushes : 0 , subscribers : { } , queue_depth : 0 , max_queue : @max_queue , accepting : false }
7336 end
7437
75- def subscribe ( event_type , &block )
76- return nil unless EVENT_TYPES . include? ( event_type . to_sym ) && block
38+ def on ( event , priority : 50 , &block )
39+ return nil unless block
7740
7841 @mutex . synchronize do
79- @subscribers [ event_type . to_sym ] << block
42+ entries = Array ( @handlers [ event . to_sym ] ) . dup
43+ entries << { priority : priority . to_i , handler : block }
44+ entries . sort_by! { |entry | entry [ :priority ] }
45+ @handlers [ event . to_sym ] = entries
8046 end
8147 true
82- rescue StandardError
83- nil
8448 end
8549
86- def emit ( event_type , data , dispatch_now : false , non_block : false )
87- return nil unless EVENT_TYPES . include? ( event_type . to_sym )
50+ def subscribe ( event , priority : 50 , &block )
51+ on ( event , priority : priority , &block )
52+ end
53+
54+ def emit ( event , payload = { } , dispatch_now : false , non_block : false )
8855 return nil unless accepting?
8956
90- event = {
91- type : event_type . to_sym ,
92- data : data ,
93- timestamp : Time . now . iso8601
57+ entry = {
58+ type : event . to_sym ,
59+ data : payload ,
60+ timestamp : Time . now . utc . iso8601
9461 }
9562 increment_stat ( :emitted )
63+
9664 if dispatch_now
97- dispatched = dispatch ( event )
98- dispatched ? event : nil
65+ dispatch ( entry )
66+ entry
9967 else
100- queued = queue_event ( event , non_block : non_block )
101- queued ? event : nil
68+ queued = queue_event ( entry , non_block : non_block )
69+ queued ? entry : nil
10270 end
103- rescue StandardError
104- nil
10571 end
10672
10773 def process_all
@@ -111,49 +77,57 @@ def process_all
11177 end
11278 rescue ThreadError
11379 true
114- rescue StandardError
115- nil
11680 end
11781
11882 def drain_async ( workers : 10 )
119- count = [ workers . to_i , 1 ] . max
120- Array . new ( count ) do
83+ Array . new ( [ workers . to_i , 1 ] . max ) do
12184 Thread . new do
12285 loop do
12386 event = @queue . pop
12487 break if event . nil?
12588
12689 dispatch ( event )
127- rescue StandardError
128- nil
12990 end
13091 end
13192 end
132- rescue StandardError
133- [ ]
13493 end
13594
13695 def stop ( workers :)
13796 @mutex . synchronize { @accepting = false }
13897 [ workers . to_i , 1 ] . max . times { @queue << nil }
13998 true
140- rescue StandardError
141- nil
99+ end
100+
101+ def handler_count ( event )
102+ Array ( @handlers [ event . to_sym ] ) . size
103+ end
104+
105+ def stats
106+ @mutex . synchronize do
107+ {
108+ emitted : @stats [ :emitted ] ,
109+ dispatched : @stats [ :dispatched ] ,
110+ dropped : @stats [ :dropped ] ,
111+ blocked_pushes : @stats [ :blocked_pushes ] ,
112+ subscribers : @handlers . each_pair . each_with_object ( { } ) { |( event , entries ) , memo | memo [ event ] = entries . size } ,
113+ queue_depth : queue_depth ,
114+ max_queue : @max_queue ,
115+ accepting : @accepting
116+ }
117+ end
142118 end
143119
144120 private
145121
146122 def dispatch ( event )
147- handlers = @mutex . synchronize { @subscribers [ event [ :type ] ] . dup }
148- handlers . each do |handler |
149- handler . call ( event [ :data ] )
150- rescue StandardError
151- nil
123+ handlers = Array ( @handlers [ event [ :type ] ] ) . dup
124+ handlers . each do |entry |
125+ entry [ : handler] . call ( event [ :data ] )
126+ rescue ASRFacet :: Error => e
127+ @logger &. print_warning ( "Event handler error for #{ event [ :type ] } : #{ e . message } " )
152128 end
153129 increment_stat ( :dispatched )
154130 true
155- rescue StandardError
156- nil
157131 end
158132
159133 def queue_event ( event , non_block : false )
@@ -167,35 +141,25 @@ def queue_event(event, non_block: false)
167141 rescue ThreadError
168142 increment_stat ( :dropped )
169143 @logger &.print_warning ( "Event bus queue is full; dropping #{ event [ :type ] } event." )
170- nil
171- rescue StandardError
172- nil
144+ false
173145 end
174146
175147 def increment_stat ( key )
176- @mutex . synchronize { @stats [ key ] = @stats [ key ] . to_i + 1 }
177- rescue StandardError
178- nil
148+ @mutex . synchronize do
149+ @stats [ key ] = @stats [ key ] . to_i + 1
150+ end
179151 end
180152
181153 def queue_depth
182154 @queue . respond_to? ( :length ) ? @queue . length : @queue . size
183- rescue StandardError
184- 0
185155 end
186156
187157 def queue_full?
188- return false unless @queue . is_a? ( SizedQueue )
189-
190158 queue_depth >= @max_queue
191- rescue StandardError
192- false
193159 end
194160
195161 def accepting?
196162 @mutex . synchronize { @accepting }
197- rescue StandardError
198- false
199163 end
200164 end
201165end
0 commit comments