@@ -842,38 +842,38 @@ void testFitStreamNotifyCrossStream() {
842842 @ Test
843843 @ DisplayName ("流程实例异常处理流转逻辑" )
844844 void testExceptionHandleForFitStream () {
845- // AtomicReference<TestData> output = new AtomicReference<>();
846- // // 单节点错误处理
847- // Flows.<TestData>create(repo, messenger, locks).just(data -> data.first(100)).just(data -> {
848- // if (data.first < 120) {
849- // throw new IllegalArgumentException();
850- // } else {
851- // data.second(100);
852- // }
853- // }).error((error, retryable, contexts) -> {
854- // contexts.get(0).getData().first(120);
855- // contexts.forEach(context -> context.setStatus(READY));
856- // retryable.retry(contexts);
857- // }).close(callback -> output.set(callback.get().getData())).offer(new TestData());
858- // FlowsTestUtil.waitUntil(() -> output.get() != null, 2000);
859- // assertEquals(120, output.get().first);
860- // assertEquals(100, output.get().second);
861- //
862- // // 整体错误处理
863- // Flows.<TestData>create(repo, messenger, locks).just(data -> data.first(100)).just(data -> {
864- // if (data.first < 120) {
865- // throw new IllegalArgumentException();
866- // } else {
867- // data.second(100);
868- // }
869- // }).close(callback -> output.set(callback.get().getData()), (exception, retryable, contexts) -> {
870- // ObjectUtils.<TestData>cast(contexts.get(0).getData()).first(120);
871- // contexts.forEach(context -> context.setStatus(READY));
872- // retryable.retry(contexts);
873- // }).offer(new TestData());
874- // FlowsTestUtil.waitFortyMillis(Collections::emptyList);
875- // assertEquals(120, output.get().first);
876- // assertEquals(100, output.get().second);
845+ AtomicReference <TestData > output = new AtomicReference <>();
846+ // 单节点错误处理
847+ Flows .<TestData >create (repo , messenger , locks ).just (data -> data .first (100 )).just (data -> {
848+ if (data .first < 120 ) {
849+ throw new IllegalArgumentException ();
850+ } else {
851+ data .second (100 );
852+ }
853+ }).error ((error , retryable , contexts ) -> {
854+ contexts .get (0 ).getData ().first (120 );
855+ contexts .forEach (context -> context .setStatus (READY ));
856+ retryable .retry (error , contexts );
857+ }).close (callback -> output .set (callback .get ().getData ())).offer (new TestData ());
858+ FlowsTestUtil .waitUntil (() -> output .get () != null , 2000 );
859+ assertEquals (120 , output .get ().first );
860+ assertEquals (100 , output .get ().second );
861+
862+ // 整体错误处理
863+ Flows .<TestData >create (repo , messenger , locks ).just (data -> data .first (100 )).just (data -> {
864+ if (data .first < 120 ) {
865+ throw new IllegalArgumentException ();
866+ } else {
867+ data .second (100 );
868+ }
869+ }).close (callback -> output .set (callback .get ().getData ()), (exception , retryable , contexts ) -> {
870+ ObjectUtils .<TestData >cast (contexts .get (0 ).getData ()).first (120 );
871+ contexts .forEach (context -> context .setStatus (READY ));
872+ retryable .retry (exception , contexts );
873+ }).offer (new TestData ());
874+ FlowsTestUtil .waitFortyMillis (Collections ::emptyList );
875+ assertEquals (120 , output .get ().first );
876+ assertEquals (100 , output .get ().second );
877877 }
878878
879879 @ Test
0 commit comments