Skip to content

Commit a83213f

Browse files
feat: add Cloudberry gang-parallel support to PXF FDW
- Introduced virtual segment ID handling for parallel execution in Cloudberry. - Added PxfBridgeImportStartVirtual function to manage imports with virtual segment IDs. - Updated PxfFdwScanState structure to include fields for gang-parallel execution. - Enhanced foreign scan functions to support gang-parallel mode, ensuring unique fragment distribution among workers. - Implemented initialization and cleanup routines for gang-parallel state management.
1 parent 72ed3e1 commit a83213f

3 files changed

Lines changed: 362 additions & 46 deletions

File tree

fdw/pxf_bridge.c

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
#include "cdb/cdbtm.h"
2525
#include "cdb/cdbvars.h"
26+
#include "miscadmin.h"
27+
#include "storage/lock.h"
2628
#include "utils/builtins.h"
2729

2830
/* helper function declarations */
@@ -392,3 +394,66 @@ PxfBridgeImportStartFragment(PxfFdwScanState *pxfsstate, int fragmentIndex)
392394
/* Update current fragment tracking */
393395
pxfsstate->current_fragment = fragmentIndex;
394396
}
397+
398+
/*
399+
* ============================================================================
400+
* Cloudberry Gang-Parallel Support (Virtual Segment ID)
401+
*
402+
* In Cloudberry/Greenplum, parallel execution uses "gang expansion" where
403+
* multiple processes share the same physical segment ID. PostgreSQL's DSM
404+
* callbacks (InitializeDSMForeignScan, InitializeWorkerForeignScan) are
405+
* NOT invoked in this model.
406+
*
407+
* Instead of fragment-by-fragment coordination, we use "virtual segment IDs":
408+
* each gang worker sends a unique virtual segment ID to PXF, so PXF's
409+
* existing round-robin fragment distribution splits the data among workers
410+
* automatically — no PXF server changes needed.
411+
*
412+
* Example: 3 physical segments × 4 workers = 12 virtual segments.
413+
* Worker i on physical segment S sends virtual_seg_id = S + i * seg_count,
414+
* with virtual_seg_count = seg_count * workers.
415+
* ============================================================================
416+
*/
417+
418+
/*
419+
* PxfBridgeImportStartVirtual
420+
* Start import with virtual segment ID for Cloudberry gang-parallel mode.
421+
*
422+
* Same as PxfBridgeImportStart, but after building the standard HTTP headers,
423+
* overrides X-GP-SEGMENT-ID and X-GP-SEGMENT-COUNT with the virtual values.
424+
* This makes PXF's round-robin assign a unique subset of fragments to each
425+
* gang worker, eliminating data duplication.
426+
*/
427+
void
428+
PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate,
429+
int virtualSegId, int virtualSegCount)
430+
{
431+
char seg_id_str[16];
432+
char seg_count_str[16];
433+
434+
pxfsstate->churl_headers = churl_headers_init();
435+
436+
BuildUriForRead(pxfsstate);
437+
BuildHttpHeaders(pxfsstate->churl_headers,
438+
pxfsstate->options,
439+
pxfsstate->relation,
440+
pxfsstate->filter_str,
441+
pxfsstate->retrieved_attrs,
442+
pxfsstate->projectionInfo);
443+
444+
/* Override physical segment ID/count with virtual values */
445+
pg_ltoa(virtualSegId, seg_id_str);
446+
pg_ltoa(virtualSegCount, seg_count_str);
447+
churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-ID", seg_id_str);
448+
churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-COUNT", seg_count_str);
449+
450+
elog(DEBUG3, "pxf_fdw: PxfBridgeImportStartVirtual physical_seg=%d "
451+
"virtual_seg_id=%d virtual_seg_count=%d",
452+
PXF_SEGMENT_ID, virtualSegId, virtualSegCount);
453+
454+
pxfsstate->churl_handle = churl_init_download(pxfsstate->uri.data,
455+
pxfsstate->churl_headers);
456+
457+
/* read some bytes to make sure the connection is established */
458+
churl_read_check_connectivity(pxfsstate->churl_handle);
459+
}

fdw/pxf_bridge.h

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,25 @@ typedef struct PxfFdwScanState
7575
CopyFromState cstate;
7676
ProjectionInfo *projectionInfo;
7777

78-
/* Parallel execution state */
79-
bool is_parallel; /* true if running in parallel mode */
78+
/* Parallel execution state (PG DSM-based) */
79+
bool is_parallel; /* true if running in DSM parallel mode */
8080
PxfParallelScanState *pstate; /* pointer to shared state in DSM */
8181
PxfFragmentData *fragments; /* array of fragment metadata */
8282
int num_fragments; /* total number of fragments */
8383
int current_fragment; /* current fragment being processed */
84+
85+
/*
86+
* Cloudberry gang-parallel state (virtual segment ID based).
87+
* Used when Cloudberry's gang expansion creates multiple processes per
88+
* segment but PG's DSM callbacks are not invoked. Each gang worker gets
89+
* a unique virtual segment ID so PXF's round-robin distributes fragments
90+
* evenly without data duplication.
91+
*/
92+
bool gang_parallel; /* true when using virtual segment IDs */
93+
bool plan_parallel_aware; /* plan node's parallel_aware flag */
94+
int worker_index; /* this worker's index within the segment gang */
95+
int virtual_seg_id; /* virtual segment ID sent to PXF */
96+
int virtual_seg_count; /* virtual segment count sent to PXF */
8497
} PxfFdwScanState;
8598

8699
/*
@@ -123,4 +136,10 @@ int PxfBridgeGetNextFragment(PxfParallelScanState *pstate);
123136
/* Start import for a specific fragment in parallel mode */
124137
void PxfBridgeImportStartFragment(PxfFdwScanState *pxfsstate, int fragmentIndex);
125138

139+
/* Cloudberry gang-parallel support (virtual segment ID based) */
140+
141+
/* Start import with virtual segment ID for gang-parallel mode */
142+
void PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate,
143+
int virtualSegId, int virtualSegCount);
144+
126145
#endif /* _PXFBRIDGE_H */

0 commit comments

Comments
 (0)