Skip to content

Commit 0b4ffeb

Browse files
authored
Merge pull request #613 from llvilanova/fix-unix-socket
drivers/unix_socket: Let clients to freely connect/disconnect
2 parents 425ce73 + 8cd0de6 commit 0b4ffeb

3 files changed

Lines changed: 117 additions & 78 deletions

File tree

bessctl/conf/testing/module_tests/vlan.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ def gen_untagged_packet():
8181
'output_port': default_gate,
8282
'input_packet': q,
8383
'output_packet': q})
84-
return [VLANSplit(), 1, 150, expected]
84+
return [VLANSplit(), 1, 30, expected]
8585

86-
OUTPUT_TEST_INPUTS.append(output_test([1, 100, 77, -1, 149, 50, 100, -1]))
87-
OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70]))
88-
OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70], True))
86+
OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7]))
87+
OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7], True))

core/drivers/unix_socket.cc

Lines changed: 94 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@
2828
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
2929
// POSSIBILITY OF SUCH DAMAGE.
3030

31+
32+
#include <glog/logging.h>
33+
#include <poll.h>
34+
#include <signal.h>
35+
36+
#include <cerrno>
37+
#include <cstring>
38+
3139
#include "unix_socket.h"
3240

3341
// TODO(barath): Clarify these comments.
@@ -38,53 +46,64 @@
3846
// TODO: Revise this once the interrupt mode is implemented.
3947

4048
#define RECV_SKIP_TICKS 256
41-
#define MAX_TX_FRAGS 8
49+
#define SIG_THREAD_EXIT SIGUSR2
4250

43-
void UnixSocketPort::AcceptNewClient() {
44-
int ret;
4551

46-
for (;;) {
47-
ret = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK);
48-
if (ret >= 0) {
49-
break;
50-
}
52+
void UnixSocketPort::AcceptThread() {
53+
sigset_t sigset;
54+
sigfillset(&sigset);
55+
sigdelset(&sigset, SIG_THREAD_EXIT);
5156

52-
if (errno != EINTR) {
53-
PLOG(ERROR) << "[UnixSocket]:accept4()";
54-
}
55-
}
57+
struct pollfd fds[2];
58+
memset(fds, 0, sizeof(fds));
59+
fds[0].fd = listen_fd_;
60+
fds[0].events = POLLIN;
61+
fds[1].events = POLLRDHUP;
5662

57-
recv_skip_cnt_ = 0;
63+
while (true) {
64+
// negative FDs are ignored by ppoll()
65+
fds[1].fd = client_fd_;
66+
int res = ppoll(fds, 2, nullptr, &sigset);
5867

59-
if (old_client_fd_ != kNotConnectedFd) {
60-
// Reuse the old file descriptor number by atomically exchanging the new fd
61-
// with the
62-
// old one. The zombie socket is closed silently (see dup2).
63-
dup2(ret, client_fd_);
64-
close(ret);
65-
} else {
66-
client_fd_ = ret;
67-
}
68-
}
68+
if (accept_thread_stop_req_) {
69+
return;
70+
71+
} else if (res < 0) {
72+
if (errno == EINTR) {
73+
continue;
74+
} else {
75+
PLOG(ERROR) << "ppoll()";
76+
}
6977

70-
// This accept thread terminates once a new client is connected.
71-
void *AcceptThreadMain(void *arg) {
72-
UnixSocketPort *p = reinterpret_cast<UnixSocketPort *>(arg);
73-
p->AcceptNewClient();
74-
return nullptr;
78+
} else if (fds[0].revents & POLLIN) {
79+
// new client connected
80+
int fd;
81+
while (true) {
82+
fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK);
83+
if (fd >= 0 || errno != EINTR) {
84+
break;
85+
}
86+
}
87+
if (fd < 0) {
88+
PLOG(ERROR) << "accept4()";
89+
} else if (client_fd_ != kNotConnectedFd) {
90+
LOG(WARNING) << "Ignoring additional client\n";
91+
close(fd);
92+
} else {
93+
client_fd_ = fd;
94+
}
95+
96+
} else if (fds[1].revents & (POLLRDHUP | POLLHUP)) {
97+
// connection dropped by client
98+
int fd = client_fd_;
99+
client_fd_ = kNotConnectedFd;
100+
close(fd);
101+
}
102+
}
75103
}
76104

77-
// The file descriptor for the connection will not be closed, until we have a
78-
// new client.
79-
// This is to avoid race condition in TX process.
80-
void UnixSocketPort::CloseConnection() {
81-
// Keep client_fd, since it may be being used in unix_send_pkts().
82-
old_client_fd_ = client_fd_;
83-
client_fd_ = kNotConnectedFd;
84-
85-
// Relaunch the accept thread.
86-
std::thread accept_thread(AcceptThreadMain, reinterpret_cast<void *>(this));
87-
accept_thread.detach();
105+
static void AcceptThreadHandler(int) {
106+
// empty handler, we only care about blocking syscalls being interrupted
88107
}
89108

90109
CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {
@@ -96,15 +115,13 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {
96115

97116
int ret;
98117

99-
client_fd_ = kNotConnectedFd;
100-
old_client_fd_ = kNotConnectedFd;
101-
102118
if (num_txq > 1 || num_rxq > 1) {
103119
return CommandFailure(EINVAL, "Cannot have more than 1 queue per RX/TX");
104120
}
105121

106122
listen_fd_ = socket(AF_UNIX, SOCK_SEQPACKET, 0);
107123
if (listen_fd_ < 0) {
124+
DeInit();
108125
return CommandFailure(errno, "socket(AF_UNIX) failed");
109126
}
110127

@@ -130,32 +147,54 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {
130147

131148
ret = bind(listen_fd_, reinterpret_cast<struct sockaddr *>(&addr_), addrlen);
132149
if (ret < 0) {
150+
DeInit();
133151
return CommandFailure(errno, "bind(%s) failed", addr_.sun_path);
134152
}
135153

136154
ret = listen(listen_fd_, 1);
137155
if (ret < 0) {
156+
DeInit();
138157
return CommandFailure(errno, "listen() failed");
139158
}
140159

141-
std::thread accept_thread(AcceptThreadMain, reinterpret_cast<void *>(this));
142-
accept_thread.detach();
160+
161+
struct sigaction sa;
162+
memset(&sa, 0, sizeof(sa));
163+
sa.sa_handler = AcceptThreadHandler;
164+
if (sigaction(SIG_THREAD_EXIT, &sa, NULL) < 0) {
165+
DeInit();
166+
return CommandFailure(errno, "sigaction(SIG_THREAD_EXIT) failed");
167+
}
168+
169+
170+
accept_thread_ = std::thread([this]() {
171+
this->AcceptThread();
172+
});
143173

144174
return CommandSuccess();
145175
}
146176

147177
void UnixSocketPort::DeInit() {
148-
close(listen_fd_);
178+
if (accept_thread_.joinable()) {
179+
accept_thread_stop_req_ = true;
180+
pthread_kill(accept_thread_.native_handle(), SIG_THREAD_EXIT);
181+
accept_thread_.join();
182+
}
149183

150-
if (client_fd_ >= 0) {
184+
if (listen_fd_ != kNotConnectedFd) {
185+
close(listen_fd_);
186+
}
187+
if (client_fd_ != kNotConnectedFd) {
151188
close(client_fd_);
152189
}
153190
}
154191

155192
int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
193+
int client_fd = client_fd_;
194+
156195
DCHECK_EQ(qid, 0);
157196

158-
if (client_fd_ == kNotConnectedFd) {
197+
if (client_fd == kNotConnectedFd) {
159198
return 0;
160199
}
161200

@@ -174,7 +213,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
174213
}
175214

176215
// Datagrams larger than 2KB will be truncated.
177-
ret = recv(client_fd_, pkt->data(), SNBUF_DATA, 0);
216+
ret = recv(client_fd, pkt->data(), SNBUF_DATA, 0);
178217

179218
if (ret > 0) {
180219
pkt->append(ret);
@@ -185,7 +224,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
185224
bess::Packet::Free(pkt);
186225

187226
if (ret < 0) {
188-
if (errno == EAGAIN || errno == EWOULDBLOCK) {
227+
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EBADF) {
189228
break;
190229
}
191230

@@ -195,7 +234,6 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
195234
}
196235

197236
// Connection closed.
198-
CloseConnection();
199237
break;
200238
}
201239

@@ -208,9 +246,14 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
208246

209247
int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) {
210248
int sent = 0;
249+
int client_fd = client_fd_;
211250

212251
DCHECK_EQ(qid, 0);
213252

253+
if (client_fd == kNotConnectedFd) {
254+
return 0;
255+
}
256+
214257
for (int i = 0; i < cnt; i++) {
215258
bess::Packet *pkt = pkts[i];
216259

@@ -229,7 +272,7 @@ int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) {
229272
pkt = pkt->next();
230273
}
231274

232-
ret = sendmsg(client_fd_, &msg, 0);
275+
ret = sendmsg(client_fd, &msg, 0);
233276
if (ret < 0) {
234277
break;
235278
}

core/drivers/unix_socket.h

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,12 @@
3131
#ifndef BESS_DRIVERS_UNIXSOCKET_H_
3232
#define BESS_DRIVERS_UNIXSOCKET_H_
3333

34-
#include <assert.h>
35-
#include <errno.h>
36-
#include <poll.h>
37-
#include <stdio.h>
38-
#include <string.h>
3934
#include <sys/socket.h>
4035
#include <sys/un.h>
41-
#include <thread>
4236
#include <unistd.h>
4337

44-
#include <glog/logging.h>
38+
#include <atomic>
39+
#include <thread>
4540

4641
#include "../message.h"
4742
#include "../port.h"
@@ -55,10 +50,10 @@ class UnixSocketPort final : public Port {
5550
UnixSocketPort()
5651
: Port(),
5752
recv_skip_cnt_(),
58-
listen_fd_(),
53+
accept_thread_stop_req_(false),
54+
listen_fd_(kNotConnectedFd),
5955
addr_(),
60-
client_fd_(),
61-
old_client_fd_() {}
56+
client_fd_(kNotConnectedFd) {}
6257

6358
/*!
6459
* Initialize the port, ie, open the socket.
@@ -107,27 +102,32 @@ class UnixSocketPort final : public Port {
107102
*/
108103
int SendPackets(queue_t qid, bess::Packet **pkts, int cnt) override;
109104

110-
/*!
111-
* Waits for a client to connect to the socket.
112-
*/
113-
void AcceptNewClient();
114-
115105
private:
116106
// Value for a disconnected socket.
117107
static const int kNotConnectedFd = -1;
118108

119-
/*!
120-
* Closes the client connection but does not shut down the listener fd.
121-
*/
122-
void CloseConnection();
123-
124109
/*!
125110
* Calling recv() system call is expensive so we only do it every
126111
* RECV_SKIP_TICKS times -- this counter keeps track of how many ticks its been
127112
* since we last called recv().
128113
* */
129114
uint32_t recv_skip_cnt_;
130115

116+
/*!
117+
* Function for the thread accepting and monitoring clients (accept thread).
118+
*/
119+
void AcceptThread();
120+
121+
/*!
122+
* Accept thread handle.
123+
*/
124+
std::thread accept_thread_;
125+
126+
/*!
127+
* Sent stop request to accept thread.
128+
*/
129+
std::atomic<bool> accept_thread_stop_req_;
130+
131131
/*!
132132
* The listener fd -- listen for new connections here.
133133
*/
@@ -142,9 +142,6 @@ class UnixSocketPort final : public Port {
142142
// volatile.
143143
/* FD for client connection.*/
144144
volatile int client_fd_;
145-
/* If client FD is not connected, what was the fd the last time we were
146-
* connected to a client? */
147-
int old_client_fd_;
148145
};
149146

150147
#endif // BESS_DRIVERS_UNIXSOCKET_H_

0 commit comments

Comments
 (0)