|
9 | 9 | import com.uber.m3.tally.Scope; |
10 | 10 | import com.uber.m3.tally.Stopwatch; |
11 | 11 | import com.uber.m3.util.ImmutableMap; |
| 12 | +import io.grpc.StatusRuntimeException; |
12 | 13 | import io.temporal.api.common.v1.WorkflowExecution; |
| 14 | +import io.temporal.api.enums.v1.QueryResultType; |
13 | 15 | import io.temporal.api.enums.v1.TaskQueueKind; |
14 | 16 | import io.temporal.api.enums.v1.WorkflowTaskFailedCause; |
| 17 | +import io.temporal.api.failure.v1.Failure; |
15 | 18 | import io.temporal.api.workflowservice.v1.*; |
| 19 | +import io.temporal.failure.ApplicationFailure; |
16 | 20 | import io.temporal.internal.logging.LoggerTag; |
| 21 | +import io.temporal.internal.retryer.GrpcMessageTooLargeException; |
17 | 22 | import io.temporal.internal.retryer.GrpcRetryer; |
| 23 | +import io.temporal.payload.context.WorkflowSerializationContext; |
18 | 24 | import io.temporal.serviceclient.MetricsTag; |
19 | 25 | import io.temporal.serviceclient.RpcRetryOptions; |
20 | 26 | import io.temporal.serviceclient.WorkflowServiceStubs; |
@@ -394,73 +400,125 @@ public void handle(WorkflowTask task) throws Exception { |
394 | 400 | PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get(); |
395 | 401 | nextWFTResponse = Optional.empty(); |
396 | 402 | WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope); |
| 403 | + WorkflowTaskFailedCause taskFailedCause = null; |
397 | 404 | try { |
398 | 405 | RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted(); |
399 | 406 | RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed(); |
400 | 407 | RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted(); |
401 | 408 |
|
402 | | - if (taskCompleted != null) { |
403 | | - RespondWorkflowTaskCompletedRequest.Builder requestBuilder = |
404 | | - taskCompleted.toBuilder(); |
405 | | - try (EagerActivitySlotsReservation activitySlotsReservation = |
406 | | - new EagerActivitySlotsReservation(eagerActivityDispatcher)) { |
407 | | - activitySlotsReservation.applyToRequest(requestBuilder); |
408 | | - RespondWorkflowTaskCompletedResponse response = |
409 | | - sendTaskCompleted( |
410 | | - currentTask.getTaskToken(), |
411 | | - requestBuilder, |
412 | | - result.getRequestRetryOptions(), |
413 | | - workflowTypeScope); |
414 | | - // If we were processing a speculative WFT the server may instruct us that the task |
415 | | - // was dropped by resting out event ID. |
416 | | - long resetEventId = response.getResetHistoryEventId(); |
417 | | - if (resetEventId != 0) { |
418 | | - result.getResetEventIdHandle().apply(resetEventId); |
| 409 | + if (queryCompleted != null) { |
| 410 | + try { |
| 411 | + sendDirectQueryCompletedResponse( |
| 412 | + currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope); |
| 413 | + } catch (StatusRuntimeException e) { |
| 414 | + GrpcMessageTooLargeException tooLargeException = |
| 415 | + GrpcMessageTooLargeException.tryWrap(e); |
| 416 | + if (tooLargeException == null) { |
| 417 | + throw e; |
419 | 418 | } |
420 | | - nextWFTResponse = |
421 | | - response.hasWorkflowTask() |
422 | | - ? Optional.of(response.getWorkflowTask()) |
423 | | - : Optional.empty(); |
424 | | - // TODO we don't have to do this under the runId lock |
425 | | - activitySlotsReservation.handleResponse(response); |
| 419 | + Failure failure = |
| 420 | + grpcMessageTooLargeFailure( |
| 421 | + workflowExecution.getWorkflowId(), |
| 422 | + tooLargeException, |
| 423 | + "Failed to send query response"); |
| 424 | + RespondQueryTaskCompletedRequest.Builder queryFailedBuilder = |
| 425 | + RespondQueryTaskCompletedRequest.newBuilder() |
| 426 | + .setTaskToken(currentTask.getTaskToken()) |
| 427 | + .setNamespace(namespace) |
| 428 | + .setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED) |
| 429 | + .setErrorMessage(failure.getMessage()) |
| 430 | + .setFailure(failure); |
| 431 | + sendDirectQueryCompletedResponse( |
| 432 | + currentTask.getTaskToken(), queryFailedBuilder, workflowTypeScope); |
| 433 | + } |
| 434 | + } else { |
| 435 | + try { |
| 436 | + if (taskCompleted != null) { |
| 437 | + RespondWorkflowTaskCompletedRequest.Builder requestBuilder = |
| 438 | + taskCompleted.toBuilder(); |
| 439 | + try (EagerActivitySlotsReservation activitySlotsReservation = |
| 440 | + new EagerActivitySlotsReservation(eagerActivityDispatcher)) { |
| 441 | + activitySlotsReservation.applyToRequest(requestBuilder); |
| 442 | + RespondWorkflowTaskCompletedResponse response = |
| 443 | + sendTaskCompleted( |
| 444 | + currentTask.getTaskToken(), |
| 445 | + requestBuilder, |
| 446 | + result.getRequestRetryOptions(), |
| 447 | + workflowTypeScope); |
| 448 | + // If we were processing a speculative WFT the server may instruct us that the |
| 449 | + // task was dropped by resting out event ID. |
| 450 | + long resetEventId = response.getResetHistoryEventId(); |
| 451 | + if (resetEventId != 0) { |
| 452 | + result.getResetEventIdHandle().apply(resetEventId); |
| 453 | + } |
| 454 | + nextWFTResponse = |
| 455 | + response.hasWorkflowTask() |
| 456 | + ? Optional.of(response.getWorkflowTask()) |
| 457 | + : Optional.empty(); |
| 458 | + // TODO we don't have to do this under the runId lock |
| 459 | + activitySlotsReservation.handleResponse(response); |
| 460 | + } |
| 461 | + } else if (taskFailed != null) { |
| 462 | + taskFailedCause = taskFailed.getCause(); |
| 463 | + sendTaskFailed( |
| 464 | + currentTask.getTaskToken(), |
| 465 | + taskFailed.toBuilder(), |
| 466 | + result.getRequestRetryOptions(), |
| 467 | + workflowTypeScope); |
| 468 | + } |
| 469 | + } catch (GrpcMessageTooLargeException e) { |
| 470 | + // Only fail workflow task on the first attempt, subsequent failures of the same |
| 471 | + // workflow task should timeout. |
| 472 | + if (currentTask.getAttempt() > 1) { |
| 473 | + throw e; |
| 474 | + } |
| 475 | + |
| 476 | + releaseReason = SlotReleaseReason.error(e); |
| 477 | + handleReportingFailure( |
| 478 | + e, currentTask, result, workflowExecution, workflowTypeScope); |
| 479 | + // setting/replacing failure cause for metrics purposes |
| 480 | + taskFailedCause = |
| 481 | + WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE; |
| 482 | + |
| 483 | + String messagePrefix = |
| 484 | + String.format( |
| 485 | + "Failed to send workflow task %s", |
| 486 | + taskFailed == null ? "completion" : "failure"); |
| 487 | + RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder = |
| 488 | + RespondWorkflowTaskFailedRequest.newBuilder() |
| 489 | + .setFailure( |
| 490 | + grpcMessageTooLargeFailure( |
| 491 | + workflowExecution.getWorkflowId(), e, messagePrefix)) |
| 492 | + .setCause( |
| 493 | + WorkflowTaskFailedCause |
| 494 | + .WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE); |
| 495 | + sendTaskFailed( |
| 496 | + currentTask.getTaskToken(), |
| 497 | + taskFailedBuilder, |
| 498 | + result.getRequestRetryOptions(), |
| 499 | + workflowTypeScope); |
426 | 500 | } |
427 | | - } else if (taskFailed != null) { |
428 | | - sendTaskFailed( |
429 | | - currentTask.getTaskToken(), |
430 | | - taskFailed.toBuilder(), |
431 | | - result.getRequestRetryOptions(), |
432 | | - workflowTypeScope); |
433 | | - } else if (queryCompleted != null) { |
434 | | - sendDirectQueryCompletedResponse( |
435 | | - currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope); |
436 | 501 | } |
437 | 502 | } catch (Exception e) { |
438 | | - logExceptionDuringResultReporting(e, currentTask, result); |
439 | 503 | releaseReason = SlotReleaseReason.error(e); |
440 | | - // if we failed to report the workflow task completion back to the server, |
441 | | - // our cached version of the workflow may be more advanced than the server is aware of. |
442 | | - // We should discard this execution and perform a clean replay based on what server |
443 | | - // knows next time. |
444 | | - cache.invalidate( |
445 | | - workflowExecution, workflowTypeScope, "Failed result reporting to the server", e); |
| 504 | + handleReportingFailure(e, currentTask, result, workflowExecution, workflowTypeScope); |
446 | 505 | throw e; |
447 | 506 | } |
448 | 507 |
|
449 | | - if (result.getTaskFailed() != null) { |
450 | | - Scope workflowTaskFailureScope = workflowTypeScope; |
451 | | - if (result |
452 | | - .getTaskFailed() |
453 | | - .getCause() |
454 | | - .equals( |
455 | | - WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) { |
456 | | - workflowTaskFailureScope = |
457 | | - workflowTaskFailureScope.tagged( |
458 | | - ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError")); |
459 | | - } else { |
460 | | - workflowTaskFailureScope = |
461 | | - workflowTaskFailureScope.tagged( |
462 | | - ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError")); |
| 508 | + if (taskFailedCause != null) { |
| 509 | + String taskFailureType; |
| 510 | + switch (taskFailedCause) { |
| 511 | + case WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR: |
| 512 | + taskFailureType = "NonDeterminismError"; |
| 513 | + break; |
| 514 | + case WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE: |
| 515 | + taskFailureType = "GrpcMessageTooLarge"; |
| 516 | + break; |
| 517 | + default: |
| 518 | + taskFailureType = "WorkflowError"; |
463 | 519 | } |
| 520 | + Scope workflowTaskFailureScope = |
| 521 | + workflowTypeScope.tagged(ImmutableMap.of(TASK_FAILURE_TYPE, taskFailureType)); |
464 | 522 | // we don't trigger the counter in case of the legacy query |
465 | 523 | // (which never has taskFailed set) |
466 | 524 | workflowTaskFailureScope |
@@ -617,5 +675,34 @@ private void logExceptionDuringResultReporting( |
617 | 675 | e); |
618 | 676 | } |
619 | 677 | } |
| 678 | + |
| 679 | + private void handleReportingFailure( |
| 680 | + Exception e, |
| 681 | + PollWorkflowTaskQueueResponse currentTask, |
| 682 | + WorkflowTaskHandler.Result result, |
| 683 | + WorkflowExecution workflowExecution, |
| 684 | + Scope workflowTypeScope) { |
| 685 | + logExceptionDuringResultReporting(e, currentTask, result); |
| 686 | + // if we failed to report the workflow task completion back to the server, |
| 687 | + // our cached version of the workflow may be more advanced than the server is aware of. |
| 688 | + // We should discard this execution and perform a clean replay based on what server |
| 689 | + // knows next time. |
| 690 | + cache.invalidate( |
| 691 | + workflowExecution, workflowTypeScope, "Failed result reporting to the server", e); |
| 692 | + } |
| 693 | + |
| 694 | + private Failure grpcMessageTooLargeFailure( |
| 695 | + String workflowId, GrpcMessageTooLargeException e, String messagePrefix) { |
| 696 | + ApplicationFailure applicationFailure = |
| 697 | + ApplicationFailure.newBuilder() |
| 698 | + .setMessage(messagePrefix + ": " + e.getMessage()) |
| 699 | + .setType(GrpcMessageTooLargeException.class.getSimpleName()) |
| 700 | + .build(); |
| 701 | + applicationFailure.setStackTrace(new StackTraceElement[0]); // don't serialize stack trace |
| 702 | + return options |
| 703 | + .getDataConverter() |
| 704 | + .withContext(new WorkflowSerializationContext(namespace, workflowId)) |
| 705 | + .exceptionToFailure(applicationFailure); |
| 706 | + } |
620 | 707 | } |
621 | 708 | } |
0 commit comments