Skip to content

Commit 414fe38

Browse files
committed
test reliability, shutdown race fixes and continued work on monitoring
1 parent 2fb5bdd commit 414fe38

15 files changed

Lines changed: 210 additions & 122 deletions

.github/workflows/build-test.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
name: build and test
22

3-
on: [push]
3+
on:
4+
push:
5+
schedule:
6+
- cron: '*/10 * * * *'
47

58

69
jobs:
@@ -24,7 +27,7 @@ jobs:
2427
run: |
2528
PWD=$(pwd)
2629
sudo apt-get install valgrind
27-
valgrind --leak-check=full ./tests/${{ matrix.release }}/tests $PWD/src/server/${{ matrix.release }}/scache $PWD/testcases/
30+
valgrind --leak-check=full --show-leak-kinds=all ./tests/${{ matrix.release }}/tests $PWD/src/server/${{ matrix.release }}/scache $PWD/testcases/
2831
- name: Spam test
2932
run: |
3033
echo "todo"
@@ -49,4 +52,4 @@ jobs:
4952
5053
query &
5154
52-
timeout 2m valgrind --leak-check=full --show-leak-kinds=all src/server/${{ matrix.release }}/scache -B 127.0.0.1:8082
55+
timeout --preserve-status 2m valgrind --leak-check=full --show-leak-kinds=all src/server/${{ matrix.release }}/scache -B 127.0.0.1:8082

src/core/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ error:
3434
$(error Invalid configuration, please check your inputs)
3535
endif
3636

37-
SOURCEFILES := connection.cpp db.cpp hash.cpp http.cpp http_parse.cpp http_parse_cache.cpp http_parse_mon.c http_respond.cpp read_buffer.cpp settings.cpp timer.cpp
37+
SOURCEFILES := connection.cpp db.cpp hash.cpp http.cpp http_parse.cpp http_parse_cache.cpp http_parse_mon.c http_respond.cpp read_buffer.cpp settings.cpp timer.cpp signal_handle.cpp
3838
EXTERNAL_LIBS :=
3939
EXTERNAL_LIBS_COPIED := $(foreach lib, $(EXTERNAL_LIBS),$(BINARYDIR)/$(notdir $(lib)))
4040

src/core/connection.cpp

Lines changed: 93 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ struct scache_connection_node ctable[CONNECTION_HASH_ENTRIES] = { 0 };
3939
int epfd;
4040
volatile sig_atomic_t stop_soon = 0;
4141

42+
bool connection_stop_soon(){
43+
return stop_soon != 0;
44+
}
4245

4346
struct connection_thread_arg
4447
{
@@ -128,6 +131,8 @@ static int connection_non_blocking(int fd)
128131

129132
void connection_close_listeners() {
130133
int fd;
134+
135+
WARN("Closing %d listeners", scache_listeners.listener_count);
131136

132137
for (uint32_t i = 0; i < scache_listeners.listener_count; i++)
133138
{
@@ -242,23 +247,39 @@ int connection_open_listener(struct scache_bind ibind) {
242247
return -1;
243248
}
244249

245-
static scache_connection* connection_add(int fd) {
250+
static scache_connection* connection_add(int fd, listener_type client_type) {
246251
scache_connection_node* node = &ctable[CONNECTION_HASH_KEY(fd)];
252+
scache_connection_node* newNode = NULL;
253+
247254
if (node->connection.client_sock != -1) {
248255
while (node->next != NULL) {
249256
assert(node->connection.client_sock != -1);
250257
node = node->next;
251258
}
252259

253-
scache_connection_node* newNode = (scache_connection_node*)malloc(sizeof(scache_connection_node));
254-
node->next = newNode;
255-
node = newNode;
260+
newNode = (scache_connection_node*)malloc(sizeof(scache_connection_node));
261+
}else{
262+
newNode = node;
256263
}
257264

258265
// Initialize connection
259-
memset(node, 0, sizeof(node)); /* .connection = {}, .next = NULL */
260-
rbuf_init(&(node->connection.input));
261-
node->connection.client_sock = fd;
266+
memset(newNode, 0, sizeof(*newNode)); /* .connection = {}, .next = NULL */
267+
268+
// safe to set first
269+
newNode->connection.ltype = client_type;
270+
if(client_type == cache_listener){
271+
newNode->connection.cache.target.key.fd = -1;
272+
}
273+
rbuf_init(&(newNode->connection.input));
274+
275+
// do last as marks connection slot as used
276+
newNode->connection.client_sock = fd;
277+
278+
// this is a chained connection
279+
if(node != newNode){
280+
node->next = newNode;
281+
node = newNode;
282+
}
262283

263284
return &(node->connection);
264285
}
@@ -383,6 +404,7 @@ static void* connection_handle_accept(void *arg)
383404
goto end;
384405
}
385406
} while(nfds == 0 && !stop_soon);
407+
assert(nfds >= 0);
386408
int n = 0;
387409
int state = 1;
388410
while (n < nfds) {
@@ -394,67 +416,63 @@ static void* connection_handle_accept(void *arg)
394416
else if (events[n].events & EPOLLIN)
395417
{
396418
DEBUG("[#] Accepting connection from fd %d of type %s\n", fd, listener_type_string(our_type));
397-
int client_sock;
398-
399-
do
400-
{
401-
client_sock = accept(fd, NULL, NULL);
419+
int client_sock = accept(fd, NULL, NULL);
402420

403-
if (client_sock < 0) {
404-
if (errno == EAGAIN || errno == EWOULDBLOCK)
405-
{
406-
break;
407-
}
421+
if (client_sock < 0) {
422+
if (errno != EAGAIN && errno != EWOULDBLOCK)
423+
{
408424
WARN("[#] accept() failed on fd %d of type %s. Error: %s", fd, listener_type_string(our_type), strerror(errno));
409-
break;
410-
}
411-
else {
412-
DEBUG("[#] Accepted connection %d on fd %d of type %s\n", client_sock, fd, listener_type_string(our_type));
413-
414-
//Connection will be non-blocking
415-
if (connection_non_blocking(client_sock) < 0)
416-
PFATAL("Setting connection to non blocking failed on fd %d of type %s.", fd, listener_type_string(our_type));
417-
418-
//Enable TCP CORK
419-
setsockopt(client_sock, IPPROTO_TCP, TCP_CORK, &state, sizeof(state));
420-
421-
connections_queued* q = (connections_queued*)malloc(sizeof(connections_queued));
422-
if(q == NULL){
423-
close(client_sock);
424-
WARN("[#] failed to allocate memory for connection. Abandoning incoming connection.");
425-
continue;
426-
}
427-
q->client_sock = client_sock;
428-
q->client_type = our_type;
429-
q->next = NULL;
430-
431-
// Insert connection into queue
432-
pthread_mutex_lock(&cq_lock);
433-
if (cq_tail == NULL)
434-
{
435-
assert(cq_head == NULL);
436-
cq_tail = q;
437-
cq_head = q;
438-
}
439-
else
440-
{
441-
cq_tail->next = q;
442-
cq_tail = q;
443-
}
444-
pthread_mutex_unlock(&cq_lock);
445-
446-
//Write a signal
447-
int res;
448-
do {
449-
res = write(eventfd, &u, sizeof(u));
450-
if(res == -1){
451-
PFATAL("Unable to write to eventfd");
452-
}
453-
assert(res == sizeof(u));
454-
} while(!res);
455425
}
456426
n++;
457-
} while (!stop_soon);
427+
continue;
428+
}
429+
else {
430+
DEBUG("[#] Accepted connection %d on fd %d of type %s\n", client_sock, fd, listener_type_string(our_type));
431+
432+
//Connection will be non-blocking
433+
if (connection_non_blocking(client_sock) < 0)
434+
PFATAL("Setting connection to non blocking failed on fd %d of type %s.", fd, listener_type_string(our_type));
435+
436+
//Enable TCP CORK
437+
setsockopt(client_sock, IPPROTO_TCP, TCP_CORK, &state, sizeof(state));
438+
439+
connections_queued* q = (connections_queued*)malloc(sizeof(connections_queued));
440+
if(q == NULL){
441+
close(client_sock);
442+
n++;
443+
WARN("[#] failed to allocate memory for connection. Abandoning incoming connection.");
444+
continue;
445+
}
446+
q->client_sock = client_sock;
447+
q->client_type = our_type;
448+
q->next = NULL;
449+
450+
// Insert connection into queue
451+
pthread_mutex_lock(&cq_lock);
452+
if (cq_tail == NULL)
453+
{
454+
assert(cq_head == NULL);
455+
cq_tail = q;
456+
cq_head = q;
457+
}
458+
else
459+
{
460+
cq_tail->next = q;
461+
cq_tail = q;
462+
}
463+
pthread_mutex_unlock(&cq_lock);
464+
465+
//Write a signal
466+
int res;
467+
do {
468+
res = write(eventfd, &u, sizeof(u));
469+
if(res == -1){
470+
PFATAL("Unable to write to eventfd");
471+
}
472+
assert(res == sizeof(u));
473+
} while(!res);
474+
}
475+
n++;
458476
}
459477
}
460478
}
@@ -481,7 +499,7 @@ void connection_event_loop(void (*connection_handler)(scache_connection* connect
481499
}
482500

483501
//Init Acceptor thread
484-
connection_thread_arg* thread_arg = (connection_thread_arg*)malloc(sizeof(connection_thread_arg) * 2) ;
502+
connection_thread_arg thread_arg[2];
485503
efd = eventfd(0, 0);
486504
thread_arg[0].type = cache_listener;
487505
thread_arg[1].type = mon_listener;
@@ -509,7 +527,6 @@ void connection_event_loop(void (*connection_handler)(scache_connection* connect
509527
continue;
510528
}
511529
PFATAL("epoll_wait() failed");
512-
goto end;
513530
}
514531
} while(nfds == 0 && !stop_soon);
515532

@@ -521,7 +538,7 @@ void connection_event_loop(void (*connection_handler)(scache_connection* connect
521538
{
522539
res = read(fd, &u, sizeof(u));
523540

524-
while (true)
541+
while (!stop_soon)
525542
{
526543
//Dequeue
527544
pthread_mutex_lock(&cq_lock);
@@ -549,9 +566,8 @@ void connection_event_loop(void (*connection_handler)(scache_connection* connect
549566

550567
//Handle connection
551568
DEBUG("[#%d] A new %s socket was accepted %d\n", fd, listener_type_string(client_type), client_sock);
552-
scache_connection* connection = connection_add(client_sock);
569+
scache_connection* connection = connection_add(client_sock, client_type);
553570
assert(connection->client_sock == client_sock);
554-
connection->ltype = client_type;
555571
connection_handler(connection);
556572

557573
//Add socket to epoll
@@ -628,12 +644,16 @@ void connection_event_loop(void (*connection_handler)(scache_connection* connect
628644
end:
629645
close(epfd);
630646

631-
pthread_join(tid[0], NULL);
632-
pthread_join(tid[1], NULL);
647+
errno = pthread_join(tid[0], NULL);
648+
if(errno != 0) {
649+
PFATAL("failed to join cache thread");
650+
}
651+
errno = pthread_join(tid[1], NULL);
652+
if(errno != 0) {
653+
PFATAL("failed to join mon thread");
654+
}
633655

634656
close(efd);
635-
636-
free(thread_arg);
637657
}
638658

639659
/*

src/core/connection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ void connection_event_loop(void(*connection_handler)(scache_connection* connecti
1212
void connection_setup(struct scache_binds cache_binds, struct scache_binds cache_monitor);
1313
void connection_cleanup();
1414
bool connection_remove(int fd);
15+
bool connection_stop_soon();
1516

1617
#define _DEBUG_CONNECTION_HANDLER
1718
#ifdef DEBUG_CONNECTION_HANDLER

src/core/connection_structures.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ struct cache_target {
88
struct cache_entry* entry;
99
off64_t position;
1010
off64_t end_position;
11-
int fd;
11+
int fd; // fd can be init to -1 because its not overlapping with table_target
1212
};
1313

1414
struct table_target {

src/core/db.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ LRU
3636
#include "hash.h"
3737
#include "settings.h"
3838
#include "timer.h"
39+
#include "signal_handle.h"
3940

4041
#ifdef DEBUG_BUILD
4142
#include <set>
@@ -734,17 +735,32 @@ bool db_open(const char* path) {
734735

735736
// Calculate the number of blocks in the blockfile
736737
off64_t size = lseek64(db.fd_blockfile, 0L, SEEK_END);
738+
if(size < 0){
739+
PWARN("seek to end failed");
740+
size = 0;
741+
if(ftruncate(db.fd_blockfile, size) == -1){
742+
PFATAL("Failed to completely truncate blockfile %s", db.path_blockfile);
743+
}
744+
}
737745
db.blocks_exist = (uint32_t)(size / BLOCK_LENGTH);
746+
lseek64(db.fd_blockfile, 0L, SEEK_SET);
738747

739748
// Mark all blocks that already exist in the block file as non-allocated
740749
if(will_black){
741750
if(size > (BLOCK_MAX_LOAD * BLOCK_LENGTH)) {
742751
size = BLOCK_MAX_LOAD * BLOCK_LENGTH;
743752
if(ftruncate(db.fd_blockfile, size) == -1){
744-
PFATAL("Failed to truncate blockfile: %s", db.path_blockfile);
753+
PFATAL("Failed to truncate blockfile %s", db.path_blockfile);
745754
}
746755
db.blocks_exist = (uint32_t)(size / BLOCK_LENGTH);
747756
}
757+
if((size % BLOCK_LENGTH) != 0){
758+
WARN("Block file was strange size of %d", size);
759+
760+
if(ftruncate(db.fd_blockfile, (size / BLOCK_LENGTH) * BLOCK_LENGTH) == -1){
761+
PFATAL("Failed to truncate blockfile %s", db.path_blockfile);
762+
}
763+
}
748764
for (off64_t i = 0; i < size; i += BLOCK_LENGTH) {
749765
db_block_free((uint32_t)(i / BLOCK_LENGTH));
750766
}
@@ -1363,6 +1379,7 @@ static pid_t db_index_flush(bool copyOnWrite){
13631379
if(copyOnWrite){
13641380
pid = fork();
13651381
if(pid != 0) return pid; // includes -1
1382+
signal_handler_remove();
13661383
}
13671384

13681385
//Open temporary index file

src/core/http_parse_mon.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,11 @@ void monitoring_destroy(scache_connection* connection){
295295
if(t != NULL){
296296
assert(t->monitoring.prev == connection);
297297
t->monitoring.prev = connection->monitoring.prev;
298-
if(t->monitoring.prev == NULL) {
298+
if(t->monitoring.prev == NULL && mon_head == connection) {
299299
assert(mon_head == connection);
300300
mon_head = t;
301301
}
302-
}else{
302+
}else if(mon_tail == connection){
303303
// Only if we are destroying the tail t will be NULL
304304
assert(connection == mon_tail);
305305
mon_tail = t;
@@ -312,13 +312,12 @@ void monitoring_destroy(scache_connection* connection){
312312
if(t != NULL){
313313
assert(t->monitoring.next == connection);
314314
t->monitoring.next = connection->monitoring.next;
315-
if(t->monitoring.next == NULL) {
315+
if(t->monitoring.next == NULL && mon_tail == connection) {
316316
assert(mon_tail == connection);
317317
mon_tail = t;
318318
}
319-
}else{
319+
}else if(mon_head == connection){
320320
// Only if we are destroying the head will t be null
321-
assert(mon_head == connection);
322321
mon_head = t;
323322
}
324323
}

src/core/http_respond.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,10 @@ state_action http_respond_contentlength(scache_connection* connection) {
6262
}
6363

6464
bool http_register_read(scache_connection* connection) {
65-
int extern stop_soon;
6665

6766
bool res = connection_register_read(connection->client_sock);
6867

69-
if (!stop_soon && rbuf_write_remaining(&connection->input)) {
68+
if (!connection_stop_soon() && rbuf_write_remaining(&connection->input)) {
7069
http_read_handle(connection);
7170
}
7271

0 commit comments

Comments
 (0)