Skip to content

Commit 059efb8

Browse files
Evanthxyuandrew
andauthored
Sample code for Nexus messaging (#457)
* Added Nexus messaging sample code * Changes from code review * Fixes for lint * Update nexus-messaging/callerpattern/README.md Co-authored-by: Andrew Yuan <andrew.yuan@temporal.io> * Update nexus-messaging/callerpattern/README.md Co-authored-by: Andrew Yuan <andrew.yuan@temporal.io> * Update nexus-messaging/callerpattern/README.md Co-authored-by: Andrew Yuan <andrew.yuan@temporal.io> * Changes from code reviews --------- Co-authored-by: Andrew Yuan <andrew.yuan@temporal.io>
1 parent b28f5bd commit 059efb8

16 files changed

Lines changed: 1204 additions & 0 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,16 @@ These samples demonstrate some common control flow patterns using Temporal's Go
213213

214214
- [**Nexus Context Propagation**](./nexus-context-propagation): Demonstrates how to propagate context through client calls, workflows, and Nexus headers.
215215

216+
217+
216218
### Scenario based examples
217219

218220
- [**Safe Message Handler**](./safe_message_handler): This demonstrates how to safely handle concurrent update and signal requests.
219221

222+
- [**Nexus Messaging**](./nexus-messaging): Demonstrates how send signal, update and query messages through Nexus.
223+
This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus
224+
and sends messages to it.
225+
220226
- [**DSL Workflow**](./dsl): Demonstrates how to implement a
221227
DSL-based Workflow. This sample contains 2 yaml files that each define a custom "workflow" which instructs the
222228
Temporal Workflow. This is useful if you want to build in a "low code" layer.

nexus-messaging/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
This sample shows how to expose a long-running Workflow's queries, updates, and signals as Nexus
2+
operations. There are two self-contained examples, each in its own directory:
3+
4+
| | `callerpattern/` | `ondemandpattern/` |
5+
|--------------------------------|--------------------------------------|--------------------------------------------------------------|
6+
| **Pattern** | Signal an existing Workflow | Create and run Workflows on demand, and send signals to them |
7+
| **Who creates the Workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation |
8+
| **Who knows the Workflow ID?** | Only the handler | The caller chooses and passes it in every operation |
9+
| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` |
10+
11+
Each directory is fully self-contained for clarity. The
12+
`GreetingWorkflow`, `GreetingActivity`, and `Language` type are pretty much the same between the two -- only the
13+
Nexus service definition and its handler implementation differ. This highlights that the same Workflow can be
14+
exposed through Nexus in different ways depending on whether the caller needs lifecycle control.
15+
16+
See each directory's README for running instructions.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
## Caller pattern
2+
3+
The handler worker starts a `GreetingWorkflow` for a User ID.
4+
The Nexus handler holds that ID and routes every Nexus operation to it.
5+
The caller’s input doesn’t include the Workflow ID because it isn’t known. Instead, the caller provides the User ID, and the handler derives the Workflow ID from it (see `GetWorkflowID`).
6+
7+
The handler worker uses the same `GetWorkflowID` call to generate a Workflow ID from a User ID when it launches the Workflow.
8+
9+
The caller Workflow:
10+
1. Queries for supported languages (`getLanguages` -- backed by a query handler)
11+
2. Changes the language to Arabic (`setLanguage` -- backed by an update handler that calls an activity)
12+
3. Confirms the change via a second query (`getLanguage`)
13+
4. Approves the Workflow (`approve` -- backed by a signal handler)
14+
15+
### Running
16+
17+
Start a Temporal server:
18+
19+
```bash
20+
temporal server start-dev
21+
```
22+
23+
Create the namespaces and Nexus endpoint:
24+
25+
```bash
26+
temporal operator namespace create --namespace my-target-namespace
27+
temporal operator namespace create --namespace my-caller-namespace
28+
29+
temporal operator nexus endpoint create \
30+
--name my-nexus-endpoint-name \
31+
--target-namespace my-target-namespace \
32+
--target-task-queue my-handler-task-queue
33+
```
34+
35+
In one terminal, start the handler worker:
36+
37+
```bash
38+
go run ./nexus-messaging/callerpattern/handler/worker/main.go
39+
```
40+
41+
In a second terminal, start the caller worker:
42+
43+
```bash
44+
go run ./nexus-messaging/callerpattern/caller/worker/main.go
45+
```
46+
47+
In a third terminal, run the following command to start the example
48+
49+
```bash
50+
go run ./nexus-messaging/callerpattern/caller/starter/main.go
51+
```
52+
53+
Expected output:
54+
55+
```
56+
[1] getLanguages returned 2 languages
57+
[2] setLanguage(Arabic) returned previous language: English
58+
[3] getLanguage returned: Arabic (confirmed Arabic)
59+
[4] approve sent successfully
60+
```
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"go.temporal.io/sdk/client"
10+
11+
"github.com/temporalio/samples-go/nexus-messaging/callerpattern/caller"
12+
)
13+
14+
func main() {
15+
// Connect to the caller's namespace. For a non-local setup, provide additional
16+
// client options such as HostPort and TLS credentials.
17+
c, err := client.Dial(client.Options{Namespace: "my-caller-namespace"})
18+
if err != nil {
19+
log.Fatalln("Unable to create client", err)
20+
}
21+
defer c.Close()
22+
23+
ctx := context.Background()
24+
workflowOptions := client.StartWorkflowOptions{
25+
ID: "nexus-messaging-caller-workflow-" + time.Now().Format("20060102150405"),
26+
TaskQueue: caller.CallerTaskQueue,
27+
}
28+
29+
wr, err := c.ExecuteWorkflow(ctx, workflowOptions, caller.CallerWorkflow, "default-user")
30+
if err != nil {
31+
log.Fatalln("Unable to execute workflow", err)
32+
}
33+
log.Println("Started workflow", "WorkflowID", wr.GetID(), "RunID", wr.GetRunID())
34+
35+
var result []string
36+
if err := wr.Get(ctx, &result); err != nil {
37+
log.Fatalln("Unable to get workflow result", err)
38+
}
39+
for i, entry := range result {
40+
fmt.Printf("[%d] %s\n", i+1, entry)
41+
}
42+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"go.temporal.io/sdk/client"
7+
"go.temporal.io/sdk/worker"
8+
9+
"github.com/temporalio/samples-go/nexus-messaging/callerpattern/caller"
10+
)
11+
12+
func main() {
13+
// Connect to the caller's namespace. For a non-local setup, provide additional
14+
// client options such as HostPort and TLS credentials.
15+
c, err := client.Dial(client.Options{Namespace: "my-caller-namespace"})
16+
if err != nil {
17+
log.Fatalln("Unable to create client", err)
18+
}
19+
defer c.Close()
20+
21+
w := worker.New(c, caller.CallerTaskQueue, worker.Options{})
22+
w.RegisterWorkflow(caller.CallerWorkflow)
23+
24+
err = w.Run(worker.InterruptCh())
25+
if err != nil {
26+
log.Fatalln("Unable to start worker", err)
27+
}
28+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package caller
2+
3+
import (
4+
"fmt"
5+
6+
"go.temporal.io/sdk/workflow"
7+
8+
"github.com/temporalio/samples-go/nexus-messaging/callerpattern/service"
9+
)
10+
11+
const (
12+
CallerTaskQueue = "nexus-messaging-caller-task-queue"
13+
endpointName = "my-nexus-endpoint-name"
14+
)
15+
16+
// CallerWorkflow calls the NexusGreetingService operations for a given userID
17+
// and returns a log of actions taken.
18+
func CallerWorkflow(ctx workflow.Context, userID string) ([]string, error) {
19+
var log []string
20+
c := workflow.NewNexusClient(endpointName, service.ServiceName)
21+
22+
// 1. Get supported languages.
23+
fut := c.ExecuteOperation(ctx, service.GetLanguagesOperationName, service.GetLanguagesInput{
24+
IncludeUnsupported: false,
25+
UserID: userID,
26+
}, workflow.NexusOperationOptions{})
27+
var langsOut service.GetLanguagesOutput
28+
if err := fut.Get(ctx, &langsOut); err != nil {
29+
return nil, fmt.Errorf("getLanguages failed: %w", err)
30+
}
31+
log = append(log, fmt.Sprintf("getLanguages returned %d languages", len(langsOut.Languages)))
32+
33+
// 2. Set language to Arabic.
34+
fut = c.ExecuteOperation(ctx, service.SetLanguageOperationName, service.SetLanguageInput{
35+
Language: service.Arabic,
36+
UserID: userID,
37+
}, workflow.NexusOperationOptions{})
38+
var prevLang service.Language
39+
if err := fut.Get(ctx, &prevLang); err != nil {
40+
return nil, fmt.Errorf("setLanguage failed: %w", err)
41+
}
42+
log = append(log, fmt.Sprintf("setLanguage(Arabic) returned previous language: %s", prevLang))
43+
44+
// 3. Get current language (assert it is Arabic).
45+
fut = c.ExecuteOperation(ctx, service.GetLanguageOperationName, service.GetLanguageInput{
46+
UserID: userID,
47+
}, workflow.NexusOperationOptions{})
48+
var currentLang service.Language
49+
if err := fut.Get(ctx, &currentLang); err != nil {
50+
return nil, fmt.Errorf("getLanguage failed: %w", err)
51+
}
52+
if currentLang != service.Arabic {
53+
return nil, fmt.Errorf("expected Arabic, got %s", currentLang)
54+
}
55+
log = append(log, fmt.Sprintf("getLanguage returned: %s (confirmed Arabic)", currentLang))
56+
57+
// 4. Approve the workflow.
58+
fut = c.ExecuteOperation(ctx, service.ApproveOperationName, service.ApproveInput{
59+
Name: "CallerWorkflow",
60+
UserID: userID,
61+
}, workflow.NexusOperationOptions{})
62+
var approveOut service.ApproveOutput
63+
if err := fut.Get(ctx, &approveOut); err != nil {
64+
return nil, fmt.Errorf("approve failed: %w", err)
65+
}
66+
log = append(log, "approve sent successfully")
67+
68+
return log, nil
69+
}

0 commit comments

Comments
 (0)