-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfilebuf.h
More file actions
122 lines (102 loc) · 3.34 KB
/
filebuf.h
File metadata and controls
122 lines (102 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
* Copyright (C) “Hostcomm” LLC
* Copyright (C) Evgeniy Buevich
* Contact email: singularity@nic.ru
*/
#ifndef _FILEBUF_H
#define _FILEBUF_H
#include <fcntl.h>
#include <pthread.h>
#ifndef BUFFER_SIZE
#define BUFFER_SIZE 65536
#endif
#include "defines.h"
#include "allocator.h"
// 1 extra byte on operation '+'/'-' for diff, 1 for extra sym after key
#define KEY_BUFFER_SIZE ALIGN_UP(MAX_KEY_SOURCE + 2,CACHE_LINE_SIZE)
#define BUFF_STATE_FILL 0
#define BUFF_STATE_PROCESS 1
#define BUFF_STATE_STOP 2
#define BUFF_STATE_FREE 3
typedef struct FIOBufferStateTg
{
pthread_mutex_t buffer_lock;
pthread_cond_t state_changed;
unsigned state;
} FIOBufferState;
typedef struct FReadBufferTg
{
FIOBufferState state_data;
int size;
CACHE_LINE_PADDING(padding1,sizeof(FIOBufferState) + sizeof(unsigned));
char data[KEY_BUFFER_SIZE + BUFFER_SIZE + CACHE_LINE_SIZE];
} FReadBuffer;
typedef struct FReadBufferSetTg
{
FReadBuffer buffers[2];
pthread_t io_thread;
int fd;
int thread_started;
unsigned no_close;
unsigned mbuf_num; // Number of main thread buffer
int mbuf_pos; // Read position in main buffer
int eof;
FReadBuffer *mbuf;
} FReadBufferSet;
FReadBufferSet *fbr_create(const char *filename);
void fbr_finish(FReadBufferSet *set);
int fbr_first_block(FReadBufferSet *set);
int fbr_next_block(FReadBufferSet *set);
char *fbr_get_key_ref(FReadBufferSet *set);
static inline char *fbr_get_ref(FReadBufferSet *set)
{ return &set->mbuf->data[KEY_BUFFER_SIZE + set->mbuf_pos]; }
static inline int fbr_get_size(FReadBufferSet *set)
{ return set->mbuf->size - set->mbuf_pos; }
static inline int fbr_shift_pos(FReadBufferSet *set,int cnt) // After reading key, we should have something left to read if file is not at the end
{
if ((set->mbuf_pos += cnt) >= set->mbuf->size)
set->eof = 1;
return set->eof;
}
static inline char fbr_get_sym(FReadBufferSet *set)
{ return set->mbuf->data[KEY_BUFFER_SIZE + set->mbuf_pos]; }
static inline int fbr_inc_pos(FReadBufferSet *set)
{
if (++set->mbuf_pos >= set->mbuf->size)
return fbr_next_block(set);
return 0;
}
// 1 extra byte for op sign, 1 byte for divider between key and value, 1 byte for '\n' in output. Doubled for phantom keys
#define WRITE_BUFFER_GROW ALIGN_UP((MAX_KEY_SOURCE + MAX_VALUE_SOURCE + 3) * 2,DISK_PAGE_BYTES)
typedef struct FWriteBufferTg
{
FIOBufferState state_data;
unsigned total_size;
unsigned filled_size;
char *data;
CACHE_LINE_PADDING(padding1,sizeof(FIOBufferState) + sizeof(unsigned) * 2 + sizeof(char *));
} FWriteBuffer;
typedef struct FWriteBufferSetTg
{
FWriteBuffer buffers[2];
pthread_t io_thread;
int fd;
int thread_started;
unsigned no_close;
unsigned commited_pos;
unsigned mbuf_num; // Number of main thread buffer
FWriteBuffer *mbuf;
} FWriteBufferSet;
FWriteBufferSet *fbw_create(const char *filename);
void fbw_finish(FWriteBufferSet *set);
static inline char *fbw_get_ref(FWriteBufferSet *set)
{ return &set->mbuf->data[set->mbuf->filled_size]; }
static inline void fbw_shift_pos(FWriteBufferSet *set,int size)
{ set->mbuf->filled_size += size; }
static inline void fbw_add_sym(FWriteBufferSet *set,char sym)
{ set->mbuf->data[set->mbuf->filled_size++] = sym; }
void fbw_check_space(FWriteBufferSet *set);
static inline void fbw_rollback(FWriteBufferSet *set)
{ set->mbuf->filled_size = set->commited_pos; }
void fbw_commit(FWriteBufferSet *set);
#endif