-
Notifications
You must be signed in to change notification settings - Fork 241
Expand file tree
/
Copy pathworkflows_test.go
More file actions
139 lines (114 loc) · 4.44 KB
/
Copy pathworkflows_test.go
File metadata and controls
139 lines (114 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package streams
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/workflowstreams"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"
)
// The unit tests below cover the workflow side only. The client Subscribe path
// needs a live Temporal client, so it is exercised by Test_OrderWorkflow_DevServer.
func Test_OrderWorkflow(t *testing.T) {
env := (&testsuite.WorkflowTestSuite{}).NewTestWorkflowEnvironment()
env.OnActivity(ChargeCard, mock.Anything, "order-42").Return("charge-order-42", nil)
env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "order-42"})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "charge-order-42", result)
}
func Test_PipelineWorkflow(t *testing.T) {
env := (&testsuite.WorkflowTestSuite{}).NewTestWorkflowEnvironment()
env.ExecuteWorkflow(PipelineWorkflow, PipelineInput{PipelineID: "p1"})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "pipeline p1 done", result)
}
func Test_HubWorkflow(t *testing.T) {
env := (&testsuite.WorkflowTestSuite{}).NewTestWorkflowEnvironment()
// Signal the workflow to close after a simulated delay.
env.RegisterDelayedCallback(func() {
env.SignalWorkflow(CloseSignal, nil)
}, time.Second)
env.ExecuteWorkflow(HubWorkflow, HubInput{HubID: "newsroom"})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "hub newsroom closed", result)
}
func Test_TickerWorkflow(t *testing.T) {
env := (&testsuite.WorkflowTestSuite{}).NewTestWorkflowEnvironment()
env.ExecuteWorkflow(TickerWorkflow, TickerInput{Count: 12, KeepLast: 5, TruncateEvery: 5})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "ticker emitted 12 events", result)
}
func Test_LLMWorkflow(t *testing.T) {
env := (&testsuite.WorkflowTestSuite{}).NewTestWorkflowEnvironment()
env.OnActivity(StreamCompletion, mock.Anything, mock.Anything).Return("a streamed answer", nil)
env.ExecuteWorkflow(LLMWorkflow, LLMInput{Prompt: "hello"})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "a streamed answer", result)
}
// Test_OrderWorkflow_DevServer runs the basic publish/subscribe scenario end to
// end against a local dev server: it exercises publishing from both the workflow
// and the activity (via the real client) and consuming via Subscribe.
func Test_OrderWorkflow_DevServer(t *testing.T) {
server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
ClientOptions: &client.Options{HostPort: ""},
})
require.NoError(t, err)
defer func() { _ = server.Stop() }()
c := server.Client()
w := worker.New(c, TaskQueue, worker.Options{})
w.RegisterWorkflow(OrderWorkflow)
w.RegisterActivity(ChargeCard)
require.NoError(t, w.Start())
defer w.Stop()
ctx := context.Background()
workflowID := "workflow-streams-order-test"
_, err = c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: TaskQueue,
}, OrderWorkflow, OrderInput{OrderID: "order-42"})
require.NoError(t, err)
dc := converter.GetDefaultDataConverter()
stream := workflowstreams.NewClient(c, workflowID, workflowstreams.Options{})
defer func() { _ = stream.Close(ctx) }()
var statuses []string
progressCount := 0
for item, err := range stream.Subscribe(ctx, workflowstreams.SubscribeOptions{
Topics: []string{TopicStatus, TopicProgress},
}) {
require.NoError(t, err)
switch item.Topic {
case TopicStatus:
var evt StatusEvent
require.NoError(t, dc.FromPayload(item.Data, &evt))
statuses = append(statuses, evt.Kind)
if evt.Kind == "complete" {
require.Equal(t, []string{"received", "shipped", "complete"}, statuses)
require.GreaterOrEqual(t, progressCount, 2)
return
}
case TopicProgress:
var evt ProgressEvent
require.NoError(t, dc.FromPayload(item.Data, &evt))
progressCount++
}
}
}