From ec8b2ad950425ddf66b25773dcdeb89e648078e1 Mon Sep 17 00:00:00 2001 From: vaibhav Date: Thu, 30 Apr 2026 17:11:00 +0530 Subject: [PATCH 1/8] feat: addition of iam walk strategy --- drivers/mssql/internal/backfill.go | 183 +++++++++++++++++++++++++++-- pkg/jdbc/jdbc.go | 44 +++++++ 2 files changed, 220 insertions(+), 7 deletions(-) diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 8df3b2a82..4169f6df9 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "database/sql" + "encoding/binary" + "encoding/hex" "fmt" "math" "sort" @@ -20,6 +22,30 @@ import ( "github.com/datazip-inc/olake/utils/typeutils" ) +// physLocChunkValueLen is the hex-encoded BINARY(8) %%physloc%% shape: +// "0x" + 16 hex chars. Used to route chunks to physloc scans. +const physLocChunkValueLen = 18 + +// usableBytesPerPage is an upper bound for in-row payload per 8KB page +// (IN_ROW_DATA max row size). Using the ceiling yields smaller chunks. +const usableBytesPerPage = 8060 + +// isPhysLocChunk reports whether the chunk's boundaries are %%physloc%% +// hex literals (the shape produced by IAM walk and the iterative physloc +// planner). Either Min or Max is sufficient to identify the format because +// both planners always set at least one boundary. +func isPhysLocChunk(chunk types.Chunk) bool { + check := func(v any) bool { + s, ok := v.(string) + if !ok || len(s) != physLocChunkValueLen || !strings.HasPrefix(s, "0x") { + return false + } + _, err := hex.DecodeString(s[2:]) + return err == nil + } + return check(chunk.Min) || check(chunk.Max) +} + // ChunkIterator implements snapshot iteration over MSSQL chunks. func (m *MSSQL) ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, onMessage abstract.BackfillMsgFn) error { opts := jdbc.DriverOptions{ @@ -48,11 +74,14 @@ func (m *MSSQL) ChunkIterator(ctx context.Context, stream types.StreamInterface, // Build query for the chunk stmt := "" - if chunkColumn != "" { + switch { + case chunkColumn != "": stmt = jdbc.MSSQLChunkScanQuery(stream, []string{chunkColumn}, chunk, filter) - } else if len(pkColumns) > 0 { + case isPhysLocChunk(chunk): + stmt = jdbc.MSSQLPhysLocChunkScanQuery(stream, chunk, filter) + case len(pkColumns) > 0: stmt = jdbc.MSSQLChunkScanQuery(stream, pkColumns, chunk, filter) - } else { + default: stmt = jdbc.MSSQLPhysLocChunkScanQuery(stream, chunk, filter) } @@ -270,17 +299,157 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo }) } - if len(pkColumns) > 0 { - logger.Debugf("Stream %s: Using PK-based chunking with columns: %v", stream.ID(), pkColumns) + switch { + case chunkColumn != "": + logger.Debugf("Stream %s: chunkColumn=%s set, using PK-based chunking", stream.ID(), chunkColumn) err = splitViaPrimaryKey(stream, chunks, pkColumns) - } else { - logger.Debugf("Stream %s: Using %%physloc%% chunking (no PK or chunkColumn available)", stream.ID()) + case m.probeIAMWalkCapability(ctx): + logger.Debugf("Stream %s: Attempting IAM walk chunking", stream.ID()) + err = m.splitViaIAMWalk(ctx, stream, chunks) + if err != nil || chunks.Len() == 0 { + logger.Warnf("Stream %s: IAM walk failed (%v, chunks=%d), falling back to existing strategy", stream.ID(), err, chunks.Len()) + chunks = types.NewSet[types.Chunk]() + if len(pkColumns) > 0 { + err = splitViaPrimaryKey(stream, chunks, pkColumns) + } else { + err = splitViaPhysLoc(stream, chunks) + } + } else { + logger.Infof("Stream %s: IAM walk produced %d chunks", stream.ID(), chunks.Len()) + } + case len(pkColumns) > 0: + logger.Debugf("Stream %s: IAM walk unavailable, using PK-based chunking with columns: %v", stream.ID(), pkColumns) + err = splitViaPrimaryKey(stream, chunks, pkColumns) + default: + logger.Debugf("Stream %s: IAM walk unavailable and no PK, using %%physloc%% iterative", stream.ID()) err = splitViaPhysLoc(stream, chunks) } return chunks, err } +// packPhysLoc encodes a (file_id, page_id) pair as a uint64 whose +// unsigned-integer comparison matches SQL Server's binary comparison of +// the equivalent BINARY(8) %%physloc%% value. +// +// %%physloc%% wire layout: bytes 0..7 = [page_id LE 4B][file_id LE 2B][slot_id LE 2B]. +// SQL Server compares BINARY(N) byte-by-byte starting from byte 0, so we +// pack physloc byte 0 into the highest 8 bits of the uint64 and physloc +// byte 7 into the lowest. slot_id is fixed at 0xFFFF so each boundary +// represents "after the last possible row on the page": chunk filter +// `%%physloc%% <= b` includes every row on b's page, and `> b` starts +// cleanly on the next page. +// +// This lets us sort []uint64 with native uint64 comparison (instead of +// allocating an 8-byte slice per page and sorting with bytes.Compare), +// halving memory on large tables and skipping the per-page encode step +// entirely; we encode only the few sampled boundaries. +func packPhysLoc(fileID, pageID int32) uint64 { + p := uint32(pageID) + f := uint32(uint16(fileID)) + return uint64(p&0xFF)<<56 | + uint64((p>>8)&0xFF)<<48 | + uint64((p>>16)&0xFF)<<40 | + uint64((p>>24)&0xFF)<<32 | + uint64(f&0xFF)<<24 | + uint64((f>>8)&0xFF)<<16 | + 0xFFFF +} + +// physLocBytes converts a packed uint64 boundary back to its 8-byte +// %%physloc%% wire representation. Because packPhysLoc places physloc +// byte 0 at the uint64's most significant 8 bits, writing the uint64 in +// big-endian order reproduces the wire layout exactly. +func physLocBytes(packed uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, packed) + return b +} + +// splitViaIAMWalk plans chunks for any heap or clustered table by reading +// only the table's Index Allocation Map pages via +// sys.dm_db_database_page_allocations. +func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterface, chunks *types.Set[types.Chunk]) error { + var objectID int64 + err := m.client.QueryRowContext(ctx, jdbc.MSSQLObjectIDQuery(), stream.Namespace(), stream.Name()).Scan(&objectID) + if err != nil { + return fmt.Errorf("failed to resolve object_id for IAM walk: %s", err) + } + + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkQuery(), objectID) + if err != nil { + return fmt.Errorf("failed to run IAM walk query: %s", err) + } + defer rows.Close() + + pages := make([]uint64, 0, 1024) + for rows.Next() { + var fileID, pageID int32 + if err := rows.Scan(&fileID, &pageID); err != nil { + return fmt.Errorf("failed to scan IAM walk page: %s", err) + } + pages = append(pages, packPhysLoc(fileID, pageID)) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate IAM walk rows: %s", err) + } + + total := int64(len(pages)) + if total == 0 { + return fmt.Errorf("IAM walk returned no allocated pages") + } + + // Sort defensively — the DMF does not guarantee any output order. + sort.Slice(pages, func(i, j int) bool { return pages[i] < pages[j] }) + + pagesPerChunk := constants.EffectiveParquetSize / usableBytesPerPage + pagesPerChunk = max(pagesPerChunk, 1) + + // Walk the sorted page list, emitting an open→closed chunk every + // pagesPerChunk pages. The trailing chunk is open-ended on the high + // side. If the table fits in a single chunk (total ≤ pagesPerChunk) + // this naturally produces just {nil, nil}. + var prev any = nil + for i := pagesPerChunk; i < total; i += pagesPerChunk { + max := utils.HexEncode(physLocBytes(pages[i])) + chunks.Insert(types.Chunk{Min: prev, Max: max}) + prev = max + } + chunks.Insert(types.Chunk{Min: prev, Max: nil}) + + return nil +} + +// probeIAMWalkCapability checks if IAM walk +func (m *MSSQL) probeIAMWalkCapability(ctx context.Context) bool { + var majorVersion, engineEdition int + err := m.client.QueryRowContext(ctx, jdbc.MSSQLIAMWalkServerPropertiesQuery()).Scan(&majorVersion, &engineEdition) + if err != nil { + logger.Debugf("IAM walk probe: failed to read server properties: %s", err) + return false + } + if majorVersion < 11 { + logger.Debugf("IAM walk probe: SQL Server major version %d < 11, IAM walk unsupported", majorVersion) + return false + } + // EngineEdition 5 = Azure SQL Database, 8 = Azure SQL Managed Instance. + // sys.dm_db_database_page_allocations is blocked on both. + if engineEdition == 5 || engineEdition == 8 { + logger.Debugf("IAM walk probe: EngineEdition %d (Azure SQL DB/MI) blocks the DMF", engineEdition) + return false + } + + // Permission probe: TOP 0 evaluates the DMF without returning any rows. + // Failure here means the current login lacks VIEW DATABASE STATE. + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkPermissionQuery()) + if err != nil { + logger.Debugf("IAM walk probe: permission test failed (likely missing VIEW DATABASE STATE): %s", err) + return false + } + rows.Close() + return true +} + // getColumnTypeMSSQL returns SQL data type for the requested column. func (m *MSSQL) getColumnTypeMSSQL(ctx context.Context, stream types.StreamInterface, column string, tx *sql.Tx) (string, error) { var dataType string diff --git a/pkg/jdbc/jdbc.go b/pkg/jdbc/jdbc.go index 00bd8d3fc..30b15968f 100644 --- a/pkg/jdbc/jdbc.go +++ b/pkg/jdbc/jdbc.go @@ -660,6 +660,50 @@ func MSSQLPhysLocNextChunkEndQuery(stream types.StreamInterface, chunkSize int64 `, quotedTable, chunkSize) } +// MSSQLIAMWalkQuery returns a streaming IAM-walk page list for a table: +// (file_id, page_id) from sys.dm_db_database_page_allocations in LIMITED mode. +// Params: @p1 = object_id. Requires VIEW DATABASE STATE; SQL Server 2012+; +// blocked on Azure SQL DB/MI. +func MSSQLIAMWalkQuery() string { + return ` + SELECT + allocated_page_file_id AS file_id, + allocated_page_page_id AS page_id + FROM sys.dm_db_database_page_allocations( + DB_ID(), + @p1, + NULL, + NULL, + 'LIMITED' + ) + WHERE is_allocated = 1 + AND is_iam_page = 0 + AND index_id IN (0, 1) + AND allocation_unit_type = 1 + ` +} + +// MSSQLObjectIDQuery returns the query to resolve a fully qualified +// schema + table name to its object_id using QUOTENAME for safety. +// Parameters: @p1 = schema name, @p2 = table name +func MSSQLObjectIDQuery() string { + return "SELECT OBJECT_ID(QUOTENAME(@p1) + '.' + QUOTENAME(@p2))" +} + +// MSSQLIAMWalkServerPropertiesQuery returns the query used by the IAM walk +// capability probe to fetch server major version and engine edition. +func MSSQLIAMWalkServerPropertiesQuery() string { + return "SELECT CAST(SERVERPROPERTY('ProductMajorVersion') AS INT), CAST(SERVERPROPERTY('EngineEdition') AS INT)" +} + +// MSSQLIAMWalkPermissionQuery returns a query that evaluates +// sys.dm_db_database_page_allocations without returning any rows. +// Failure indicates the login likely lacks VIEW DATABASE STATE or the DMF +// is blocked by the platform (Azure SQL DB/MI). +func MSSQLIAMWalkPermissionQuery() string { + return "SELECT TOP 0 1 FROM sys.dm_db_database_page_allocations(DB_ID(), OBJECT_ID('sys.objects'), NULL, NULL, 'LIMITED')" +} + // MSSQLCDCSupportQuery returns the query to check if CDC is enabled for the current database func MSSQLCDCSupportQuery() string { return ` From c102a9349e8a5f37fc3ba230039642b24723dec4 Mon Sep 17 00:00:00 2001 From: vaibhav Date: Thu, 30 Apr 2026 18:56:49 +0530 Subject: [PATCH 2/8] feat: add page based sampling for mssql --- constants/constants.go | 11 ++++ drivers/abstract/backfill.go | 10 ++++ drivers/mssql/internal/backfill.go | 78 +++++++++++++++++++++++++---- drivers/oracle/internal/backfill.go | 17 +------ pkg/jdbc/jdbc.go | 11 ++++ 5 files changed, 100 insertions(+), 27 deletions(-) diff --git a/constants/constants.go b/constants/constants.go index 658318dff..3422ec750 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -40,6 +40,17 @@ const ( // MysqlChunkAcceptanceRatio defines the minimum ratio of expected chunks that must be generated // for the split to be considered valid. MysqlChunkAcceptanceRatio = float64(0.8) + // SamplePercentMin / SamplePercentMax define the clamped range for TABLESAMPLE / + // SAMPLE BLOCK percentage used by physloc and ROWID chunk boundary estimation. + // 0.01 is the practical floor below which page-level sampling may return zero + // rows; 50 caps worst-case I/O so a bad row-count estimate cannot escalate to a + // near-full scan. + SamplePercentMin = float64(0.01) + SamplePercentMax = float64(50.0) + // SampleRowsPerChunkMultiplier controls sample density: each target chunk gets + // ~10 sample points to pick a boundary from, producing even spacing even when + // blocks/pages are clustered (e.g. freshly inserted rows land on adjacent pages). + SampleRowsPerChunkMultiplier = int64(10) ) type DriverType string diff --git a/drivers/abstract/backfill.go b/drivers/abstract/backfill.go index a9048e6f1..417323e41 100644 --- a/drivers/abstract/backfill.go +++ b/drivers/abstract/backfill.go @@ -97,3 +97,13 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha utils.ConcurrentInGroupWithRetry(a.GlobalConnGroup, chunks, a.driver.MaxRetries(), chunkProcessor) return nil } + +// ComputeSamplePercent returns the TABLESAMPLE / SAMPLE BLOCK percentage to use +// for chunk boundary estimation. The result is clamped to +// [constants.SamplePercentMin, constants.SamplePercentMax] so callers never +// request a near-zero or near-full scan regardless of row-count estimates. +func ComputeSamplePercent(approxRowCount, numberOfChunks int64) float64 { + minSampleRows := numberOfChunks * constants.SampleRowsPerChunkMultiplier + samplingPercentage := float64(minSampleRows) / float64(approxRowCount) * 100.0 + return max(constants.SamplePercentMin, min(constants.SamplePercentMax, samplingPercentage)) +} diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 4169f6df9..2f652413a 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -132,6 +132,7 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo return nil, fmt.Errorf("failed to get avg row size: %s", err) } chunkSize := int64(math.Ceil(float64(constants.EffectiveParquetSize) / avgRowSizeFloat)) + numberOfChunks := max(int64(math.Ceil(float64(approxRowCount)/float64(chunkSize))), int64(1)) chunks := types.NewSet[types.Chunk]() chunkColumn := stream.Self().StreamMetadata.ChunkColumn pkColumns := stream.GetStream().SourceDefinedPrimaryKey.Array() @@ -299,6 +300,23 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo }) } + // physLocSampleThenFallback tries TABLESAMPLE physloc sampling and, if that + // produces no usable chunks, falls back to PK or iterative physloc scan. + physLocSampleThenFallback := func() error { + sampleChunks, sampleErr := m.splitViaPhysLocSample(ctx, stream, approxRowCount, numberOfChunks) + if sampleErr == nil && sampleChunks.Len() > 0 { + logger.Infof("Stream %s: TABLESAMPLE physloc sampling produced %d chunks", stream.ID(), sampleChunks.Len()) + chunks = sampleChunks + return nil + } + logger.Debugf("Stream %s: TABLESAMPLE physloc sampling failed %s", stream.ID(), sampleErr) + chunks = types.NewSet[types.Chunk]() + if len(pkColumns) > 0 { + return splitViaPrimaryKey(stream, chunks, pkColumns) + } + return splitViaPhysLoc(stream, chunks) + } + switch { case chunkColumn != "": logger.Debugf("Stream %s: chunkColumn=%s set, using PK-based chunking", stream.ID(), chunkColumn) @@ -307,22 +325,15 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo logger.Debugf("Stream %s: Attempting IAM walk chunking", stream.ID()) err = m.splitViaIAMWalk(ctx, stream, chunks) if err != nil || chunks.Len() == 0 { - logger.Warnf("Stream %s: IAM walk failed (%v, chunks=%d), falling back to existing strategy", stream.ID(), err, chunks.Len()) + logger.Warnf("Stream %s: IAM walk failed %s", stream.ID(), err) chunks = types.NewSet[types.Chunk]() - if len(pkColumns) > 0 { - err = splitViaPrimaryKey(stream, chunks, pkColumns) - } else { - err = splitViaPhysLoc(stream, chunks) - } + err = physLocSampleThenFallback() } else { logger.Infof("Stream %s: IAM walk produced %d chunks", stream.ID(), chunks.Len()) } - case len(pkColumns) > 0: - logger.Debugf("Stream %s: IAM walk unavailable, using PK-based chunking with columns: %v", stream.ID(), pkColumns) - err = splitViaPrimaryKey(stream, chunks, pkColumns) default: - logger.Debugf("Stream %s: IAM walk unavailable and no PK, using %%physloc%% iterative", stream.ID()) - err = splitViaPhysLoc(stream, chunks) + logger.Debugf("Stream %s: IAM walk unavailable, trying TABLESAMPLE physloc sampling", stream.ID()) + err = physLocSampleThenFallback() } return chunks, err @@ -366,6 +377,51 @@ func physLocBytes(packed uint64) []byte { return b } +// splitViaPhysLocSample estimates chunk boundaries using TABLESAMPLE SYSTEM. +// It reads a small percentage of data pages (no full table scan), sorts the +// sampled %%physloc%% values, and picks evenly-spaced boundaries in Go. +func (m *MSSQL) splitViaPhysLocSample(ctx context.Context, stream types.StreamInterface, approxRowCount int64, numberOfChunks int64) (*types.Set[types.Chunk], error) { + samplePercent := abstract.ComputeSamplePercent(approxRowCount, numberOfChunks) + + logger.Debugf("TABLESAMPLE sampling %.4f%% of pages from [%s.%s] for chunk boundaries (approxRows=%d, chunks=%d)", + samplePercent, stream.Namespace(), stream.Name(), approxRowCount, numberOfChunks) + + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLPhysLocSampleBoundaryQuery(stream, samplePercent)) + if err != nil { + return nil, fmt.Errorf("TABLESAMPLE %%%%physloc%%%% query failed: %s", err) + } + defer rows.Close() + + var sampledLocs [][]byte + for rows.Next() { + var loc []byte + if err := rows.Scan(&loc); err != nil { + return nil, fmt.Errorf("failed to scan sampled %%%%physloc%%%%: %s", err) + } + sampledLocs = append(sampledLocs, loc) + } + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate sampled %%%%physloc%%%% rows: %s", err) + } + + if int64(len(sampledLocs)) < numberOfChunks { + return nil, fmt.Errorf("TABLESAMPLE returned %d rows, need at least %d for %d chunks", + len(sampledLocs), numberOfChunks, numberOfChunks) + } + + chunks := types.NewSet[types.Chunk]() + step := float64(len(sampledLocs)) / float64(numberOfChunks) + var prev any = nil + for i := int64(0); i < numberOfChunks; i++ { + curr := utils.HexEncode(sampledLocs[int(float64(i)*step)]) + chunks.Insert(types.Chunk{Min: prev, Max: curr}) + prev = curr + } + chunks.Insert(types.Chunk{Min: prev, Max: nil}) + + return chunks, nil +} + // splitViaIAMWalk plans chunks for any heap or clustered table by reading // only the table's Index Allocation Map pages via // sys.dm_db_database_page_allocations. diff --git a/drivers/oracle/internal/backfill.go b/drivers/oracle/internal/backfill.go index bfab8963e..873ec48e6 100644 --- a/drivers/oracle/internal/backfill.go +++ b/drivers/oracle/internal/backfill.go @@ -16,19 +16,6 @@ import ( "github.com/datazip-inc/olake/utils/logger" ) -// Oracle SAMPLE BLOCK clamp bounds for boundary estimation. Oracle allows -// (0.000001, 100) but 0.01 is the practical floor; staging tests on a ~500 GB -// table showed it improves chunk boundary quality at negligible runtime cost -// (a couple of seconds). 50 caps worst-case block scanning so a misconfigured -// stats row count cannot escalate to a full table scan equivalent. -const ( - sampleBlockPercentMin = 0.01 - sampleBlockPercentMax = 50.0 - // sampleRowsPerChunkMultiplier gives each chunk boundary ~10x sampled ROWIDs - // to pick from, which produces evenly spaced boundaries even when block - // sampling is clustered (e.g. freshly inserted rows land in adjacent blocks). - sampleRowsPerChunkMultiplier int64 = 10 -) // ChunkIterator implements the abstract.DriverInterface func (o *Oracle) ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, OnMessage abstract.BackfillMsgFn) error { @@ -178,9 +165,7 @@ func (o *Oracle) splitViaTableIterationLoop(ctx context.Context, stream types.St // full-table sort that NTILE requires, making it safe on tables with billions of // rows where NTILE would spill to temp tablespace and risk ORA-1652. func (o *Oracle) splitViaTableIterationSample(ctx context.Context, stream types.StreamInterface, approxRowCount int64, numberOfChunks int64) (*types.Set[types.Chunk], error) { - minSampleRows := numberOfChunks * sampleRowsPerChunkMultiplier - samplePercent := float64(minSampleRows) / float64(approxRowCount) * 100.0 - samplePercent = math.Max(sampleBlockPercentMin, math.Min(sampleBlockPercentMax, samplePercent)) + samplePercent := abstract.ComputeSamplePercent(approxRowCount, numberOfChunks) logger.Debugf("Sampling %.4f%% of blocks from [%s.%s] for chunk boundaries (approxRows=%d, chunks=%d)", samplePercent, stream.Namespace(), stream.Name(), approxRowCount, numberOfChunks) diff --git a/pkg/jdbc/jdbc.go b/pkg/jdbc/jdbc.go index 30b15968f..ea012e5ef 100644 --- a/pkg/jdbc/jdbc.go +++ b/pkg/jdbc/jdbc.go @@ -1029,6 +1029,17 @@ func MSSQLTableRowStatsQuery() string { ` } +// MSSQLPhysLocSampleBoundaryQuery returns a query that uses TABLESAMPLE SYSTEM to +// sample a percentage of data pages and return sorted %%physloc%% binary values. +func MSSQLPhysLocSampleBoundaryQuery(stream types.StreamInterface, samplePercent float64) string { + quotedTable := QuoteTable(stream.Namespace(), stream.Name(), constants.MSSQL) + return fmt.Sprintf(` + SELECT %%%%physloc%%%% + FROM %s TABLESAMPLE SYSTEM (%.6f PERCENT) WITH (NOLOCK) + ORDER BY %%%%physloc%%%% + `, quotedTable, samplePercent) +} + // OracleDB Specific Queries // OracleTableDiscoveryQuery returns the query to fetch the username and table name of all the tables which the current user has access to in OracleDB From ff69e09d7c0b83479c09a59935b3e13492bdf182 Mon Sep 17 00:00:00 2001 From: vaibhav Date: Tue, 12 May 2026 19:26:01 +0530 Subject: [PATCH 3/8] feat: add caching for iam walk compatability --- drivers/mssql/internal/backfill.go | 91 +++++++++++++++++------------- drivers/mssql/internal/mssql.go | 1 + 2 files changed, 52 insertions(+), 40 deletions(-) diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 2f652413a..65074e6d9 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -10,6 +10,7 @@ import ( "math" "sort" "strings" + "sync" "time" "github.com/datazip-inc/olake/constants" @@ -30,20 +31,10 @@ const physLocChunkValueLen = 18 // (IN_ROW_DATA max row size). Using the ceiling yields smaller chunks. const usableBytesPerPage = 8060 -// isPhysLocChunk reports whether the chunk's boundaries are %%physloc%% -// hex literals (the shape produced by IAM walk and the iterative physloc -// planner). Either Min or Max is sufficient to identify the format because -// both planners always set at least one boundary. -func isPhysLocChunk(chunk types.Chunk) bool { - check := func(v any) bool { - s, ok := v.(string) - if !ok || len(s) != physLocChunkValueLen || !strings.HasPrefix(s, "0x") { - return false - } - _, err := hex.DecodeString(s[2:]) - return err == nil - } - return check(chunk.Min) || check(chunk.Max) +// iamWalkCache holds the one-time probe result for IAM walk capability. +type iamWalkCache struct { + once sync.Once + capable bool } // ChunkIterator implements snapshot iteration over MSSQL chunks. @@ -476,34 +467,38 @@ func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterfac return nil } -// probeIAMWalkCapability checks if IAM walk +// probeIAMWalkCapability checks whether the connected SQL Server instance +// supports IAM walk chunking via sys.dm_db_database_page_allocations. func (m *MSSQL) probeIAMWalkCapability(ctx context.Context) bool { - var majorVersion, engineEdition int - err := m.client.QueryRowContext(ctx, jdbc.MSSQLIAMWalkServerPropertiesQuery()).Scan(&majorVersion, &engineEdition) - if err != nil { - logger.Debugf("IAM walk probe: failed to read server properties: %s", err) - return false - } - if majorVersion < 11 { - logger.Debugf("IAM walk probe: SQL Server major version %d < 11, IAM walk unsupported", majorVersion) - return false - } - // EngineEdition 5 = Azure SQL Database, 8 = Azure SQL Managed Instance. - // sys.dm_db_database_page_allocations is blocked on both. - if engineEdition == 5 || engineEdition == 8 { - logger.Debugf("IAM walk probe: EngineEdition %d (Azure SQL DB/MI) blocks the DMF", engineEdition) - return false - } + m.once.Do(func() { + var majorVersion, engineEdition int + err := m.client.QueryRowContext(ctx, jdbc.MSSQLIAMWalkServerPropertiesQuery()).Scan(&majorVersion, &engineEdition) + if err != nil { + logger.Debugf("IAM walk probe: failed to read server properties: %s", err) + return + } + if majorVersion < 11 { + logger.Debugf("IAM walk probe: SQL Server major version %d < 11, IAM walk unsupported", majorVersion) + return + } + // EngineEdition 5 = Azure SQL Database, 8 = Azure SQL Managed Instance. + // sys.dm_db_database_page_allocations is blocked on both. + if engineEdition == 5 || engineEdition == 8 { + logger.Debugf("IAM walk probe: EngineEdition %d (Azure SQL DB/MI) blocks the DMF", engineEdition) + return + } - // Permission probe: TOP 0 evaluates the DMF without returning any rows. - // Failure here means the current login lacks VIEW DATABASE STATE. - rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkPermissionQuery()) - if err != nil { - logger.Debugf("IAM walk probe: permission test failed (likely missing VIEW DATABASE STATE): %s", err) - return false - } - rows.Close() - return true + // Permission probe: TOP 0 evaluates the DMF without returning any rows. + // Failure here means the current login lacks VIEW DATABASE STATE. + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkPermissionQuery()) + if err != nil { + logger.Debugf("IAM walk probe: permission test failed (likely missing VIEW DATABASE STATE): %s", err) + return + } + rows.Close() + m.capable = true + }) + return m.capable } // getColumnTypeMSSQL returns SQL data type for the requested column. @@ -585,3 +580,19 @@ func formatUniqueIdentifierBytes(v []byte) (string, bool) { v[10], v[11], v[12], v[13], v[14], v[15], // last 6 bytes (big-endian) ), true } + +// isPhysLocChunk reports whether the chunk's boundaries are %%physloc%% +// hex literals (the shape produced by IAM walk and the iterative physloc +// planner). Either Min or Max is sufficient to identify the format because +// both planners always set at least one boundary. +func isPhysLocChunk(chunk types.Chunk) bool { + check := func(v any) bool { + s, ok := v.(string) + if !ok || len(s) != physLocChunkValueLen || !strings.HasPrefix(s, "0x") { + return false + } + _, err := hex.DecodeString(s[2:]) + return err == nil + } + return check(chunk.Min) || check(chunk.Max) +} diff --git a/drivers/mssql/internal/mssql.go b/drivers/mssql/internal/mssql.go index 8e654d5df..b7f312a14 100644 --- a/drivers/mssql/internal/mssql.go +++ b/drivers/mssql/internal/mssql.go @@ -32,6 +32,7 @@ type MSSQL struct { streams []types.StreamInterface cdcSupported bool sshClient *ssh.Client + iamWalkCache // backfill-related capability cache for iam walk strategy } // GetConfigRef implements abstract.DriverInterface. From f6d66b477a56a5a2cea6771a61ad979ffeaf9cbf Mon Sep 17 00:00:00 2001 From: vaibhav Date: Thu, 14 May 2026 15:52:40 +0530 Subject: [PATCH 4/8] chore: address review comments --- drivers/mssql/internal/backfill.go | 383 ++++++++++++++--------------- drivers/mssql/internal/mssql.go | 1 - 2 files changed, 180 insertions(+), 204 deletions(-) diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 65074e6d9..90e423312 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -10,7 +10,6 @@ import ( "math" "sort" "strings" - "sync" "time" "github.com/datazip-inc/olake/constants" @@ -31,12 +30,6 @@ const physLocChunkValueLen = 18 // (IN_ROW_DATA max row size). Using the ceiling yields smaller chunks. const usableBytesPerPage = 8060 -// iamWalkCache holds the one-time probe result for IAM walk capability. -type iamWalkCache struct { - once sync.Once - capable bool -} - // ChunkIterator implements snapshot iteration over MSSQL chunks. func (m *MSSQL) ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, onMessage abstract.BackfillMsgFn) error { opts := jdbc.DriverOptions{ @@ -138,159 +131,6 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo logger.Debugf("Stream %s: No PK or chunkColumn, will use %%physloc%% chunking", stream.ID()) } - // Split via primary key when available - splitViaPrimaryKey := func(stream types.StreamInterface, chunks *types.Set[types.Chunk], pkCols []string) error { - return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { - sort.Strings(pkCols) - - if len(pkCols) == 0 { - return nil - } - // Get the minimum and maximum values for the primary key columns - minVal, maxVal, err := m.getTableExtremes(ctx, stream, pkCols, tx) - if err != nil { - return fmt.Errorf("failed to get table extremes: %s", err) - } - // Skip if table is empty - if minVal == nil { - return nil - } - - columnType := "" - if len(pkCols) == 1 { - columnType, err = m.getColumnTypeMSSQL(ctx, stream, pkCols[0], tx) - if err != nil { - return fmt.Errorf("failed to get table column type: %s", err) - } - } - - // Create the first chunk from the beginning up to the minimum value - chunks.Insert(types.Chunk{ - Min: nil, - Max: normalizeBoundaryValue(minVal, pkCols, columnType), - }) - - logger.Infof( - "Stream %s extremes - min: %v, max: %v", stream.ID(), - normalizeBoundaryValue(minVal, pkCols, columnType), - normalizeBoundaryValue(maxVal, pkCols, columnType), - ) - - // Build query to find the next chunk boundary - query := jdbc.MSSQLNextChunkEndQuery(stream, pkCols, chunkSize) - currentVal := minVal - - for { - // Split the current composite key value into individual column parts - columns := strings.Split(normalizeBoundaryValue(currentVal, pkCols, columnType), ",") - - // Build query arguments for composite key comparison - args := make([]interface{}, 0) - for colIdx := 0; colIdx < len(pkCols); colIdx++ { - for partIdx := 0; partIdx <= colIdx && partIdx < len(columns); partIdx++ { - args = append(args, strings.TrimSpace(columns[partIdx])) - } - } - - // Query for the next chunk boundary value - var nextValRaw interface{} - err := tx.QueryRowContext(ctx, query, args...).Scan(&nextValRaw) - // Stop if we've reached the end of the table - if err == sql.ErrNoRows || nextValRaw == nil { - break - } - if err != nil { - return fmt.Errorf("failed to get next chunk end: %s", err) - } - - // Create a chunk between current and next boundary - if currentVal != nil { - chunks.Insert(types.Chunk{ - Min: normalizeBoundaryValue(currentVal, pkCols, columnType), - Max: normalizeBoundaryValue(nextValRaw, pkCols, columnType), - }) - } - - currentVal = nextValRaw - } - - // Create the final chunk from the last value to the end - if currentVal != nil { - chunks.Insert(types.Chunk{ - Min: normalizeBoundaryValue(currentVal, pkCols, columnType), - Max: nil, - }) - } - - return nil - }) - } - - // Split using physical location when no primary key is available. - // %%physloc%% returns the physical location (file_id, page_id, slot_id) of a row as binary. - // We iteratively find chunk boundaries by querying for the N-th row (N = chunkSize) where - // physloc > current, creating evenly-sized chunks: [nil, min], [min, next1], ..., [last, nil] - // - // All physloc values are hex-encoded before storing in chunks to ensure valid UTF-8 chunk values. - splitViaPhysLoc := func(stream types.StreamInterface, chunks *types.Set[types.Chunk]) error { - // SQL Server doesn't support read-only transactions - // Use repeatable read isolation without read-only flag - return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { - // Get the minimum and maximum physical location values - // These define the boundaries of our table for chunking - minVal, maxVal, err := m.getPhysLocExtremes(ctx, stream, tx) - if err != nil { - return fmt.Errorf("failed to get %%physloc%% extremes: %s", err) - } - // Skip if table is empty (no rows to chunk) - if minVal == nil || maxVal == nil { - return nil - } - - // Start from the minimum physloc value - current := minVal - chunks.Insert(types.Chunk{ - Min: nil, - Max: utils.HexEncode(minVal), - }) - - // Iteratively find chunk boundaries until we reach the end of the table - for { - var next []byte - // This gives us the next chunk boundary, ensuring each chunk has ~chunkSize rows - query := jdbc.MSSQLPhysLocNextChunkEndQuery(stream, chunkSize) - - err := tx.QueryRowContext(ctx, query, current).Scan(&next) - // End of table reached: no more rows with physloc > current - if err == sql.ErrNoRows || next == nil { - chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) - break - } - if err != nil { - return fmt.Errorf("failed to get next %%physloc%% chunk end: %s", err) - } - - if bytes.Equal(current, next) { - chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) - break - } - - // Create a chunk between current and next boundary - // This chunk will contain approximately chunkSize rows - // Example: If current = A and next = D, chunk [A, D) contains rows A, B, C - chunks.Insert(types.Chunk{ - Min: utils.HexEncode(current), - Max: utils.HexEncode(next), - }) - - // Move to the next boundary for the next iteration - current = next - } - - return nil - }) - } - // physLocSampleThenFallback tries TABLESAMPLE physloc sampling and, if that // produces no usable chunks, falls back to PK or iterative physloc scan. physLocSampleThenFallback := func() error { @@ -301,23 +141,21 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo return nil } logger.Debugf("Stream %s: TABLESAMPLE physloc sampling failed %s", stream.ID(), sampleErr) - chunks = types.NewSet[types.Chunk]() if len(pkColumns) > 0 { - return splitViaPrimaryKey(stream, chunks, pkColumns) + return m.splitViaPrimaryKey(ctx, stream, chunks, pkColumns, chunkSize) } - return splitViaPhysLoc(stream, chunks) + return m.splitViaPhysLoc(ctx, stream, chunks, chunkSize) } switch { case chunkColumn != "": logger.Debugf("Stream %s: chunkColumn=%s set, using PK-based chunking", stream.ID(), chunkColumn) - err = splitViaPrimaryKey(stream, chunks, pkColumns) + err = m.splitViaPrimaryKey(ctx, stream, chunks, pkColumns, chunkSize) case m.probeIAMWalkCapability(ctx): logger.Debugf("Stream %s: Attempting IAM walk chunking", stream.ID()) err = m.splitViaIAMWalk(ctx, stream, chunks) if err != nil || chunks.Len() == 0 { logger.Warnf("Stream %s: IAM walk failed %s", stream.ID(), err) - chunks = types.NewSet[types.Chunk]() err = physLocSampleThenFallback() } else { logger.Infof("Stream %s: IAM walk produced %d chunks", stream.ID(), chunks.Len()) @@ -368,6 +206,148 @@ func physLocBytes(packed uint64) []byte { return b } +// splitViaPrimaryKey divides the table into chunks by walking PK boundaries +// under REPEATABLE READ (see jdbc.WithIsolation). +func (m *MSSQL) splitViaPrimaryKey(ctx context.Context, stream types.StreamInterface, chunks *types.Set[types.Chunk], pkCols []string, chunkSize int64) error { + return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { + sort.Strings(pkCols) + + if len(pkCols) == 0 { + return nil + } + minVal, maxVal, err := m.getTableExtremes(ctx, stream, pkCols, tx) + if err != nil { + return fmt.Errorf("failed to get table extremes: %s", err) + } + if minVal == nil { + return nil + } + + columnType := "" + if len(pkCols) == 1 { + columnType, err = m.getColumnTypeMSSQL(ctx, stream, pkCols[0], tx) + if err != nil { + return fmt.Errorf("failed to get table column type: %s", err) + } + } + + chunks.Insert(types.Chunk{ + Min: nil, + Max: normalizeBoundaryValue(minVal, pkCols, columnType), + }) + + logger.Infof( + "Stream %s extremes - min: %v, max: %v", stream.ID(), + normalizeBoundaryValue(minVal, pkCols, columnType), + normalizeBoundaryValue(maxVal, pkCols, columnType), + ) + + query := jdbc.MSSQLNextChunkEndQuery(stream, pkCols, chunkSize) + currentVal := minVal + + for { + // Split the current composite key value into individual column parts + columns := strings.Split(normalizeBoundaryValue(currentVal, pkCols, columnType), ",") + + // Build query arguments for composite key comparison + args := make([]interface{}, 0) + for colIdx := 0; colIdx < len(pkCols); colIdx++ { + for partIdx := 0; partIdx <= colIdx && partIdx < len(columns); partIdx++ { + args = append(args, strings.TrimSpace(columns[partIdx])) + } + } + + // Query for the next chunk boundary value + var nextValRaw interface{} + err := tx.QueryRowContext(ctx, query, args...).Scan(&nextValRaw) + // Stop if we've reached the end of the table + if err == sql.ErrNoRows || nextValRaw == nil { + break + } + if err != nil { + return fmt.Errorf("failed to get next chunk end: %s", err) + } + + // Create a chunk between current and next boundary + if currentVal != nil { + chunks.Insert(types.Chunk{ + Min: normalizeBoundaryValue(currentVal, pkCols, columnType), + Max: normalizeBoundaryValue(nextValRaw, pkCols, columnType), + }) + } + + currentVal = nextValRaw + } + + // Create the final chunk from the last value to the end + if currentVal != nil { + chunks.Insert(types.Chunk{ + Min: normalizeBoundaryValue(currentVal, pkCols, columnType), + Max: nil, + }) + } + + return nil + }) +} + +// Split using physical location when no primary key is available. +// %%physloc%% returns the physical location (file_id, page_id, slot_id) of a row as binary. +// We iteratively find chunk boundaries by querying for the N-th row (N = chunkSize) where +// physloc > current, creating evenly-sized chunks: [nil, min], [min, next1], ..., [last, nil] +// +// All physloc values are hex-encoded before storing in chunks to ensure valid UTF-8 chunk values. +func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterface, chunks *types.Set[types.Chunk], chunkSize int64) error { + // SQL Server doesn't support read-only transactions + // Use repeatable read isolation without read-only flag + return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { + // Get the minimum and maximum physical location values + // These define the boundaries of our table for chunking + minVal, maxVal, err := m.getPhysLocExtremes(ctx, stream, tx) + if err != nil { + return fmt.Errorf("failed to get %%physloc%% extremes: %s", err) + } + // Skip if table is empty (no rows to chunk) + if minVal == nil || maxVal == nil { + return nil + } + + current := minVal + chunks.Insert(types.Chunk{ + Min: nil, + Max: utils.HexEncode(minVal), + }) + + for { + var next []byte + query := jdbc.MSSQLPhysLocNextChunkEndQuery(stream, chunkSize) + + err := tx.QueryRowContext(ctx, query, current).Scan(&next) + if err == sql.ErrNoRows || next == nil { + chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) + break + } + if err != nil { + return fmt.Errorf("failed to get next %%physloc%% chunk end: %s", err) + } + + if bytes.Equal(current, next) { + chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) + break + } + + chunks.Insert(types.Chunk{ + Min: utils.HexEncode(current), + Max: utils.HexEncode(next), + }) + + current = next + } + + return nil + }) +} + // splitViaPhysLocSample estimates chunk boundaries using TABLESAMPLE SYSTEM. // It reads a small percentage of data pages (no full table scan), sorts the // sampled %%physloc%% values, and picks evenly-spaced boundaries in Go. @@ -383,28 +363,28 @@ func (m *MSSQL) splitViaPhysLocSample(ctx context.Context, stream types.StreamIn } defer rows.Close() - var sampledLocs [][]byte + var physLocSamples [][]byte for rows.Next() { var loc []byte if err := rows.Scan(&loc); err != nil { return nil, fmt.Errorf("failed to scan sampled %%%%physloc%%%%: %s", err) } - sampledLocs = append(sampledLocs, loc) + physLocSamples = append(physLocSamples, loc) } if err = rows.Err(); err != nil { return nil, fmt.Errorf("failed to iterate sampled %%%%physloc%%%% rows: %s", err) } - if int64(len(sampledLocs)) < numberOfChunks { - return nil, fmt.Errorf("TABLESAMPLE returned %d rows, need at least %d for %d chunks", - len(sampledLocs), numberOfChunks, numberOfChunks) + if int64(len(physLocSamples)) < numberOfChunks { + return nil, fmt.Errorf("TABLESAMPLE returned %d rows, need at least %d", + len(physLocSamples), numberOfChunks) } chunks := types.NewSet[types.Chunk]() - step := float64(len(sampledLocs)) / float64(numberOfChunks) + step := float64(len(physLocSamples)) / float64(numberOfChunks) var prev any = nil - for i := int64(0); i < numberOfChunks; i++ { - curr := utils.HexEncode(sampledLocs[int(float64(i)*step)]) + for i := int64(0); i < numberOfChunks; i++ { + curr := utils.HexEncode(physLocSamples[int(float64(i)*step)]) chunks.Insert(types.Chunk{Min: prev, Max: curr}) prev = curr } @@ -467,38 +447,35 @@ func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterfac return nil } -// probeIAMWalkCapability checks whether the connected SQL Server instance -// supports IAM walk chunking via sys.dm_db_database_page_allocations. +// probeIAMWalkCapability checks whether IAM walk can run on this server/login. +// It is cheap (two round-trips) so we call it per stream instead of caching. func (m *MSSQL) probeIAMWalkCapability(ctx context.Context) bool { - m.once.Do(func() { - var majorVersion, engineEdition int - err := m.client.QueryRowContext(ctx, jdbc.MSSQLIAMWalkServerPropertiesQuery()).Scan(&majorVersion, &engineEdition) - if err != nil { - logger.Debugf("IAM walk probe: failed to read server properties: %s", err) - return - } - if majorVersion < 11 { - logger.Debugf("IAM walk probe: SQL Server major version %d < 11, IAM walk unsupported", majorVersion) - return - } - // EngineEdition 5 = Azure SQL Database, 8 = Azure SQL Managed Instance. - // sys.dm_db_database_page_allocations is blocked on both. - if engineEdition == 5 || engineEdition == 8 { - logger.Debugf("IAM walk probe: EngineEdition %d (Azure SQL DB/MI) blocks the DMF", engineEdition) - return - } + var majorVersion, engineEdition int + err := m.client.QueryRowContext(ctx, jdbc.MSSQLIAMWalkServerPropertiesQuery()).Scan(&majorVersion, &engineEdition) + if err != nil { + logger.Debugf("IAM walk probe: failed to read server properties: %s", err) + return false + } + if majorVersion < 11 { + logger.Debugf("IAM walk probe: SQL Server major version %d < 11, IAM walk unsupported", majorVersion) + return false + } + // EngineEdition 5 = Azure SQL Database, 8 = Azure SQL Managed Instance. + // sys.dm_db_database_page_allocations is blocked on both. + if engineEdition == 5 || engineEdition == 8 { + logger.Debugf("IAM walk probe: EngineEdition %d (Azure SQL DB/MI) blocks the DMF", engineEdition) + return false + } - // Permission probe: TOP 0 evaluates the DMF without returning any rows. - // Failure here means the current login lacks VIEW DATABASE STATE. - rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkPermissionQuery()) - if err != nil { - logger.Debugf("IAM walk probe: permission test failed (likely missing VIEW DATABASE STATE): %s", err) - return - } - rows.Close() - m.capable = true - }) - return m.capable + // Permission probe: TOP 0 evaluates the DMF without returning any rows. + // Failure here means the current login lacks VIEW DATABASE STATE. + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkPermissionQuery()) + if err != nil { + logger.Debugf("IAM walk probe: permission test failed (likely missing VIEW DATABASE STATE): %s", err) + return false + } + rows.Close() + return true } // getColumnTypeMSSQL returns SQL data type for the requested column. diff --git a/drivers/mssql/internal/mssql.go b/drivers/mssql/internal/mssql.go index b7f312a14..8e654d5df 100644 --- a/drivers/mssql/internal/mssql.go +++ b/drivers/mssql/internal/mssql.go @@ -32,7 +32,6 @@ type MSSQL struct { streams []types.StreamInterface cdcSupported bool sshClient *ssh.Client - iamWalkCache // backfill-related capability cache for iam walk strategy } // GetConfigRef implements abstract.DriverInterface. From 2b95c59b6abecc22b10b2ccc8cd17b94f1448827 Mon Sep 17 00:00:00 2001 From: vaibhav Date: Mon, 18 May 2026 17:49:23 +0530 Subject: [PATCH 5/8] fix: physloc iteration detection correction --- drivers/mssql/internal/backfill.go | 97 +++++++++++------------------- 1 file changed, 36 insertions(+), 61 deletions(-) diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 90e423312..233336909 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -5,9 +5,9 @@ import ( "context" "database/sql" "encoding/binary" - "encoding/hex" "fmt" "math" + "slices" "sort" "strings" "time" @@ -22,9 +22,8 @@ import ( "github.com/datazip-inc/olake/utils/typeutils" ) -// physLocChunkValueLen is the hex-encoded BINARY(8) %%physloc%% shape: -// "0x" + 16 hex chars. Used to route chunks to physloc scans. -const physLocChunkValueLen = 18 +// physLocBoundary is a sentinel type for %%physloc%% hex-literal chunk boundaries. +type physLocBoundary string // usableBytesPerPage is an upper bound for in-row payload per 8KB page // (IN_ROW_DATA max row size). Using the ceiling yields smaller chunks. @@ -168,41 +167,23 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo return chunks, err } -// packPhysLoc encodes a (file_id, page_id) pair as a uint64 whose -// unsigned-integer comparison matches SQL Server's binary comparison of -// the equivalent BINARY(8) %%physloc%% value. -// -// %%physloc%% wire layout: bytes 0..7 = [page_id LE 4B][file_id LE 2B][slot_id LE 2B]. -// SQL Server compares BINARY(N) byte-by-byte starting from byte 0, so we -// pack physloc byte 0 into the highest 8 bits of the uint64 and physloc -// byte 7 into the lowest. slot_id is fixed at 0xFFFF so each boundary -// represents "after the last possible row on the page": chunk filter -// `%%physloc%% <= b` includes every row on b's page, and `> b` starts -// cleanly on the next page. -// -// This lets us sort []uint64 with native uint64 comparison (instead of -// allocating an 8-byte slice per page and sorting with bytes.Compare), -// halving memory on large tables and skipping the per-page encode step -// entirely; we encode only the few sampled boundaries. -func packPhysLoc(fileID, pageID int32) uint64 { - p := uint32(pageID) - f := uint32(uint16(fileID)) - return uint64(p&0xFF)<<56 | - uint64((p>>8)&0xFF)<<48 | - uint64((p>>16)&0xFF)<<40 | - uint64((p>>24)&0xFF)<<32 | - uint64(f&0xFF)<<24 | - uint64((f>>8)&0xFF)<<16 | - 0xFFFF +// physlocSortKey encodes (file_id, page_id) as a uint64 that sorts identically +// to SQL Server's byte-by-byte BINARY(8) comparison of the equivalent %%physloc%%. +// slot_id is fixed at 0xFFFF ("end of page") so chunk predicates split cleanly between pages. +// Sorting []uint64 with < is cheaper than sorting [][]byte with bytes.Compare. +func physlocSortKey(fileID, pageID int32) uint64 { + var b [8]byte + binary.LittleEndian.PutUint32(b[0:4], uint32(pageID)) + binary.LittleEndian.PutUint16(b[4:6], uint16(fileID)) + binary.LittleEndian.PutUint16(b[6:8], 0xFFFF) + return binary.BigEndian.Uint64(b[:]) } -// physLocBytes converts a packed uint64 boundary back to its 8-byte -// %%physloc%% wire representation. Because packPhysLoc places physloc -// byte 0 at the uint64's most significant 8 bits, writing the uint64 in -// big-endian order reproduces the wire layout exactly. -func physLocBytes(packed uint64) []byte { +// physLocBytes converts a sort key back to the 8-byte %%physloc%% wire format +// that SQL Server understands in chunk boundary predicates. +func physLocBytes(key uint64) []byte { b := make([]byte, 8) - binary.BigEndian.PutUint64(b, packed) + binary.BigEndian.PutUint64(b, key) return b } @@ -315,7 +296,7 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac current := minVal chunks.Insert(types.Chunk{ Min: nil, - Max: utils.HexEncode(minVal), + Max: physLocBoundary(utils.HexEncode(minVal)), }) for { @@ -324,7 +305,7 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac err := tx.QueryRowContext(ctx, query, current).Scan(&next) if err == sql.ErrNoRows || next == nil { - chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) + chunks.Insert(types.Chunk{Min: physLocBoundary(utils.HexEncode(current)), Max: nil}) break } if err != nil { @@ -332,13 +313,13 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac } if bytes.Equal(current, next) { - chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) + chunks.Insert(types.Chunk{Min: physLocBoundary(utils.HexEncode(current)), Max: nil}) break } chunks.Insert(types.Chunk{ - Min: utils.HexEncode(current), - Max: utils.HexEncode(next), + Min: physLocBoundary(utils.HexEncode(current)), + Max: physLocBoundary(utils.HexEncode(next)), }) current = next @@ -383,8 +364,8 @@ func (m *MSSQL) splitViaPhysLocSample(ctx context.Context, stream types.StreamIn chunks := types.NewSet[types.Chunk]() step := float64(len(physLocSamples)) / float64(numberOfChunks) var prev any = nil - for i := int64(0); i < numberOfChunks; i++ { - curr := utils.HexEncode(physLocSamples[int(float64(i)*step)]) + for i := int64(0); i < numberOfChunks; i++ { + curr := physLocBoundary(utils.HexEncode(physLocSamples[int(float64(i)*step)])) chunks.Insert(types.Chunk{Min: prev, Max: curr}) prev = curr } @@ -415,7 +396,7 @@ func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterfac if err := rows.Scan(&fileID, &pageID); err != nil { return fmt.Errorf("failed to scan IAM walk page: %s", err) } - pages = append(pages, packPhysLoc(fileID, pageID)) + pages = append(pages, physlocSortKey(fileID, pageID)) } if err := rows.Err(); err != nil { return fmt.Errorf("failed to iterate IAM walk rows: %s", err) @@ -427,7 +408,7 @@ func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterfac } // Sort defensively — the DMF does not guarantee any output order. - sort.Slice(pages, func(i, j int) bool { return pages[i] < pages[j] }) + slices.Sort(pages) pagesPerChunk := constants.EffectiveParquetSize / usableBytesPerPage pagesPerChunk = max(pagesPerChunk, 1) @@ -438,9 +419,9 @@ func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterfac // this naturally produces just {nil, nil}. var prev any = nil for i := pagesPerChunk; i < total; i += pagesPerChunk { - max := utils.HexEncode(physLocBytes(pages[i])) - chunks.Insert(types.Chunk{Min: prev, Max: max}) - prev = max + boundary := physLocBoundary(utils.HexEncode(physLocBytes(pages[i]))) + chunks.Insert(types.Chunk{Min: prev, Max: boundary}) + prev = boundary } chunks.Insert(types.Chunk{Min: prev, Max: nil}) @@ -456,6 +437,7 @@ func (m *MSSQL) probeIAMWalkCapability(ctx context.Context) bool { logger.Debugf("IAM walk probe: failed to read server properties: %s", err) return false } + // SQL Server 2012 (version 11) is the first version to support IAM walk. if majorVersion < 11 { logger.Debugf("IAM walk probe: SQL Server major version %d < 11, IAM walk unsupported", majorVersion) return false @@ -558,18 +540,11 @@ func formatUniqueIdentifierBytes(v []byte) (string, bool) { ), true } -// isPhysLocChunk reports whether the chunk's boundaries are %%physloc%% -// hex literals (the shape produced by IAM walk and the iterative physloc -// planner). Either Min or Max is sufficient to identify the format because -// both planners always set at least one boundary. +// isPhysLocChunk reports whether the chunk was produced by a physloc-based +// planner (IAM walk, iterative, or TABLESAMPLE). It uses a sentinel type +// instead of a string-length heuristic to avoid false positives. func isPhysLocChunk(chunk types.Chunk) bool { - check := func(v any) bool { - s, ok := v.(string) - if !ok || len(s) != physLocChunkValueLen || !strings.HasPrefix(s, "0x") { - return false - } - _, err := hex.DecodeString(s[2:]) - return err == nil - } - return check(chunk.Min) || check(chunk.Max) + _, minOK := chunk.Min.(physLocBoundary) + _, maxOK := chunk.Max.(physLocBoundary) + return minOK || maxOK } From 3a3f8206e51bbf2b465a5168bae925b51eb923ff Mon Sep 17 00:00:00 2001 From: vaibhav Date: Tue, 19 May 2026 19:41:39 +0530 Subject: [PATCH 6/8] chore: change code placement --- drivers/abstract/backfill.go | 10 ---------- drivers/mssql/internal/backfill.go | 25 +++++++++++++++++++++---- drivers/mssql/internal/config.go | 16 ++++++++++++++++ drivers/mssql/resources/spec.json | 10 ++++++++++ drivers/oracle/internal/backfill.go | 2 +- utils/spec/uischema.go | 10 +++++++++- utils/utils.go | 10 ++++++++++ 7 files changed, 67 insertions(+), 16 deletions(-) diff --git a/drivers/abstract/backfill.go b/drivers/abstract/backfill.go index c6c33414e..dc8060b72 100644 --- a/drivers/abstract/backfill.go +++ b/drivers/abstract/backfill.go @@ -97,13 +97,3 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha utils.ConcurrentInGroupWithRetry(a.GlobalConnGroup, chunks, a.driver.MaxRetries(), chunkProcessor) return nil } - -// ComputeSamplePercent returns the TABLESAMPLE / SAMPLE BLOCK percentage to use -// for chunk boundary estimation. The result is clamped to -// [constants.SamplePercentMin, constants.SamplePercentMax] so callers never -// request a near-zero or near-full scan regardless of row-count estimates. -func ComputeSamplePercent(approxRowCount, numberOfChunks int64) float64 { - minSampleRows := numberOfChunks * constants.SampleRowsPerChunkMultiplier - samplingPercentage := float64(minSampleRows) / float64(approxRowCount) * 100.0 - return max(constants.SamplePercentMin, min(constants.SamplePercentMax, samplingPercentage)) -} diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 233336909..2d8b74930 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -146,21 +146,26 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo return m.splitViaPhysLoc(ctx, stream, chunks, chunkSize) } + // useIAMWalk returns true when the config allows IAM walk and the server + // supports it. Setting chunking_strategy=sampling bypasses IAM walk + // entirely, which avoids the schema-stability lock it takes on the table. + useIAMWalk := m.config.ChunkingStrategy == IAMWalkStrategy && m.probeIAMWalkCapability(ctx) + switch { case chunkColumn != "": logger.Debugf("Stream %s: chunkColumn=%s set, using PK-based chunking", stream.ID(), chunkColumn) err = m.splitViaPrimaryKey(ctx, stream, chunks, pkColumns, chunkSize) - case m.probeIAMWalkCapability(ctx): + case useIAMWalk: logger.Debugf("Stream %s: Attempting IAM walk chunking", stream.ID()) err = m.splitViaIAMWalk(ctx, stream, chunks) if err != nil || chunks.Len() == 0 { - logger.Warnf("Stream %s: IAM walk failed %s", stream.ID(), err) + logger.Warnf("Stream %s: IAM walk failed (%s), falling back to sampling", stream.ID(), err) err = physLocSampleThenFallback() } else { logger.Infof("Stream %s: IAM walk produced %d chunks", stream.ID(), chunks.Len()) } default: - logger.Debugf("Stream %s: IAM walk unavailable, trying TABLESAMPLE physloc sampling", stream.ID()) + logger.Debugf("Stream %s: using TABLESAMPLE physloc sampling (strategy=%s)", stream.ID(), m.config.ChunkingStrategy) err = physLocSampleThenFallback() } @@ -196,10 +201,12 @@ func (m *MSSQL) splitViaPrimaryKey(ctx context.Context, stream types.StreamInter if len(pkCols) == 0 { return nil } + // Get the minimum and maximum values for the primary key columns minVal, maxVal, err := m.getTableExtremes(ctx, stream, pkCols, tx) if err != nil { return fmt.Errorf("failed to get table extremes: %s", err) } + // Skip if table is empty if minVal == nil { return nil } @@ -212,6 +219,7 @@ func (m *MSSQL) splitViaPrimaryKey(ctx context.Context, stream types.StreamInter } } + // Create the first chunk from the beginning up to the minimum value chunks.Insert(types.Chunk{ Min: nil, Max: normalizeBoundaryValue(minVal, pkCols, columnType), @@ -223,6 +231,7 @@ func (m *MSSQL) splitViaPrimaryKey(ctx context.Context, stream types.StreamInter normalizeBoundaryValue(maxVal, pkCols, columnType), ) + // Build query to find the next chunk boundary query := jdbc.MSSQLNextChunkEndQuery(stream, pkCols, chunkSize) currentVal := minVal @@ -293,17 +302,21 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac return nil } + // Start from the minimum physloc value current := minVal chunks.Insert(types.Chunk{ Min: nil, Max: physLocBoundary(utils.HexEncode(minVal)), }) + // Iteratively find chunk boundaries until we reach the end of the table for { var next []byte + // This gives us the next chunk boundary, ensuring each chunk has ~chunkSize rows query := jdbc.MSSQLPhysLocNextChunkEndQuery(stream, chunkSize) err := tx.QueryRowContext(ctx, query, current).Scan(&next) + // End of table reached: no more rows with physloc > current if err == sql.ErrNoRows || next == nil { chunks.Insert(types.Chunk{Min: physLocBoundary(utils.HexEncode(current)), Max: nil}) break @@ -317,11 +330,15 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac break } + // Create a chunk between current and next boundary + // This chunk will contain approximately chunkSize rows + // Example: If current = A and next = D, chunk [A, D) contains rows A, B, C chunks.Insert(types.Chunk{ Min: physLocBoundary(utils.HexEncode(current)), Max: physLocBoundary(utils.HexEncode(next)), }) + // Move to the next boundary for the next iteration current = next } @@ -333,7 +350,7 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac // It reads a small percentage of data pages (no full table scan), sorts the // sampled %%physloc%% values, and picks evenly-spaced boundaries in Go. func (m *MSSQL) splitViaPhysLocSample(ctx context.Context, stream types.StreamInterface, approxRowCount int64, numberOfChunks int64) (*types.Set[types.Chunk], error) { - samplePercent := abstract.ComputeSamplePercent(approxRowCount, numberOfChunks) + samplePercent := utils.ComputeSamplePercent(approxRowCount, numberOfChunks) logger.Debugf("TABLESAMPLE sampling %.4f%% of pages from [%s.%s] for chunk boundaries (approxRows=%d, chunks=%d)", samplePercent, stream.Namespace(), stream.Name(), approxRowCount, numberOfChunks) diff --git a/drivers/mssql/internal/config.go b/drivers/mssql/internal/config.go index ba21b4253..39237daa0 100644 --- a/drivers/mssql/internal/config.go +++ b/drivers/mssql/internal/config.go @@ -9,6 +9,21 @@ import ( "github.com/datazip-inc/olake/utils" ) +// ChunkingStrategy controls how MSSQL tables are divided into parallel read chunks. +// - "iam_walk" (default): uses sys.dm_db_database_page_allocations to plan +// page-aligned chunks in O(1) without scanning table data. Requires SQL Server +// 2012+, VIEW DATABASE STATE permission, and is unavailable on Azure SQL DB/MI. +// Takes a schema-stability lock during planning, which blocks DDL on +// the target table. Use when DDL is not running concurrently during sync. +// - "sampling": uses TABLESAMPLE SYSTEM for boundary estimation; no DDL lock, +// no special permissions, works on Azure SQL DB/MI. +type ChunkingStrategy string + +const ( + IAMWalkStrategy ChunkingStrategy = "iam_walk" + SamplingStrategy ChunkingStrategy = "sampling" +) + // Config represents the configuration for connecting to a MSSQL database. type Config struct { Host string `json:"host"` @@ -22,6 +37,7 @@ type Config struct { SSLConfiguration *utils.SSLConfig `json:"ssl"` ManageCaptureInstances bool `json:"manage_capture_instances"` SSHConfig *utils.SSHConfig `json:"ssh_config"` + ChunkingStrategy ChunkingStrategy `json:"chunking_strategy,omitempty"` } // Validate checks and normalises MSSQL configuration. diff --git a/drivers/mssql/resources/spec.json b/drivers/mssql/resources/spec.json index 881f805c2..6251ee73f 100644 --- a/drivers/mssql/resources/spec.json +++ b/drivers/mssql/resources/spec.json @@ -72,6 +72,16 @@ "description": "Automatically manage MSSQL CDC capture instances for schema evolution", "default": false }, + "chunking_strategy": { + "type": "string", + "title": "Chunking Strategy", + "description": "Strategy used to split tables into parallel read chunks. Use iam_walk for faster, evenly distributed chunks (requires SQL Server 2012+, VIEW DATABASE STATE; takes a brief DDL lock). Use sampling if DDL runs during sync , might be slower than iam walk strategy.", + "default": "iam_walk", + "enum": [ + "iam_walk", + "sampling" + ] + }, "ssh_config": { "title": "SSH Config", "description": "OLake uses SSH to connect to the database.", diff --git a/drivers/oracle/internal/backfill.go b/drivers/oracle/internal/backfill.go index 873ec48e6..77ff607f7 100644 --- a/drivers/oracle/internal/backfill.go +++ b/drivers/oracle/internal/backfill.go @@ -165,7 +165,7 @@ func (o *Oracle) splitViaTableIterationLoop(ctx context.Context, stream types.St // full-table sort that NTILE requires, making it safe on tables with billions of // rows where NTILE would spill to temp tablespace and risk ORA-1652. func (o *Oracle) splitViaTableIterationSample(ctx context.Context, stream types.StreamInterface, approxRowCount int64, numberOfChunks int64) (*types.Set[types.Chunk], error) { - samplePercent := abstract.ComputeSamplePercent(approxRowCount, numberOfChunks) + samplePercent := utils.ComputeSamplePercent(approxRowCount, numberOfChunks) logger.Debugf("Sampling %.4f%% of blocks from [%s.%s] for chunk boundaries (approxRows=%d, chunks=%d)", samplePercent, stream.Namespace(), stream.Name(), approxRowCount, numberOfChunks) diff --git a/utils/spec/uischema.go b/utils/spec/uischema.go index 6daee7218..561b85390 100644 --- a/utils/spec/uischema.go +++ b/utils/spec/uischema.go @@ -198,7 +198,8 @@ const MSSQLUISchema = `{ { "port": 12, "max_threads": 12 }, { "retry_count": 12, "jdbc_url_params": 12 }, { "ssl": 12, "manage_capture_instances": 12 }, - { "update_method": 12, "ssh_config": 12 } + { "update_method": 12, "ssh_config": 12 }, + { "chunking_strategy": 12 } ], "ssl": { "ui:options": { @@ -216,6 +217,13 @@ const MSSQLUISchema = `{ "manage_capture_instances": { "ui:widget": "boolean" }, + "chunking_strategy": { + "ui:widget": "select", + "ui:enumNames": [ + "IAM Walk", + "Sampling" + ] + }, "ssh_config": { "ui:options": { "title": false, diff --git a/utils/utils.go b/utils/utils.go index 9ccf3f865..fab289c01 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -543,3 +543,13 @@ func ExtractColumnName(groups ...string) string { } return "" } + +// ComputeSamplePercent returns the TABLESAMPLE / SAMPLE BLOCK percentage to use +// for chunk boundary estimation. The result is clamped to +// [constants.SamplePercentMin, constants.SamplePercentMax] so callers never +// request a near-zero or near-full scan regardless of row-count estimates. +func ComputeSamplePercent(approxRowCount, numberOfChunks int64) float64 { + minSampleRows := numberOfChunks * constants.SampleRowsPerChunkMultiplier + samplingPercentage := float64(minSampleRows) / float64(approxRowCount) * 100.0 + return max(constants.SamplePercentMin, min(constants.SamplePercentMax, samplingPercentage)) +} From a97fffe29f8850500a87113f578133be167f8c6f Mon Sep 17 00:00:00 2001 From: vaibhav Date: Wed, 20 May 2026 11:51:11 +0530 Subject: [PATCH 7/8] chore: remove chunking strategy drop down --- drivers/mssql/internal/backfill.go | 11 +++-------- drivers/mssql/internal/config.go | 16 ---------------- drivers/mssql/resources/spec.json | 10 ---------- utils/spec/uischema.go | 10 +--------- 4 files changed, 4 insertions(+), 43 deletions(-) diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 2d8b74930..4c4e8ee4b 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -146,26 +146,21 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo return m.splitViaPhysLoc(ctx, stream, chunks, chunkSize) } - // useIAMWalk returns true when the config allows IAM walk and the server - // supports it. Setting chunking_strategy=sampling bypasses IAM walk - // entirely, which avoids the schema-stability lock it takes on the table. - useIAMWalk := m.config.ChunkingStrategy == IAMWalkStrategy && m.probeIAMWalkCapability(ctx) - switch { case chunkColumn != "": logger.Debugf("Stream %s: chunkColumn=%s set, using PK-based chunking", stream.ID(), chunkColumn) err = m.splitViaPrimaryKey(ctx, stream, chunks, pkColumns, chunkSize) - case useIAMWalk: + case m.probeIAMWalkCapability(ctx): logger.Debugf("Stream %s: Attempting IAM walk chunking", stream.ID()) err = m.splitViaIAMWalk(ctx, stream, chunks) if err != nil || chunks.Len() == 0 { - logger.Warnf("Stream %s: IAM walk failed (%s), falling back to sampling", stream.ID(), err) + logger.Warnf("Stream %s: IAM walk failed (%s)", stream.ID(), err) err = physLocSampleThenFallback() } else { logger.Infof("Stream %s: IAM walk produced %d chunks", stream.ID(), chunks.Len()) } default: - logger.Debugf("Stream %s: using TABLESAMPLE physloc sampling (strategy=%s)", stream.ID(), m.config.ChunkingStrategy) + logger.Debugf("Stream %s: IAM walk unavailable, trying TABLESAMPLE physloc sampling", stream.ID()) err = physLocSampleThenFallback() } diff --git a/drivers/mssql/internal/config.go b/drivers/mssql/internal/config.go index 39237daa0..ba21b4253 100644 --- a/drivers/mssql/internal/config.go +++ b/drivers/mssql/internal/config.go @@ -9,21 +9,6 @@ import ( "github.com/datazip-inc/olake/utils" ) -// ChunkingStrategy controls how MSSQL tables are divided into parallel read chunks. -// - "iam_walk" (default): uses sys.dm_db_database_page_allocations to plan -// page-aligned chunks in O(1) without scanning table data. Requires SQL Server -// 2012+, VIEW DATABASE STATE permission, and is unavailable on Azure SQL DB/MI. -// Takes a schema-stability lock during planning, which blocks DDL on -// the target table. Use when DDL is not running concurrently during sync. -// - "sampling": uses TABLESAMPLE SYSTEM for boundary estimation; no DDL lock, -// no special permissions, works on Azure SQL DB/MI. -type ChunkingStrategy string - -const ( - IAMWalkStrategy ChunkingStrategy = "iam_walk" - SamplingStrategy ChunkingStrategy = "sampling" -) - // Config represents the configuration for connecting to a MSSQL database. type Config struct { Host string `json:"host"` @@ -37,7 +22,6 @@ type Config struct { SSLConfiguration *utils.SSLConfig `json:"ssl"` ManageCaptureInstances bool `json:"manage_capture_instances"` SSHConfig *utils.SSHConfig `json:"ssh_config"` - ChunkingStrategy ChunkingStrategy `json:"chunking_strategy,omitempty"` } // Validate checks and normalises MSSQL configuration. diff --git a/drivers/mssql/resources/spec.json b/drivers/mssql/resources/spec.json index 6251ee73f..881f805c2 100644 --- a/drivers/mssql/resources/spec.json +++ b/drivers/mssql/resources/spec.json @@ -72,16 +72,6 @@ "description": "Automatically manage MSSQL CDC capture instances for schema evolution", "default": false }, - "chunking_strategy": { - "type": "string", - "title": "Chunking Strategy", - "description": "Strategy used to split tables into parallel read chunks. Use iam_walk for faster, evenly distributed chunks (requires SQL Server 2012+, VIEW DATABASE STATE; takes a brief DDL lock). Use sampling if DDL runs during sync , might be slower than iam walk strategy.", - "default": "iam_walk", - "enum": [ - "iam_walk", - "sampling" - ] - }, "ssh_config": { "title": "SSH Config", "description": "OLake uses SSH to connect to the database.", diff --git a/utils/spec/uischema.go b/utils/spec/uischema.go index 561b85390..6daee7218 100644 --- a/utils/spec/uischema.go +++ b/utils/spec/uischema.go @@ -198,8 +198,7 @@ const MSSQLUISchema = `{ { "port": 12, "max_threads": 12 }, { "retry_count": 12, "jdbc_url_params": 12 }, { "ssl": 12, "manage_capture_instances": 12 }, - { "update_method": 12, "ssh_config": 12 }, - { "chunking_strategy": 12 } + { "update_method": 12, "ssh_config": 12 } ], "ssl": { "ui:options": { @@ -217,13 +216,6 @@ const MSSQLUISchema = `{ "manage_capture_instances": { "ui:widget": "boolean" }, - "chunking_strategy": { - "ui:widget": "select", - "ui:enumNames": [ - "IAM Walk", - "Sampling" - ] - }, "ssh_config": { "ui:options": { "title": false, From c61513445d5a09fe6318dbfa9629ed1f274b9d7a Mon Sep 17 00:00:00 2001 From: vaibhav Date: Wed, 20 May 2026 18:32:50 +0530 Subject: [PATCH 8/8] fix: physloc detection for resumable sync --- constants/constants.go | 3 +++ drivers/mssql/internal/backfill.go | 37 ++++++++++++++++++------------ pkg/jdbc/jdbc.go | 12 ++++------ 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/constants/constants.go b/constants/constants.go index 3422ec750..90b9e842c 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -51,6 +51,9 @@ const ( // ~10 sample points to pick a boundary from, producing even spacing even when // blocks/pages are clustered (e.g. freshly inserted rows land on adjacent pages). SampleRowsPerChunkMultiplier = int64(10) + // PhysLocBoundaryPrefix is prepended to every %%physloc%% hex boundary stored + // in a types.Chunk in MSSQL. It is used to identify physloc reading chunks. + PhysLocBoundaryPrefix = "olake_physloc_" ) type DriverType string diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 4c4e8ee4b..33e49c77d 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -22,9 +22,6 @@ import ( "github.com/datazip-inc/olake/utils/typeutils" ) -// physLocBoundary is a sentinel type for %%physloc%% hex-literal chunk boundaries. -type physLocBoundary string - // usableBytesPerPage is an upper bound for in-row payload per 8KB page // (IN_ROW_DATA max row size). Using the ceiling yields smaller chunks. const usableBytesPerPage = 8060 @@ -301,7 +298,7 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac current := minVal chunks.Insert(types.Chunk{ Min: nil, - Max: physLocBoundary(utils.HexEncode(minVal)), + Max: physLocBoundary(minVal), }) // Iteratively find chunk boundaries until we reach the end of the table @@ -313,7 +310,7 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac err := tx.QueryRowContext(ctx, query, current).Scan(&next) // End of table reached: no more rows with physloc > current if err == sql.ErrNoRows || next == nil { - chunks.Insert(types.Chunk{Min: physLocBoundary(utils.HexEncode(current)), Max: nil}) + chunks.Insert(types.Chunk{Min: physLocBoundary(current), Max: nil}) break } if err != nil { @@ -321,7 +318,7 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac } if bytes.Equal(current, next) { - chunks.Insert(types.Chunk{Min: physLocBoundary(utils.HexEncode(current)), Max: nil}) + chunks.Insert(types.Chunk{Min: physLocBoundary(current), Max: nil}) break } @@ -329,8 +326,8 @@ func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterfac // This chunk will contain approximately chunkSize rows // Example: If current = A and next = D, chunk [A, D) contains rows A, B, C chunks.Insert(types.Chunk{ - Min: physLocBoundary(utils.HexEncode(current)), - Max: physLocBoundary(utils.HexEncode(next)), + Min: physLocBoundary(current), + Max: physLocBoundary(next), }) // Move to the next boundary for the next iteration @@ -377,7 +374,8 @@ func (m *MSSQL) splitViaPhysLocSample(ctx context.Context, stream types.StreamIn step := float64(len(physLocSamples)) / float64(numberOfChunks) var prev any = nil for i := int64(0); i < numberOfChunks; i++ { - curr := physLocBoundary(utils.HexEncode(physLocSamples[int(float64(i)*step)])) + idx := min(int(float64(i)*step), len(physLocSamples)-1) + curr := physLocBoundary(physLocSamples[idx]) chunks.Insert(types.Chunk{Min: prev, Max: curr}) prev = curr } @@ -431,7 +429,7 @@ func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterfac // this naturally produces just {nil, nil}. var prev any = nil for i := pagesPerChunk; i < total; i += pagesPerChunk { - boundary := physLocBoundary(utils.HexEncode(physLocBytes(pages[i]))) + boundary := physLocBoundary(physLocBytes(pages[i])) chunks.Insert(types.Chunk{Min: prev, Max: boundary}) prev = boundary } @@ -553,10 +551,19 @@ func formatUniqueIdentifierBytes(v []byte) (string, bool) { } // isPhysLocChunk reports whether the chunk was produced by a physloc-based -// planner (IAM walk, iterative, or TABLESAMPLE). It uses a sentinel type -// instead of a string-length heuristic to avoid false positives. +// planner (IAM walk, iterative, or TABLESAMPLE). Detection uses the +// PhysLocBoundaryPrefix embedded in the boundary value. func isPhysLocChunk(chunk types.Chunk) bool { - _, minOK := chunk.Min.(physLocBoundary) - _, maxOK := chunk.Max.(physLocBoundary) - return minOK || maxOK + hasPrefix := func(v any) bool { + s, ok := v.(string) + return ok && strings.HasPrefix(s, constants.PhysLocBoundaryPrefix) + } + return hasPrefix(chunk.Min) || hasPrefix(chunk.Max) +} + +// physLocBoundary encodes raw %%physloc%% bytes as a prefixed hex string for +// storage in a types.Chunk. And concats with the prefix to the hex encoded value. +// This helps to uniquely identify the chunk for physloc reading. +func physLocBoundary(b []byte) string { + return constants.PhysLocBoundaryPrefix + utils.HexEncode(b) } diff --git a/pkg/jdbc/jdbc.go b/pkg/jdbc/jdbc.go index 623dd67a3..72b442e59 100644 --- a/pkg/jdbc/jdbc.go +++ b/pkg/jdbc/jdbc.go @@ -971,18 +971,14 @@ func MSSQLNextChunkEndQuery(stream types.StreamInterface, orderingColumns []stri func MSSQLPhysLocChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string { tableName := QuoteTable(stream.Namespace(), stream.Name(), constants.MSSQL) - // Format %%physloc%% value as a hex literal + // formatPhysLocValue strips the physloc prefix and returns the raw hex + // literal that SQL Server expects in %%physloc%% predicates. formatPhysLocValue := func(val any) string { if val == nil { return "NULL" } - - // chunk stores boundary (min and max) values in hex format - if value, ok := val.(string); ok { - return value - } - - return fmt.Sprintf("%v", val) + s := fmt.Sprintf("%v", val) + return strings.TrimPrefix(s, constants.PhysLocBoundaryPrefix) } var chunkCond string