@@ -30,47 +30,90 @@ struct stream_queue_t {
3030 static constexpr int DefaultNumComputeStreams = CS ;
3131 static constexpr int DefaultNumTransferStreams = TS ;
3232
33+ // Mutex to guard modifications to the ComputeStreams vector, and
34+ // NumComputeStreams.
35+ std::mutex ComputeStreamMutex;
3336 std::vector<native_type> ComputeStreams;
37+ // Number of compute streams that have been created
38+ unsigned int NumComputeStreams{0 };
39+
40+ // Mutex to guard modifications to the TransferStreams vector, and
41+ // NumTransferStreams.
42+ std::mutex TransferStreamMutex;
3443 std::vector<native_type> TransferStreams;
44+ // Number of transfer streams that have been created
45+ unsigned int NumTransferStreams{0 };
46+
47+ // The stream indices are incremented every time we return a stream. This
48+ // means that they encode both the index of the next stream in the round
49+ // robin, as well as which iteration of the round robin we're on. Dividing
50+ // the stream index by the size of the associated stream vector will give the
51+ // number of round robins we've done as quotient, and the index of the next
52+ // stream to use as remainder.
53+ std::atomic_uint32_t ComputeStreamIndex{0 };
54+ std::atomic_uint32_t TransferStreamIndex{0 };
55+
56+ // The LastSync indices keep track of the index based on ComputeStreamIndex
57+ // or TransferStreamIndex of the last stream that was synchronized during a
58+ // syncStreams operation.
59+ unsigned int LastSyncComputeStreams{0 };
60+ unsigned int LastSyncTransferStreams{0 };
61+
3562 // Stream used for recording EvQueue, which holds information about when the
3663 // command in question is enqueued on host, as opposed to started. It is
3764 // created only if profiling is enabled - either for queue or per event.
3865 native_type HostSubmitTimeStream{0 };
66+ // Flag to keep track of the creation og HostSubmitTimeStream, it is created
67+ // either in the queue constructor when profiling is enabled or whenever it
68+ // is requested for the first time through timestamp entry points.
3969 std::once_flag HostSubmitTimeStreamFlag;
40- // delay_compute_ keeps track of which streams have been recently reused and
70+
71+ // DelayCompute keeps track of which streams have been recently reused and
4172 // their next use should be delayed. If a stream has been recently reused it
4273 // will be skipped the next time it would be selected round-robin style. When
4374 // skipped, its delay flag is cleared.
4475 std::vector<bool > DelayCompute;
45- // keep track of which streams have applied barrier
76+
77+ // ComputeStreamSyncMutex is used to guard compute streams when they are
78+ // being re-used.
79+ //
80+ // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be
81+ // locked at the same time, ComputeStreamSyncMutex should be locked first
82+ // to avoid deadlocks.
83+ std::mutex ComputeStreamSyncMutex;
84+
85+ // Guards barrier insertion in urEnqueueEventsWaitWithBarrier.
86+ std::mutex BarrierMutex;
87+ BarrierEventT BarrierEvent = nullptr ;
88+ BarrierEventT BarrierTmpEvent = nullptr ;
89+
90+ // Keep track of which streams have applied barrier.
4691 std::vector<bool > ComputeAppliedBarrier;
4792 std::vector<bool > TransferAppliedBarrier;
48- ur_context_handle_t_ *Context;
49- ur_device_handle_t_ *Device;
93+
94+ ur_context_handle_t Context;
95+ ur_device_handle_t Device;
96+
97+ // Reference count for the queue object.
5098 ur::RefCount RefCount;
99+
100+ // Event count used to give events an ordering used in the event class
101+ // forLatestEvents.
51102 std::atomic_uint32_t EventCount{0 };
52- std::atomic_uint32_t ComputeStreamIndex{0 };
53- std::atomic_uint32_t TransferStreamIndex{0 };
54- unsigned int NumComputeStreams{0 };
55- unsigned int NumTransferStreams{0 };
56- unsigned int LastSyncComputeStreams{0 };
57- unsigned int LastSyncTransferStreams{0 };
103+
104+ // Queue flags in the native API format as well as UR format.
58105 unsigned int Flags;
59106 ur_queue_flags_t URFlags;
107+
108+ // Priority of this queue, matches underlying API priority.
60109 int Priority;
61- // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be
62- // locked at the same time, ComputeStreamSyncMutex should be locked first
63- // to avoid deadlocks
64- std::mutex ComputeStreamSyncMutex;
65- std::mutex ComputeStreamMutex;
66- std::mutex TransferStreamMutex;
67- std::mutex BarrierMutex;
110+
111+ // Tracks if the queue owns the underlying native streams, this may happen
112+ // for queues created from interop.
68113 bool HasOwnership;
69- BarrierEventT BarrierEvent = nullptr ;
70- BarrierEventT BarrierTmpEvent = nullptr ;
71114
72- stream_queue_t (bool IsOutOfOrder, ur_context_handle_t_ * Context,
73- ur_device_handle_t_ * Device, unsigned int Flags,
115+ stream_queue_t (bool IsOutOfOrder, ur_context_handle_t Context,
116+ ur_device_handle_t Device, unsigned int Flags,
74117 ur_queue_flags_t URFlags, int Priority)
75118 : ComputeStreams(IsOutOfOrder ? DefaultNumComputeStreams : 1 ),
76119 TransferStreams (IsOutOfOrder ? DefaultNumTransferStreams : 0 ),
@@ -87,16 +130,16 @@ struct stream_queue_t {
87130 }
88131 }
89132
90- // Create a queue from a native handle
91- stream_queue_t (native_type stream, ur_context_handle_t_ * Context,
92- ur_device_handle_t_ * Device, unsigned int Flags,
133+ // Create a queue from a native handle.
134+ stream_queue_t (native_type stream, ur_context_handle_t Context,
135+ ur_device_handle_t Device, unsigned int Flags,
93136 ur_queue_flags_t URFlags, bool BackendOwns)
94- : ComputeStreams(1 , stream), TransferStreams(0 ),
137+ : ComputeStreams(1 , stream), NumComputeStreams{ 1 }, TransferStreams(0 ),
95138 DelayCompute (this ->ComputeStreams.size(), false ),
96139 ComputeAppliedBarrier (this ->ComputeStreams.size()),
97140 TransferAppliedBarrier (this ->TransferStreams.size()), Context{Context},
98- Device{Device}, NumComputeStreams{ 1 }, Flags(Flags), URFlags(URFlags),
99- Priority ( 0 ), HasOwnership{BackendOwns} {
141+ Device{Device}, Flags(Flags), URFlags(URFlags), Priority( 0 ),
142+ HasOwnership{BackendOwns} {
100143 urContextRetain (Context);
101144
102145 // Create timing stream if profiling is enabled.
@@ -107,6 +150,7 @@ struct stream_queue_t {
107150
108151 ~stream_queue_t () { urContextRelease (Context); }
109152
153+ // Methods defined by the specific adapters.
110154 void computeStreamWaitForBarrierIfNeeded (native_type Strean,
111155 uint32_t StreamI);
112156 void transferStreamWaitForBarrierIfNeeded (native_type Stream,
@@ -206,9 +250,6 @@ struct stream_queue_t {
206250 return Result;
207251 }
208252
209- native_type get () { return getNextComputeStream (); };
210- ur_device_handle_t getDevice () const noexcept { return Device; };
211-
212253 native_type getHostSubmitTimeStream () { return HostSubmitTimeStream; }
213254
214255 bool hasBeenSynchronized (uint32_t StreamToken) {
@@ -345,7 +386,8 @@ struct stream_queue_t {
345386 }
346387 }
347388
348- ur_context_handle_t_ *getContext () const { return Context; };
389+ ur_device_handle_t getDevice () const noexcept { return Device; };
390+ ur_context_handle_t getContext () const noexcept { return Context; };
349391
350392 uint32_t getNextEventId () noexcept { return ++EventCount; }
351393
0 commit comments