feat: add parallel snapshot read for OceanBase JDBC connector#135
Merged
Conversation
Implement parallel snapshot read functionality for OceanBase JDBC connector: - OceanBaseSource: Main source class implementing Flink Source API - OceanBaseSourceReader: Reads data from splits using JDBC - OceanBaseSplitEnumerator: Manages split discovery and assignment - OceanBaseSplit: Split definition for parallel reading - OceanBaseSourceConfig: Configuration for source connection - OceanBaseSplitSerializer/OceanBaseEnumeratorStateSerializer: Serializers for state - OceanBaseTableSource/TableSourceFactory: SQL API support Features: - Support both MySQL and Oracle compatible modes - MySQL mode: default split by primary key, user can specify chunk key column - Oracle mode: default split by ROWID, user can specify chunk key column - Pure JDBC implementation without debezium dependency - Bounded source for snapshot-only reading
The convertValue method was returning Long for all integer types, causing ClassCastException when Flink expected Integer for INTEGER type. Now returns correct types: byte for TINYINT, short for SMALLINT, int for INTEGER, long for BIGINT.
Add unit tests for OceanBaseSplitEnumerator and OceanBaseSourceReader: - Test Oracle mode default split column (ROWID) - Test MySQL mode default split column (primary key) - Test chunk-key-column override - Test identifier quoting for MySQL and Oracle modes - Test split point generation for numeric and string types - Test split boundary conditions - Test type conversion for all integer types (TINYINT, SMALLINT, INTEGER, BIGINT) - Test type conversion for floating point types (FLOAT, DOUBLE) - Test decimal conversion - Test JDBC Long to Integer conversion fix
Add comprehensive tests for SQL query generation in both MySQL and Oracle modes: - Test identifier quoting for MySQL (backticks) and Oracle (double quotes) - Test query generation for single split, first split, middle split, last split - Test ROWID-based splitting for Oracle mode - Test split boundary detection - Test value escaping for SQL strings
- Add OceanBaseParallelSourceOracleE2eITCase with environment-based configuration - Test parallel snapshot read with ROWID (default for Oracle mode) - Test parallel snapshot read with explicit chunk-key-column - Tests are @disabled by default (require OceanBase Enterprise Edition) - Can be run locally with environment variables: HOST, PORT, USER_NAME, PASSWORD, SCHEMA_NAME
- Test connection to OceanBase Oracle mode - Test ROWID query functionality - Test ROWID range query for parallel splitting - Test data read - Test split query with ROWID range - Tests are @disabled by default (require OceanBase Oracle instance)
Made-with: Cursor
Ensure snapshot source readers emit END_OF_INPUT after no-more-splits, keep decimal split boundaries precise, and add comprehensive conversion tests across supported data types to prevent runtime type mismatches. Made-with: Cursor
- Remove duplicate ConfigOptions from source factory, reuse ConnectorOptions - Add helper.validate() and fix deprecated getSchema() API - Centralize quoteIdentifier() in OceanBaseSourceConfig with SQL injection prevention - Add volatile to dataSource fields for DCL thread safety - Refactor pollNext() to cursor-based incremental reads (one row per call) - Remove unnecessary parallelism cap on numSplits - Reduce connection pool maxActive from 10 to 2 - Fix splitSize default from 8096 to 8192 - Cross-declare options in both factories for shared identifier tolerance - Rewrite tests to verify actual production code paths - Add factory, serializer edge case, and split boundary tests (57 total)
… precision, enumerator safety, Oracle quoting, JDBC defaults - Add lastReadValue to OceanBaseSplit for checkpoint resume (uses > instead of >= with ORDER BY) - Bump split/state serializer to v3 with v2 backward compatibility - Fix integer split boundary precision loss by using long arithmetic for Long/Integer types - Prevent premature noMoreSplits by checking inFlightSplits before signaling - Fix ConcurrentHashMap iterator mutation in assignPendingSplits by copying set before iteration - Align Oracle quoteIdentifier with sink-side: no quoting when case-insensitive (default) - Add createConfiguredDataSource with default JDBC properties matching OceanBaseConnectionProvider - Wire driverClassName, druidProperties, oracleTenantCaseInsensitive through config/factory/builder
Update connector docs to cover both Source and Sink capabilities, add Source usage examples for MySQL and Oracle modes, and reorganize configuration options into Common/Source/Sink sections.
Add OceanBaseMySQLSourceITCase covering 15 data types (BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, CHAR, VARCHAR, DATE, TIME, TIMESTAMP, BINARY, VARBINARY) through the full Source read -> Sink write -> JDBC query pipeline with TestContainers.
Hex literal X'48656C6C6F000000000000000000000000' was 17 bytes, exceeding BINARY(16) limit. Use plain strings instead - the database handles zero-padding automatically.
BOOLEAN returns true/false (not 1/0) via JDBC. CHAR(10) trailing spaces are trimmed after sink write-back.
Replace hand-written SQL queries (INFORMATION_SCHEMA for MySQL, all_constraints for Oracle) with the standard JDBC DatabaseMetaData.getPrimaryKeys() API. This aligns with flink-cdc's approach (via Debezium) and guarantees correct column ordering for composite primary keys across all compatibility modes.
Split discovery now runs on a background thread, allowing readers to start processing data before all splits have been calculated. This is especially beneficial for tables with non-numeric primary keys (e.g., VARCHAR), where each split point requires an expensive ORDER BY + OFFSET query. Numeric split points are still calculated in a single batch. Key changes: - Add ExecutorService for background split discovery thread - Use context.callAsync() for periodic split assignment checks - Add splitDiscoveryFinished flag to OceanBaseEnumeratorState (v4) - Incremental split generation for non-numeric types - Dedup logic to handle checkpoint restore during discovery
…tion Avoid MySQL deep pagination problem by using iterative next-chunk-max queries instead of LIMIT 1 OFFSET N. Each query now scans only chunkSize rows forward from the last boundary, giving O(chunkSize) per query regardless of table position. Also update blog SQL examples accordingly.
6001aa8 to
cc1095b
Compare
Read all rows from a split and emit them within a single pollNext() call. Since Flink injects checkpoint barriers only between pollNext() calls, this makes each split's processing atomic — no mid-split checkpoint possible. Source-level exactly-once without lastReadValue.
- Simplify isNumericSplittable() by removing redundant Long/Integer check - Unify dedupe key format to use splitBoundaryKey() consistently - Propagate SQL exceptions instead of swallowing them in split queries - Route addAllSplitsFromPoints() through addSplitToPending() for dedup - Remove lastReadValue dead code from OceanBaseSplit and serializers - Add __pk_increment hidden column support for MySQL no-PK tables - Inject hidden_column_visible hint in all split and reader queries - Fix blog parameter names (scan.split-size -> split-size) - Add unit tests for hidden PK hint, E2E and integration tests for no-PK tables
…lit points - Add volatile splitDiscoveryError field to capture background thread exceptions instead of silently swallowing them (modeled after Flink CDC's uncaughtSplitterException pattern). Errors are checked in callAsync callback and handleSplitRequest, ensuring the job fails instead of completing with partial/zero data. - Use BigDecimal arithmetic for Long/Integer split point calculation to prevent overflow when maxVal - minVal exceeds Long.MAX_VALUE (e.g., full BIGINT range). This matches Flink CDC's ObjectUtils.minus() approach. - Add overflow regression tests for full BIGINT range and large range scenarios.
Align with Flink CDC documentation practice by warning users that the chunk-key-column must be a non-null column, as NULL values are silently excluded from read results by SQL range comparison semantics.
- Delete OceanBaseSourceBuilder.java which has zero references in the codebase - Clarify Oracle mode requires OceanBase JDBC driver (MySQL driver not supported)
Add MiniCluster-based failover tests to verify exactly-once semantics during TM/JM failures in snapshot reading phase.
- Assert sink not full before triggering failover (job still running) - Wait for job to recover to RUNNING after failover (proves restart happened) - Log failover timing with sink progress
…ink-cdc Replace INSERT INTO sink + TestValuesTableFactory approach with SELECT + tableResult.collect() iterator pattern. The iterator blocks naturally on hasNext(), providing a reliable failover window without timing races. This mirrors the proven pattern from flink-cdc's MySqlSourceITCase.
Add string PK test coverage for the incremental split generation path (generateSplitsIncrementally) which was previously untested. Includes: - IT test in OceanBaseMySQLSourceITCase with 10-row VARCHAR PK table - Failover tests (NONE/TM/JM) with 30-row VARCHAR PK table
davidzhangbj
approved these changes
Apr 13, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implementation
Test plan