diff --git a/constants/constants.go b/constants/constants.go index 658318dff..90b9e842c 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -40,6 +40,20 @@ const ( // MysqlChunkAcceptanceRatio defines the minimum ratio of expected chunks that must be generated // for the split to be considered valid. MysqlChunkAcceptanceRatio = float64(0.8) + // SamplePercentMin / SamplePercentMax define the clamped range for TABLESAMPLE / + // SAMPLE BLOCK percentage used by physloc and ROWID chunk boundary estimation. + // 0.01 is the practical floor below which page-level sampling may return zero + // rows; 50 caps worst-case I/O so a bad row-count estimate cannot escalate to a + // near-full scan. + SamplePercentMin = float64(0.01) + SamplePercentMax = float64(50.0) + // SampleRowsPerChunkMultiplier controls sample density: each target chunk gets + // ~10 sample points to pick a boundary from, producing even spacing even when + // blocks/pages are clustered (e.g. freshly inserted rows land on adjacent pages). + SampleRowsPerChunkMultiplier = int64(10) + // PhysLocBoundaryPrefix is prepended to every %%physloc%% hex boundary stored + // in a types.Chunk in MSSQL. It is used to identify physloc reading chunks. + PhysLocBoundaryPrefix = "olake_physloc_" ) type DriverType string diff --git a/drivers/mssql/internal/backfill.go b/drivers/mssql/internal/backfill.go index 8df3b2a82..33e49c77d 100644 --- a/drivers/mssql/internal/backfill.go +++ b/drivers/mssql/internal/backfill.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "database/sql" + "encoding/binary" "fmt" "math" + "slices" "sort" "strings" "time" @@ -20,6 +22,10 @@ import ( "github.com/datazip-inc/olake/utils/typeutils" ) +// usableBytesPerPage is an upper bound for in-row payload per 8KB page +// (IN_ROW_DATA max row size). Using the ceiling yields smaller chunks. +const usableBytesPerPage = 8060 + // ChunkIterator implements snapshot iteration over MSSQL chunks. func (m *MSSQL) ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, onMessage abstract.BackfillMsgFn) error { opts := jdbc.DriverOptions{ @@ -48,11 +54,14 @@ func (m *MSSQL) ChunkIterator(ctx context.Context, stream types.StreamInterface, // Build query for the chunk stmt := "" - if chunkColumn != "" { + switch { + case chunkColumn != "": stmt = jdbc.MSSQLChunkScanQuery(stream, []string{chunkColumn}, chunk, filter) - } else if len(pkColumns) > 0 { + case isPhysLocChunk(chunk): + stmt = jdbc.MSSQLPhysLocChunkScanQuery(stream, chunk, filter) + case len(pkColumns) > 0: stmt = jdbc.MSSQLChunkScanQuery(stream, pkColumns, chunk, filter) - } else { + default: stmt = jdbc.MSSQLPhysLocChunkScanQuery(stream, chunk, filter) } @@ -103,6 +112,7 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo return nil, fmt.Errorf("failed to get avg row size: %s", err) } chunkSize := int64(math.Ceil(float64(constants.EffectiveParquetSize) / avgRowSizeFloat)) + numberOfChunks := max(int64(math.Ceil(float64(approxRowCount)/float64(chunkSize))), int64(1)) chunks := types.NewSet[types.Chunk]() chunkColumn := stream.Self().StreamMetadata.ChunkColumn pkColumns := stream.GetStream().SourceDefinedPrimaryKey.Array() @@ -117,168 +127,347 @@ func (m *MSSQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo logger.Debugf("Stream %s: No PK or chunkColumn, will use %%physloc%% chunking", stream.ID()) } - // Split via primary key when available - splitViaPrimaryKey := func(stream types.StreamInterface, chunks *types.Set[types.Chunk], pkCols []string) error { - return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { - sort.Strings(pkCols) + // physLocSampleThenFallback tries TABLESAMPLE physloc sampling and, if that + // produces no usable chunks, falls back to PK or iterative physloc scan. + physLocSampleThenFallback := func() error { + sampleChunks, sampleErr := m.splitViaPhysLocSample(ctx, stream, approxRowCount, numberOfChunks) + if sampleErr == nil && sampleChunks.Len() > 0 { + logger.Infof("Stream %s: TABLESAMPLE physloc sampling produced %d chunks", stream.ID(), sampleChunks.Len()) + chunks = sampleChunks + return nil + } + logger.Debugf("Stream %s: TABLESAMPLE physloc sampling failed %s", stream.ID(), sampleErr) + if len(pkColumns) > 0 { + return m.splitViaPrimaryKey(ctx, stream, chunks, pkColumns, chunkSize) + } + return m.splitViaPhysLoc(ctx, stream, chunks, chunkSize) + } - if len(pkCols) == 0 { - return nil - } - // Get the minimum and maximum values for the primary key columns - minVal, maxVal, err := m.getTableExtremes(ctx, stream, pkCols, tx) - if err != nil { - return fmt.Errorf("failed to get table extremes: %s", err) - } - // Skip if table is empty - if minVal == nil { - return nil - } + switch { + case chunkColumn != "": + logger.Debugf("Stream %s: chunkColumn=%s set, using PK-based chunking", stream.ID(), chunkColumn) + err = m.splitViaPrimaryKey(ctx, stream, chunks, pkColumns, chunkSize) + case m.probeIAMWalkCapability(ctx): + logger.Debugf("Stream %s: Attempting IAM walk chunking", stream.ID()) + err = m.splitViaIAMWalk(ctx, stream, chunks) + if err != nil || chunks.Len() == 0 { + logger.Warnf("Stream %s: IAM walk failed (%s)", stream.ID(), err) + err = physLocSampleThenFallback() + } else { + logger.Infof("Stream %s: IAM walk produced %d chunks", stream.ID(), chunks.Len()) + } + default: + logger.Debugf("Stream %s: IAM walk unavailable, trying TABLESAMPLE physloc sampling", stream.ID()) + err = physLocSampleThenFallback() + } - columnType := "" - if len(pkCols) == 1 { - columnType, err = m.getColumnTypeMSSQL(ctx, stream, pkCols[0], tx) - if err != nil { - return fmt.Errorf("failed to get table column type: %s", err) - } - } + return chunks, err +} - // Create the first chunk from the beginning up to the minimum value - chunks.Insert(types.Chunk{ - Min: nil, - Max: normalizeBoundaryValue(minVal, pkCols, columnType), - }) +// physlocSortKey encodes (file_id, page_id) as a uint64 that sorts identically +// to SQL Server's byte-by-byte BINARY(8) comparison of the equivalent %%physloc%%. +// slot_id is fixed at 0xFFFF ("end of page") so chunk predicates split cleanly between pages. +// Sorting []uint64 with < is cheaper than sorting [][]byte with bytes.Compare. +func physlocSortKey(fileID, pageID int32) uint64 { + var b [8]byte + binary.LittleEndian.PutUint32(b[0:4], uint32(pageID)) + binary.LittleEndian.PutUint16(b[4:6], uint16(fileID)) + binary.LittleEndian.PutUint16(b[6:8], 0xFFFF) + return binary.BigEndian.Uint64(b[:]) +} - logger.Infof( - "Stream %s extremes - min: %v, max: %v", stream.ID(), - normalizeBoundaryValue(minVal, pkCols, columnType), - normalizeBoundaryValue(maxVal, pkCols, columnType), - ) - - // Build query to find the next chunk boundary - query := jdbc.MSSQLNextChunkEndQuery(stream, pkCols, chunkSize) - currentVal := minVal - - for { - // Split the current composite key value into individual column parts - columns := strings.Split(normalizeBoundaryValue(currentVal, pkCols, columnType), ",") - - // Build query arguments for composite key comparison - args := make([]interface{}, 0) - for colIdx := 0; colIdx < len(pkCols); colIdx++ { - for partIdx := 0; partIdx <= colIdx && partIdx < len(columns); partIdx++ { - args = append(args, strings.TrimSpace(columns[partIdx])) - } - } +// physLocBytes converts a sort key back to the 8-byte %%physloc%% wire format +// that SQL Server understands in chunk boundary predicates. +func physLocBytes(key uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, key) + return b +} - // Query for the next chunk boundary value - var nextValRaw interface{} - err := tx.QueryRowContext(ctx, query, args...).Scan(&nextValRaw) - // Stop if we've reached the end of the table - if err == sql.ErrNoRows || nextValRaw == nil { - break - } - if err != nil { - return fmt.Errorf("failed to get next chunk end: %s", err) - } +// splitViaPrimaryKey divides the table into chunks by walking PK boundaries +// under REPEATABLE READ (see jdbc.WithIsolation). +func (m *MSSQL) splitViaPrimaryKey(ctx context.Context, stream types.StreamInterface, chunks *types.Set[types.Chunk], pkCols []string, chunkSize int64) error { + return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { + sort.Strings(pkCols) - // Create a chunk between current and next boundary - if currentVal != nil { - chunks.Insert(types.Chunk{ - Min: normalizeBoundaryValue(currentVal, pkCols, columnType), - Max: normalizeBoundaryValue(nextValRaw, pkCols, columnType), - }) + if len(pkCols) == 0 { + return nil + } + // Get the minimum and maximum values for the primary key columns + minVal, maxVal, err := m.getTableExtremes(ctx, stream, pkCols, tx) + if err != nil { + return fmt.Errorf("failed to get table extremes: %s", err) + } + // Skip if table is empty + if minVal == nil { + return nil + } + + columnType := "" + if len(pkCols) == 1 { + columnType, err = m.getColumnTypeMSSQL(ctx, stream, pkCols[0], tx) + if err != nil { + return fmt.Errorf("failed to get table column type: %s", err) + } + } + + // Create the first chunk from the beginning up to the minimum value + chunks.Insert(types.Chunk{ + Min: nil, + Max: normalizeBoundaryValue(minVal, pkCols, columnType), + }) + + logger.Infof( + "Stream %s extremes - min: %v, max: %v", stream.ID(), + normalizeBoundaryValue(minVal, pkCols, columnType), + normalizeBoundaryValue(maxVal, pkCols, columnType), + ) + + // Build query to find the next chunk boundary + query := jdbc.MSSQLNextChunkEndQuery(stream, pkCols, chunkSize) + currentVal := minVal + + for { + // Split the current composite key value into individual column parts + columns := strings.Split(normalizeBoundaryValue(currentVal, pkCols, columnType), ",") + + // Build query arguments for composite key comparison + args := make([]interface{}, 0) + for colIdx := 0; colIdx < len(pkCols); colIdx++ { + for partIdx := 0; partIdx <= colIdx && partIdx < len(columns); partIdx++ { + args = append(args, strings.TrimSpace(columns[partIdx])) } + } - currentVal = nextValRaw + // Query for the next chunk boundary value + var nextValRaw interface{} + err := tx.QueryRowContext(ctx, query, args...).Scan(&nextValRaw) + // Stop if we've reached the end of the table + if err == sql.ErrNoRows || nextValRaw == nil { + break + } + if err != nil { + return fmt.Errorf("failed to get next chunk end: %s", err) } - // Create the final chunk from the last value to the end + // Create a chunk between current and next boundary if currentVal != nil { chunks.Insert(types.Chunk{ Min: normalizeBoundaryValue(currentVal, pkCols, columnType), - Max: nil, + Max: normalizeBoundaryValue(nextValRaw, pkCols, columnType), }) } + currentVal = nextValRaw + } + + // Create the final chunk from the last value to the end + if currentVal != nil { + chunks.Insert(types.Chunk{ + Min: normalizeBoundaryValue(currentVal, pkCols, columnType), + Max: nil, + }) + } + + return nil + }) +} + +// Split using physical location when no primary key is available. +// %%physloc%% returns the physical location (file_id, page_id, slot_id) of a row as binary. +// We iteratively find chunk boundaries by querying for the N-th row (N = chunkSize) where +// physloc > current, creating evenly-sized chunks: [nil, min], [min, next1], ..., [last, nil] +// +// All physloc values are hex-encoded before storing in chunks to ensure valid UTF-8 chunk values. +func (m *MSSQL) splitViaPhysLoc(ctx context.Context, stream types.StreamInterface, chunks *types.Set[types.Chunk], chunkSize int64) error { + // SQL Server doesn't support read-only transactions + // Use repeatable read isolation without read-only flag + return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { + // Get the minimum and maximum physical location values + // These define the boundaries of our table for chunking + minVal, maxVal, err := m.getPhysLocExtremes(ctx, stream, tx) + if err != nil { + return fmt.Errorf("failed to get %%physloc%% extremes: %s", err) + } + // Skip if table is empty (no rows to chunk) + if minVal == nil || maxVal == nil { return nil + } + + // Start from the minimum physloc value + current := minVal + chunks.Insert(types.Chunk{ + Min: nil, + Max: physLocBoundary(minVal), }) - } - // Split using physical location when no primary key is available. - // %%physloc%% returns the physical location (file_id, page_id, slot_id) of a row as binary. - // We iteratively find chunk boundaries by querying for the N-th row (N = chunkSize) where - // physloc > current, creating evenly-sized chunks: [nil, min], [min, next1], ..., [last, nil] - // - // All physloc values are hex-encoded before storing in chunks to ensure valid UTF-8 chunk values. - splitViaPhysLoc := func(stream types.StreamInterface, chunks *types.Set[types.Chunk]) error { - // SQL Server doesn't support read-only transactions - // Use repeatable read isolation without read-only flag - return jdbc.WithIsolation(ctx, m.client, false, func(tx *sql.Tx) error { - // Get the minimum and maximum physical location values - // These define the boundaries of our table for chunking - minVal, maxVal, err := m.getPhysLocExtremes(ctx, stream, tx) + // Iteratively find chunk boundaries until we reach the end of the table + for { + var next []byte + // This gives us the next chunk boundary, ensuring each chunk has ~chunkSize rows + query := jdbc.MSSQLPhysLocNextChunkEndQuery(stream, chunkSize) + + err := tx.QueryRowContext(ctx, query, current).Scan(&next) + // End of table reached: no more rows with physloc > current + if err == sql.ErrNoRows || next == nil { + chunks.Insert(types.Chunk{Min: physLocBoundary(current), Max: nil}) + break + } if err != nil { - return fmt.Errorf("failed to get %%physloc%% extremes: %s", err) + return fmt.Errorf("failed to get next %%physloc%% chunk end: %s", err) } - // Skip if table is empty (no rows to chunk) - if minVal == nil || maxVal == nil { - return nil + + if bytes.Equal(current, next) { + chunks.Insert(types.Chunk{Min: physLocBoundary(current), Max: nil}) + break } - // Start from the minimum physloc value - current := minVal + // Create a chunk between current and next boundary + // This chunk will contain approximately chunkSize rows + // Example: If current = A and next = D, chunk [A, D) contains rows A, B, C chunks.Insert(types.Chunk{ - Min: nil, - Max: utils.HexEncode(minVal), + Min: physLocBoundary(current), + Max: physLocBoundary(next), }) - // Iteratively find chunk boundaries until we reach the end of the table - for { - var next []byte - // This gives us the next chunk boundary, ensuring each chunk has ~chunkSize rows - query := jdbc.MSSQLPhysLocNextChunkEndQuery(stream, chunkSize) - - err := tx.QueryRowContext(ctx, query, current).Scan(&next) - // End of table reached: no more rows with physloc > current - if err == sql.ErrNoRows || next == nil { - chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) - break - } - if err != nil { - return fmt.Errorf("failed to get next %%physloc%% chunk end: %s", err) - } + // Move to the next boundary for the next iteration + current = next + } - if bytes.Equal(current, next) { - chunks.Insert(types.Chunk{Min: utils.HexEncode(current), Max: nil}) - break - } + return nil + }) +} - // Create a chunk between current and next boundary - // This chunk will contain approximately chunkSize rows - // Example: If current = A and next = D, chunk [A, D) contains rows A, B, C - chunks.Insert(types.Chunk{ - Min: utils.HexEncode(current), - Max: utils.HexEncode(next), - }) +// splitViaPhysLocSample estimates chunk boundaries using TABLESAMPLE SYSTEM. +// It reads a small percentage of data pages (no full table scan), sorts the +// sampled %%physloc%% values, and picks evenly-spaced boundaries in Go. +func (m *MSSQL) splitViaPhysLocSample(ctx context.Context, stream types.StreamInterface, approxRowCount int64, numberOfChunks int64) (*types.Set[types.Chunk], error) { + samplePercent := utils.ComputeSamplePercent(approxRowCount, numberOfChunks) - // Move to the next boundary for the next iteration - current = next - } + logger.Debugf("TABLESAMPLE sampling %.4f%% of pages from [%s.%s] for chunk boundaries (approxRows=%d, chunks=%d)", + samplePercent, stream.Namespace(), stream.Name(), approxRowCount, numberOfChunks) - return nil - }) + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLPhysLocSampleBoundaryQuery(stream, samplePercent)) + if err != nil { + return nil, fmt.Errorf("TABLESAMPLE %%%%physloc%%%% query failed: %s", err) } + defer rows.Close() - if len(pkColumns) > 0 { - logger.Debugf("Stream %s: Using PK-based chunking with columns: %v", stream.ID(), pkColumns) - err = splitViaPrimaryKey(stream, chunks, pkColumns) - } else { - logger.Debugf("Stream %s: Using %%physloc%% chunking (no PK or chunkColumn available)", stream.ID()) - err = splitViaPhysLoc(stream, chunks) + var physLocSamples [][]byte + for rows.Next() { + var loc []byte + if err := rows.Scan(&loc); err != nil { + return nil, fmt.Errorf("failed to scan sampled %%%%physloc%%%%: %s", err) + } + physLocSamples = append(physLocSamples, loc) + } + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate sampled %%%%physloc%%%% rows: %s", err) } - return chunks, err + if int64(len(physLocSamples)) < numberOfChunks { + return nil, fmt.Errorf("TABLESAMPLE returned %d rows, need at least %d", + len(physLocSamples), numberOfChunks) + } + + chunks := types.NewSet[types.Chunk]() + step := float64(len(physLocSamples)) / float64(numberOfChunks) + var prev any = nil + for i := int64(0); i < numberOfChunks; i++ { + idx := min(int(float64(i)*step), len(physLocSamples)-1) + curr := physLocBoundary(physLocSamples[idx]) + chunks.Insert(types.Chunk{Min: prev, Max: curr}) + prev = curr + } + chunks.Insert(types.Chunk{Min: prev, Max: nil}) + + return chunks, nil +} + +// splitViaIAMWalk plans chunks for any heap or clustered table by reading +// only the table's Index Allocation Map pages via +// sys.dm_db_database_page_allocations. +func (m *MSSQL) splitViaIAMWalk(ctx context.Context, stream types.StreamInterface, chunks *types.Set[types.Chunk]) error { + var objectID int64 + err := m.client.QueryRowContext(ctx, jdbc.MSSQLObjectIDQuery(), stream.Namespace(), stream.Name()).Scan(&objectID) + if err != nil { + return fmt.Errorf("failed to resolve object_id for IAM walk: %s", err) + } + + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkQuery(), objectID) + if err != nil { + return fmt.Errorf("failed to run IAM walk query: %s", err) + } + defer rows.Close() + + pages := make([]uint64, 0, 1024) + for rows.Next() { + var fileID, pageID int32 + if err := rows.Scan(&fileID, &pageID); err != nil { + return fmt.Errorf("failed to scan IAM walk page: %s", err) + } + pages = append(pages, physlocSortKey(fileID, pageID)) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate IAM walk rows: %s", err) + } + + total := int64(len(pages)) + if total == 0 { + return fmt.Errorf("IAM walk returned no allocated pages") + } + + // Sort defensively — the DMF does not guarantee any output order. + slices.Sort(pages) + + pagesPerChunk := constants.EffectiveParquetSize / usableBytesPerPage + pagesPerChunk = max(pagesPerChunk, 1) + + // Walk the sorted page list, emitting an open→closed chunk every + // pagesPerChunk pages. The trailing chunk is open-ended on the high + // side. If the table fits in a single chunk (total ≤ pagesPerChunk) + // this naturally produces just {nil, nil}. + var prev any = nil + for i := pagesPerChunk; i < total; i += pagesPerChunk { + boundary := physLocBoundary(physLocBytes(pages[i])) + chunks.Insert(types.Chunk{Min: prev, Max: boundary}) + prev = boundary + } + chunks.Insert(types.Chunk{Min: prev, Max: nil}) + + return nil +} + +// probeIAMWalkCapability checks whether IAM walk can run on this server/login. +// It is cheap (two round-trips) so we call it per stream instead of caching. +func (m *MSSQL) probeIAMWalkCapability(ctx context.Context) bool { + var majorVersion, engineEdition int + err := m.client.QueryRowContext(ctx, jdbc.MSSQLIAMWalkServerPropertiesQuery()).Scan(&majorVersion, &engineEdition) + if err != nil { + logger.Debugf("IAM walk probe: failed to read server properties: %s", err) + return false + } + // SQL Server 2012 (version 11) is the first version to support IAM walk. + if majorVersion < 11 { + logger.Debugf("IAM walk probe: SQL Server major version %d < 11, IAM walk unsupported", majorVersion) + return false + } + // EngineEdition 5 = Azure SQL Database, 8 = Azure SQL Managed Instance. + // sys.dm_db_database_page_allocations is blocked on both. + if engineEdition == 5 || engineEdition == 8 { + logger.Debugf("IAM walk probe: EngineEdition %d (Azure SQL DB/MI) blocks the DMF", engineEdition) + return false + } + + // Permission probe: TOP 0 evaluates the DMF without returning any rows. + // Failure here means the current login lacks VIEW DATABASE STATE. + rows, err := m.client.QueryContext(ctx, jdbc.MSSQLIAMWalkPermissionQuery()) + if err != nil { + logger.Debugf("IAM walk probe: permission test failed (likely missing VIEW DATABASE STATE): %s", err) + return false + } + rows.Close() + return true } // getColumnTypeMSSQL returns SQL data type for the requested column. @@ -360,3 +549,21 @@ func formatUniqueIdentifierBytes(v []byte) (string, bool) { v[10], v[11], v[12], v[13], v[14], v[15], // last 6 bytes (big-endian) ), true } + +// isPhysLocChunk reports whether the chunk was produced by a physloc-based +// planner (IAM walk, iterative, or TABLESAMPLE). Detection uses the +// PhysLocBoundaryPrefix embedded in the boundary value. +func isPhysLocChunk(chunk types.Chunk) bool { + hasPrefix := func(v any) bool { + s, ok := v.(string) + return ok && strings.HasPrefix(s, constants.PhysLocBoundaryPrefix) + } + return hasPrefix(chunk.Min) || hasPrefix(chunk.Max) +} + +// physLocBoundary encodes raw %%physloc%% bytes as a prefixed hex string for +// storage in a types.Chunk. And concats with the prefix to the hex encoded value. +// This helps to uniquely identify the chunk for physloc reading. +func physLocBoundary(b []byte) string { + return constants.PhysLocBoundaryPrefix + utils.HexEncode(b) +} diff --git a/drivers/oracle/internal/backfill.go b/drivers/oracle/internal/backfill.go index bfab8963e..77ff607f7 100644 --- a/drivers/oracle/internal/backfill.go +++ b/drivers/oracle/internal/backfill.go @@ -16,19 +16,6 @@ import ( "github.com/datazip-inc/olake/utils/logger" ) -// Oracle SAMPLE BLOCK clamp bounds for boundary estimation. Oracle allows -// (0.000001, 100) but 0.01 is the practical floor; staging tests on a ~500 GB -// table showed it improves chunk boundary quality at negligible runtime cost -// (a couple of seconds). 50 caps worst-case block scanning so a misconfigured -// stats row count cannot escalate to a full table scan equivalent. -const ( - sampleBlockPercentMin = 0.01 - sampleBlockPercentMax = 50.0 - // sampleRowsPerChunkMultiplier gives each chunk boundary ~10x sampled ROWIDs - // to pick from, which produces evenly spaced boundaries even when block - // sampling is clustered (e.g. freshly inserted rows land in adjacent blocks). - sampleRowsPerChunkMultiplier int64 = 10 -) // ChunkIterator implements the abstract.DriverInterface func (o *Oracle) ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, OnMessage abstract.BackfillMsgFn) error { @@ -178,9 +165,7 @@ func (o *Oracle) splitViaTableIterationLoop(ctx context.Context, stream types.St // full-table sort that NTILE requires, making it safe on tables with billions of // rows where NTILE would spill to temp tablespace and risk ORA-1652. func (o *Oracle) splitViaTableIterationSample(ctx context.Context, stream types.StreamInterface, approxRowCount int64, numberOfChunks int64) (*types.Set[types.Chunk], error) { - minSampleRows := numberOfChunks * sampleRowsPerChunkMultiplier - samplePercent := float64(minSampleRows) / float64(approxRowCount) * 100.0 - samplePercent = math.Max(sampleBlockPercentMin, math.Min(sampleBlockPercentMax, samplePercent)) + samplePercent := utils.ComputeSamplePercent(approxRowCount, numberOfChunks) logger.Debugf("Sampling %.4f%% of blocks from [%s.%s] for chunk boundaries (approxRows=%d, chunks=%d)", samplePercent, stream.Namespace(), stream.Name(), approxRowCount, numberOfChunks) diff --git a/pkg/jdbc/jdbc.go b/pkg/jdbc/jdbc.go index 68e1a6ac9..72b442e59 100644 --- a/pkg/jdbc/jdbc.go +++ b/pkg/jdbc/jdbc.go @@ -660,6 +660,50 @@ func MSSQLPhysLocNextChunkEndQuery(stream types.StreamInterface, chunkSize int64 `, quotedTable, chunkSize) } +// MSSQLIAMWalkQuery returns a streaming IAM-walk page list for a table: +// (file_id, page_id) from sys.dm_db_database_page_allocations in LIMITED mode. +// Params: @p1 = object_id. Requires VIEW DATABASE STATE; SQL Server 2012+; +// blocked on Azure SQL DB/MI. +func MSSQLIAMWalkQuery() string { + return ` + SELECT + allocated_page_file_id AS file_id, + allocated_page_page_id AS page_id + FROM sys.dm_db_database_page_allocations( + DB_ID(), + @p1, + NULL, + NULL, + 'LIMITED' + ) + WHERE is_allocated = 1 + AND is_iam_page = 0 + AND index_id IN (0, 1) + AND allocation_unit_type = 1 + ` +} + +// MSSQLObjectIDQuery returns the query to resolve a fully qualified +// schema + table name to its object_id using QUOTENAME for safety. +// Parameters: @p1 = schema name, @p2 = table name +func MSSQLObjectIDQuery() string { + return "SELECT OBJECT_ID(QUOTENAME(@p1) + '.' + QUOTENAME(@p2))" +} + +// MSSQLIAMWalkServerPropertiesQuery returns the query used by the IAM walk +// capability probe to fetch server major version and engine edition. +func MSSQLIAMWalkServerPropertiesQuery() string { + return "SELECT CAST(SERVERPROPERTY('ProductMajorVersion') AS INT), CAST(SERVERPROPERTY('EngineEdition') AS INT)" +} + +// MSSQLIAMWalkPermissionQuery returns a query that evaluates +// sys.dm_db_database_page_allocations without returning any rows. +// Failure indicates the login likely lacks VIEW DATABASE STATE or the DMF +// is blocked by the platform (Azure SQL DB/MI). +func MSSQLIAMWalkPermissionQuery() string { + return "SELECT TOP 0 1 FROM sys.dm_db_database_page_allocations(DB_ID(), OBJECT_ID('sys.objects'), NULL, NULL, 'LIMITED')" +} + // MSSQLCDCSupportQuery returns the query to check if CDC is enabled for the current database func MSSQLCDCSupportQuery() string { return ` @@ -927,18 +971,14 @@ func MSSQLNextChunkEndQuery(stream types.StreamInterface, orderingColumns []stri func MSSQLPhysLocChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string { tableName := QuoteTable(stream.Namespace(), stream.Name(), constants.MSSQL) - // Format %%physloc%% value as a hex literal + // formatPhysLocValue strips the physloc prefix and returns the raw hex + // literal that SQL Server expects in %%physloc%% predicates. formatPhysLocValue := func(val any) string { if val == nil { return "NULL" } - - // chunk stores boundary (min and max) values in hex format - if value, ok := val.(string); ok { - return value - } - - return fmt.Sprintf("%v", val) + s := fmt.Sprintf("%v", val) + return strings.TrimPrefix(s, constants.PhysLocBoundaryPrefix) } var chunkCond string @@ -997,6 +1037,17 @@ func MSSQLTableRowStatsQuery() string { ` } +// MSSQLPhysLocSampleBoundaryQuery returns a query that uses TABLESAMPLE SYSTEM to +// sample a percentage of data pages and return sorted %%physloc%% binary values. +func MSSQLPhysLocSampleBoundaryQuery(stream types.StreamInterface, samplePercent float64) string { + quotedTable := QuoteTable(stream.Namespace(), stream.Name(), constants.MSSQL) + return fmt.Sprintf(` + SELECT %%%%physloc%%%% + FROM %s TABLESAMPLE SYSTEM (%.6f PERCENT) WITH (NOLOCK) + ORDER BY %%%%physloc%%%% + `, quotedTable, samplePercent) +} + // OracleDB Specific Queries // OracleTableDiscoveryQuery returns the query to fetch the username and table name of all the tables which the current user has access to in OracleDB diff --git a/utils/utils.go b/utils/utils.go index 9ccf3f865..fab289c01 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -543,3 +543,13 @@ func ExtractColumnName(groups ...string) string { } return "" } + +// ComputeSamplePercent returns the TABLESAMPLE / SAMPLE BLOCK percentage to use +// for chunk boundary estimation. The result is clamped to +// [constants.SamplePercentMin, constants.SamplePercentMax] so callers never +// request a near-zero or near-full scan regardless of row-count estimates. +func ComputeSamplePercent(approxRowCount, numberOfChunks int64) float64 { + minSampleRows := numberOfChunks * constants.SampleRowsPerChunkMultiplier + samplingPercentage := float64(minSampleRows) / float64(approxRowCount) * 100.0 + return max(constants.SamplePercentMin, min(constants.SamplePercentMax, samplingPercentage)) +}