Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Unreleased
* Add work item filtering support for `DurableTaskGrpcWorker` to enable worker-side filtering of orchestration and activity work items ([#275](https://github.com/microsoft/durabletask-java/pull/275))
* Add support for calls to HTTP endpoints ([#271](https://github.com/microsoft/durabletask-java/pull/271))
* Add getSuspendPostUri and getResumePostUri getters to HttpManagementPayload ([#264](https://github.com/microsoft/durabletask-java/pull/264))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final DataConverter dataConverter;
private final Duration maximumTimerInterval;
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
private final WorkItemFilter workItemFilter;
private final GetWorkItemsRequest getWorkItemsRequest;

private final TaskHubSidecarServiceBlockingStub sidecarClient;

DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder, WorkItemFilter workItemFilter) {
this.orchestrationFactories.putAll(builder.orchestrationFactories);
this.activityFactories.putAll(builder.activityFactories);

Expand Down Expand Up @@ -70,6 +72,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.versioningOptions = builder.versioningOptions;
this.workItemFilter = workItemFilter;
this.getWorkItemsRequest = buildGetWorkItemsRequest();
}

/**
Expand Down Expand Up @@ -132,8 +136,7 @@ public void startAndBlock() {
// TODO: How do we interrupt manually?
while (true) {
try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(this.getWorkItemsRequest);
while (workItemStream.hasNext()) {
WorkItem workItem = workItemStream.next();
RequestCase requestType = workItem.getRequestCase();
Expand Down Expand Up @@ -408,4 +411,38 @@ else if (requestType == RequestCase.HEALTHPING)
public void stop() {
this.close();
}
}

/**
* Returns the work item filter configured for this worker, or {@code null} if none.
*/
WorkItemFilter getWorkItemFilter() {
return this.workItemFilter;
}

private GetWorkItemsRequest buildGetWorkItemsRequest() {
GetWorkItemsRequest.Builder builder = GetWorkItemsRequest.newBuilder();
if (this.workItemFilter != null) {
builder.setWorkItemFilters(toProtoWorkItemFilters(this.workItemFilter));
Comment thread
bachuv marked this conversation as resolved.
}
return builder.build();
}

static WorkItemFilters toProtoWorkItemFilters(WorkItemFilter filter) {
WorkItemFilters.Builder builder = WorkItemFilters.newBuilder();
for (WorkItemFilter.OrchestrationFilter orch : filter.getOrchestrations()) {
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.Builder orchBuilder =
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.newBuilder()
.setName(orch.getName());
orchBuilder.addAllVersions(orch.getVersions());
builder.addOrchestrations(orchBuilder.build());
}
for (WorkItemFilter.ActivityFilter activity : filter.getActivities()) {
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.Builder actBuilder =
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.newBuilder()
.setName(activity.getName());
actBuilder.addAllVersions(activity.getVersions());
builder.addActivities(actBuilder.build());
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import io.grpc.Channel;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

/**
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.
Expand All @@ -18,6 +21,8 @@ public final class DurableTaskGrpcWorkerBuilder {
DataConverter dataConverter;
Duration maximumTimerInterval;
DurableTaskGrpcWorkerVersioningOptions versioningOptions;
private WorkItemFilter workItemFilter;
private boolean autoGenerateWorkItemFilters;

/**
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
Expand Down Expand Up @@ -125,11 +130,75 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin
return this;
}

/**
* Sets explicit work item filters for this worker. When set, only work items matching the filters
* will be dispatched to this worker by the backend.
* <p>
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
* only receives work items it can handle. However, if an orchestration calls a task type
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
* the call may hang indefinitely instead of failing with an error.
*
* @param workItemFilter the work item filter to use, or {@code null} to disable filtering
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder useWorkItemFilters(WorkItemFilter workItemFilter) {
this.workItemFilter = workItemFilter;
this.autoGenerateWorkItemFilters = false;
return this;
}

/**
* Enables automatic work item filtering by generating filters from the registered
* orchestrations and activities. When enabled, the backend will only dispatch work items
* for registered orchestrations and activities to this worker.
* <p>
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
* only receives work items it can handle. However, if an orchestration calls a task type
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
* the call may hang indefinitely instead of failing with an error.
* <p>
* Only use this method when all task types referenced by orchestrations are guaranteed to be
* registered with at least one connected worker.
*
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder useWorkItemFilters() {
this.autoGenerateWorkItemFilters = true;
this.workItemFilter = null;
return this;
}

/**
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskGrpcWorker} object
*/
public DurableTaskGrpcWorker build() {
return new DurableTaskGrpcWorker(this);
WorkItemFilter resolvedFilter = this.autoGenerateWorkItemFilters
? buildAutoWorkItemFilter()
: this.workItemFilter;
return new DurableTaskGrpcWorker(this, resolvedFilter);
}

private WorkItemFilter buildAutoWorkItemFilter() {
List<String> versions = Collections.emptyList();
if (this.versioningOptions != null
&& this.versioningOptions.getMatchStrategy() == DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.STRICT
&& this.versioningOptions.getVersion() != null) {
versions = Collections.singletonList(this.versioningOptions.getVersion());
}

WorkItemFilter.Builder builder = WorkItemFilter.newBuilder();
List<String> orchestrationNames = new ArrayList<>(this.orchestrationFactories.keySet());
Collections.sort(orchestrationNames);
for (String name : orchestrationNames) {
builder.addOrchestration(name, versions);
}
List<String> activityNames = new ArrayList<>(this.activityFactories.keySet());
Collections.sort(activityNames);
for (String name : activityNames) {
builder.addActivity(name, versions);
}
Comment thread
bachuv marked this conversation as resolved.
return builder.build();
}
}
221 changes: 221 additions & 0 deletions client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Represents work item filters for a Durable Task worker. These filters are passed to the backend
* and only work items matching the filters will be processed by the worker. If no filters are provided,
* the worker will process all work items.
* <p>
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
* only receives work items it can handle. However, if an orchestration calls a task type
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
* the call may hang indefinitely instead of failing with an error.
* <p>
* Use {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters(WorkItemFilter)} to provide explicit filters,
* or {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters()} to auto-generate filters from the
* registered orchestrations and activities.
*/
public final class WorkItemFilter {

private final List<OrchestrationFilter> orchestrations;
private final List<ActivityFilter> activities;

private WorkItemFilter(List<OrchestrationFilter> orchestrations, List<ActivityFilter> activities) {
this.orchestrations = Collections.unmodifiableList(new ArrayList<OrchestrationFilter>(orchestrations));
this.activities = Collections.unmodifiableList(new ArrayList<ActivityFilter>(activities));
}

/**
* Gets the orchestration filters.
*
* @return an unmodifiable list of orchestration filters
*/
public List<OrchestrationFilter> getOrchestrations() {
Comment thread
bachuv marked this conversation as resolved.
Dismissed
return this.orchestrations;
}

/**
* Gets the activity filters.
*
* @return an unmodifiable list of activity filters
*/
public List<ActivityFilter> getActivities() {
return this.activities;
}

/**
* Creates a new {@link Builder} for constructing {@link WorkItemFilter} instances.
*
* @return a new builder
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder for constructing {@link WorkItemFilter} instances.
*/
public static final class Builder {
private final List<OrchestrationFilter> orchestrations = new ArrayList<OrchestrationFilter>();
private final List<ActivityFilter> activities = new ArrayList<ActivityFilter>();

Builder() {
}

/**
* Adds an orchestration filter with the specified name and no version constraint.
*
* @param name the orchestration name to filter on
* @return this builder
*/
public Builder addOrchestration(String name) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Orchestration filter name must not be null or empty.");
}
this.orchestrations.add(new OrchestrationFilter(name, Collections.<String>emptyList()));
return this;
}

/**
* Adds an orchestration filter with the specified name and versions.
*
* @param name the orchestration name to filter on
* @param versions the versions to filter on
* @return this builder
*/
public Builder addOrchestration(String name, List<String> versions) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Orchestration filter name must not be null or empty.");
}
List<String> versionsCopy;
if (versions == null) {
versionsCopy = Collections.<String>emptyList();
} else {
for (String version : versions) {
if (version == null || version.isEmpty()) {
throw new IllegalArgumentException("Orchestration filter versions must not contain null or empty entries.");
}
}
versionsCopy = Collections.unmodifiableList(new ArrayList<String>(versions));
}
this.orchestrations.add(new OrchestrationFilter(name, versionsCopy));
return this;
}

/**
* Adds an activity filter with the specified name and no version constraint.
*
* @param name the activity name to filter on
* @return this builder
*/
public Builder addActivity(String name) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Activity filter name must not be null or empty.");
}
this.activities.add(new ActivityFilter(name, Collections.<String>emptyList()));
return this;
}

/**
* Adds an activity filter with the specified name and versions.
*
* @param name the activity name to filter on
* @param versions the versions to filter on
* @return this builder
*/
public Builder addActivity(String name, List<String> versions) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Activity filter name must not be null or empty.");
}
List<String> versionsCopy;
if (versions == null) {
versionsCopy = Collections.<String>emptyList();
} else {
for (String version : versions) {
if (version == null || version.isEmpty()) {
throw new IllegalArgumentException("Activity filter versions must not contain null or empty entries.");
}
}
versionsCopy = Collections.unmodifiableList(new ArrayList<String>(versions));
}
this.activities.add(new ActivityFilter(name, versionsCopy));
return this;
Comment thread
bachuv marked this conversation as resolved.
}

/**
* Builds a new {@link WorkItemFilter} from the configured filters.
*
* @return a new {@link WorkItemFilter} instance
*/
public WorkItemFilter build() {
return new WorkItemFilter(this.orchestrations, this.activities);
}
}

/**
* Specifies an orchestration filter with a name and optional versions.
*/
public static final class OrchestrationFilter {
private final String name;
private final List<String> versions;

OrchestrationFilter(String name, List<String> versions) {
this.name = name;
this.versions = Collections.unmodifiableList(new ArrayList<String>(versions));
}

/**
* Gets the name of the orchestration to filter.
*
* @return the orchestration name
*/
public String getName() {
return this.name;
}

/**
* Gets the versions of the orchestration to filter.
*
* @return an unmodifiable list of versions, or an empty list if no version constraint
*/
public List<String> getVersions() {
return this.versions;
}
}

/**
* Specifies an activity filter with a name and optional versions.
*/
public static final class ActivityFilter {
private final String name;
private final List<String> versions;

ActivityFilter(String name, List<String> versions) {
this.name = name;
this.versions = Collections.unmodifiableList(new ArrayList<String>(versions));
}

/**
* Gets the name of the activity to filter.
*
* @return the activity name
*/
public String getName() {
return this.name;
}

/**
* Gets the versions of the activity to filter.
*
* @return an unmodifiable list of versions, or an empty list if no version constraint
*/
public List<String> getVersions() {
return this.versions;
}
}
}
Loading
Loading