@@ -387,6 +387,7 @@ function _Cluster.new(opts)
387387 or require (' resty.cassandra.policies.reconnection.exp' ).new (1000 , 60000 ),
388388 retry_policy = opts .retry_policy
389389 or require (' resty.cassandra.policies.retry.simple' ).new (3 ),
390+ stream_ids = nil ,
390391 }, _Cluster )
391392end
392393
@@ -550,6 +551,15 @@ function _Cluster:refresh()
550551 -- initiate the load balancing policy
551552 self .lb_policy :init (peers )
552553
554+ if self .stream_ids == nil then
555+ -- Initialize the list of available seed ids (only once)
556+ self .stream_ids = {}
557+ local max_id = protocol_version < 3 and 2 ^ 7 - 1 or 2 ^ 15 - 1
558+ for i = 1 ,max_id do
559+ self .stream_ids [i ] = i
560+ end
561+ end
562+
553563 -- cluster is ready to be queried
554564 self .init = true
555565
@@ -748,8 +758,45 @@ local function handle_error(self, err, cql_code, coordinator, request)
748758 return nil , err , cql_code
749759end
750760
761+ local function get_stream_id (self )
762+ local stream_id = 0
763+
764+ local lock = resty_lock :new (self .dict_name , self .lock_opts )
765+ local elapsed , err = lock :lock (' stream_id' )
766+ if not elapsed then return stream_id , ' failed to acquire stream_id lock: ' .. err end
767+
768+ if table .getn (self .stream_ids ) > 0 then
769+ stream_id = table.remove (self .stream_ids )
770+ else
771+ err = ' Too many inflight requests. No new stream ids available.'
772+ end
773+
774+ local ok , err = lock :unlock ()
775+ if not ok then return stream_id , ' failed to unlock: ' .. err end
776+
777+ return stream_id , nil
778+ end
779+
780+ local function release_stream_id (self , stream_id )
781+ local lock = resty_lock :new (self .dict_name , self .lock_opts )
782+ local elapsed , err = lock :lock (' stream_id' )
783+ if not elapsed then return stream_id , ' failed to acquire stream_id lock: ' .. err end
784+
785+ table.insert (self .stream_ids , 1 , stream_id )
786+
787+ local ok , err = lock :unlock ()
788+ if not ok then return stream_id , ' failed to unlock: ' .. err end
789+ end
790+
751791send_request = function (self , coordinator , request )
792+ local err
793+ request .opts .stream_id , err = get_stream_id (self )
794+ if err ~= nil and self .logging then
795+ log (WARN , _log_prefix , err )
796+ end
797+
752798 local res , err , cql_code = coordinator :send (request )
799+ release_stream_id (self , request .opts .stream_id )
753800 if not res then
754801 return handle_error (self , err , cql_code , coordinator , request )
755802 elseif res .warnings and self .logging then
0 commit comments