Skip to content

Commit 22645d2

Browse files
authored
[DX-3609] migrate GRPC source tests to ChIP test sink (#21795)
migrate GRPC source tests to ChIP test sink
1 parent 93e5aae commit 22645d2

2 files changed

Lines changed: 105 additions & 63 deletions

File tree

system-tests/tests/smoke/cre/v2_grpc_source_test.go

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/hex"
77
"os"
88
"path/filepath"
9+
"strings"
910
"testing"
1011
"time"
1112

@@ -134,14 +135,15 @@ func ExecuteGRPCSourceLifecycleTest(t *testing.T, testEnv *ttypes.TestEnvironmen
134135
require.NoError(t, err, "failed to decode owner hex")
135136
artifacts := compileAndCopyWorkflow(t, testEnv, grpcWorkflowName, ownerHex)
136137

137-
// Start Beholder listener for workflow events
138-
testLogger.Info().Msg("Starting Beholder listener for workflow lifecycle events...")
139-
beholderCtx, messageChan, errChan := startWorkflowEventBeholder(t, testEnv)
138+
// Start CHiP sink listener for workflow events
139+
testLogger.Info().Msg("Starting CHiP sink listener for workflow lifecycle events...")
140+
eventsCtx, messageChan := startWorkflowEventSink(t)
141+
grpcWorkflowID := hex.EncodeToString(artifacts.WorkflowID[:])
140142

141143
// Step 1: (Optional) Verify contract workflow is activated
142144
if runIsolationChecks {
143-
testLogger.Info().Str("workflowName", contractWorkflowName).Msg("Step 1: Verifying contract-source workflow is active...")
144-
assertWorkflowActivated(t, beholderCtx, messageChan, errChan, contractWorkflowName, 2*grpcSourceTestSyncerInterval)
145+
testLogger.Info().Str("workflowName", contractWorkflowName).Msg("Step 1: Verifying contract workflow isolation baseline...")
146+
assertWorkflowStillExecuting(t, testEnv, contractWorkflowName)
145147
} else {
146148
testLogger.Info().Msg("Skipping contract workflow isolation checks (no contract workflow configured)")
147149
}
@@ -162,7 +164,7 @@ func ExecuteGRPCSourceLifecycleTest(t *testing.T, testEnv *ttypes.TestEnvironmen
162164
require.NoError(t, err, "failed to add workflow via private registry API")
163165

164166
// Verify gRPC workflow activation
165-
assertWorkflowActivated(t, beholderCtx, messageChan, errChan, grpcWorkflowName, 2*grpcSourceTestSyncerInterval)
167+
assertWorkflowActivated(t, eventsCtx, messageChan, grpcWorkflowName, grpcWorkflowID, 2*grpcSourceTestSyncerInterval)
166168

167169
// Step 3: (Optional) Verify contract workflow is still running (isolation check)
168170
if runIsolationChecks {
@@ -176,7 +178,7 @@ func ExecuteGRPCSourceLifecycleTest(t *testing.T, testEnv *ttypes.TestEnvironmen
176178
require.NoError(t, err, "failed to pause workflow via private registry API")
177179

178180
// Verify gRPC workflow paused
179-
assertWorkflowPaused(t, beholderCtx, messageChan, errChan, grpcWorkflowName, 2*grpcSourceTestSyncerInterval)
181+
assertWorkflowPaused(t, eventsCtx, messageChan, grpcWorkflowName, grpcWorkflowID, 2*grpcSourceTestSyncerInterval)
180182

181183
// Step 5: (Optional) Verify contract workflow is still running (isolation check)
182184
if runIsolationChecks {
@@ -190,15 +192,15 @@ func ExecuteGRPCSourceLifecycleTest(t *testing.T, testEnv *ttypes.TestEnvironmen
190192
require.NoError(t, err, "failed to resume workflow via private registry API")
191193

192194
// Verify gRPC workflow reactivated
193-
assertWorkflowActivated(t, beholderCtx, messageChan, errChan, grpcWorkflowName, 2*grpcSourceTestSyncerInterval)
195+
assertWorkflowActivated(t, eventsCtx, messageChan, grpcWorkflowName, grpcWorkflowID, 2*grpcSourceTestSyncerInterval)
194196

195197
// Step 7: Delete gRPC workflow
196198
testLogger.Info().Str("workflowName", grpcWorkflowName).Msg("Step 7: Deleting gRPC workflow...")
197199
err = mockServer.PrivateRegistryService().DeleteWorkflow(ctx, artifacts.WorkflowID)
198200
require.NoError(t, err, "failed to delete workflow via private registry API")
199201

200202
// Verify gRPC workflow deleted
201-
assertWorkflowDeleted(t, beholderCtx, messageChan, errChan, grpcWorkflowName, 2*grpcSourceTestSyncerInterval)
203+
assertWorkflowDeleted(t, eventsCtx, messageChan, grpcWorkflowName, grpcWorkflowID, 2*grpcSourceTestSyncerInterval)
202204

203205
// Step 8: (Optional) Final isolation check - contract workflow still running
204206
if runIsolationChecks {
@@ -242,12 +244,13 @@ func ExecuteGRPCSourceAuthRejectionTest(t *testing.T, testEnv *ttypes.TestEnviro
242244
err = mockServer.PrivateRegistryService().AddWorkflow(ctx, registration)
243245
require.NoError(t, err, "failed to add workflow via private registry API")
244246

245-
// Start Beholder listener
246-
beholderCtx, messageChan, errChan := startWorkflowEventBeholder(t, testEnv)
247+
// Start CHiP sink listener
248+
eventsCtx, messageChan := startWorkflowEventSink(t)
249+
workflowIDHex := hex.EncodeToString(registration.WorkflowID[:])
247250

248251
// Wait for 2 sync intervals - workflow should NOT be activated
249252
testLogger.Info().Msg("Waiting to verify workflow is NOT activated (auth rejection)...")
250-
assertNoWorkflowActivated(t, beholderCtx, messageChan, errChan, registration.WorkflowName, 2*grpcSourceTestSyncerInterval)
253+
assertNoWorkflowActivated(t, eventsCtx, messageChan, registration.WorkflowName, workflowIDHex, 2*grpcSourceTestSyncerInterval)
251254

252255
// Verify nodes are still healthy (no panics)
253256
testLogger.Info().Msg("Verifying nodes are still healthy after auth rejection...")
@@ -258,35 +261,23 @@ func ExecuteGRPCSourceAuthRejectionTest(t *testing.T, testEnv *ttypes.TestEnviro
258261

259262
// Helper functions
260263

261-
func startWorkflowEventBeholder(t *testing.T, testEnv *ttypes.TestEnvironment) (context.Context, <-chan proto.Message, <-chan error) {
264+
func startWorkflowEventSink(t *testing.T) (context.Context, <-chan proto.Message) {
262265
t.Helper()
263266

264-
beholder, err := t_helpers.NewBeholder(framework.L, testEnv.TestConfig)
265-
require.NoError(t, err, "failed to create beholder instance")
266-
267-
// Register for workflow deployment events
268-
messageTypes := map[string]func() proto.Message{
269-
"workflows.v2.WorkflowActivated": func() proto.Message { return &workflowsv2.WorkflowActivated{} },
270-
"workflows.v2.WorkflowPaused": func() proto.Message { return &workflowsv2.WorkflowPaused{} },
271-
"workflows.v2.WorkflowDeleted": func() proto.Message { return &workflowsv2.WorkflowDeleted{} },
272-
}
267+
messageChan := make(chan proto.Message, 1000)
268+
server := t_helpers.StartChipTestSink(t, t_helpers.GetWorkflowV2LifecyclePublishFn(framework.L, messageChan))
269+
t.Cleanup(func() {
270+
server.Shutdown(t.Context())
271+
close(messageChan)
272+
})
273273

274274
timeout := 5 * time.Minute
275-
beholderCtx, cancelListener := context.WithTimeout(t.Context(), timeout)
275+
eventsCtx, cancelListener := context.WithTimeout(t.Context(), timeout)
276276
t.Cleanup(func() {
277277
cancelListener()
278278
})
279279

280-
messageChan, errChan := beholder.SubscribeToBeholderMessages(beholderCtx, messageTypes)
281-
282-
// Fail fast if there's an immediate error
283-
select {
284-
case err := <-errChan:
285-
require.NoError(t, err, "Beholder subscription failed during initialization")
286-
default:
287-
}
288-
289-
return beholderCtx, messageChan, errChan
280+
return eventsCtx, messageChan
290281
}
291282

292283
// workflowEvent is an interface that abstracts common fields across workflow lifecycle events
@@ -308,37 +299,44 @@ type workflowEventMatcher struct {
308299
}
309300

310301
// assertWorkflowEvent is a generic function to wait for and validate a workflow lifecycle event.
311-
// It listens on messageChan for messages matching the specified matcher and workflowName.
302+
// It listens on messageChan for messages matching the specified matcher and workflowID.
312303
func assertWorkflowEvent(
313304
t *testing.T,
314305
ctx context.Context, //nolint:revive // test helper conventionally has t first
315306
messageChan <-chan proto.Message,
316-
errChan <-chan error,
317307
workflowName string,
308+
expectedWorkflowID string,
318309
timeout time.Duration,
319310
matcher workflowEventMatcher,
320311
) {
321312
t.Helper()
322313
testLogger := framework.L
314+
normalizedExpectedWorkflowID := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(expectedWorkflowID)), "0x")
315+
timer := time.NewTimer(timeout)
316+
defer timer.Stop()
323317

324318
for {
325319
select {
326-
case msg := <-messageChan:
320+
case msg, ok := <-messageChan:
321+
if !ok || msg == nil {
322+
continue
323+
}
327324
if event, ok := matcher.tryMatch(msg); ok {
328325
wfKey := event.GetWorkflow().GetWorkflowKey()
329-
if wfKey.GetWorkflowName() == workflowName {
326+
normalizedEventWorkflowID := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(wfKey.GetWorkflowID())), "0x")
327+
if normalizedEventWorkflowID == normalizedExpectedWorkflowID {
330328
require.Empty(t, event.GetErrorMessage(), matcher.errorAssertionMsg)
331329
testLogger.Info().
332330
Str("workflowName", wfKey.GetWorkflowName()).
333331
Str("workflowID", wfKey.GetWorkflowID()).
332+
Str("expectedWorkflowName", workflowName).
333+
Str("expectedWorkflowID", expectedWorkflowID).
334334
Msgf("%s event received", matcher.eventName)
335335
return
336336
}
337337
}
338-
case err := <-errChan:
339-
require.NoError(t, err, "Beholder error during %s assertion", matcher.eventName)
340-
case <-time.After(timeout):
341-
t.Fatalf("Timeout waiting for %s event for workflow %s", matcher.eventName, workflowName)
338+
case <-timer.C:
339+
t.Fatalf("Timeout waiting for %s event for workflowID %s (workflowName: %s)", matcher.eventName, expectedWorkflowID, workflowName)
342340
case <-ctx.Done():
343341
t.Fatalf("Context cancelled while waiting for %s event", matcher.eventName)
344342
}
@@ -384,49 +382,59 @@ var (
384382
// assertWorkflowActivated waits for a WorkflowActivated event for the given workflow name.
385383
//
386384
//nolint:revive // test helper conventionally has t first
387-
func assertWorkflowActivated(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, errChan <-chan error, workflowName string, timeout time.Duration) {
385+
func assertWorkflowActivated(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, workflowName string, workflowID string, timeout time.Duration) {
388386
t.Helper()
389-
assertWorkflowEvent(t, ctx, messageChan, errChan, workflowName, timeout, workflowActivatedMatcher)
387+
assertWorkflowEvent(t, ctx, messageChan, workflowName, workflowID, timeout, workflowActivatedMatcher)
390388
}
391389

392390
// assertWorkflowPaused waits for a WorkflowPaused event for the given workflow name.
393391
//
394392
//nolint:revive // test helper conventionally has t first
395-
func assertWorkflowPaused(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, errChan <-chan error, workflowName string, timeout time.Duration) {
393+
func assertWorkflowPaused(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, workflowName string, workflowID string, timeout time.Duration) {
396394
t.Helper()
397-
assertWorkflowEvent(t, ctx, messageChan, errChan, workflowName, timeout, workflowPausedMatcher)
395+
assertWorkflowEvent(t, ctx, messageChan, workflowName, workflowID, timeout, workflowPausedMatcher)
398396
}
399397

400398
// assertWorkflowDeleted waits for a WorkflowDeleted event for the given workflow name.
401399
//
402400
//nolint:revive // test helper conventionally has t first
403-
func assertWorkflowDeleted(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, errChan <-chan error, workflowName string, timeout time.Duration) {
401+
func assertWorkflowDeleted(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, workflowName string, workflowID string, timeout time.Duration) {
404402
t.Helper()
405-
assertWorkflowEvent(t, ctx, messageChan, errChan, workflowName, timeout, workflowDeletedMatcher)
403+
assertWorkflowEvent(t, ctx, messageChan, workflowName, workflowID, timeout, workflowDeletedMatcher)
406404
}
407405

408406
//nolint:revive // test helper conventionally has t first
409-
func assertNoWorkflowActivated(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, errChan <-chan error, workflowName string, timeout time.Duration) {
407+
func assertNoWorkflowActivated(t *testing.T, ctx context.Context, messageChan <-chan proto.Message, workflowName string, expectedWorkflowID string, timeout time.Duration) {
410408
t.Helper()
411409
testLogger := framework.L
410+
normalizedExpectedWorkflowID := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(expectedWorkflowID)), "0x")
411+
timer := time.NewTimer(timeout)
412+
defer timer.Stop()
412413

413-
select {
414-
case msg := <-messageChan:
415-
if activated, ok := msg.(*workflowsv2.WorkflowActivated); ok {
416-
wfKey := activated.GetWorkflow().GetWorkflowKey()
417-
if wfKey.GetWorkflowName() == workflowName {
418-
t.Fatalf("Workflow %s should NOT be activated when auth is rejected", workflowName)
414+
for {
415+
select {
416+
case msg, ok := <-messageChan:
417+
if !ok || msg == nil {
418+
continue
419+
}
420+
if activated, ok := msg.(*workflowsv2.WorkflowActivated); ok {
421+
wfKey := activated.GetWorkflow().GetWorkflowKey()
422+
normalizedEventWorkflowID := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(wfKey.GetWorkflowID())), "0x")
423+
if normalizedEventWorkflowID == normalizedExpectedWorkflowID {
424+
t.Fatalf("Workflow %s (workflowID %s) should NOT be activated when auth is rejected", workflowName, expectedWorkflowID)
425+
}
419426
}
427+
case <-timer.C:
428+
// Success - no activation received
429+
testLogger.Info().
430+
Str("workflowName", workflowName).
431+
Str("workflowID", expectedWorkflowID).
432+
Msg("Confirmed: No WorkflowActivated event received (expected for auth rejection)")
433+
return
434+
case <-ctx.Done():
435+
// Context cancelled, which is fine
436+
return
420437
}
421-
case err := <-errChan:
422-
require.NoError(t, err, "Beholder error during assertNoWorkflowActivated")
423-
case <-time.After(timeout):
424-
// Success - no activation received
425-
testLogger.Info().
426-
Str("workflowName", workflowName).
427-
Msg("Confirmed: No WorkflowActivated event received (expected for auth rejection)")
428-
case <-ctx.Done():
429-
// Context cancelled, which is fine
430438
}
431439
}
432440

system-tests/tests/test-helpers/chip_testsink_helpers.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,15 @@ func safeSendBaseMessage(ch chan *commonevents.BaseMessage, msg *commonevents.Ba
150150
ch <- msg
151151
}
152152

153+
func safeSendProtoMessage(ch chan proto.Message, msg proto.Message) {
154+
// Same race as safeSendUserLogs; avoid panic on send to closed channel.
155+
defer func() { _ = recover() }()
156+
if ch == nil {
157+
return
158+
}
159+
ch <- msg
160+
}
161+
153162
func ChipSinkFanoutEnabled() bool {
154163
v := strings.TrimSpace(strings.ToLower(os.Getenv("CRE_TEST_CHIP_SINK_FANOUT_ENABLED")))
155164
return v == "1" || v == "true" || v == "yes"
@@ -324,6 +333,31 @@ func GetPublishFn(testLogger zerolog.Logger, userLogsCh chan *workflowevents.Use
324333
return publishFn
325334
}
326335

336+
// GetWorkflowV2LifecyclePublishFn returns a CHiP publish handler that demuxes workflow lifecycle v2 events.
337+
func GetWorkflowV2LifecyclePublishFn(testLogger zerolog.Logger, workflowEventCh chan proto.Message) chiptestsink.PublishFn {
338+
return func(ctx context.Context, event *pb.CloudEvent) (*chippb.PublishResponse, error) {
339+
var typedMsg proto.Message
340+
switch event.Type {
341+
case "workflows.v2.WorkflowActivated":
342+
typedMsg = &workfloweventsv2.WorkflowActivated{}
343+
case "workflows.v2.WorkflowPaused":
344+
typedMsg = &workfloweventsv2.WorkflowPaused{}
345+
case "workflows.v2.WorkflowDeleted":
346+
typedMsg = &workfloweventsv2.WorkflowDeleted{}
347+
default:
348+
return &chippb.PublishResponse{}, nil
349+
}
350+
351+
if err := proto.Unmarshal(event.GetProtoData().GetValue(), typedMsg); err != nil {
352+
testLogger.Error().Err(err).Str("ce_type", event.Type).Msg("Failed to unmarshal protobuf; skipping")
353+
return &chippb.PublishResponse{}, nil
354+
}
355+
356+
safeSendProtoMessage(workflowEventCh, typedMsg)
357+
return &chippb.PublishResponse{}, nil
358+
}
359+
}
360+
327361
// GetLoggingPublishFn returns a CHiP publish handler that demuxes events into the provided channels and saves all events to a file.
328362
// Useful when debugging failures of tests that depend on workflow logs.
329363
func GetLoggingPublishFn(

0 commit comments

Comments
 (0)