Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
## Unreleased
* Add support for tags when creating new orchestrations ([#231](https://github.com/microsoft/durabletask-java/pull/230))
* Add versioning support for client and worker ([#224](https://github.com/microsoft/durabletask-java/pull/224))

## v1.5.2
* Add distributed tracing support for Azure Functions client scenarios ([#211](https://github.com/microsoft/durabletask-java/pull/211))


## v1.5.1
* Improve logging for unexpected connection failures in DurableTaskGrpcWorker ([#216](https://github.com/microsoft/durabletask-java/pull/216/files))
* Add User-Agent Header to gRPC Metadata ([#213](https://github.com/microsoft/durabletask-java/pull/213))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
private final DataConverter dataConverter;
private final ManagedChannel managedSidecarChannel;
private final TaskHubSidecarServiceBlockingStub sidecarClient;
private final String defaultVersion;

DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.defaultVersion = builder.defaultVersion;

Channel sidecarGrpcChannel;
if (builder.channel != null) {
Expand Down Expand Up @@ -100,6 +102,8 @@ public String scheduleNewOrchestrationInstance(
String version = options.getVersion();
if (version != null) {
builder.setVersion(StringValue.of(version));
} else if (this.defaultVersion != null) {
builder.setVersion(StringValue.of(this.defaultVersion));
}

Object input = options.getInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public final class DurableTaskGrpcClientBuilder {
DataConverter dataConverter;
int port;
Channel channel;
String defaultVersion;

/**
* Sets the {@link DataConverter} to use for converting serializable data payloads.
Expand Down Expand Up @@ -53,6 +54,17 @@ public DurableTaskGrpcClientBuilder port(int port) {
return this;
}

/**
* Sets the default version that orchestrations will be created with.
*
* @param defaultVersion the default version to create orchestrations with
* @return this builder object
*/
public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) {
this.defaultVersion = defaultVersion;
return this;
}

/**
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskClient} object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
import com.microsoft.durabletask.util.VersionUtils;

import io.grpc.*;

Expand All @@ -16,6 +17,7 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

/**
* Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
Expand All @@ -31,6 +33,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final ManagedChannel managedSidecarChannel;
private final DataConverter dataConverter;
private final Duration maximumTimerInterval;
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;

private final TaskHubSidecarServiceBlockingStub sidecarClient;

Expand Down Expand Up @@ -61,6 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.versioningOptions = builder.versioningOptions;
}

/**
Expand Down Expand Up @@ -113,7 +117,8 @@ public void startAndBlock() {
this.orchestrationFactories,
this.dataConverter,
this.maximumTimerInterval,
logger);
logger,
this.versioningOptions);
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
this.activityFactories,
this.dataConverter,
Expand All @@ -130,20 +135,87 @@ public void startAndBlock() {
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();

// If versioning is set, process it first to see if the orchestration should be executed.
boolean versioningFailed = false;
if (versioningOptions != null && versioningOptions.getVersion() != null) {
String version = Stream.concat(orchestratorRequest.getPastEventsList().stream(), orchestratorRequest.getNewEventsList().stream())
.filter(event -> event.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONSTARTED)
.map(event -> event.getExecutionStarted().getVersion().getValue())
.findFirst()
.orElse(null);

if (version != null) {
int comparison = VersionUtils.compareVersions(version, versioningOptions.getVersion());

switch (versioningOptions.getMatchStrategy()) {
Comment thread
halspang marked this conversation as resolved.
Outdated
case NONE:
break;
case STRICT:
if (comparison != 0) {
logger.log(Level.WARNING, String.format("The orchestration version '%s' does not match the worker version '%s'.", version, versioningOptions.getVersion()));
versioningFailed = true;
}
break;
case CURRENTOROLDER:
if (comparison > 0) {
logger.log(Level.WARNING, String.format("The orchestration version '%s' is greater than the worker version '%s'.", version, versioningOptions.getVersion()));
versioningFailed = true;
}
break;
default:
logger.log(Level.SEVERE, String.format("Unknown version match strategy '%s'.", versioningOptions.getMatchStrategy()));
versioningFailed = true;
break;
}
}
}

// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
// TODO: Error handling
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
if (!versioningFailed) {
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.setCompletionToken(workItem.getCompletionToken())
.build();
OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.setCompletionToken(workItem.getCompletionToken())
.build();

this.sidecarClient.completeOrchestratorTask(response);
} else {
switch(versioningOptions.getFailureStrategy()) {
case FAIL:
CompleteOrchestrationAction completeAction = CompleteOrchestrationAction.newBuilder()
.setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED)
.setFailureDetails(TaskFailureDetails.newBuilder()
.setErrorType("VersionMismatch")
.setErrorMessage("The orchestration version does not match the worker version.")
.build())
.build();

OrchestratorAction action = OrchestratorAction.newBuilder()
.setCompleteOrchestration(completeAction)
.build();

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.setCompletionToken(workItem.getCompletionToken())
.addActions(action)
.build();

this.sidecarClient.completeOrchestratorTask(response);
this.sidecarClient.completeOrchestratorTask(response);
break;
// Reject and default share the same behavior as it does not change the orchestration to a terminal state.
case REJECT:
default:
this.sidecarClient.abandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest.newBuilder()
.setCompletionToken(workItem.getCompletionToken())
.build());
}
}
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
ActivityRequest activityRequest = workItem.getActivityRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class DurableTaskGrpcWorkerBuilder {
Channel channel;
DataConverter dataConverter;
Duration maximumTimerInterval;
DurableTaskGrpcWorkerVersioningOptions versioningOptions;

/**
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
Expand Down Expand Up @@ -113,6 +114,17 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn
return this;
}

/**
* Sets the versioning options for this worker.
*
* @param options the versioning options to use
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersioningOptions options) {
this.versioningOptions = options;
return this;
}

/**
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskGrpcWorker} object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask;

/**
* Options for configuring versioning behavior in the DurableTaskGrpcWorker.
*/
public final class DurableTaskGrpcWorkerVersioningOptions {

/**
* Strategy for matching versions.
* NONE: No version matching is performed.
* STRICT: The version must match exactly.
* CURRENTOROLDER: The version must be the current version or older.
*/
public enum VersionMatchStrategy {
NONE,
STRICT,
CURRENTOROLDER;
}

/**
* Strategy for handling version mismatches.
* REJECT: Reject the orchestration if the version does not match. The orchestration will be retried later.
* FAIL: Fail the orchestration if the version does not match.
*/
public enum VersionFailureStrategy {
REJECT,
FAIL;
}

private final String version;
private final String defaultVersion;
private final VersionMatchStrategy matchStrategy;
private final VersionFailureStrategy failureStrategy;

/**
* Constructor for DurableTaskGrpcWorkerVersioningOptions.
* @param version the version that is matched against orchestrations
* @param defaultVersion the default version used when starting sub orchestrations from this worker
* @param matchStrategy the strategy for matching versions
* @param failureStrategy the strategy for handling version mismatches
*/
public DurableTaskGrpcWorkerVersioningOptions(String version, String defaultVersion, VersionMatchStrategy matchStrategy, VersionFailureStrategy failureStrategy) {
this.version = version;
this.defaultVersion = defaultVersion;
this.matchStrategy = matchStrategy;
this.failureStrategy = failureStrategy;
}

/**
* Gets the version that is matched against orchestrations.
* @return the version that is matched against orchestrations
*/
public String getVersion() {
return version;
}

/**
* Gets the default version used when starting sub orchestrations from this worker.
* @return the default version used when starting sub orchestrations from this worker
*/
public String getDefaultVersion() {
return defaultVersion;
}

/**
* Gets the strategy for matching versions.
* @return the strategy for matching versions
*/
public VersionMatchStrategy getMatchStrategy() {
return matchStrategy;
}

/**
* Gets the strategy for handling version mismatches.
* @return the strategy for handling version mismatches
*/
public VersionFailureStrategy getFailureStrategy() {
return failureStrategy;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.microsoft.durabletask;

/**
* Options for starting a new sub-orchestration instance.
*/
public class NewSubOrchestrationInstanceOptions extends TaskOptions {

private String instanceId;
private String version;

/**
* Creates options with a retry policy for the sub-orchestration.
* @param retryPolicy The retry policy to use for the sub-orchestration.
*/
public NewSubOrchestrationInstanceOptions(RetryPolicy retryPolicy) {
super(retryPolicy);
}

/**
* Creates options with a retry handler for the sub-orchestration.
* @param retryHandler The retry handler to use for the sub-orchestration.
*/
public NewSubOrchestrationInstanceOptions(RetryHandler retryHandler) {
super(retryHandler);
}

/**
* Sets the version for the sub-orchestration instance.
* @param version The version string to use.
* @return This options object for chaining.
*/
public NewSubOrchestrationInstanceOptions setVersion(String version) {
this.version = version;
return this;
}

/**
* Gets the version for the sub-orchestration instance.
* @return The version string, or null if not set.
*/
public String getVersion() {
return version;
}

/**
* Sets a custom instance ID for the sub-orchestration.
* @param instanceId The custom instance ID to use.
* @return This options object for chaining.
*/
public NewSubOrchestrationInstanceOptions setInstanceId(String instanceId) {
this.instanceId = instanceId;
return this;
}

/**
* Gets the custom instance ID for the sub-orchestration.
* @return The instance ID, or null if not set.
*/
public String getInstanceId() {
return instanceId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public TaskOrchestration create() {
orchestrationFactories,
new JacksonDataConverter(),
DEFAULT_MAXIMUM_TIMER_INTERVAL,
logger);
logger,
null);

// TODO: Error handling
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* Options that can be used to control the behavior of orchestrator and activity task execution.
*/
public final class TaskOptions {
public class TaskOptions {
private final RetryPolicy retryPolicy;
private final RetryHandler retryHandler;

Expand Down
Loading
Loading