From 5097b7266ab4cfd7ccfe602af6d2d7a19b9f4c70 Mon Sep 17 00:00:00 2001 From: Anthony Malkoun Date: Tue, 2 Jun 2026 19:59:40 +1000 Subject: [PATCH 1/4] Add draft implementation to test connectivity --- .../key.key.data_pipeline_push_api_key.yml | 14 + .../src/Access/ApiKeyAccessCheck.php | 61 ++++ .../src/Controller/DatasetPushController.php | 82 +++++ .../DatasetSource/JsonEndpointSource.php | 128 ++++++++ ...line_json_endpoint_test.data_pipelines.yml | 2 + ..._data_pipeline_json_endpoint_test.info.yml | 8 + .../src/Kernel/ApiKeyAccessCheckTest.php | 150 +++++++++ .../src/Kernel/DatasetPushControllerTest.php | 289 ++++++++++++++++++ .../Kernel/JsonEndpointSourceKernelTest.php | 204 +++++++++++++ .../DatasetSource/JsonEndpointSourceTest.php | 61 ++++ .../tide_data_pipeline_json_endpoint.info.yml | 8 + ...ata_pipeline_json_endpoint.permissions.yml | 3 + ...de_data_pipeline_json_endpoint.routing.yml | 10 + ...e_data_pipeline_json_endpoint.services.yml | 7 + 14 files changed, 1027 insertions(+) create mode 100644 modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml create mode 100644 modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php create mode 100644 modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php create mode 100644 modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php create mode 100644 modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.data_pipelines.yml create mode 100644 modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.info.yml create mode 100644 modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php create mode 100644 modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php create mode 100644 modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php create mode 100644 modules/tide_data_pipeline_json_endpoint/tests/src/Unit/Plugin/DatasetSource/JsonEndpointSourceTest.php create mode 100644 modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml create mode 100644 modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml create mode 100644 modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml create mode 100644 modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml diff --git a/modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml b/modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml new file mode 100644 index 000000000..3bc480042 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml @@ -0,0 +1,14 @@ +langcode: en +status: true +dependencies: {} +id: data_pipeline_push_api_key +label: 'Data Pipeline Push API Key' +description: 'API key for authenticating POST requests to the JSON endpoint dataset source. Set the DATA_PIPELINE_PUSH_API_KEY environment variable to provide the value.' +key_type: authentication +key_type_settings: {} +key_provider: env +key_provider_settings: + env_variable: DATA_PIPELINE_PUSH_API_KEY + strip_line_breaks: true +key_input: none +key_input_settings: {} diff --git a/modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php b/modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php new file mode 100644 index 000000000..d26f37e1c --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php @@ -0,0 +1,61 @@ +hasRequirement('_data_pipeline_api_key'); + } + + /** + * Allows the request if the X-Api-Key header matches the configured key. + */ + public function access(Route $route, Request $request): AccessResultInterface { + $provided = $request->headers->get(static::HEADER_NAME, ''); + if (empty($provided)) { + return AccessResult::forbidden('Missing ' . static::HEADER_NAME . ' header.')->setCacheMaxAge(0); + } + + $key = $this->keyRepository->getKey(static::KEY_ID); + $expected = $key ? $key->getKeyValue() : NULL; + + if (empty($expected)) { + return AccessResult::forbidden('API key not configured.')->setCacheMaxAge(0); + } + + // Use hash_equals to prevent timing attacks. + if (!hash_equals($expected, $provided)) { + return AccessResult::forbidden('Invalid API key.')->setCacheMaxAge(0); + } + + return AccessResult::allowed()->setCacheMaxAge(0); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php b/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php new file mode 100644 index 000000000..bd32b99ab --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php @@ -0,0 +1,82 @@ +get('file_system')); + } + + public function push(Request $request, string $machine_name): JsonResponse { + if (!str_contains($request->headers->get('Content-Type', ''), 'application/json')) { + return new JsonResponse(['error' => 'Content-Type must be application/json.'], 415); + } + + $datasets = $this->entityTypeManager()->getStorage('data_pipelines') + ->loadByProperties(['machine_name' => $machine_name, 'source' => 'json_endpoint']); + + if (empty($datasets)) { + return new JsonResponse(['error' => 'Dataset not found.'], 404); + } + + /** @var \Drupal\data_pipelines\Entity\DatasetInterface $dataset */ + $dataset = reset($datasets); + + if (!$dataset->isPublished()) { + return new JsonResponse(['error' => 'Dataset is not published.'], 422); + } + + $body = $request->getContent(); + try { + json_decode($body, flags: JSON_THROW_ON_ERROR); + } + catch (\JsonException $e) { + return new JsonResponse(['error' => 'Invalid JSON: ' . $e->getMessage()], 400); + } + + $directory_uri = sprintf('%s://%s', JsonEndpointSource::STORAGE_SCHEME, JsonEndpointSource::STORAGE_DIR); + $this->fileSystem->prepareDirectory($directory_uri, FileSystemInterface::CREATE_DIRECTORY | FileSystemInterface::MODIFY_PERMISSIONS); + $this->fileSystem->saveData($body, JsonEndpointSource::buildStorageUri($machine_name), FileSystemInterface::EXISTS_REPLACE); + + if ($request->query->getBoolean('save_only')) { + return new JsonResponse(['status' => 'saved', 'machine_name' => $machine_name]); + } + + $dataset_id = (int) $dataset->id(); + + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + DatasetBatchOperations::operationQueueItem($dataset_id, $context); + } while ($context['finished'] < 1); + + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + DatasetBatchOperations::operationProcess($dataset_id, $context); + } while ($context['finished'] < 1); + + return new JsonResponse(['status' => 'processed', 'machine_name' => $machine_name]); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php b/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php new file mode 100644 index 000000000..95d02beac --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php @@ -0,0 +1,128 @@ +get('file_system'), + $container->get('logger.channel.data_pipelines'), + ); + } + + /** + * {@inheritdoc} + */ + public function buildFieldDefinitions(): array { + return [ + 'json_endpoint_path_to_data' => BaseFieldDefinition::create('string') + ->setLabel(new TranslatableMarkup('Path to data')) + ->setDescription(new TranslatableMarkup("Optional JSON path expression to select a sub-array from the received payload.", [ + '@link' => 'https://github.com/SoftCreatR/JSONPath', + ])) + ->setSetting('max_length', 255) + ->setDisplayOptions('form', ['type' => 'string_textfield']) + ->setPropertyConstraints('value', ['JsonPath' => []]), + ]; + } + + /** + * {@inheritdoc} + */ + public function extractDataFromDataSet(DatasetInterface $dataset): \Generator { + $machine_name = $dataset->get('machine_name')->value; + $file_uri = static::buildStorageUri($machine_name); + $real_path = $this->fileSystem->realpath($file_uri); + + if (!$real_path || !file_exists($real_path)) { + $this->logger->warning('No payload file found for dataset @name at @uri.', [ + '@name' => $machine_name, + '@uri' => $file_uri, + ]); + return; + } + + try { + $json = json_decode(file_get_contents($real_path), TRUE, 512, JSON_THROW_ON_ERROR); + + $json_path = self::getFieldValue($dataset, 'json_endpoint_path_to_data'); + if (!empty($json_path)) { + $json = $this->createJsonPath($json)->find($json_path)->getData(); + } + + if (is_array($json) && array_values($json) === $json) { + foreach ($json as $record) { + yield new DatasetData(is_array($record) ? $record : [$record]); + } + } + else { + yield new DatasetData($json); + } + } + catch (\Exception $e) { + $this->logger->critical('Failed to process JSON payload for dataset @name: @message', [ + '@name' => $machine_name, + '@message' => $e->getMessage(), + ]); + } + } + + /** + * Returns the private filesystem URI for a dataset's stored JSON payload. + */ + public static function buildStorageUri(string $machine_name): string { + return sprintf('%s://%s/%s.json', static::STORAGE_SCHEME, static::STORAGE_DIR, $machine_name); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.data_pipelines.yml b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.data_pipelines.yml new file mode 100644 index 000000000..5774fad46 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.data_pipelines.yml @@ -0,0 +1,2 @@ +test_json_endpoint_pipeline: + label: 'Test JSON Endpoint Pipeline' diff --git a/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.info.yml b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.info.yml new file mode 100644 index 000000000..b42dfff2d --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.info.yml @@ -0,0 +1,8 @@ +name: 'Tide Data Pipeline JSON Endpoint Test' +type: module +description: 'Test fixtures and pipelines for tide_data_pipeline_json_endpoint.' +core_version_requirement: ^10 || ^11 +package: Testing +dependencies: + - data_pipelines:data_pipelines + - tide_core:tide_data_pipeline_json_endpoint diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php new file mode 100644 index 000000000..7b133e59e --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php @@ -0,0 +1,150 @@ +installEntitySchema('user'); + $this->installConfig(['tide_data_pipeline_json_endpoint']); + $this->route = new Route('/api/datasets/{machine_name}/push', requirements: ['_data_pipeline_api_key' => 'TRUE']); + BypassFinals::enable(FALSE); + } + + /** + * Returns the access check instance from the container. + */ + private function check(): ApiKeyAccessCheck { + return $this->container->get('tide_data_pipeline_json_endpoint.api_key_access_check'); + } + + /** + * Returns a POST request optionally carrying an API key header. + */ + private function request(?string $api_key = NULL): Request { + $request = Request::create('/api/datasets/test/push', 'POST'); + if ($api_key !== NULL) { + $request->headers->set('X-Api-Key', $api_key); + } + return $request; + } + + /** + * Tests that applies() returns true for routes with _data_pipeline_api_key. + */ + public function testAppliesReturnsTrueForTaggedRoute(): void { + $this->assertTrue($this->check()->applies($this->route)); + } + + /** + * Tests that applies() returns false for routes without _data_pipeline_api_key. + */ + public function testAppliesReturnsFalseForUntaggedRoute(): void { + $this->assertFalse($this->check()->applies(new Route('/some/other/route'))); + } + + /** + * Tests that the correct key grants access. + */ + public function testAccessAllowedWithCorrectKey(): void { + putenv('DATA_PIPELINE_PUSH_API_KEY=super-secret-test-key'); + + $result = $this->check()->access($this->route, $this->request('super-secret-test-key')); + + $this->assertTrue($result->isAllowed()); + + putenv('DATA_PIPELINE_PUSH_API_KEY='); + } + + /** + * Tests that a missing X-Api-Key header is forbidden. + */ + public function testAccessForbiddenWhenHeaderMissing(): void { + putenv('DATA_PIPELINE_PUSH_API_KEY=super-secret-test-key'); + + $result = $this->check()->access($this->route, $this->request()); + + $this->assertTrue($result->isForbidden()); + + putenv('DATA_PIPELINE_PUSH_API_KEY='); + } + + /** + * Tests that an incorrect API key is forbidden. + */ + public function testAccessForbiddenWithWrongKey(): void { + putenv('DATA_PIPELINE_PUSH_API_KEY=super-secret-test-key'); + + $result = $this->check()->access($this->route, $this->request('wrong-key')); + + $this->assertTrue($result->isForbidden()); + + putenv('DATA_PIPELINE_PUSH_API_KEY='); + } + + /** + * Tests that access is forbidden when the env variable is not set. + */ + public function testAccessForbiddenWhenEnvVariableNotSet(): void { + putenv('DATA_PIPELINE_PUSH_API_KEY='); + + $result = $this->check()->access($this->route, $this->request('any-key')); + + $this->assertTrue($result->isForbidden()); + } + + /** + * Tests that the result is never cached (access depends on the request header). + */ + public function testAccessResultIsNeverCached(): void { + putenv('DATA_PIPELINE_PUSH_API_KEY=my-key'); + + $allowed = $this->check()->access($this->route, $this->request('my-key')); + $forbidden = $this->check()->access($this->route, $this->request('bad-key')); + + $this->assertSame(0, $allowed->getCacheMaxAge()); + $this->assertSame(0, $forbidden->getCacheMaxAge()); + + putenv('DATA_PIPELINE_PUSH_API_KEY='); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php new file mode 100644 index 000000000..0ea5d7fe9 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php @@ -0,0 +1,289 @@ +privatePath = $this->siteDirectory . '/private'; + mkdir($this->privatePath, 0777, TRUE); + $this->setSetting('file_private_path', $this->privatePath); + $this->installEntitySchema('user'); + $this->installEntitySchema('file'); + $this->installEntitySchema('data_pipelines'); + $this->installSchema('file', ['file_usage']); + $this->installConfig(['tide_data_pipeline_json_endpoint']); + $this->setUpCurrentUser(); + BypassFinals::enable(FALSE); + } + + /** + * Returns an instantiated controller with the test container. + */ + private function controller(): DatasetPushController { + return DatasetPushController::create($this->container); + } + + /** + * Builds a POST request with JSON content type. + */ + private function jsonRequest(mixed $body): Request { + return Request::create( + '/', + 'POST', + content: is_string($body) ? $body : json_encode($body), + server: ['CONTENT_TYPE' => 'application/json'], + ); + } + + /** + * Creates and saves a published json_endpoint dataset with a state destination. + */ + private function createPublishedDataset(string $machine_name): Dataset { + $destination = Destination::create([ + 'id' => $machine_name . '_dest', + 'label' => 'Test destination', + 'destination' => 'state', + 'destinationSettings' => ['state_key' => 'test_push_result'], + ]); + $destination->save(); + + $dataset = Dataset::create([ + 'name' => $machine_name, + 'machine_name' => $machine_name, + 'source' => 'json_endpoint', + 'pipeline' => 'test_json_endpoint_pipeline', + 'published' => TRUE, + 'destinations' => [$destination], + ]); + $dataset->save(); + assert($dataset instanceof Dataset); + return $dataset; + } + + /** + * Tests that a valid push returns 200 and processes the dataset. + */ + public function testPushProcessesDatasetAndReturns200(): void { + $machine_name = 'push_success'; + $this->createPublishedDataset($machine_name); + $payload = [['suburb' => 'Carlton'], ['suburb' => 'Fitzroy']]; + + $response = $this->controller()->push($this->jsonRequest($payload), $machine_name); + + $this->assertSame(200, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertSame('processed', $body['status']); + $this->assertSame($machine_name, $body['machine_name']); + } + + /** + * Tests that ?save_only=1 saves the file but skips reprocessing. + */ + public function testSaveOnlyReturnsSavedStatusWithoutProcessing(): void { + $machine_name = 'push_save_only'; + $this->createPublishedDataset($machine_name); + $payload = [['id' => 1]]; + + $request = $this->jsonRequest($payload); + $request->query->set('save_only', '1'); + $response = $this->controller()->push($request, $machine_name); + + $this->assertSame(200, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertSame('saved', $body['status']); + + // Confirm the file was written. + $file_path = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR . '/' . $machine_name . '.json'; + $this->assertFileExists($file_path); + $this->assertSame($payload, json_decode(file_get_contents($file_path), TRUE)); + + // Confirm no processing occurred (state destination was never written to). + $this->assertNull(\Drupal::state()->get('test_push_result')); + } + + /** + * Tests that a save_only push followed by manual reprocessing works end-to-end. + */ + public function testSaveOnlyThenManualReprocessProducesExpectedData(): void { + $machine_name = 'push_then_reprocess'; + $dataset = $this->createPublishedDataset($machine_name); + $payload = [['name' => 'Station A'], ['name' => 'Station B']]; + + // Step 1: save only. + $request = $this->jsonRequest($payload); + $request->query->set('save_only', '1'); + $this->controller()->push($request, $machine_name); + + // Step 2: trigger reprocessing via the batch operations directly. + $dataset_id = (int) $dataset->id(); + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + \Drupal\data_pipelines\Form\DatasetBatchOperations::operationQueueItem($dataset_id, $context); + } while ($context['finished'] !== 1); + + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + \Drupal\data_pipelines\Form\DatasetBatchOperations::operationProcess($dataset_id, $context); + } while ($context['finished'] !== 1); + + $stored = \Drupal::state()->get('test_push_result'); + $this->assertNotNull($stored); + } + + /** + * Tests that a non-JSON Content-Type returns 415. + */ + public function testPushReturns415ForNonJsonContentType(): void { + $machine_name = 'push_415'; + $this->createPublishedDataset($machine_name); + + $request = Request::create('/', 'POST', content: 'some data', server: ['CONTENT_TYPE' => 'text/plain']); + $response = $this->controller()->push($request, $machine_name); + + $this->assertSame(415, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertArrayHasKey('error', $body); + } + + /** + * Tests that a malformed JSON body returns 400. + */ + public function testPushReturns400ForInvalidJson(): void { + $machine_name = 'push_400'; + $this->createPublishedDataset($machine_name); + + $response = $this->controller()->push( + Request::create('/', 'POST', content: '{not: valid json', server: ['CONTENT_TYPE' => 'application/json']), + $machine_name + ); + + $this->assertSame(400, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertStringContainsString('Invalid JSON', $body['error']); + } + + /** + * Tests that pushing to an unknown machine name returns 404. + */ + public function testPushReturns404ForUnknownDataset(): void { + $response = $this->controller()->push( + $this->jsonRequest(['data' => 'value']), + 'this_dataset_does_not_exist' + ); + + $this->assertSame(404, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertArrayHasKey('error', $body); + } + + /** + * Tests that pushing to an unpublished dataset returns 422. + */ + public function testPushReturns422ForUnpublishedDataset(): void { + $machine_name = 'push_unpublished'; + $dataset = Dataset::create([ + 'name' => $machine_name, + 'machine_name' => $machine_name, + 'source' => 'json_endpoint', + 'pipeline' => 'test_json_endpoint_pipeline', + 'published' => FALSE, + ]); + $dataset->save(); + + $response = $this->controller()->push($this->jsonRequest(['data' => 'value']), $machine_name); + + $this->assertSame(422, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertArrayHasKey('error', $body); + } + + /** + * Tests that pushing to a non-json_endpoint source dataset returns 404. + * + * A dataset with a different source type must not be reachable via this + * endpoint even if the machine name matches. + */ + public function testPushReturns404ForDatasetWithDifferentSource(): void { + $name = mb_strtolower($this->randomMachineName()); + $dataset = Dataset::create([ + 'name' => $name, + 'machine_name' => $name, + 'source' => 'csv:text', + 'pipeline' => 'test_pipeline_1', + 'csv_text' => "a,b\n1,2", + ]); + $dataset->save(); + + $response = $this->controller()->push($this->jsonRequest(['data' => 'value']), $name); + + $this->assertSame(404, $response->getStatusCode()); + } + + /** + * Tests that a successful push overwrites any previously stored payload. + */ + public function testPushOverwritesPreviousPayload(): void { + $machine_name = 'push_overwrite'; + $this->createPublishedDataset($machine_name); + $file_path = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR . '/' . $machine_name . '.json'; + + $request1 = $this->jsonRequest([['v' => 'first']]); + $request1->query->set('save_only', '1'); + $this->controller()->push($request1, $machine_name); + $this->assertSame([['v' => 'first']], json_decode(file_get_contents($file_path), TRUE)); + + $request2 = $this->jsonRequest([['v' => 'second']]); + $request2->query->set('save_only', '1'); + $this->controller()->push($request2, $machine_name); + $this->assertSame([['v' => 'second']], json_decode(file_get_contents($file_path), TRUE)); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php new file mode 100644 index 000000000..6196e75c2 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php @@ -0,0 +1,204 @@ +privatePath = $this->siteDirectory . '/private'; + mkdir($this->privatePath, 0777, TRUE); + $this->setSetting('file_private_path', $this->privatePath); + $this->installEntitySchema('user'); + $this->installEntitySchema('file'); + $this->installEntitySchema('data_pipelines'); + $this->installSchema('file', ['file_usage']); + $this->setUpCurrentUser(); + BypassFinals::enable(FALSE); + } + + /** + * Returns the json_endpoint source plugin instance. + */ + private function getPlugin(): JsonEndpointSource { + $manager = \Drupal::service('plugin.manager.data_pipelines_source'); + assert($manager instanceof DatasetSourcePluginManager); + $plugin = $manager->createInstance('json_endpoint'); + assert($plugin instanceof JsonEndpointSource); + return $plugin; + } + + /** + * Creates an unsaved Dataset entity with the json_endpoint source. + */ + private function createDataset(string $machine_name, array $extra = []): Dataset { + $dataset = Dataset::create([ + 'name' => $machine_name, + 'machine_name' => $machine_name, + 'source' => 'json_endpoint', + 'pipeline' => 'test_json_endpoint_pipeline', + ] + $extra); + assert($dataset instanceof Dataset); + return $dataset; + } + + /** + * Writes a JSON payload to the private filesystem for a given machine name. + */ + private function writePayload(string $machine_name, mixed $data): void { + $storage_dir = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR; + if (!is_dir($storage_dir)) { + mkdir($storage_dir, 0777, TRUE); + } + file_put_contents($storage_dir . '/' . $machine_name . '.json', json_encode($data)); + } + + /** + * Tests that an array-of-objects payload yields one DatasetData per element. + */ + public function testExtractArrayOfObjectsPayload(): void { + $machine_name = 'test_array_objects'; + $this->writePayload($machine_name, [ + ['name' => 'Station A', 'suburb' => 'Carlton'], + ['name' => 'Station B', 'suburb' => 'Fitzroy'], + ]); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEquals([ + new DatasetData(['name' => 'Station A', 'suburb' => 'Carlton']), + new DatasetData(['name' => 'Station B', 'suburb' => 'Fitzroy']), + ], $result); + } + + /** + * Tests that a root JSON object yields a single DatasetData. + */ + public function testExtractObjectPayload(): void { + $machine_name = 'test_object'; + $this->writePayload($machine_name, ['key1' => 'value1', 'key2' => 'value2']); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEquals([ + new DatasetData(['key1' => 'value1', 'key2' => 'value2']), + ], $result); + } + + /** + * Tests that an array of scalar values wraps each value in a DatasetData. + */ + public function testExtractArrayOfScalarsPayload(): void { + $machine_name = 'test_scalars'; + $this->writePayload($machine_name, ['alpha', 'beta', 'gamma']); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEquals([ + new DatasetData(['alpha']), + new DatasetData(['beta']), + new DatasetData(['gamma']), + ], $result); + } + + /** + * Tests that the json_endpoint_path_to_data field applies a JSONPath filter. + */ + public function testExtractWithJsonPath(): void { + $machine_name = 'test_jsonpath'; + $this->writePayload($machine_name, [ + 'meta' => ['total' => 2], + 'records' => [ + ['id' => 1, 'title' => 'First'], + ['id' => 2, 'title' => 'Second'], + ], + ]); + + $dataset = $this->createDataset($machine_name, ['json_endpoint_path_to_data' => '$.records']); + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($dataset)); + + $this->assertEquals([ + new DatasetData(['id' => 1, 'title' => 'First']), + new DatasetData(['id' => 2, 'title' => 'Second']), + ], $result); + } + + /** + * Tests that extraction returns empty when no payload file exists. + */ + public function testExtractReturnsEmptyWhenNoFileExists(): void { + $result = iterator_to_array( + $this->getPlugin()->extractDataFromDataSet($this->createDataset('nonexistent_dataset')) + ); + + $this->assertEmpty($result); + } + + /** + * Tests that extraction returns empty when the stored file contains invalid JSON. + */ + public function testExtractReturnsEmptyOnInvalidJson(): void { + $machine_name = 'test_invalid_json'; + $storage_dir = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR; + mkdir($storage_dir, 0777, TRUE); + file_put_contents($storage_dir . '/' . $machine_name . '.json', '{not: valid json{{{'); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEmpty($result); + } + + /** + * Tests that a single-record payload is processed as one DatasetData. + */ + public function testExtractSingleRecordPayload(): void { + $machine_name = 'test_single'; + $this->writePayload($machine_name, [['station' => 'CBD', 'phone' => '9999-9999']]); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertCount(1, $result); + $this->assertEquals(new DatasetData(['station' => 'CBD', 'phone' => '9999-9999']), $result[0]); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Unit/Plugin/DatasetSource/JsonEndpointSourceTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Unit/Plugin/DatasetSource/JsonEndpointSourceTest.php new file mode 100644 index 000000000..e9d044dee --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Unit/Plugin/DatasetSource/JsonEndpointSourceTest.php @@ -0,0 +1,61 @@ +assertSame( + 'private://data_pipelines_json_endpoint/my_dataset.json', + JsonEndpointSource::buildStorageUri('my_dataset') + ); + } + + /** + * Tests buildStorageUri with various valid machine names. + * + * @dataProvider machineNameProvider + */ + public function testBuildStorageUriHandlesVariousMachineNames(string $name, string $expected): void { + $this->assertSame($expected, JsonEndpointSource::buildStorageUri($name)); + } + + public static function machineNameProvider(): array { + return [ + 'underscored name' => [ + 'station_locator', + 'private://data_pipelines_json_endpoint/station_locator.json', + ], + 'name with numbers' => [ + 'dataset_2024', + 'private://data_pipelines_json_endpoint/dataset_2024.json', + ], + 'single character' => [ + 'a', + 'private://data_pipelines_json_endpoint/a.json', + ], + ]; + } + + /** + * Tests that the storage scheme and directory constants have expected values. + */ + public function testStorageConstants(): void { + $this->assertSame('private', JsonEndpointSource::STORAGE_SCHEME); + $this->assertSame('data_pipelines_json_endpoint', JsonEndpointSource::STORAGE_DIR); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml new file mode 100644 index 000000000..93cdd8f57 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml @@ -0,0 +1,8 @@ +name: 'Tide Data Pipeline JSON Endpoint' +type: module +description: 'Provides a JSON endpoint source for data pipelines, allowing external systems to push JSON data via an authenticated POST request.' +core_version_requirement: ^10 +package: Tide +dependencies: + - drupal:data_pipelines + - drupal:key diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml new file mode 100644 index 000000000..0518b4090 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml @@ -0,0 +1,3 @@ +# No permissions — access to the push endpoint is controlled by the API key +# stored in the key.key.data_pipeline_push_api_key configuration entity and +# validated via the ApiKeyAccessCheck service. diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml new file mode 100644 index 000000000..be59c3d8c --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml @@ -0,0 +1,10 @@ +tide_data_pipeline_json_endpoint.push: + path: '/api/datasets/{machine_name}/push' + defaults: + _controller: '\Drupal\tide_data_pipeline_json_endpoint\Controller\DatasetPushController::push' + _title: 'Push Dataset Data' + methods: [POST] + requirements: + _data_pipeline_api_key: 'TRUE' + options: + no_cache: TRUE diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml new file mode 100644 index 000000000..b52d1f29b --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml @@ -0,0 +1,7 @@ +services: + tide_data_pipeline_json_endpoint.api_key_access_check: + class: Drupal\tide_data_pipeline_json_endpoint\Access\ApiKeyAccessCheck + arguments: + - '@key.repository' + tags: + - { name: access_check, applies_to: _data_pipeline_api_key } From d95757eb7fe679a25a947883993cab0f5cbe21b6 Mon Sep 17 00:00:00 2001 From: Anthony Malkoun Date: Thu, 4 Jun 2026 09:41:00 +1000 Subject: [PATCH 2/4] Add in a flatten transform for nested JSON --- docker-compose.yml | 2 - .../src/Plugin/DatasetTransform/Flatten.php | 59 +++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 modules/tide_search/src/Plugin/DatasetTransform/Flatten.php diff --git a/docker-compose.yml b/docker-compose.yml index 5ea841ded..0770e3d1a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,8 +3,6 @@ # - Using a single file to work in local, CI and production environments. # - Local overrides are possible using docker-composer.override.yml file. # - Use inline comments starting with ### to have the line removed in CI. -version: '2.3' - x-bay-image-version: &bay-image-version ${BAY_IMAGE_VERSION:-6.x} diff --git a/modules/tide_search/src/Plugin/DatasetTransform/Flatten.php b/modules/tide_search/src/Plugin/DatasetTransform/Flatten.php new file mode 100644 index 000000000..a7d4420c7 --- /dev/null +++ b/modules/tide_search/src/Plugin/DatasetTransform/Flatten.php @@ -0,0 +1,59 @@ + [], + 'separator' => '_', + 'remove_source' => TRUE, + ]; + } + + /** + * {@inheritdoc} + */ + protected function doTransformRecord(DatasetData $record): DatasetData { + $record = parent::doTransformRecord($record); + if (!$this->configuration['fields']) { + return $record; + } + $separator = $this->configuration['separator']; + $remove_source = $this->configuration['remove_source']; + foreach ($this->configuration['fields'] as $field_name) { + if (!$record->offsetExists($field_name)) { + continue; + } + $value = $record[$field_name]; + if (!is_array($value)) { + continue; + } + foreach ($value as $key => $nested_value) { + $record["{$field_name}{$separator}{$key}"] = $nested_value; + } + if ($remove_source) { + unset($record[$field_name]); + } + } + return $record; + } + +} From 5234e286650a9adf657b00b30d06a15c38640aba Mon Sep 17 00:00:00 2001 From: Anthony Malkoun Date: Thu, 4 Jun 2026 13:06:15 +1000 Subject: [PATCH 3/4] Change to OAuth instead of API key --- .../README.md | 226 ++++++++++++++++++ .../key.key.data_pipeline_push_api_key.yml | 14 -- .../user.role.data_pipeline_pusher.yml | 9 + .../src/Access/ApiKeyAccessCheck.php | 61 ----- .../src/Controller/DatasetPushController.php | 6 + .../DatasetSource/JsonEndpointSource.php | 4 +- .../src/Kernel/ApiKeyAccessCheckTest.php | 150 ------------ .../src/Kernel/DatasetPushControllerTest.php | 22 +- .../Kernel/JsonEndpointSourceKernelTest.php | 9 +- .../tide_data_pipeline_json_endpoint.info.yml | 2 +- .../tide_data_pipeline_json_endpoint.install | 38 +++ ...ata_pipeline_json_endpoint.permissions.yml | 6 +- ...de_data_pipeline_json_endpoint.routing.yml | 2 +- ...e_data_pipeline_json_endpoint.services.yml | 8 +- 14 files changed, 306 insertions(+), 251 deletions(-) create mode 100644 modules/tide_data_pipeline_json_endpoint/README.md delete mode 100644 modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml create mode 100644 modules/tide_data_pipeline_json_endpoint/config/install/user.role.data_pipeline_pusher.yml delete mode 100644 modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php delete mode 100644 modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php create mode 100644 modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.install diff --git a/modules/tide_data_pipeline_json_endpoint/README.md b/modules/tide_data_pipeline_json_endpoint/README.md new file mode 100644 index 000000000..7853867c0 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/README.md @@ -0,0 +1,226 @@ +# Tide Data Pipeline JSON Endpoint + +Provides a `json_endpoint` dataset source for the `data_pipelines` module. External systems push JSON payloads to a Drupal REST endpoint, which stores the data and optionally triggers immediate reprocessing of the dataset. + +## How it works + +1. An external system authenticates with Drupal using OAuth 2.0 (client credentials flow) to obtain a short-lived bearer token. +2. It POSTs a JSON payload to `/api/datasets/{machine_name}/push`. +3. Drupal saves the payload to the private filesystem and, by default, synchronously reprocesses the dataset. + +--- + +## Requirements + +- `data_pipelines` module +- `consumers` module +- `simple_oauth` module (provides the OAuth 2.0 token endpoint) +- `tide_oauth` module (provides the authentication provider that validates bearer tokens) +- A configured private filesystem (`$settings['file_private_path']` in `settings.php`) +- OAuth public/private keys generated (run `drush tide-oauth:keygen`) + +--- + +## Installation + +Enable the module: + +```bash +drush en tide_data_pipeline_json_endpoint +``` + +On install the module creates: + +- A **`data_pipeline_pusher` role** with the single permission `push data pipeline json endpoint`. +- A **`Data Pipeline Pusher` OAuth consumer** (`client_id: data_pipeline_pusher`) wired to that role. The consumer is created as confidential — it cannot issue tokens until a client secret is set (see [OAuth set up](#oauth-set-up) below). + +--- + +## OAuth set up + +### 1. Ensure OAuth keys exist + +If you have not already generated OAuth keys, run: + +```bash +drush tide-oauth:keygen +``` + +### 2. Set the consumer client secret + +The consumer is created without a secret so that no credential is ever stored in code. You must set one before the consumer can issue tokens. + +1. Go to **Admin > Configuration > Web services > Consumers** (`/admin/config/services/consumer`). +2. Open the **Data Pipeline Pusher** consumer. +3. Enter a strong random value in the **New Secret** field and save. + +Store the secret securely (e.g. in a secrets manager or CI/CD environment variable). Drupal stores only a bcrypt hash — the plaintext is never recoverable from the database. + +### 3. Verify the token endpoint + +Confirm that `/oauth/token` is accessible and returns a token: + +```bash +curl -s -X POST https://your-site.com/oauth/token \ + -d "grant_type=client_credentials" \ + -d "client_id=data_pipeline_pusher" \ + -d "client_secret=YOUR_CLIENT_SECRET" \ + | jq . +``` + +A successful response: + +```json +{ + "token_type": "Bearer", + "expires_in": 300, + "access_token": "eyJ0eXAiOiJKV1QiLCJhb..." +} +``` + +--- + +## Creating a dataset + +Datasets are content entities managed at **Admin > Content > Data Pipelines**. + +### Via the admin UI + +1. Go to `/admin/content/data-pipelines/add`. +2. Set **Source** to **JSON Endpoint**. +3. Enter a **Machine name** — this becomes the `{machine_name}` segment in the push URL. +4. Optionally set **Path to data** if your payload wraps the records in a nested key (see [Path to data](#path-to-data)). +5. Configure your pipeline and destination as normal. +6. **Publish** the dataset. Unpublished datasets reject push requests with `422`. + +### Via Drush / API + +```php +$dataset = \Drupal\data_pipelines\Entity\Dataset::create([ + 'name' => 'Suburbs', + 'machine_name' => 'suburbs', + 'source' => 'json_endpoint', + 'pipeline' => 'my_pipeline', + 'published' => TRUE, + 'destinations' => [$destination], +]); +$dataset->save(); +``` + +### Path to data + +If your payload nests records under a key rather than being a top-level array, use the **Path to data** field to provide a [JSONPath](https://github.com/SoftCreatR/JSONPath) expression. + +| Payload shape | Path to data | +|---|---| +| `[{"id":1}, ...]` (top-level array) | *(leave empty)* | +| `{"data": [{"id":1}, ...]}` | `$.data` | +| `{"results": {"items": [...]}}` | `$.results.items` | + +--- + +## Pushing data + +### Endpoint + +``` +POST /api/datasets/{machine_name}/push +``` + +| Header | Value | +|---|---| +| `Authorization` | `Bearer ` | +| `Content-Type` | `application/json` | + +### Modes + +| Query string | Behaviour | +|---|---| +| *(none)* | Save the payload **and** immediately reprocess the dataset synchronously. | +| `?save_only=1` | Save the payload only. Reprocessing is deferred to the next scheduled run or a manual trigger. | + +### Response codes + +| Code | Meaning | +|---|---| +| `200` | Success. Body is `{"status":"processed","machine_name":"..."}` or `{"status":"saved","machine_name":"..."}`. | +| `400` | Request body is not valid JSON. | +| `401` | Missing or invalid bearer token. | +| `403` | Token is valid but the associated user lacks the `push data pipeline json endpoint` permission. | +| `404` | No published `json_endpoint` dataset with that machine name exists. | +| `415` | `Content-Type` is not `application/json`. | +| `422` | The dataset exists but is not published. | + +--- + +## Example curl + +The following example pushes a list of suburb records to a dataset with the machine name `suburbs`. + +### Step 1 — obtain a token + +```bash +TOKEN=$(curl -s -X POST https://your-site.com/oauth/token \ + -d "grant_type=client_credentials" \ + -d "client_id=data_pipeline_pusher" \ + -d "client_secret=YOUR_CLIENT_SECRET" \ + | jq -r '.access_token') +``` + +### Step 2 — push data + +```bash +curl -s -X POST https://your-site.com/api/datasets/suburbs/push \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '[ + {"id": 1, "name": "Carlton", "postcode": "3053"}, + {"id": 2, "name": "Fitzroy", "postcode": "3065"}, + {"id": 3, "name": "Collingwood", "postcode": "3066"} + ]' \ + | jq . +``` + +Expected response: + +```json +{ + "status": "processed", + "machine_name": "suburbs" +} +``` + +### Save only (defer processing) + +```bash +curl -s -X POST "https://your-site.com/api/datasets/suburbs/push?save_only=1" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '[{"id": 1, "name": "Carlton", "postcode": "3053"}]' \ + | jq . +``` + +```json +{ + "status": "saved", + "machine_name": "suburbs" +} +``` + +--- + +## Payload storage + +Each push overwrites the previous payload. The file is stored at: + +``` +private://data_pipelines_json_endpoint/{machine_name}.json +``` + +This path is inside Drupal's private filesystem and is not publicly accessible. + +--- + +## Token expiry + +Access tokens are short-lived (default 5 minutes in `simple_oauth`). Clients should request a new token when the current one is near expiry rather than reusing a cached token indefinitely. diff --git a/modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml b/modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml deleted file mode 100644 index 3bc480042..000000000 --- a/modules/tide_data_pipeline_json_endpoint/config/install/key.key.data_pipeline_push_api_key.yml +++ /dev/null @@ -1,14 +0,0 @@ -langcode: en -status: true -dependencies: {} -id: data_pipeline_push_api_key -label: 'Data Pipeline Push API Key' -description: 'API key for authenticating POST requests to the JSON endpoint dataset source. Set the DATA_PIPELINE_PUSH_API_KEY environment variable to provide the value.' -key_type: authentication -key_type_settings: {} -key_provider: env -key_provider_settings: - env_variable: DATA_PIPELINE_PUSH_API_KEY - strip_line_breaks: true -key_input: none -key_input_settings: {} diff --git a/modules/tide_data_pipeline_json_endpoint/config/install/user.role.data_pipeline_pusher.yml b/modules/tide_data_pipeline_json_endpoint/config/install/user.role.data_pipeline_pusher.yml new file mode 100644 index 000000000..482cba870 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/config/install/user.role.data_pipeline_pusher.yml @@ -0,0 +1,9 @@ +langcode: en +status: true +dependencies: {} +id: data_pipeline_pusher +label: 'Data Pipeline Pusher' +weight: 100 +is_admin: null +permissions: + - 'push data pipeline json endpoint' diff --git a/modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php b/modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php deleted file mode 100644 index d26f37e1c..000000000 --- a/modules/tide_data_pipeline_json_endpoint/src/Access/ApiKeyAccessCheck.php +++ /dev/null @@ -1,61 +0,0 @@ -hasRequirement('_data_pipeline_api_key'); - } - - /** - * Allows the request if the X-Api-Key header matches the configured key. - */ - public function access(Route $route, Request $request): AccessResultInterface { - $provided = $request->headers->get(static::HEADER_NAME, ''); - if (empty($provided)) { - return AccessResult::forbidden('Missing ' . static::HEADER_NAME . ' header.')->setCacheMaxAge(0); - } - - $key = $this->keyRepository->getKey(static::KEY_ID); - $expected = $key ? $key->getKeyValue() : NULL; - - if (empty($expected)) { - return AccessResult::forbidden('API key not configured.')->setCacheMaxAge(0); - } - - // Use hash_equals to prevent timing attacks. - if (!hash_equals($expected, $provided)) { - return AccessResult::forbidden('Invalid API key.')->setCacheMaxAge(0); - } - - return AccessResult::allowed()->setCacheMaxAge(0); - } - -} diff --git a/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php b/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php index bd32b99ab..f9d9cce08 100644 --- a/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php +++ b/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php @@ -25,10 +25,16 @@ class DatasetPushController extends ControllerBase { public function __construct(private readonly FileSystemInterface $fileSystem) {} + /** + * {@inheritdoc} + */ public static function create(ContainerInterface $container): static { return new static($container->get('file_system')); } + /** + * Handles an authenticated JSON payload push for a dataset. + */ public function push(Request $request, string $machine_name): JsonResponse { if (!str_contains($request->headers->get('Content-Type', ''), 'application/json')) { return new JsonResponse(['error' => 'Content-Type must be application/json.'], 415); diff --git a/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php b/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php index 95d02beac..8534e2746 100644 --- a/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php +++ b/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php @@ -37,7 +37,6 @@ class JsonEndpointSource extends PluginBase implements DatasetSourceInterface, C // DatasetSourceBase.__construct is final, so this plugin extends PluginBase // directly and implements DatasetSourceInterface without a source resource. - const STORAGE_SCHEME = 'private'; const STORAGE_DIR = 'data_pipelines_json_endpoint'; @@ -51,6 +50,9 @@ public function __construct( parent::__construct($configuration, $plugin_id, $plugin_definition); } + /** + * {@inheritdoc} + */ public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition): static { return new static( $configuration, diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php deleted file mode 100644 index 7b133e59e..000000000 --- a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/ApiKeyAccessCheckTest.php +++ /dev/null @@ -1,150 +0,0 @@ -installEntitySchema('user'); - $this->installConfig(['tide_data_pipeline_json_endpoint']); - $this->route = new Route('/api/datasets/{machine_name}/push', requirements: ['_data_pipeline_api_key' => 'TRUE']); - BypassFinals::enable(FALSE); - } - - /** - * Returns the access check instance from the container. - */ - private function check(): ApiKeyAccessCheck { - return $this->container->get('tide_data_pipeline_json_endpoint.api_key_access_check'); - } - - /** - * Returns a POST request optionally carrying an API key header. - */ - private function request(?string $api_key = NULL): Request { - $request = Request::create('/api/datasets/test/push', 'POST'); - if ($api_key !== NULL) { - $request->headers->set('X-Api-Key', $api_key); - } - return $request; - } - - /** - * Tests that applies() returns true for routes with _data_pipeline_api_key. - */ - public function testAppliesReturnsTrueForTaggedRoute(): void { - $this->assertTrue($this->check()->applies($this->route)); - } - - /** - * Tests that applies() returns false for routes without _data_pipeline_api_key. - */ - public function testAppliesReturnsFalseForUntaggedRoute(): void { - $this->assertFalse($this->check()->applies(new Route('/some/other/route'))); - } - - /** - * Tests that the correct key grants access. - */ - public function testAccessAllowedWithCorrectKey(): void { - putenv('DATA_PIPELINE_PUSH_API_KEY=super-secret-test-key'); - - $result = $this->check()->access($this->route, $this->request('super-secret-test-key')); - - $this->assertTrue($result->isAllowed()); - - putenv('DATA_PIPELINE_PUSH_API_KEY='); - } - - /** - * Tests that a missing X-Api-Key header is forbidden. - */ - public function testAccessForbiddenWhenHeaderMissing(): void { - putenv('DATA_PIPELINE_PUSH_API_KEY=super-secret-test-key'); - - $result = $this->check()->access($this->route, $this->request()); - - $this->assertTrue($result->isForbidden()); - - putenv('DATA_PIPELINE_PUSH_API_KEY='); - } - - /** - * Tests that an incorrect API key is forbidden. - */ - public function testAccessForbiddenWithWrongKey(): void { - putenv('DATA_PIPELINE_PUSH_API_KEY=super-secret-test-key'); - - $result = $this->check()->access($this->route, $this->request('wrong-key')); - - $this->assertTrue($result->isForbidden()); - - putenv('DATA_PIPELINE_PUSH_API_KEY='); - } - - /** - * Tests that access is forbidden when the env variable is not set. - */ - public function testAccessForbiddenWhenEnvVariableNotSet(): void { - putenv('DATA_PIPELINE_PUSH_API_KEY='); - - $result = $this->check()->access($this->route, $this->request('any-key')); - - $this->assertTrue($result->isForbidden()); - } - - /** - * Tests that the result is never cached (access depends on the request header). - */ - public function testAccessResultIsNeverCached(): void { - putenv('DATA_PIPELINE_PUSH_API_KEY=my-key'); - - $allowed = $this->check()->access($this->route, $this->request('my-key')); - $forbidden = $this->check()->access($this->route, $this->request('bad-key')); - - $this->assertSame(0, $allowed->getCacheMaxAge()); - $this->assertSame(0, $forbidden->getCacheMaxAge()); - - putenv('DATA_PIPELINE_PUSH_API_KEY='); - } - -} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php index 0ea5d7fe9..4a6723d11 100644 --- a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php @@ -5,10 +5,11 @@ namespace Drupal\Tests\tide_data_pipeline_json_endpoint\Kernel; use DG\BypassFinals; -use Drupal\KernelTests\KernelTestBase; -use Drupal\Tests\user\Traits\UserCreationTrait; use Drupal\data_pipelines\Entity\Dataset; use Drupal\data_pipelines\Entity\Destination; +use Drupal\data_pipelines\Form\DatasetBatchOperations; +use Drupal\KernelTests\KernelTestBase; +use Drupal\Tests\user\Traits\UserCreationTrait; use Drupal\tide_data_pipeline_json_endpoint\Controller\DatasetPushController; use Drupal\tide_data_pipeline_json_endpoint\Plugin\DatasetSource\JsonEndpointSource; use Symfony\Component\HttpFoundation\Request; @@ -17,8 +18,8 @@ * Tests the DatasetPushController business logic. * * These tests call the controller method directly, bypassing the routing layer. - * OAuth bearer token validation and the _permission access check are enforced - * by the routing system and should be covered by functional (browser) tests. + * OAuth bearer token authentication and the _permission access check are + * enforced by the routing system and should be covered by functional tests. * * @group tide_data_pipeline_json_endpoint * @@ -32,7 +33,6 @@ class DatasetPushControllerTest extends KernelTestBase { * {@inheritdoc} */ protected static $modules = [ - 'key', 'options', 'link', 'file', @@ -45,6 +45,9 @@ class DatasetPushControllerTest extends KernelTestBase { 'system', ]; + /** + * Absolute path to the private filesystem used during tests. + */ protected string $privatePath; /** @@ -59,7 +62,6 @@ protected function setUp(): void { $this->installEntitySchema('file'); $this->installEntitySchema('data_pipelines'); $this->installSchema('file', ['file_usage']); - $this->installConfig(['tide_data_pipeline_json_endpoint']); $this->setUpCurrentUser(); BypassFinals::enable(FALSE); } @@ -84,7 +86,7 @@ private function jsonRequest(mixed $body): Request { } /** - * Creates and saves a published json_endpoint dataset with a state destination. + * Creates and saves a published json_endpoint dataset with a state dest. */ private function createPublishedDataset(string $machine_name): Dataset { $destination = Destination::create([ @@ -150,7 +152,7 @@ public function testSaveOnlyReturnsSavedStatusWithoutProcessing(): void { } /** - * Tests that a save_only push followed by manual reprocessing works end-to-end. + * Tests that a save_only push followed by manual reprocessing works. */ public function testSaveOnlyThenManualReprocessProducesExpectedData(): void { $machine_name = 'push_then_reprocess'; @@ -166,12 +168,12 @@ public function testSaveOnlyThenManualReprocessProducesExpectedData(): void { $dataset_id = (int) $dataset->id(); $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; do { - \Drupal\data_pipelines\Form\DatasetBatchOperations::operationQueueItem($dataset_id, $context); + DatasetBatchOperations::operationQueueItem($dataset_id, $context); } while ($context['finished'] !== 1); $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; do { - \Drupal\data_pipelines\Form\DatasetBatchOperations::operationProcess($dataset_id, $context); + DatasetBatchOperations::operationProcess($dataset_id, $context); } while ($context['finished'] !== 1); $stored = \Drupal::state()->get('test_push_result'); diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php index 6196e75c2..808783762 100644 --- a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php @@ -5,11 +5,11 @@ namespace Drupal\Tests\tide_data_pipeline_json_endpoint\Kernel; use DG\BypassFinals; -use Drupal\KernelTests\KernelTestBase; -use Drupal\Tests\user\Traits\UserCreationTrait; use Drupal\data_pipelines\DatasetData; use Drupal\data_pipelines\Entity\Dataset; use Drupal\data_pipelines\Source\DatasetSourcePluginManager; +use Drupal\KernelTests\KernelTestBase; +use Drupal\Tests\user\Traits\UserCreationTrait; use Drupal\tide_data_pipeline_json_endpoint\Plugin\DatasetSource\JsonEndpointSource; /** @@ -39,6 +39,9 @@ class JsonEndpointSourceKernelTest extends KernelTestBase { 'system', ]; + /** + * Absolute path to the private filesystem used during tests. + */ protected string $privatePath; /** @@ -175,7 +178,7 @@ public function testExtractReturnsEmptyWhenNoFileExists(): void { } /** - * Tests that extraction returns empty when the stored file contains invalid JSON. + * Tests that extraction returns empty when the stored file has invalid JSON. */ public function testExtractReturnsEmptyOnInvalidJson(): void { $machine_name = 'test_invalid_json'; diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml index 93cdd8f57..e09d69a80 100644 --- a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml @@ -5,4 +5,4 @@ core_version_requirement: ^10 package: Tide dependencies: - drupal:data_pipelines - - drupal:key + - consumers:consumers diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.install b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.install new file mode 100644 index 000000000..6d08a5843 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.install @@ -0,0 +1,38 @@ + 'Data Pipeline Pusher', + 'client_id' => 'data_pipeline_pusher', + 'confidential' => 1, + 'is_default' => FALSE, + 'third_party' => 0, + 'status' => 1, + 'roles' => [['target_id' => 'data_pipeline_pusher']], + ])->save(); +} + +/** + * Implements hook_uninstall(). + */ +function tide_data_pipeline_json_endpoint_uninstall(): void { + $consumers = \Drupal::entityTypeManager() + ->getStorage('consumer') + ->loadByProperties(['client_id' => 'data_pipeline_pusher']); + foreach ($consumers as $consumer) { + $consumer->delete(); + } +} diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml index 0518b4090..fbd053818 100644 --- a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml @@ -1,3 +1,3 @@ -# No permissions — access to the push endpoint is controlled by the API key -# stored in the key.key.data_pipeline_push_api_key configuration entity and -# validated via the ApiKeyAccessCheck service. +push data pipeline json endpoint: + title: 'Push data pipeline JSON endpoint' + description: 'Allows pushing data to the JSON endpoint dataset source via the authenticated POST endpoint.' diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml index be59c3d8c..06cd4f2ad 100644 --- a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml @@ -5,6 +5,6 @@ tide_data_pipeline_json_endpoint.push: _title: 'Push Dataset Data' methods: [POST] requirements: - _data_pipeline_api_key: 'TRUE' + _permission: 'push data pipeline json endpoint' options: no_cache: TRUE diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml index b52d1f29b..ad189ddbe 100644 --- a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml @@ -1,7 +1 @@ -services: - tide_data_pipeline_json_endpoint.api_key_access_check: - class: Drupal\tide_data_pipeline_json_endpoint\Access\ApiKeyAccessCheck - arguments: - - '@key.repository' - tags: - - { name: access_check, applies_to: _data_pipeline_api_key } +services: {} From a54659f2c9a7eb1907b3cca89307144901217ed1 Mon Sep 17 00:00:00 2001 From: Vincent Gao Date: Wed, 17 Jun 2026 15:25:43 +1000 Subject: [PATCH 4/4] Allow datasets to define a custom document `_id` Allow datasets to define a custom document `_id` instead of the positional delta --- composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 949154d4e..cdd88b0b3 100644 --- a/composer.json +++ b/composer.json @@ -413,7 +413,8 @@ }, "drupal/data_pipelines_elasticsearch-data_pipelines_elasticsearch": { "Complete the fix for bulk indexing on Elasticsearch endpoint - https://www.drupal.org/project/data_pipelines_elasticsearch/issues/3540879": "https://www.drupal.org/files/issues/2025-08-12/3511558-Change-bulk-deleting-on-Elasticsearch-endpoint.patch", - "False positive mappings defined at top level error triggered unconditionally in ElasticSearchDatasetPipeline constructor - https://www.drupal.org/project/data_pipelines_elasticsearch/issues/3541059#comment-16226097": "https://www.drupal.org/files/issues/2025-08-13/3541059-fix-elasticsearch-mapping-warning-in-constructor.patch" + "False positive mappings defined at top level error triggered unconditionally in ElasticSearchDatasetPipeline constructor - https://www.drupal.org/project/data_pipelines_elasticsearch/issues/3541059#comment-16226097": "https://www.drupal.org/files/issues/2025-08-13/3541059-fix-elasticsearch-mapping-warning-in-constructor.patch", + "Allow datasets to define a custom document `_id` instead of the positional delta": "https://www.drupal.org/files/issues/2026-06-17/allow_external_id_datasets_3601849_2.patch" }, "drupal/paragraphs": { "Add default paragraph count setting - https://www.drupal.org/project/paragraphs/issues/3089423#comment-14517270": "https://www.drupal.org/files/issues/2022-05-17/paragraphs-default-quantity-3089423-18.patch",