Skip to content

Commit f14a73b

Browse files
committed
move receiver thread to video_rxtx
sender thread is dispatched here as well so to be consistent Moreover the thread init is at multiple places so it cannot be forgotten.
1 parent 9c3fc10 commit f14a73b

5 files changed

Lines changed: 36 additions & 50 deletions

File tree

src/hd-rum-translator/hd-rum-decompress.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ struct state_transcoder_decompress final : public frame_recv_delegate {
8888

8989
struct display *display = nullptr;
9090
struct control_state *control = nullptr;
91-
thread receiver_thread;
9291

9392
void frame_arrived(struct video_frame *f, struct audio_frame *a) override;
9493

@@ -228,7 +227,6 @@ void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf co
228227
assert(s->video_rxtx);
229228

230229
s->worker_thread = thread(&state_transcoder_decompress::worker, s);
231-
s->receiver_thread = thread(&video_rxtx::receiver_thread, s->video_rxtx);
232230
display_run_new_thread(s->display);
233231

234232
if (capture_filter_init(parent, capture_filter, &s->capture_filter_state) != 0) {
@@ -246,8 +244,6 @@ void *hd_rum_decompress_init(struct module *parent, struct hd_rum_output_conf co
246244
void hd_rum_decompress_done(void *state) {
247245
auto *s = static_cast<state_transcoder_decompress *>(state);
248246

249-
s->receiver_thread.join();
250-
251247
{
252248
unique_lock<mutex> l(s->lock);
253249
s->received_frame.push({});

src/main.cpp

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,7 +1335,6 @@ int main(int argc, char *argv[])
13351335
#endif
13361336
ug_options opt{};
13371337

1338-
pthread_t receiver_thread_id = PTHREAD_NULL;
13391338
pthread_t capture_thread_id = PTHREAD_NULL;
13401339

13411340
unsigned display_flags = 0;
@@ -1500,24 +1499,6 @@ int main(int argc, char *argv[])
15001499
goto cleanup;
15011500
}
15021501

1503-
if ((opt.video.rxtx_mode & MODE_RECEIVER) != 0U) {
1504-
if (!uv.state_video_rxtx->supports_receiving()) {
1505-
fprintf(
1506-
stderr,
1507-
"Selected RX/TX mode doesn't support receiving.\n");
1508-
exit_uv(EXIT_FAILURE);
1509-
goto cleanup;
1510-
}
1511-
// init module here so as it is capable of receiving messages
1512-
if (pthread_create(&receiver_thread_id, NULL,
1513-
video_rxtx::receiver_thread,
1514-
(void *) uv.state_video_rxtx) != 0) {
1515-
perror("Unable to create display thread!\n");
1516-
exit_uv(EXIT_FAILURE);
1517-
goto cleanup;
1518-
}
1519-
}
1520-
15211502
if ((opt.video.rxtx_mode & MODE_SENDER) != 0U) {
15221503
if (pthread_create(&capture_thread_id, NULL, capture_thread,
15231504
(void *) &uv) != 0) {
@@ -1541,10 +1522,6 @@ int main(int argc, char *argv[])
15411522
display_run_mainloop(uv.display_device);
15421523

15431524
cleanup:
1544-
if (!pthread_equal(receiver_thread_id, PTHREAD_NULL)) {
1545-
pthread_join(receiver_thread_id, NULL);
1546-
}
1547-
15481525
if (!pthread_equal(capture_thread_id, PTHREAD_NULL)) {
15491526
pthread_join(capture_thread_id, NULL);
15501527
}

src/video_capture/ug_input.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ struct ug_input_state final : public frame_recv_delegate {
7979
struct display *display = nullptr;
8080

8181
void frame_arrived(struct video_frame *f, struct audio_frame *a) override;
82-
thread receiver_thread;
8382
unique_ptr<struct video_rxtx> video_rxtx;
8483
struct state_audio *audio = nullptr;
8584

@@ -204,7 +203,6 @@ static int vidcap_ug_input_init(const struct vidcap_params *cap_params, void **s
204203
}
205204
s->t0 = steady_clock::now();
206205

207-
s->receiver_thread = thread(&video_rxtx::receiver_thread, s->video_rxtx.get());
208206
display_run_new_thread(s->display);
209207

210208
*state = s;
@@ -216,7 +214,6 @@ static void vidcap_ug_input_done(void *state)
216214
auto s = (ug_input_state *) state;
217215

218216
audio_join(s->audio);
219-
s->receiver_thread.join();
220217

221218
display_put_frame(s->display, NULL, 0);
222219
display_join(s->display);

src/video_rxtx.cpp

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,12 @@ video_rxtx::video_rxtx(const char *protocol_name,
119119

120120
video_rxtx::~video_rxtx() noexcept
121121
{
122+
if (!pthread_equal(m_receiver_thread_id, PTHREAD_NULL)) {
123+
pthread_join(m_receiver_thread_id, nullptr);
124+
}
125+
122126
join();
123-
if (!m_poisoned && m_compression) {
127+
if (!m_sender_poisoned && m_compression != nullptr) {
124128
send(NULL);
125129
compress_pop(m_compression);
126130
}
@@ -135,15 +139,20 @@ video_rxtx::~video_rxtx() noexcept
135139
void
136140
video_rxtx::join() noexcept
137141
{
138-
if (m_joined) {
142+
if (!pthread_equal(m_receiver_thread_id, PTHREAD_NULL)) {
143+
pthread_join(m_receiver_thread_id, nullptr);
144+
m_receiver_thread_id = PTHREAD_NULL;
145+
}
146+
147+
if (m_sender_joined) {
139148
return;
140149
}
141150
send(NULL); // pass poisoned pill
142-
pthread_join(m_thread_id, NULL);
151+
pthread_join(m_sender_thread_id, NULL);
143152
if (m_impl_funcs != nullptr && m_impl_funcs->join_sender != nullptr) {
144153
m_impl_funcs->join_sender(m_impl_state);
145154
}
146-
m_joined = true;
155+
m_sender_joined = true;
147156
}
148157

149158
const char *
@@ -162,11 +171,11 @@ video_rxtx::get_long_name(string const &short_name) noexcept
162171
void
163172
video_rxtx::send(shared_ptr<video_frame> frame) noexcept
164173
{
165-
if (!frame && m_poisoned) {
174+
if (!frame && m_sender_poisoned) {
166175
return;
167176
}
168177
if (!frame) {
169-
m_poisoned = true;
178+
m_sender_poisoned = true;
170179
} else {
171180
m_input_codec = frame->color_spec;
172181
}
@@ -257,10 +266,23 @@ video_rxtx::create(string const &proto, const struct vrxtx_params *params,
257266
return nullptr;
258267
}
259268

260-
int rc = pthread_create(&ret->m_thread_id, nullptr, video_rxtx::sender_thread,
269+
if ((params->rxtx_mode & MODE_RECEIVER) != 0U) {
270+
if (ret->m_impl_funcs->receiver_routine == nullptr) {
271+
MSG(ERROR,
272+
"Selected RX/TX mode doesn't support receiving.\n");
273+
delete ret;
274+
return nullptr;
275+
}
276+
int rc = pthread_create(&ret->m_receiver_thread_id, nullptr,
277+
ret->m_impl_funcs->receiver_routine,
278+
ret->m_impl_state);
279+
assert(rc == 0);
280+
}
281+
282+
int rc = pthread_create(&ret->m_sender_thread_id, nullptr, video_rxtx::sender_thread,
261283
(void *) ret);
262284
assert(rc == 0);
263-
ret->m_joined = false;
285+
ret->m_sender_joined = false;
264286

265287
return ret;
266288
}

src/video_rxtx.hpp

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ struct video_rxtx_info {
102102
void *(*create)(const struct vrxtx_params *params,
103103
const struct common_opts *common);
104104
void (*done)(void *state);
105+
/// this may be set optional if we had some receive-only mod
105106
void (*send_frame)(void *state, std::shared_ptr<video_frame>);
106107

107108
// following callbacks are optional
@@ -118,15 +119,6 @@ struct video_rxtx {
118119
virtual ~video_rxtx() noexcept;
119120
void send(std::shared_ptr<struct video_frame>) noexcept;
120121
static const char *get_long_name(std::string const &short_name) noexcept;
121-
static void *receiver_thread(void *arg) noexcept
122-
{
123-
video_rxtx *rxtx = static_cast<video_rxtx *>(arg);
124-
return rxtx->m_impl_funcs->receiver_routine(rxtx->m_impl_state);
125-
}
126-
bool supports_receiving() noexcept
127-
{
128-
return m_impl_funcs->receiver_routine != nullptr;
129-
}
130122
/**
131123
* If overridden, children must call also video_rxtx::join()
132124
*/
@@ -160,12 +152,14 @@ struct video_rxtx {
160152
struct compress_state *m_compression = nullptr;
161153
pthread_mutex_t m_lock;
162154

163-
pthread_t m_thread_id = PTHREAD_NULL;
164-
bool m_poisoned = false;
165-
bool m_joined = true;
155+
pthread_t m_sender_thread_id = PTHREAD_NULL;
156+
bool m_sender_poisoned = false;
157+
bool m_sender_joined = true;
158+
pthread_t m_receiver_thread_id = PTHREAD_NULL;
166159

167160
video_desc m_video_desc{};
168161
std::atomic<codec_t> m_input_codec{};
162+
169163
};
170164

171165
const char *vrxtx_get_compression(const char *video_protocol,

0 commit comments

Comments
 (0)