Skip to content

Commit e63c094

Browse files
committed
Use interface for storage interactions in Append-optimized TAM
We present an appendonly SMGR interface and use it for storage operations. Use of this interface allow to implement out-of-core extensions that store data not localy but in some external storage.
1 parent 734a8a1 commit e63c094

23 files changed

Lines changed: 205 additions & 64 deletions

src/backend/access/aocs/aocs_compaction.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ AOCSCompaction_DropSegmentFile(Relation aorel, int segno, AOVacuumRelStats *vacr
8686
if (fd >= 0)
8787
{
8888
TruncateAOSegmentFile(fd, aorel, pseudoSegNo, 0, vacrelstats);
89-
CloseAOSegmentFile(fd);
89+
CloseAOSegmentFile(fd, aorel);
9090
}
9191
else
9292
{
@@ -149,7 +149,7 @@ AOCSSegmentFileTruncateToEOF(Relation aorel, int segno, AOCSVPInfo *vpinfo, AOVa
149149
if (fd >= 0)
150150
{
151151
TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof, vacrelstats);
152-
CloseAOSegmentFile(fd);
152+
CloseAOSegmentFile(fd, aorel);
153153

154154
elogif(Debug_appendonly_print_compaction, LOG,
155155
"Successfully truncated AO COL relation \"%s.%s\", relation id %u, relfilenode %lu column #%d, logical segment #%d (physical segment file #%d, logical EOF " INT64_FORMAT ")",

src/backend/access/aocs/aocsam.c

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ aocs_delete_hook_type aocs_delete_hook = NULL;
7575
*/
7676
static void
7777
open_datumstreamread_segfile(
78-
char *basepath, RelFileNode node,
78+
char *basepath,
79+
const struct f_smgr_ao *smgrAO,
80+
RelFileNode node,
7981
AOCSFileSegInfo *segInfo,
8082
DatumStreamRead *ds,
8183
int colNo)
@@ -118,7 +120,7 @@ open_all_datumstreamread_segfiles(AOCSScanDesc scan, AOCSFileSegInfo *segInfo)
118120
{
119121
AttrNumber attno = proj_atts[i];
120122

121-
open_datumstreamread_segfile(basepath, rel->rd_node, segInfo, ds[attno], attno);
123+
open_datumstreamread_segfile(basepath, rel->rd_smgr->smgr_ao, rel->rd_node, segInfo, ds[attno], attno);
122124
datumstreamread_block(ds[attno], blockDirectory, attno);
123125

124126
AOCSScanDesc_UpdateTotalBytesRead(scan, attno);
@@ -141,6 +143,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds, TupleDesc relationTupleDesc,
141143
rnode.node = rel->rd_node;
142144
rnode.backend = rel->rd_backend;
143145

146+
RelationOpenSmgr(rel);
147+
144148
/* open datum streams. It will open segment file underneath */
145149
for (int i = 0; i < natts; ++i)
146150
{
@@ -181,7 +185,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds, TupleDesc relationTupleDesc,
181185
RelationGetRelationName(rel),
182186
/* title */ titleBuf.data,
183187
XLogIsNeeded() && RelationNeedsWAL(rel),
184-
&rnode);
188+
&rnode,
189+
rel->rd_smgr->smgr_ao);
185190

186191
}
187192
}
@@ -231,6 +236,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc relationTupleDesc,
231236
for (AttrNumber attno = 0; attno < relationTupleDesc->natts; attno++)
232237
ds[attno] = NULL;
233238

239+
RelationOpenSmgr(rel);
240+
234241
/* And then initialize the data streams for those columns we need */
235242
for (AttrNumber i = 0; i < num_proj_atts; i++)
236243
{
@@ -272,7 +279,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc relationTupleDesc,
272279
attr,
273280
RelationGetRelationName(rel),
274281
/* title */ titleBuf.data,
275-
&rel->rd_node);
282+
&rel->rd_node,
283+
rel->rd_smgr->smgr_ao);
276284
}
277285
}
278286

@@ -1409,7 +1417,7 @@ openFetchSegmentFile(AOCSFetchDesc aocsFetchDesc,
14091417
if (logicalEof == 0)
14101418
return false;
14111419

1412-
open_datumstreamread_segfile(aocsFetchDesc->basepath, aocsFetchDesc->relation->rd_node,
1420+
open_datumstreamread_segfile(aocsFetchDesc->basepath, aocsFetchDesc->relation->rd_smgr->smgr_ao, aocsFetchDesc->relation->rd_node,
14131421
fsInfo,
14141422
datumStreamFetchDesc->datumStream,
14151423
colNo);
@@ -1523,6 +1531,8 @@ aocs_fetch_init(Relation relation,
15231531
aocsFetchDesc->datumStreamFetchDesc = (DatumStreamFetchDesc *)
15241532
palloc0(relation->rd_att->natts * sizeof(DatumStreamFetchDesc));
15251533

1534+
RelationOpenSmgr(relation);
1535+
15261536
for (colno = 0; colno < relation->rd_att->natts; colno++)
15271537
{
15281538

@@ -1566,7 +1576,7 @@ aocs_fetch_init(Relation relation,
15661576
TupleDescAttr(tupleDesc, colno),
15671577
relation->rd_rel->relname.data,
15681578
/* title */ titleBuf.data,
1569-
&relation->rd_node);
1579+
&relation->rd_node, relation->rd_smgr->smgr_ao);
15701580

15711581
}
15721582
if (opts[colno])
@@ -1941,13 +1951,16 @@ aocs_begin_headerscan(Relation rel, int colno)
19411951
ao_attr.overflowSize = 0;
19421952
ao_attr.safeFSWriteSize = 0;
19431953
hdesc = palloc(sizeof(AOCSHeaderScanDescData));
1954+
1955+
RelationOpenSmgr(rel);
1956+
19441957
AppendOnlyStorageRead_Init(&hdesc->ao_read,
19451958
NULL, //current memory context
19461959
opts[colno]->blocksize,
19471960
RelationGetRelationName(rel),
19481961
"ALTER TABLE ADD COLUMN scan",
19491962
&ao_attr,
1950-
&rel->rd_node);
1963+
&rel->rd_node, rel->rd_smgr->smgr_ao);
19511964
hdesc->colno = colno;
19521965
return hdesc;
19531966
}
@@ -2032,6 +2045,9 @@ aocs_addcol_init(Relation rel,
20322045
NULL);
20332046

20342047
iattr = rel->rd_att->natts - num_newcols;
2048+
2049+
RelationOpenSmgr(rel);
2050+
20352051
for (i = 0; i < num_newcols; ++i, ++iattr)
20362052
{
20372053
Form_pg_attribute attr = TupleDescAttr(rel->rd_att, iattr);
@@ -2047,7 +2063,8 @@ aocs_addcol_init(Relation rel,
20472063
attr, RelationGetRelationName(rel),
20482064
titleBuf.data,
20492065
XLogIsNeeded() && RelationNeedsWAL(rel),
2050-
&rnode);
2066+
&rnode,
2067+
rel->rd_smgr->smgr_ao);
20512068
}
20522069
return desc;
20532070
}

src/backend/access/aocs/aocsam_handler.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1364,7 +1364,7 @@ aoco_relation_copy_data(Relation rel, const RelFileNode *newrnode)
13641364
*/
13651365
RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, SMGR_AO, rel);
13661366

1367-
copy_append_only_data(rel->rd_node, *newrnode, rel->rd_backend, rel->rd_rel->relpersistence);
1367+
copy_append_only_data(rel->rd_node, *newrnode, rel->rd_smgr, dstrel, rel->rd_backend, rel->rd_rel->relpersistence);
13681368

13691369
/*
13701370
* For append-optimized tables, no forks other than the main fork should

src/backend/access/aocs/test/aocsam_test.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include "postgres.h"
77
#include "utils/memutils.h"
8+
#include "storage/smgr.h"
89

910
#include "../aocsam.c"
1011

@@ -19,15 +20,22 @@ test__aocs_begin_headerscan(void **state)
1920
{
2021
AOCSHeaderScanDesc desc;
2122
RelationData reldata;
23+
SMgrRelationData smgrdata;
2224
FormData_pg_class pgclass;
2325

26+
memset(&reldata, 0, sizeof(RelationData));
27+
2428
reldata.rd_rel = &pgclass;
2529
reldata.rd_id = 12345;
2630
StdRdOptions opt;
2731

2832
opt.blocksize = 8192 * 5;
2933
StdRdOptions *opts[1];
3034

35+
smgrdata.smgr_ao = smgrAOGetDefault();
36+
reldata.rd_smgr = &smgrdata;
37+
reldata.rd_backend = InvalidBackendId;
38+
3139
opts[0] = &opt;
3240

3341
strncpy(&pgclass.relname.data[0], "mock_relation", 13);
@@ -63,6 +71,7 @@ test__aocs_addcol_init(void **state)
6371
{
6472
AOCSAddColumnDesc desc;
6573
RelationData reldata;
74+
SMgrRelationData smgrdata;
6675
int nattr = 5;
6776
StdRdOptions **opts =
6877
(StdRdOptions **) malloc(sizeof(StdRdOptions *) * nattr);
@@ -98,6 +107,8 @@ test__aocs_addcol_init(void **state)
98107
expect_value(create_datumstreamwrite, needsWAL, true);
99108
expect_any(create_datumstreamwrite, rnode);
100109
expect_any(create_datumstreamwrite, rnode);
110+
expect_any(create_datumstreamwrite, smgrAO);
111+
expect_any(create_datumstreamwrite, smgrAO);
101112
expect_any_count(create_datumstreamwrite, attr, 2);
102113
expect_any_count(create_datumstreamwrite, relname, 2);
103114
expect_any_count(create_datumstreamwrite, title, 2);
@@ -112,6 +123,9 @@ test__aocs_addcol_init(void **state)
112123
memset(reldata.rd_att->attrs, 0, sizeof(Form_pg_attribute *) * nattr);
113124
reldata.rd_att->natts = nattr;
114125

126+
smgrdata.smgr_ao = smgrAOGetDefault();
127+
reldata.rd_smgr = &smgrdata;
128+
115129
expect_value(GetAppendOnlyEntryAttributes, relid, 12345);
116130
expect_any(GetAppendOnlyEntryAttributes, blocksize);
117131
expect_any(GetAppendOnlyEntryAttributes, safefswritesize);

src/backend/access/appendonly/aomd.c

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ OpenAOSegmentFile(Relation rel,
156156
File fd;
157157

158158
errno = 0;
159-
fd = PathNameOpenFile(filepathname, fileFlags);
159+
160+
RelationOpenSmgr(rel);
161+
162+
fd = rel->rd_smgr->smgr_ao->smgr_AORelOpenSegFile(filepathname, fileFlags);
160163
if (fd < 0)
161164
{
162165
if (logicalEof == 0 && errno == ENOENT)
@@ -175,9 +178,12 @@ OpenAOSegmentFile(Relation rel,
175178
* Close an Append Only relation file segment
176179
*/
177180
void
178-
CloseAOSegmentFile(File fd)
181+
CloseAOSegmentFile(File fd, Relation rel)
179182
{
180-
FileClose(fd);
183+
Assert(fd > 0);
184+
RelationOpenSmgr(rel);
185+
186+
rel->rd_smgr->smgr_ao->smgr_FileClose(fd);
181187
}
182188

183189
/*
@@ -192,7 +198,9 @@ TruncateAOSegmentFile(File fd, Relation rel, int32 segFileNum, int64 offset, AOV
192198
Assert(fd > 0);
193199
Assert(offset >= 0);
194200

195-
filesize_before = FileSize(fd);
201+
RelationOpenSmgr(rel);
202+
203+
filesize_before = rel->rd_smgr->smgr_ao->smgr_FileSize(fd);
196204
if (filesize_before < offset)
197205
ereport(ERROR,
198206
(errmsg("\"%s\": file size smaller than logical eof: %m",
@@ -202,7 +210,7 @@ TruncateAOSegmentFile(File fd, Relation rel, int32 segFileNum, int64 offset, AOV
202210
* Call the 'fd' module with a 64-bit length since AO segment files
203211
* can be multi-gigabyte to the terabytes...
204212
*/
205-
if (FileTruncate(fd, offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0)
213+
if (rel->rd_smgr->smgr_ao->smgr_FileTruncate(fd, offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0)
206214
ereport(ERROR,
207215
(errmsg("\"%s\": failed to truncate data after eof: %m",
208216
relname)));
@@ -387,7 +395,8 @@ mdunlink_ao_perFile(const int segno, void *ctx)
387395

388396
static void
389397
copy_file(char *srcsegpath, char *dstsegpath,
390-
RelFileNode dst, int segfilenum, bool use_wal)
398+
RelFileNode dst, SMgrRelation srcSMGR, SMgrRelation dstSMGR,
399+
int segfilenum, bool use_wal)
391400
{
392401
File srcFile;
393402
File dstFile;
@@ -396,7 +405,7 @@ copy_file(char *srcsegpath, char *dstsegpath,
396405
char *buffer = palloc(BLCKSZ);
397406
int dstflags;
398407

399-
srcFile = PathNameOpenFile(srcsegpath, O_RDONLY | PG_BINARY);
408+
srcFile = srcSMGR->smgr_ao->smgr_AORelOpenSegFile(srcsegpath, O_RDONLY | PG_BINARY);
400409
if (srcFile < 0)
401410
ereport(ERROR,
402411
(errcode_for_file_access(),
@@ -411,13 +420,13 @@ copy_file(char *srcsegpath, char *dstsegpath,
411420
if (segfilenum)
412421
dstflags |= O_CREAT;
413422

414-
dstFile = PathNameOpenFile(dstsegpath, dstflags);
423+
dstFile = dstSMGR->smgr_ao->smgr_AORelOpenSegFile(dstsegpath, dstflags);
415424
if (dstFile < 0)
416425
ereport(ERROR,
417426
(errcode_for_file_access(),
418427
(errmsg("could not create destination file %s: %m", dstsegpath))));
419428

420-
left = FileDiskSize(srcFile);
429+
left = srcSMGR->smgr_ao->smgr_FileDiskSize(srcFile);
421430
if (left < 0)
422431
ereport(ERROR,
423432
(errcode_for_file_access(),
@@ -431,13 +440,13 @@ copy_file(char *srcsegpath, char *dstsegpath,
431440
CHECK_FOR_INTERRUPTS();
432441

433442
len = Min(left, BLCKSZ);
434-
if (FileRead(srcFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_READ) != len)
443+
if (srcSMGR->smgr_ao->smgr_FileRead(srcFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_READ) != len)
435444
ereport(ERROR,
436445
(errcode_for_file_access(),
437446
errmsg("could not read %d bytes from file \"%s\": %m",
438447
len, srcsegpath)));
439448

440-
if (FileWrite(dstFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_WRITE) != len)
449+
if (dstSMGR->smgr_ao->smgr_FileWrite(dstFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_WRITE) != len)
441450
ereport(ERROR,
442451
(errcode_for_file_access(),
443452
errmsg("could not write %d bytes to file \"%s\": %m",
@@ -449,19 +458,21 @@ copy_file(char *srcsegpath, char *dstsegpath,
449458
left -= len;
450459
}
451460

452-
if (FileSync(dstFile, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0)
461+
if (dstSMGR->smgr_ao->smgr_FileSync(dstFile, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0)
453462
ereport(ERROR,
454463
(errcode_for_file_access(),
455464
errmsg("could not fsync file \"%s\": %m",
456465
dstsegpath)));
457-
FileClose(srcFile);
458-
FileClose(dstFile);
466+
srcSMGR->smgr_ao->smgr_FileClose(srcFile);
467+
dstSMGR->smgr_ao->smgr_FileClose(dstFile);
459468
pfree(buffer);
460469
}
461470

462471
struct copy_append_only_data_callback_ctx {
463472
char *srcPath;
464473
char *dstPath;
474+
SMgrRelation srcSMGR;
475+
SMgrRelation dstSMGR;
465476
RelFileNode src;
466477
RelFileNode dst;
467478
bool useWal;
@@ -473,6 +484,7 @@ struct copy_append_only_data_callback_ctx {
473484
*/
474485
void
475486
copy_append_only_data(RelFileNode src, RelFileNode dst,
487+
SMgrRelation srcSMGR, SMgrRelation dstSMGR,
476488
BackendId backendid, char relpersistence)
477489
{
478490
char *srcPath;
@@ -488,10 +500,12 @@ copy_append_only_data(RelFileNode src, RelFileNode dst,
488500
srcPath = relpathbackend(src, backendid, MAIN_FORKNUM);
489501
dstPath = relpathbackend(dst, backendid, MAIN_FORKNUM);
490502

491-
copy_file(srcPath, dstPath, dst, 0, useWal);
503+
copy_file(srcPath, dstPath, dst, srcSMGR, dstSMGR, 0, useWal);
492504

493505
copyFiles.srcPath = srcPath;
494506
copyFiles.dstPath = dstPath;
507+
copyFiles.srcSMGR = srcSMGR;
508+
copyFiles.dstSMGR = dstSMGR;
495509
copyFiles.src = src;
496510
copyFiles.dst = dst;
497511
copyFiles.useWal = useWal;
@@ -526,7 +540,7 @@ copy_append_only_data_perFile(const int segno, void *ctx)
526540
return false;
527541
}
528542
sprintf(dstSegPath, "%s.%u", copyFiles->dstPath, segno);
529-
copy_file(srcSegPath, dstSegPath, copyFiles->dst, segno, copyFiles->useWal);
543+
copy_file(srcSegPath, dstSegPath, copyFiles->dst, copyFiles->srcSMGR, copyFiles->dstSMGR, segno, copyFiles->useWal);
530544

531545
return true;
532546
}
@@ -595,7 +609,7 @@ truncate_ao_perFile(const int segno, void *ctx)
595609
if (fd >= 0)
596610
{
597611
TruncateAOSegmentFile(fd, aorel, segno, 0, NULL);
598-
CloseAOSegmentFile(fd);
612+
CloseAOSegmentFile(fd, aorel);
599613
}
600614
else
601615
{

src/backend/access/appendonly/appendonly_compaction.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ AppendOnlyCompaction_DropSegmentFile(Relation aorel, int segno, AOVacuumRelStats
9494
if (fd >= 0)
9595
{
9696
TruncateAOSegmentFile(fd, aorel, fileSegNo, 0, vacrelstats);
97-
CloseAOSegmentFile(fd);
97+
CloseAOSegmentFile(fd, aorel);
9898
}
9999
else
100100
{
@@ -259,7 +259,7 @@ AppendOnlySegmentFileTruncateToEOF(Relation aorel, int segno, int64 segeof, AOVa
259259
if (fd >= 0)
260260
{
261261
TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof, vacrelstats);
262-
CloseAOSegmentFile(fd);
262+
CloseAOSegmentFile(fd, aorel);
263263

264264
elogif(Debug_appendonly_print_compaction, LOG,
265265
"Successfully truncated AO ROW relation \"%s.%s\", relation id %u, relfilenode %lu (physical segment file #%d, logical EOF " INT64_FORMAT ")",

0 commit comments

Comments
 (0)