-
|
I'm trying to use Context passing to keep a transaction object across a Mutiny pipeline. It works great even across several I've tried to make some simple examples. I would have hoped that in all examples, the same context (including the I'm guessing that Note regarding alternatives to Mutiny context passing: I cannot use public static void somethingEquivalentToMain() {
contextAvailable_MultiOnItem().onFailure().recoverWithNull().await().indefinitely();
contextMissing_MultiOnCompletion().onFailure().recoverWithNull().await().indefinitely();
contextMissing_MultiOnFailure().onFailure().recoverWithNull().await().indefinitely();
contextAvailable_UniOnItem().onFailure().recoverWithNull().await().indefinitely();
contextMissing_UniOnTermination().onFailure().recoverWithNull().await().indefinitely();
contextAvailable_UniOnFailure().onFailure().recoverWithNull().await().indefinitely();
}
public Uni<List<String>> contextAvailable_MultiOnItem() {
log("\ncontextAvailable_MultiOnItem");
return beginTransaction()
// get items from db
.onItem().transformToMulti(ignore -> Multi.createFrom().items("a", "b", "c", "d"))
.invoke(item -> log("Item: " + item))
// run some database queries
.collect().asList()
// commit transaction if there was no error
.toMulti()
.onItem().call(this::commitTransaction)
.toUni()
// some other async step
.call(items -> Uni.createFrom().item(items))
.log();
}
public Uni<List<String>> contextMissing_MultiOnCompletion() {
log("\ncontextMissing_MultiOnCompletion");
return beginTransaction()
// get items from db
.onItem().transformToMulti(ignore -> Multi.createFrom().items("a", "b", "c", "d"))
.invoke(item -> log("Item: " + item))
// run some database queries
.collect().asList()
// commit transaction if there was no error (onCompletion)
.toMulti()
.onCompletion().call(this::commitTransaction)
.toUni()
// some other async step
.call((items) -> Uni.createFrom().item(items))
.log();
}
public Uni<List<String>> contextMissing_MultiOnFailure() {
log("\ncontextMissing_MultiOnFailure");
return beginTransaction()
// get items from db
.onItem().transformToMulti(ignore -> Multi.createFrom().items("a", "b", "c", "d"))
.invoke(item -> log("Item: " + item))
// cause failure
.invoke(item -> {
if (item.equals("b")) {
throw new RuntimeException("fail");
}
})
// run some database queries
.collect().asList()
// commit transaction if there was an error (onFailure)
.toMulti()
.onFailure()
.call(this::commitTransaction)
.toUni()
// some other async step
.call(items -> Uni.createFrom().item(items))
.log();
}
public Uni<List<String>> contextAvailable_UniOnItem() {
log("\ncontextAvailable_UniOnItem");
return beginTransaction()
// get items from db
.onItem().transformToMulti(ignore -> Multi.createFrom().items("a", "b", "c", "d"))
.invoke(item -> log("Item: " + item))
// run some database queries
.collect().asList()
// commit transaction if there was no error
.onItem().call(this::commitTransaction)
// some other async step
.call(items -> Uni.createFrom().item(items))
.log();
}
public Uni<List<String>> contextMissing_UniOnTermination() {
log("\ncontextMissing_UniOnCompletion");
return beginTransaction()
// get items from db
.onItem().transformToMulti(ignore -> Multi.createFrom().items("a", "b", "c", "d"))
.invoke(item -> log("Item: " + item))
// run some database queries
.collect().asList()
// commit transaction in any case
.onTermination().call(this::commitTransaction)
// some other async step
.call(items -> Uni.createFrom().item(items))
.log();
}
public Uni<List<String>> contextAvailable_UniOnFailure() {
log("\ncontextMissing_UniOnFailure");
return beginTransaction()
// get items from db
.onItem().transformToMulti(ignore -> Multi.createFrom().items("a", "b", "c", "d"))
.invoke(item -> log("Item: " + item))
// cause failure
.invoke(item -> {
if (item.equals("b")) {
throw new RuntimeException("fail");
}
})
// run some database queries
.collect().asList()
// rollback if there was an error
.onFailure().call(this::rollbackTransaction)
// some other async step
.call(items -> Uni.createFrom().item(items))
.log();
}
public Uni<Void> beginTransaction() {
return Uni.createFrom().context(ctx -> {
log("context when beginning:" + ctx);
// open transaction (placeholder example)
ctx.put("transaction", new Object());
return Uni.createFrom().voidItem();
});
}
public Uni<Void> commitTransaction() {
return Uni.createFrom().context(ctx -> {
log("context when committing:" + ctx);
// commit transaction
// example code:
return Uni.createFrom().item(() -> ctx.get("transaction")).replaceWithVoid();
// real code:
//return Uni.createFrom().publisher(ctx.get("transaction").commit())
});
}
public Uni<Void> rollbackTransaction() {
return Uni.createFrom().context(ctx -> {
log("context when rolling back:" + ctx);
// rollback transaction
// example code:
return Uni.createFrom().item(() -> ctx.get("transaction")).replaceWithVoid();
// real code:
//return Uni.createFrom().publisher(ctx.get("transaction").rollback())
});
}This is the output: I have had success with the following workaround. However, this requires "hiding" the choice of public <T> Multi<T> commitTransactionCtx(final Multi<T> multi, final Context ctx) {
return multi.onTermination().call(() -> {
log("commitTransactionCtx: " + ctx);
// commit transaction ...
return Uni.createFrom().voidItem();
});
}
public Uni<List<String>> contextAvailable_MultiWithContextOnTermination() {
return beginTransaction()
// get items from db
.onItem().transformToMulti(ignore -> Multi.createFrom().items("a", "b", "c", "d"))
.invoke(item -> log("Item: " + item))
// run some database queries
.collect().asList()
// commit transaction if there was no error (onTermination)
.toMulti()
.withContext(this::commitTransactionCtx)
.toUni()
// some other async step
.call((items) -> Uni.createFrom().item(items))
.log();
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
|
The I think you should try to come up with a unit test even smaller than that to possibly illustrate a faulty context passing, if any. The more atomic, the better. |
Beta Was this translation helpful? Give feedback.
Fixed in #1985