|
| 1 | +<?php |
| 2 | + |
| 3 | +declare(strict_types=1); |
| 4 | + |
| 5 | +namespace Tests\Feature\V2; |
| 6 | + |
| 7 | +use Illuminate\Support\Facades\Queue; |
| 8 | +use Tests\Fixtures\V2\TestAvroParityActivity; |
| 9 | +use Tests\Fixtures\V2\TestAvroParityWorkflow; |
| 10 | +use Tests\TestCase; |
| 11 | +use Workflow\Serializers\CodecRegistry; |
| 12 | +use Workflow\Serializers\Serializer; |
| 13 | +use Workflow\V2\Enums\TaskStatus; |
| 14 | +use Workflow\V2\Enums\TaskType; |
| 15 | +use Workflow\V2\Jobs\RunActivityTask; |
| 16 | +use Workflow\V2\Jobs\RunTimerTask; |
| 17 | +use Workflow\V2\Jobs\RunWorkflowTask; |
| 18 | +use Workflow\V2\Models\WorkflowRun; |
| 19 | +use Workflow\V2\Models\WorkflowTask; |
| 20 | +use Workflow\V2\Support\HistoryExport; |
| 21 | +use Workflow\V2\WorkflowStub; |
| 22 | + |
| 23 | +/** |
| 24 | + * Release-gating Avro parity suite (#362). |
| 25 | + */ |
| 26 | +final class V2AvroParitySuiteTest extends TestCase |
| 27 | +{ |
| 28 | + protected function setUp(): void |
| 29 | + { |
| 30 | + parent::setUp(); |
| 31 | + config()->set('workflows.serializer', 'avro'); |
| 32 | + config()->set('queue.default', 'redis'); |
| 33 | + config()->set('queue.connections.redis.driver', 'redis'); |
| 34 | + Queue::fake(); |
| 35 | + } |
| 36 | + |
| 37 | + public function testStartInputRoundTripsUnderAvro(): void |
| 38 | + { |
| 39 | + $workflow = WorkflowStub::make(TestAvroParityWorkflow::class, 'avro-start-input'); |
| 40 | + $workflow->start('ORD-1', 99.5, 3); |
| 41 | + |
| 42 | + $run = WorkflowRun::query()->where('workflow_instance_id', 'avro-start-input')->first(); |
| 43 | + |
| 44 | + $this->assertSame('avro', $run->payload_codec); |
| 45 | + |
| 46 | + $args = Serializer::unserializeWithCodec('avro', $run->arguments); |
| 47 | + $this->assertSame('ORD-1', $args[0]); |
| 48 | + $this->assertSame(99.5, $args[1]); |
| 49 | + $this->assertIsFloat($args[1], 'float must survive Avro round-trip'); |
| 50 | + $this->assertSame(3, $args[2]); |
| 51 | + $this->assertIsInt($args[2], 'int must survive Avro round-trip'); |
| 52 | + } |
| 53 | + |
| 54 | + public function testFloatIntFidelityThroughFullWorkflowLifecycle(): void |
| 55 | + { |
| 56 | + $workflow = WorkflowStub::make(TestAvroParityWorkflow::class, 'avro-fidelity'); |
| 57 | + $workflow->start('ORD-2', 3.14, 42); |
| 58 | + |
| 59 | + $this->drainReadyTasks(); |
| 60 | + |
| 61 | + $run = WorkflowRun::query()->where('workflow_instance_id', 'avro-fidelity')->first(); |
| 62 | + $this->assertSame('avro', $run->payload_codec); |
| 63 | + $this->assertSame('waiting', $workflow->refresh()->status()); |
| 64 | + |
| 65 | + $workflow->signal('order-updated', ['priority' => true, 'discount' => 0.15]); |
| 66 | + $this->drainReadyTasks(); |
| 67 | + $this->assertTrue($workflow->refresh()->completed()); |
| 68 | + |
| 69 | + $output = $workflow->output(); |
| 70 | + |
| 71 | + $this->assertSame('ORD-2', $output['order_id']); |
| 72 | + $this->assertSame(3.14, $output['input_amount']); |
| 73 | + $this->assertIsFloat($output['input_amount'], '3.14 must stay float'); |
| 74 | + $this->assertSame(42, $output['input_items_count']); |
| 75 | + $this->assertIsInt($output['input_items_count'], '42 must stay int'); |
| 76 | + $this->assertSame(3, $output['three_point_zero']); |
| 77 | + |
| 78 | + $activityResult = $output['activity_result']; |
| 79 | + $this->assertSame('ORD-2', $activityResult['order_id']); |
| 80 | + $this->assertIsFloat($activityResult['amount'], 'activity result float must survive'); |
| 81 | + $this->assertIsInt($activityResult['items_count'], 'activity result int must survive'); |
| 82 | + $this->assertSame(3, $activityResult['three_point_zero']); |
| 83 | + |
| 84 | + $this->assertSame(['priority' => true, 'discount' => 0.15], $output['signal_payload']); |
| 85 | + } |
| 86 | + |
| 87 | + public function testEveryPayloadRowIsTaggedAvro(): void |
| 88 | + { |
| 89 | + $workflow = WorkflowStub::make(TestAvroParityWorkflow::class, 'avro-tagging'); |
| 90 | + $workflow->start('ORD-3', 10.0, 1); |
| 91 | + $this->drainReadyTasks(); |
| 92 | + $workflow->signal('order-updated', ['tagged' => true]); |
| 93 | + $this->drainReadyTasks(); |
| 94 | + |
| 95 | + $run = WorkflowRun::query()->where('workflow_instance_id', 'avro-tagging')->first(); |
| 96 | + $this->assertSame('avro', $run->payload_codec); |
| 97 | + |
| 98 | + $commands = $run->commands()->get(); |
| 99 | + foreach ($commands as $command) { |
| 100 | + if ($command->payload_codec !== null) { |
| 101 | + $this->assertSame('avro', $command->payload_codec, |
| 102 | + "Command {$command->id} has codec {$command->payload_codec}, expected avro"); |
| 103 | + } |
| 104 | + } |
| 105 | + } |
| 106 | + |
| 107 | + public function testHistoryExportIncludesCodecMetadata(): void |
| 108 | + { |
| 109 | + $workflow = WorkflowStub::make(TestAvroParityWorkflow::class, 'avro-export'); |
| 110 | + $workflow->start('ORD-4', 55.5, 7); |
| 111 | + $this->drainReadyTasks(); |
| 112 | + $workflow->signal('order-updated', ['exported' => true]); |
| 113 | + $this->drainReadyTasks(); |
| 114 | + |
| 115 | + $run = WorkflowRun::query()->where('workflow_instance_id', 'avro-export')->first(); |
| 116 | + $export = HistoryExport::forRun($run); |
| 117 | + |
| 118 | + $this->assertIsArray($export); |
| 119 | + $this->assertArrayHasKey('payloads', $export); |
| 120 | + $this->assertSame('avro', $export['payloads']['codec'] ?? null, |
| 121 | + 'Export must tag the run codec as avro'); |
| 122 | + } |
| 123 | + |
| 124 | + public function testSchemaEvolutionDecodesV1PayloadWithV2ReaderSchema(): void |
| 125 | + { |
| 126 | + if (! class_exists(\Apache\Avro\Schema\AvroSchema::class)) { |
| 127 | + $this->markTestSkipped('apache/avro not installed'); |
| 128 | + } |
| 129 | + |
| 130 | + $writerSchemaJson = json_encode([ |
| 131 | + 'type' => 'record', 'name' => 'OrderPayload', 'namespace' => 'durable_workflow.test', |
| 132 | + 'fields' => [ |
| 133 | + ['name' => 'order_id', 'type' => 'string'], |
| 134 | + ['name' => 'amount', 'type' => 'double'], |
| 135 | + ['name' => 'items_count', 'type' => 'int'], |
| 136 | + ], |
| 137 | + ]); |
| 138 | + |
| 139 | + $readerSchemaJson = json_encode([ |
| 140 | + 'type' => 'record', 'name' => 'OrderPayload', 'namespace' => 'durable_workflow.test', |
| 141 | + 'fields' => [ |
| 142 | + ['name' => 'order_id', 'type' => 'string'], |
| 143 | + ['name' => 'amount', 'type' => 'double'], |
| 144 | + ['name' => 'items_count', 'type' => 'int'], |
| 145 | + ['name' => 'region', 'type' => 'string', 'default' => 'us-east'], |
| 146 | + ], |
| 147 | + ]); |
| 148 | + |
| 149 | + $writerSchema = \Apache\Avro\Schema\AvroSchema::parse($writerSchemaJson); |
| 150 | + $readerSchema = \Apache\Avro\Schema\AvroSchema::parse($readerSchemaJson); |
| 151 | + |
| 152 | + $io = new \Apache\Avro\IO\AvroStringIO(); |
| 153 | + $encoder = new \Apache\Avro\Datum\AvroIOBinaryEncoder($io); |
| 154 | + $writer = new \Apache\Avro\Datum\AvroIODatumWriter($writerSchema); |
| 155 | + $writer->write(['order_id' => 'ORD-EVOLVE', 'amount' => 42.0, 'items_count' => 3], $encoder); |
| 156 | + $v1Bytes = $io->string(); |
| 157 | + |
| 158 | + $readIo = new \Apache\Avro\IO\AvroStringIO($v1Bytes); |
| 159 | + $decoder = new \Apache\Avro\Datum\AvroIOBinaryDecoder($readIo); |
| 160 | + $reader = new \Apache\Avro\Datum\AvroIODatumReader($writerSchema, $readerSchema); |
| 161 | + $decoded = $reader->read($decoder); |
| 162 | + |
| 163 | + $this->assertSame('ORD-EVOLVE', $decoded['order_id']); |
| 164 | + $this->assertSame(42.0, $decoded['amount']); |
| 165 | + $this->assertIsFloat($decoded['amount']); |
| 166 | + $this->assertSame(3, $decoded['items_count']); |
| 167 | + $this->assertIsInt($decoded['items_count']); |
| 168 | + $this->assertSame('us-east', $decoded['region'], 'Added field must get default value'); |
| 169 | + } |
| 170 | + |
| 171 | + public function testNonAvroBytesUnderAvroCodecTagProducesTypedError(): void |
| 172 | + { |
| 173 | + $this->expectException(\Workflow\Serializers\CodecDecodeException::class); |
| 174 | + Serializer::unserializeWithCodec('avro', '{"this":"is json not avro"}'); |
| 175 | + } |
| 176 | + |
| 177 | + public function testJsonBytesUnderAvroTagGetsDiagnosticMessage(): void |
| 178 | + { |
| 179 | + try { |
| 180 | + Serializer::unserializeWithCodec('avro', '{"x":1}'); |
| 181 | + $this->fail('Should have thrown'); |
| 182 | + } catch (\Workflow\Serializers\CodecDecodeException $e) { |
| 183 | + $this->assertStringContainsString('json', strtolower($e->getMessage()), |
| 184 | + 'Error should mention the bytes look like JSON'); |
| 185 | + } |
| 186 | + } |
| 187 | + |
| 188 | + public function testAvroDecodeFailureNamesCodec(): void |
| 189 | + { |
| 190 | + try { |
| 191 | + Serializer::unserializeWithCodec('avro', base64_encode("\x07garbage")); |
| 192 | + $this->fail('Should have thrown'); |
| 193 | + } catch (\Workflow\Serializers\CodecDecodeException $e) { |
| 194 | + $this->assertStringContainsString('avro', strtolower($e->getMessage())); |
| 195 | + } |
| 196 | + } |
| 197 | + |
| 198 | + private function drainReadyTasks(): void |
| 199 | + { |
| 200 | + $deadline = microtime(true) + 10; |
| 201 | + |
| 202 | + while (microtime(true) < $deadline) { |
| 203 | + $task = WorkflowTask::query() |
| 204 | + ->where('status', TaskStatus::Ready->value) |
| 205 | + ->orderBy('created_at') |
| 206 | + ->first(); |
| 207 | + |
| 208 | + if ($task === null) { |
| 209 | + return; |
| 210 | + } |
| 211 | + |
| 212 | + $job = match ($task->task_type) { |
| 213 | + TaskType::Workflow => new RunWorkflowTask($task->id), |
| 214 | + TaskType::Activity => new RunActivityTask($task->id), |
| 215 | + TaskType::Timer => new RunTimerTask($task->id), |
| 216 | + }; |
| 217 | + |
| 218 | + $this->app->call([$job, 'handle']); |
| 219 | + } |
| 220 | + |
| 221 | + $this->fail('Timed out draining ready workflow tasks.'); |
| 222 | + } |
| 223 | +} |
0 commit comments