Skip to content

feat: add parallel snapshot read for OceanBase JDBC connector#135

Merged
yuanoOo merged 33 commits into
oceanbase:mainfrom
yuanoOo:feature/jdbc-parallel-source
Apr 13, 2026
Merged

feat: add parallel snapshot read for OceanBase JDBC connector#135
yuanoOo merged 33 commits into
oceanbase:mainfrom
yuanoOo:feature/jdbc-parallel-source

Conversation

@yuanoOo
Copy link
Copy Markdown
Collaborator

@yuanoOo yuanoOo commented Apr 1, 2026

Summary

  • Implement parallel snapshot read functionality for OceanBase JDBC connector
  • Support both MySQL and Oracle compatible modes
  • MySQL mode: default split by primary key, user can specify chunk key column
  • Oracle mode: default split by ROWID, user can specify chunk key column
  • Pure JDBC implementation without debezium dependency

Implementation

  • OceanBaseSource: Main source class implementing Flink Source API
  • OceanBaseSourceReader: Reads data from splits using JDBC
  • OceanBaseSplitEnumerator: Manages split discovery and assignment
  • OceanBaseSplit: Split definition for parallel reading
  • OceanBaseSourceConfig: Configuration for source connection
  • OceanBaseSplitSerializer/OceanBaseEnumeratorStateSerializer: Serializers for state
  • OceanBaseTableSource/TableSourceFactory: SQL API support

Test plan

  • Unit tests
  • E2E integration tests
  • GitHub CI passes

yuanoOo added 21 commits April 1, 2026 16:44
Implement parallel snapshot read functionality for OceanBase JDBC connector:
- OceanBaseSource: Main source class implementing Flink Source API
- OceanBaseSourceReader: Reads data from splits using JDBC
- OceanBaseSplitEnumerator: Manages split discovery and assignment
- OceanBaseSplit: Split definition for parallel reading
- OceanBaseSourceConfig: Configuration for source connection
- OceanBaseSplitSerializer/OceanBaseEnumeratorStateSerializer: Serializers for state
- OceanBaseTableSource/TableSourceFactory: SQL API support

Features:
- Support both MySQL and Oracle compatible modes
- MySQL mode: default split by primary key, user can specify chunk key column
- Oracle mode: default split by ROWID, user can specify chunk key column
- Pure JDBC implementation without debezium dependency
- Bounded source for snapshot-only reading
The convertValue method was returning Long for all integer types,
causing ClassCastException when Flink expected Integer for INTEGER type.
Now returns correct types: byte for TINYINT, short for SMALLINT,
int for INTEGER, long for BIGINT.
Add unit tests for OceanBaseSplitEnumerator and OceanBaseSourceReader:
- Test Oracle mode default split column (ROWID)
- Test MySQL mode default split column (primary key)
- Test chunk-key-column override
- Test identifier quoting for MySQL and Oracle modes
- Test split point generation for numeric and string types
- Test split boundary conditions
- Test type conversion for all integer types (TINYINT, SMALLINT, INTEGER, BIGINT)
- Test type conversion for floating point types (FLOAT, DOUBLE)
- Test decimal conversion
- Test JDBC Long to Integer conversion fix
Add comprehensive tests for SQL query generation in both MySQL and Oracle modes:
- Test identifier quoting for MySQL (backticks) and Oracle (double quotes)
- Test query generation for single split, first split, middle split, last split
- Test ROWID-based splitting for Oracle mode
- Test split boundary detection
- Test value escaping for SQL strings
- Add OceanBaseParallelSourceOracleE2eITCase with environment-based configuration
- Test parallel snapshot read with ROWID (default for Oracle mode)
- Test parallel snapshot read with explicit chunk-key-column
- Tests are @disabled by default (require OceanBase Enterprise Edition)
- Can be run locally with environment variables: HOST, PORT, USER_NAME, PASSWORD, SCHEMA_NAME
- Test connection to OceanBase Oracle mode
- Test ROWID query functionality
- Test ROWID range query for parallel splitting
- Test data read
- Test split query with ROWID range
- Tests are @disabled by default (require OceanBase Oracle instance)
Ensure snapshot source readers emit END_OF_INPUT after no-more-splits, keep decimal split boundaries precise, and add comprehensive conversion tests across supported data types to prevent runtime type mismatches.

Made-with: Cursor
- Remove duplicate ConfigOptions from source factory, reuse ConnectorOptions
- Add helper.validate() and fix deprecated getSchema() API
- Centralize quoteIdentifier() in OceanBaseSourceConfig with SQL injection prevention
- Add volatile to dataSource fields for DCL thread safety
- Refactor pollNext() to cursor-based incremental reads (one row per call)
- Remove unnecessary parallelism cap on numSplits
- Reduce connection pool maxActive from 10 to 2
- Fix splitSize default from 8096 to 8192
- Cross-declare options in both factories for shared identifier tolerance
- Rewrite tests to verify actual production code paths
- Add factory, serializer edge case, and split boundary tests (57 total)
… precision, enumerator safety, Oracle quoting, JDBC defaults

- Add lastReadValue to OceanBaseSplit for checkpoint resume (uses > instead of >= with ORDER BY)
- Bump split/state serializer to v3 with v2 backward compatibility
- Fix integer split boundary precision loss by using long arithmetic for Long/Integer types
- Prevent premature noMoreSplits by checking inFlightSplits before signaling
- Fix ConcurrentHashMap iterator mutation in assignPendingSplits by copying set before iteration
- Align Oracle quoteIdentifier with sink-side: no quoting when case-insensitive (default)
- Add createConfiguredDataSource with default JDBC properties matching OceanBaseConnectionProvider
- Wire driverClassName, druidProperties, oracleTenantCaseInsensitive through config/factory/builder
Update connector docs to cover both Source and Sink capabilities,
add Source usage examples for MySQL and Oracle modes, and reorganize
configuration options into Common/Source/Sink sections.
Add OceanBaseMySQLSourceITCase covering 15 data types (BOOLEAN, TINYINT,
SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, CHAR, VARCHAR, DATE,
TIME, TIMESTAMP, BINARY, VARBINARY) through the full Source read -> Sink
write -> JDBC query pipeline with TestContainers.
Hex literal X'48656C6C6F000000000000000000000000' was 17 bytes,
exceeding BINARY(16) limit. Use plain strings instead - the database
handles zero-padding automatically.
BOOLEAN returns true/false (not 1/0) via JDBC. CHAR(10) trailing
spaces are trimmed after sink write-back.
Replace hand-written SQL queries (INFORMATION_SCHEMA for MySQL,
all_constraints for Oracle) with the standard JDBC
DatabaseMetaData.getPrimaryKeys() API. This aligns with flink-cdc's
approach (via Debezium) and guarantees correct column ordering for
composite primary keys across all compatibility modes.
Split discovery now runs on a background thread, allowing readers to
start processing data before all splits have been calculated. This is
especially beneficial for tables with non-numeric primary keys (e.g.,
VARCHAR), where each split point requires an expensive ORDER BY + OFFSET
query. Numeric split points are still calculated in a single batch.

Key changes:
- Add ExecutorService for background split discovery thread
- Use context.callAsync() for periodic split assignment checks
- Add splitDiscoveryFinished flag to OceanBaseEnumeratorState (v4)
- Incremental split generation for non-numeric types
- Dedup logic to handle checkpoint restore during discovery
…tion

Avoid MySQL deep pagination problem by using iterative next-chunk-max
queries instead of LIMIT 1 OFFSET N. Each query now scans only chunkSize
rows forward from the last boundary, giving O(chunkSize) per query
regardless of table position. Also update blog SQL examples accordingly.
@yuanoOo yuanoOo force-pushed the feature/jdbc-parallel-source branch from 6001aa8 to cc1095b Compare April 8, 2026 08:03
yuanoOo added 8 commits April 8, 2026 18:04
Read all rows from a split and emit them within a single pollNext()
call. Since Flink injects checkpoint barriers only between pollNext()
calls, this makes each split's processing atomic — no mid-split
checkpoint possible. Source-level exactly-once without lastReadValue.
- Simplify isNumericSplittable() by removing redundant Long/Integer check
- Unify dedupe key format to use splitBoundaryKey() consistently
- Propagate SQL exceptions instead of swallowing them in split queries
- Route addAllSplitsFromPoints() through addSplitToPending() for dedup
- Remove lastReadValue dead code from OceanBaseSplit and serializers
- Add __pk_increment hidden column support for MySQL no-PK tables
- Inject hidden_column_visible hint in all split and reader queries
- Fix blog parameter names (scan.split-size -> split-size)
- Add unit tests for hidden PK hint, E2E and integration tests for no-PK tables
…lit points

- Add volatile splitDiscoveryError field to capture background thread exceptions
  instead of silently swallowing them (modeled after Flink CDC's
  uncaughtSplitterException pattern). Errors are checked in callAsync callback
  and handleSplitRequest, ensuring the job fails instead of completing with
  partial/zero data.

- Use BigDecimal arithmetic for Long/Integer split point calculation to prevent
  overflow when maxVal - minVal exceeds Long.MAX_VALUE (e.g., full BIGINT range).
  This matches Flink CDC's ObjectUtils.minus() approach.

- Add overflow regression tests for full BIGINT range and large range scenarios.
Align with Flink CDC documentation practice by warning users that
the chunk-key-column must be a non-null column, as NULL values are
silently excluded from read results by SQL range comparison semantics.
- Delete OceanBaseSourceBuilder.java which has zero references in the
  codebase
- Clarify Oracle mode requires OceanBase JDBC driver (MySQL driver not
  supported)
Add MiniCluster-based failover tests to verify exactly-once semantics
during TM/JM failures in snapshot reading phase.
yuanoOo added 4 commits April 13, 2026 10:28
- Assert sink not full before triggering failover (job still running)
- Wait for job to recover to RUNNING after failover (proves restart happened)
- Log failover timing with sink progress
…ink-cdc

Replace INSERT INTO sink + TestValuesTableFactory approach with
SELECT + tableResult.collect() iterator pattern. The iterator blocks
naturally on hasNext(), providing a reliable failover window without
timing races. This mirrors the proven pattern from flink-cdc's
MySqlSourceITCase.
Add string PK test coverage for the incremental split generation path
(generateSplitsIncrementally) which was previously untested. Includes:
- IT test in OceanBaseMySQLSourceITCase with 10-row VARCHAR PK table
- Failover tests (NONE/TM/JM) with 30-row VARCHAR PK table
@yuanoOo yuanoOo merged commit bc36003 into oceanbase:main Apr 13, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants