Skip to content

Commit 89e4134

Browse files
authored
Merge pull request #7 from jsc0218/dev_gc
fix the parallel issue in utility functions
2 parents 227c4f1 + da80836 commit 89e4134

6 files changed

Lines changed: 205 additions & 11 deletions

File tree

src/kv_fdw.c

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11

22
#include "kv_fdw.h"
3-
#include "kv_storage.h"
43

54
#include <pthread.h>
65

@@ -54,6 +53,7 @@ static void GetForeignRelSize(PlannerInfo *root,
5453
* min & max will call GetForeignRelSize & GetForeignPaths multiple times,
5554
* we should open & close db multiple times.
5655
*/
56+
printf("\n-----------------%s open----------------------\n", __func__);
5757
ptr = OpenRequest(foreignTableId, ptr);
5858

5959
/* TODO better estimation */
@@ -146,6 +146,88 @@ static ForeignScan *GetForeignPlan(PlannerInfo *root,
146146
NULL);
147147
}
148148

149+
#ifdef VidarDB
150+
static void GetKeyRangeQual(Plan *plan,
151+
TupleDesc tupleDescriptor,
152+
TableReadState *readState) {
153+
if (!plan) {
154+
return;
155+
}
156+
157+
int qualSize = list_length(plan->qual);
158+
if (qualSize < 2) {
159+
return;
160+
}
161+
162+
bool startFlag = false;
163+
bool limitFlag = false;
164+
StringInfo startInfo = makeStringInfo();
165+
StringInfo limitInfo = makeStringInfo();
166+
167+
ListCell *lc;
168+
foreach (lc, plan->qual) {
169+
Expr *state = lfirst(lc);
170+
if (!state || !IsA(state, OpExpr)) {
171+
continue;
172+
}
173+
OpExpr *op = (OpExpr *) state;
174+
if (list_length(op->args) != 2) {
175+
continue;
176+
}
177+
178+
Node *left = list_nth(op->args, 0);
179+
if (!IsA(left, Var)) {
180+
continue;
181+
}
182+
183+
Node *right = list_nth(op->args, 1);
184+
if (!IsA(right, Const)) {
185+
continue;
186+
}
187+
188+
Index varattno = ((Var *) left)->varattno;
189+
if (varattno != 1) {
190+
continue;
191+
}
192+
193+
HeapTuple opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(op->opno));
194+
if (!HeapTupleIsValid(opertup)) {
195+
ereport(ERROR, (errmsg("cache lookup failed for operator %u", op->opno)));
196+
}
197+
Form_pg_operator operform = (Form_pg_operator) GETSTRUCT(opertup);
198+
char *oprname = NameStr(operform->oprname);
199+
if (strncmp(oprname, ">=", NAMEDATALEN) == 0) {
200+
startFlag = true;
201+
Const *constNode = ((Const *) right);
202+
Datum datum = constNode->constvalue;
203+
TypeCacheEntry *typeEntry = lookup_type_cache(constNode->consttype, 0);
204+
datum = ShortVarlena(datum, typeEntry->typlen, typeEntry->typstorage);
205+
SerializeAttribute(tupleDescriptor, varattno-1, datum, startInfo);
206+
ReleaseSysCache(opertup);
207+
}else if (strncmp(oprname, "<=", NAMEDATALEN) == 0) {
208+
limitFlag = true;
209+
Const *constNode = ((Const *) right);
210+
Datum datum = constNode->constvalue;
211+
TypeCacheEntry *typeEntry = lookup_type_cache(constNode->consttype, 0);
212+
datum = ShortVarlena(datum, typeEntry->typlen, typeEntry->typstorage);
213+
SerializeAttribute(tupleDescriptor, varattno-1, datum, limitInfo);
214+
ReleaseSysCache(opertup);
215+
}else{
216+
ReleaseSysCache(opertup);
217+
continue;
218+
}
219+
220+
if (startFlag && limitFlag) {
221+
readState->isRangeQueryUsed = true;
222+
readState->rangeSpec.start = startInfo->data;
223+
readState->rangeSpec.startLen = startInfo->len;
224+
readState->rangeSpec.limit = limitInfo->data;
225+
readState->rangeSpec.limitLen = limitInfo->len;
226+
}
227+
}
228+
}
229+
#endif
230+
149231
static void GetKeyBasedQual(Node *node,
150232
TupleDesc tupleDescriptor,
151233
TableReadState *readState) {
@@ -242,9 +324,13 @@ static void BeginForeignScan(ForeignScanState *scanState, int executorFlags) {
242324
return;
243325
}
244326

327+
//printf("\n-----------------Plan Type: %d----------------------\n", scanState->ss.ps.plan->type);
328+
245329
ListCell *lc;
246330
foreach (lc, scanState->ss.ps.plan->qual) {
247331
Expr *state = lfirst(lc);
332+
//printf("\n-----------------Qual Type: %d----------------------\n", state->type);
333+
248334
GetKeyBasedQual((Node *) state,
249335
scanState->ss.ss_currentRelation->rd_att,
250336
readState);
@@ -255,8 +341,18 @@ static void BeginForeignScan(ForeignScanState *scanState, int executorFlags) {
255341
}
256342

257343
if (!readState->isKeyBased) {
258-
Oid relationId = RelationGetRelid(scanState->ss.ss_currentRelation);
259-
GetIterRequest(relationId, ptr);
344+
#ifdef VidarDB
345+
GetKeyRangeQual(scanState->ss.ps.plan,
346+
scanState->ss.ss_currentRelation->rd_att,
347+
readState);
348+
#endif
349+
//if(readState->isRangeQueryUsed) {
350+
351+
//}else
352+
//{
353+
Oid relationId = RelationGetRelid(scanState->ss.ss_currentRelation);
354+
GetIterRequest(relationId, ptr);
355+
//}
260356
}
261357
}
262358

@@ -359,6 +455,9 @@ static TupleTableSlot *IterateForeignScan(ForeignScanState *scanState) {
359455
found = GetRequest(relationId, ptr, k, kLen, &v, &vLen);
360456
readState->done = true;
361457
}
458+
/*} else if (readState->isRangeQueryUsed)
459+
{*/
460+
362461
} else {
363462
found = NextRequest(relationId, ptr, &k, &kLen, &v, &vLen);
364463
}

src/kv_fdw.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#ifndef KV_FDW_H_
33
#define KV_FDW_H_
44

5+
#include "kv_storage.h"
56

67
#include <stdbool.h>
78
#include <semaphore.h>
@@ -73,6 +74,16 @@ typedef struct TableReadState {
7374
bool isKeyBased;
7475
bool done;
7576
StringInfo key;
77+
78+
#ifdef VidarDB
79+
bool isRangeQueryUsed;
80+
RangeSpec rangeSpec;
81+
void **rangeReadOptions;
82+
char* valArray;
83+
uint32* valLens;
84+
uint32 valArraySize;
85+
uint32 iterCount;
86+
#endif
7687
} TableReadState;
7788

7889
/*
@@ -97,6 +108,9 @@ typedef enum FuncName {
97108
GET,
98109
PUT,
99110
DELETE,
111+
#ifdef VidarDB
112+
RANGEQUERY,
113+
#endif
100114
TERMINATE
101115
} FuncName;
102116

@@ -155,6 +169,13 @@ extern void DeleteRequest(Oid relationId,
155169
char* key,
156170
uint32 keyLen);
157171

172+
#ifdef VidarDB
173+
extern void RangeQueryRequest(Oid relationId,
174+
SharedMem *ptr,
175+
void** readOptions,
176+
RangeSpec rangeSpec
177+
);
178+
#endif
158179

159180
/* global variables */
160181

src/kv_shm.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ static void KVWorkerMain(int argc, char *argv[]) {
328328
case DELETE:
329329
DeleteResponse(buf + sizeof(responseId));
330330
break;
331+
#ifdef VidarDB
332+
case RANGEQUERY:
333+
RangeQueryResponse(buff + sizeof(int));
334+
#endif
331335
default:
332336
ereport(ERROR, (errmsg("%s failed in switch", __func__)));
333337
}
@@ -427,6 +431,7 @@ static void OpenResponse(char *area) {
427431
char *pos = strrchr(path, '/');
428432
Oid relationId = atoi(pos + 1);
429433
bool found;
434+
430435
KVHashEntry *entry = hash_search(kvTableHash, &relationId, HASH_ENTER, &found);
431436
if (!found) {
432437
entry->relationId = relationId;
@@ -952,3 +957,13 @@ static void DeleteResponse(char *area) {
952957
}
953958
}
954959
}
960+
961+
#ifdef VidarDB
962+
void RangeQueryRequest(Oid relationId, SharedMem *ptr, void** readOptions, RangeSpec rangeSpec) {
963+
964+
}
965+
966+
static void RangeQueryResponse(char *area) {
967+
968+
}
969+
#endif

src/kv_storage.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,44 @@ bool Delete(void* db, char* key, uint32 keyLen) {
7979
return s.ok();
8080
}
8181

82+
#ifdef VidarDB
83+
bool RangeQuery(void* db, void** readOptions, RangeSpec range, char** valArray, uint32** valLens, uint32* valArraySize) {
84+
Range r(Slice(range.start, range.startLen), Slice(range.limit, range.limitLen));
85+
ReadOptions *options = static_cast<ReadOptions*>(*readOptions);
86+
if (options == nullptr)
87+
{
88+
options = (ReadOptions*) palloc0(sizeof(ReadOptions));
89+
}
90+
options->max_result_num = MAXRESULTNUM;
91+
*readOptions = options;
92+
std::vector<std::string> res;
93+
Status s;
94+
bool ret = static_cast<DB*>(db)->RangeQuery(*options, r, res, &s);
95+
96+
if (!s.ok()) {
97+
*valArraySize = 0;
98+
return false;
99+
}
100+
101+
*valArraySize = res.size();
102+
if( *valArraySize > 0) {
103+
uint32 *tmpLens = (uint32*) palloc0(*valArraySize * sizeof(uint32));
104+
*valLens = tmpLens;
105+
uint32 total = 0;
106+
for (auto it = res.begin(); it != res.end(); ++it) {
107+
*tmpLens = (*it).size();
108+
total += (*it).size();
109+
tmpLens++;
110+
}
111+
char *tmpVal = (char*) palloc0(total);
112+
*valArray = tmpVal;
113+
for (auto it = res.begin(); it != res.end(); ++it) {
114+
memcpy(tmpVal, (*it).c_str(), (*it).size());
115+
tmpVal = tmpVal + (*it).size();
116+
}
117+
}
118+
return ret;
119+
}
120+
#endif
121+
82122
}

src/kv_storage.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ bool Get(void* db, char* key, uint32 keyLen, char** val, uint32* valLen);
2828
bool Put(void* db, char* key, uint32 keyLen, char* val, uint32 valLen);
2929
bool Delete(void* db, char* key, uint32 keyLen);
3030

31+
#ifdef VidarDB
32+
#define MAXRESULTNUM 10000
33+
typedef struct RangeSpec {
34+
char* start;
35+
uint32 startLen;
36+
char* limit;
37+
uint32 limitLen;
38+
} RangeSpec;
39+
40+
bool RangeQuery(void* db, void** readOptions, RangeSpec range, char** valArray, uint32** valLens, uint32* valArraySize);
41+
#endif
3142

3243
#if defined(__cplusplus)
3344
}

src/kv_utility.c

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
PREVIOUS_UTILITY(plannedStmt, queryString, context, paramListInfo, \
3131
queryEnvironment, destReceiver, completionTag)
3232

33+
/* */
34+
static SharedMem *ptr = NULL;
3335

3436
/*
3537
* SQL functions
@@ -200,7 +202,7 @@ Datum kv_ddl_event_end_trigger(PG_FUNCTION_ARGS) {
200202
* directory for it during database creation time.
201203
*/
202204
KVCreateDatabaseDirectory(MyDatabaseId);
203-
205+
204206
StringInfo kvPath = makeStringInfo();
205207
appendStringInfo(kvPath,
206208
"%s/%s/%u/%u",
@@ -470,8 +472,10 @@ static uint64 KVCopyIntoTable(const CopyStmt *copyStmt,
470472
ALLOCSET_DEFAULT_SIZES);
471473

472474
Oid relationId = RelationGetRelid(relation);
473-
KVFdwOptions *fdwOptions = KVGetOptions(relationId);
474-
void *db = Open(fdwOptions->filename);
475+
//KVFdwOptions *fdwOptions = KVGetOptions(relationId);
476+
477+
//void *db = Open(fdwOptions->filename);
478+
ptr = OpenRequest(relationId, ptr);
475479

476480
TupleDesc tupleDescriptor = RelationGetDescr(relation);
477481
uint32 count = tupleDescriptor->natts;
@@ -514,9 +518,12 @@ static uint64 KVCopyIntoTable(const CopyStmt *copyStmt,
514518
}
515519
memcpy(value->data, buffer->data, bufLen);
516520

517-
if (!Put(db, key->data, key->len, value->data, value->len)) {
518-
ereport(ERROR, (errmsg("error from Copy")));
519-
}
521+
//if (!Put(db, key->data, key->len, value->data, value->len)) {
522+
// ereport(ERROR, (errmsg("error from Copy")));
523+
//}
524+
525+
526+
PutRequest(relationId, ptr, key->data, key->len, value->data, value->len);
520527
rowCount++;
521528
}
522529

@@ -527,7 +534,8 @@ static uint64 KVCopyIntoTable(const CopyStmt *copyStmt,
527534

528535
/* end read/write sessions and close the relation */
529536
EndCopyFrom(copyState);
530-
Close(db);
537+
//Close(db);
538+
CloseRequest(relationId, ptr);
531539
heap_close(relation, ShareUpdateExclusiveLock);
532540

533541
return rowCount;
@@ -657,7 +665,7 @@ static void KVProcessUtility(PlannedStmt *plannedStmt,
657665

658666
CopyStmt *copyStmt = (CopyStmt *) parseTree;
659667
if (KVCopyTableStatement(copyStmt)) {
660-
668+
661669
uint64 rowCount = 0;
662670
if (copyStmt->is_from) {
663671
rowCount = KVCopyIntoTable(copyStmt, queryString);

0 commit comments

Comments
 (0)