|
17 | 17 |
|
18 | 18 | import java.util.ArrayList; |
19 | 19 | import java.util.List; |
| 20 | +import java.util.Objects; |
20 | 21 | import java.util.concurrent.CopyOnWriteArrayList; |
21 | 22 |
|
| 23 | +import javax.sql.DataSource; |
| 24 | + |
22 | 25 | import org.junit.jupiter.api.Assertions; |
23 | 26 | import org.junit.jupiter.api.Test; |
24 | 27 |
|
|
33 | 36 | import org.springframework.batch.core.step.StepExecution; |
34 | 37 | import org.springframework.batch.core.step.builder.ChunkOrientedStepBuilder; |
35 | 38 | import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; |
| 39 | +import org.springframework.batch.infrastructure.item.ItemWriter; |
36 | 40 | import org.springframework.batch.infrastructure.item.support.ListItemReader; |
37 | 41 | import org.springframework.context.ApplicationContext; |
38 | 42 | import org.springframework.context.annotation.AnnotationConfigApplicationContext; |
39 | 43 | import org.springframework.context.annotation.Bean; |
40 | 44 | import org.springframework.context.annotation.Configuration; |
41 | 45 | import org.springframework.core.task.SimpleAsyncTaskExecutor; |
42 | 46 | import org.springframework.jdbc.core.JdbcTemplate; |
| 47 | +import org.springframework.jdbc.datasource.ConnectionHolder; |
43 | 48 | import org.springframework.jdbc.support.JdbcTransactionManager; |
44 | 49 | import org.springframework.test.jdbc.JdbcTestUtils; |
| 50 | +import org.springframework.transaction.support.TransactionSynchronization; |
| 51 | +import org.springframework.transaction.support.TransactionSynchronizationManager; |
45 | 52 |
|
46 | 53 | /** |
47 | 54 | * Tests for scan mode functionality in {@link ChunkOrientedStep}. |
48 | 55 | * |
49 | 56 | * @author KMGeon |
| 57 | + * @author MinChul Son |
50 | 58 | */ |
51 | 59 | class ChunkOrientedStepScanModeIntegrationTests { |
52 | 60 |
|
@@ -282,6 +290,265 @@ public Step step(JobRepository jobRepository, JdbcTransactionManager transaction |
282 | 290 |
|
283 | 291 | } |
284 | 292 |
|
| 293 | + // Issue https://github.com/spring-projects/spring-batch/issues/5377 |
| 294 | + @Test |
| 295 | + void testSkipPolicyWithJpaLikeRollbackOnlyBehaviorInSequentialMode() throws Exception { |
| 296 | + // given |
| 297 | + ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class, |
| 298 | + JpaLikeRollbackOnlySequentialStepConfiguration.class); |
| 299 | + JobOperator jobOperator = context.getBean(JobOperator.class); |
| 300 | + Job job = context.getBean(Job.class); |
| 301 | + JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class); |
| 302 | + |
| 303 | + // when |
| 304 | + JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); |
| 305 | + JobExecution jobExecution = jobOperator.start(job, jobParameters); |
| 306 | + |
| 307 | + // then |
| 308 | + // Without the fix, item "3" would fail with "Transaction silently rolled back |
| 309 | + // because it has been marked as rollback-only" (JPA-like behavior) causing the |
| 310 | + // step to fail with a NonSkippableWriteException. With the fix, each scan item |
| 311 | + // runs in its own transaction so item "3" succeeds. |
| 312 | + Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode()); |
| 313 | + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); |
| 314 | + Assertions.assertEquals(3, stepExecution.getReadCount()); |
| 315 | + Assertions.assertEquals(2, stepExecution.getWriteCount()); |
| 316 | + Assertions.assertEquals(1, stepExecution.getWriteSkipCount()); |
| 317 | + Assertions.assertEquals(1, |
| 318 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '1'", Integer.class)); |
| 319 | + Assertions.assertEquals(1, |
| 320 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '3'", Integer.class)); |
| 321 | + Assertions.assertEquals(2, JdbcTestUtils.countRowsInTable(jdbcTemplate, "delivery")); |
| 322 | + } |
| 323 | + |
| 324 | + // Issue https://github.com/spring-projects/spring-batch/issues/5377 |
| 325 | + @Test |
| 326 | + void testSkipPolicyWithJpaLikeRollbackOnlyBehaviorInConcurrentMode() throws Exception { |
| 327 | + // given |
| 328 | + ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class, |
| 329 | + JpaLikeRollbackOnlyConcurrentStepConfiguration.class); |
| 330 | + JobOperator jobOperator = context.getBean(JobOperator.class); |
| 331 | + Job job = context.getBean(Job.class); |
| 332 | + JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class); |
| 333 | + |
| 334 | + // when |
| 335 | + JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); |
| 336 | + JobExecution jobExecution = jobOperator.start(job, jobParameters); |
| 337 | + |
| 338 | + // then |
| 339 | + Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode()); |
| 340 | + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); |
| 341 | + Assertions.assertEquals(3, stepExecution.getReadCount()); |
| 342 | + Assertions.assertEquals(2, stepExecution.getWriteCount()); |
| 343 | + Assertions.assertEquals(1, stepExecution.getWriteSkipCount()); |
| 344 | + Assertions.assertEquals(1, |
| 345 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '1'", Integer.class)); |
| 346 | + Assertions.assertEquals(1, |
| 347 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '3'", Integer.class)); |
| 348 | + Assertions.assertEquals(2, JdbcTestUtils.countRowsInTable(jdbcTemplate, "delivery")); |
| 349 | + } |
| 350 | + |
| 351 | + /** |
| 352 | + * Simulates JPA-like rollback-only behavior: when a write fails, the transaction is |
| 353 | + * marked as rollback-only (as JPA/Hibernate does after a flush failure). Subsequent |
| 354 | + * writes in the same transaction would then throw "Transaction silently rolled back |
| 355 | + * because it has been marked as rollback-only". |
| 356 | + */ |
| 357 | + private static ItemWriter<String> jpaLikeWriter(JdbcTemplate jdbcTemplate) { |
| 358 | + ThreadLocal<Boolean> rollbackOnly = ThreadLocal.withInitial(() -> false); |
| 359 | + return chunk -> { |
| 360 | + // Simulate JPA behavior: if the current "session" is rollback-only, |
| 361 | + // throw as JPA/Hibernate would before even attempting the write |
| 362 | + if (rollbackOnly.get()) { |
| 363 | + throw new RuntimeException( |
| 364 | + "Transaction silently rolled back because it has been marked as rollback-only"); |
| 365 | + } |
| 366 | + for (String item : chunk) { |
| 367 | + if ("2".equals(item)) { |
| 368 | + // Simulate JPA flush failure: mark session rollback-only and register |
| 369 | + // cleanup synchronization (reset after transaction completion) |
| 370 | + rollbackOnly.set(true); |
| 371 | + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { |
| 372 | + @Override |
| 373 | + public void afterCompletion(int status) { |
| 374 | + rollbackOnly.set(false); |
| 375 | + } |
| 376 | + }); |
| 377 | + throw new RuntimeException("Simulated JPA constraint violation for item: " + item); |
| 378 | + } |
| 379 | + jdbcTemplate.update("INSERT INTO delivery (item_number) VALUES (?)", item); |
| 380 | + } |
| 381 | + }; |
| 382 | + } |
| 383 | + |
| 384 | + /** |
| 385 | + * Simulates JPA-like global rollback-only behavior by marking the underlying |
| 386 | + * {@link ConnectionHolder} as rollback-only when an item fails. This causes |
| 387 | + * {@link org.springframework.transaction.support.AbstractPlatformTransactionManager} |
| 388 | + * to detect a globally-rolled-back transaction during commit and, without the fix in |
| 389 | + * {@code ChunkOrientedStep.doExecute()}, throw an |
| 390 | + * {@link org.springframework.transaction.UnexpectedRollbackException}. The fix |
| 391 | + * explicitly calls {@code transactionStatus.setRollbackOnly()} before returning from |
| 392 | + * the lambda so that the local rollback-only flag is set and the transaction manager |
| 393 | + * performs a clean rollback instead. |
| 394 | + */ |
| 395 | + private static ItemWriter<String> globalRollbackOnlyWriter(JdbcTemplate jdbcTemplate) { |
| 396 | + DataSource dataSource = Objects.requireNonNull(jdbcTemplate.getDataSource()); |
| 397 | + return chunk -> { |
| 398 | + for (String item : chunk) { |
| 399 | + if ("2".equals(item)) { |
| 400 | + // Simulate JPA/Hibernate flush failure: mark the underlying |
| 401 | + // connection |
| 402 | + // holder as rollback-only (global), exactly as Hibernate does via |
| 403 | + // JdbcResourceLocalTransactionCoordinatorImpl after a flush error. |
| 404 | + ConnectionHolder holder = (ConnectionHolder) TransactionSynchronizationManager |
| 405 | + .getResource(dataSource); |
| 406 | + if (holder != null) { |
| 407 | + holder.setRollbackOnly(); |
| 408 | + } |
| 409 | + throw new RuntimeException("Simulated JPA flush failure for item: " + item); |
| 410 | + } |
| 411 | + jdbcTemplate.update("INSERT INTO delivery (item_number) VALUES (?)", item); |
| 412 | + } |
| 413 | + }; |
| 414 | + } |
| 415 | + |
| 416 | + // Issue https://github.com/spring-projects/spring-batch/issues/5377 |
| 417 | + @Test |
| 418 | + void testUnexpectedRollbackExceptionPreventedInSequentialScanMode() throws Exception { |
| 419 | + // given |
| 420 | + ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class, |
| 421 | + GlobalRollbackOnlySequentialStepConfiguration.class); |
| 422 | + JobOperator jobOperator = context.getBean(JobOperator.class); |
| 423 | + Job job = context.getBean(Job.class); |
| 424 | + JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class); |
| 425 | + |
| 426 | + // when |
| 427 | + JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); |
| 428 | + JobExecution jobExecution = jobOperator.start(job, jobParameters); |
| 429 | + |
| 430 | + // then |
| 431 | + // Without the fix in doExecute(), the scan transaction for item "2" is globally |
| 432 | + // rollback-only (ConnectionHolder.setRollbackOnly()) but not locally |
| 433 | + // rollback-only. |
| 434 | + // AbstractPlatformTransactionManager.commit() would then call |
| 435 | + // processRollback(defStatus, unexpected=true) and throw |
| 436 | + // UnexpectedRollbackException, |
| 437 | + // causing the step to fail. With the fix, doExecute() calls |
| 438 | + // transactionStatus.setRollbackOnly() before returning from the lambda so that |
| 439 | + // the transaction manager performs a clean rollback and the step completes |
| 440 | + // normally. |
| 441 | + Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode()); |
| 442 | + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); |
| 443 | + Assertions.assertEquals(3, stepExecution.getReadCount()); |
| 444 | + Assertions.assertEquals(2, stepExecution.getWriteCount()); |
| 445 | + Assertions.assertEquals(1, stepExecution.getWriteSkipCount()); |
| 446 | + Assertions.assertEquals(2, stepExecution.getRollbackCount()); |
| 447 | + Assertions.assertEquals(1, |
| 448 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '1'", Integer.class)); |
| 449 | + Assertions.assertEquals(1, |
| 450 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '3'", Integer.class)); |
| 451 | + Assertions.assertEquals(2, JdbcTestUtils.countRowsInTable(jdbcTemplate, "delivery")); |
| 452 | + } |
| 453 | + |
| 454 | + // Issue https://github.com/spring-projects/spring-batch/issues/5377 |
| 455 | + @Test |
| 456 | + void testUnexpectedRollbackExceptionPreventedInConcurrentScanMode() throws Exception { |
| 457 | + // given |
| 458 | + ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class, |
| 459 | + GlobalRollbackOnlyConcurrentStepConfiguration.class); |
| 460 | + JobOperator jobOperator = context.getBean(JobOperator.class); |
| 461 | + Job job = context.getBean(Job.class); |
| 462 | + JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class); |
| 463 | + |
| 464 | + // when |
| 465 | + JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); |
| 466 | + JobExecution jobExecution = jobOperator.start(job, jobParameters); |
| 467 | + |
| 468 | + // then |
| 469 | + Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode()); |
| 470 | + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); |
| 471 | + Assertions.assertEquals(3, stepExecution.getReadCount()); |
| 472 | + Assertions.assertEquals(2, stepExecution.getWriteCount()); |
| 473 | + Assertions.assertEquals(1, stepExecution.getWriteSkipCount()); |
| 474 | + Assertions.assertEquals(2, stepExecution.getRollbackCount()); |
| 475 | + Assertions.assertEquals(1, |
| 476 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '1'", Integer.class)); |
| 477 | + Assertions.assertEquals(1, |
| 478 | + jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '3'", Integer.class)); |
| 479 | + Assertions.assertEquals(2, JdbcTestUtils.countRowsInTable(jdbcTemplate, "delivery")); |
| 480 | + } |
| 481 | + |
| 482 | + @Configuration |
| 483 | + static class GlobalRollbackOnlySequentialStepConfiguration { |
| 484 | + |
| 485 | + @Bean |
| 486 | + public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager, |
| 487 | + JdbcTemplate jdbcTemplate) { |
| 488 | + List<String> items = List.of("1", "2", "3"); |
| 489 | + return new ChunkOrientedStepBuilder<String, String>(jobRepository, 3).reader(new ListItemReader<>(items)) |
| 490 | + .writer(globalRollbackOnlyWriter(jdbcTemplate)) |
| 491 | + .transactionManager(transactionManager) |
| 492 | + .faultTolerant() |
| 493 | + .skipPolicy(new AlwaysSkipItemSkipPolicy()) |
| 494 | + .build(); |
| 495 | + } |
| 496 | + |
| 497 | + } |
| 498 | + |
| 499 | + @Configuration |
| 500 | + static class GlobalRollbackOnlyConcurrentStepConfiguration { |
| 501 | + |
| 502 | + @Bean |
| 503 | + public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager, |
| 504 | + JdbcTemplate jdbcTemplate) { |
| 505 | + List<String> items = List.of("1", "2", "3"); |
| 506 | + return new ChunkOrientedStepBuilder<String, String>(jobRepository, 3).reader(new ListItemReader<>(items)) |
| 507 | + .writer(globalRollbackOnlyWriter(jdbcTemplate)) |
| 508 | + .transactionManager(transactionManager) |
| 509 | + .taskExecutor(new SimpleAsyncTaskExecutor()) |
| 510 | + .faultTolerant() |
| 511 | + .skipPolicy(new AlwaysSkipItemSkipPolicy()) |
| 512 | + .build(); |
| 513 | + } |
| 514 | + |
| 515 | + } |
| 516 | + |
| 517 | + @Configuration |
| 518 | + static class JpaLikeRollbackOnlySequentialStepConfiguration { |
| 519 | + |
| 520 | + @Bean |
| 521 | + public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager, |
| 522 | + JdbcTemplate jdbcTemplate) { |
| 523 | + List<String> items = List.of("1", "2", "3"); |
| 524 | + return new ChunkOrientedStepBuilder<String, String>(jobRepository, 3).reader(new ListItemReader<>(items)) |
| 525 | + .writer(jpaLikeWriter(jdbcTemplate)) |
| 526 | + .transactionManager(transactionManager) |
| 527 | + .faultTolerant() |
| 528 | + .skipPolicy(new AlwaysSkipItemSkipPolicy()) |
| 529 | + .build(); |
| 530 | + } |
| 531 | + |
| 532 | + } |
| 533 | + |
| 534 | + @Configuration |
| 535 | + static class JpaLikeRollbackOnlyConcurrentStepConfiguration { |
| 536 | + |
| 537 | + @Bean |
| 538 | + public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager, |
| 539 | + JdbcTemplate jdbcTemplate) { |
| 540 | + List<String> items = List.of("1", "2", "3"); |
| 541 | + return new ChunkOrientedStepBuilder<String, String>(jobRepository, 3).reader(new ListItemReader<>(items)) |
| 542 | + .writer(jpaLikeWriter(jdbcTemplate)) |
| 543 | + .transactionManager(transactionManager) |
| 544 | + .taskExecutor(new SimpleAsyncTaskExecutor()) |
| 545 | + .faultTolerant() |
| 546 | + .skipPolicy(new AlwaysSkipItemSkipPolicy()) |
| 547 | + .build(); |
| 548 | + } |
| 549 | + |
| 550 | + } |
| 551 | + |
285 | 552 | @Configuration |
286 | 553 | static class TrackingWriterStepConfiguration { |
287 | 554 |
|
|
0 commit comments