|
| 1 | +import asyncio |
| 2 | + |
| 3 | +import pytest |
| 4 | + |
| 5 | +from deepset_mcp.api.client import AsyncDeepsetClient |
| 6 | +from deepset_mcp.api.pipeline.models import DeepsetPipeline, PipelineLogList |
| 7 | +from deepset_mcp.api.pipeline.resource import PipelineResource |
| 8 | + |
| 9 | +pytestmark = pytest.mark.integration |
| 10 | + |
| 11 | + |
| 12 | +@pytest.fixture |
| 13 | +async def pipeline_resource( |
| 14 | + client: AsyncDeepsetClient, |
| 15 | + test_workspace: str, |
| 16 | +) -> PipelineResource: |
| 17 | + """Create a PipelineResource instance for testing.""" |
| 18 | + return PipelineResource(client=client, workspace=test_workspace) |
| 19 | + |
| 20 | + |
| 21 | +@pytest.fixture |
| 22 | +def simple_yaml_config() -> str: |
| 23 | + """Return a simple YAML configuration that should deploy quickly.""" |
| 24 | + return """ |
| 25 | +components: |
| 26 | + openai_generator: |
| 27 | + type: haystack.components.generators.openai.OpenAIGenerator |
| 28 | + init_parameters: |
| 29 | + api_key: {"type": "env_var", "env_vars": ["OPENAI_API_KEY"], "strict": false} |
| 30 | + model: "gpt-4o-mini" |
| 31 | + generation_kwargs: |
| 32 | + temperature: 0.1 |
| 33 | + max_tokens: 300 |
| 34 | +
|
| 35 | + prompt_builder: |
| 36 | + type: haystack.components.builders.prompt_builder.PromptBuilder |
| 37 | + init_parameters: |
| 38 | + template: | |
| 39 | + Answer the following question: {{question}} |
| 40 | +
|
| 41 | + Answer: |
| 42 | +
|
| 43 | + answer_builder: |
| 44 | + type: haystack.components.builders.answer_builder.AnswerBuilder |
| 45 | + init_parameters: {} |
| 46 | +
|
| 47 | +connections: |
| 48 | + - sender: prompt_builder.prompt |
| 49 | + receiver: openai_generator.prompt |
| 50 | + - sender: openai_generator.replies |
| 51 | + receiver: answer_builder.replies |
| 52 | +
|
| 53 | +inputs: |
| 54 | + query: |
| 55 | + - "prompt_builder.question" |
| 56 | + - "answer_builder.query" |
| 57 | +
|
| 58 | +outputs: |
| 59 | + answers: "answer_builder.answers" |
| 60 | +""" |
| 61 | + |
| 62 | + |
| 63 | +async def wait_for_pipeline_deployment( |
| 64 | + pipeline_resource: PipelineResource, |
| 65 | + pipeline_name: str, |
| 66 | + timeout_seconds: int = 300, |
| 67 | + poll_interval: int = 10, |
| 68 | +) -> DeepsetPipeline: |
| 69 | + """ |
| 70 | + Wait for a pipeline to reach DEPLOYED status by polling its status. |
| 71 | +
|
| 72 | + Args: |
| 73 | + pipeline_resource: The pipeline resource to use for API calls |
| 74 | + pipeline_name: Name of the pipeline to monitor |
| 75 | + timeout_seconds: Maximum time to wait for deployment (default: 5 minutes) |
| 76 | + poll_interval: Time between status checks in seconds (default: 10 seconds) |
| 77 | +
|
| 78 | + Returns: |
| 79 | + The deployed pipeline object |
| 80 | +
|
| 81 | + Raises: |
| 82 | + TimeoutError: If the pipeline doesn't deploy within the timeout period |
| 83 | + Exception: If the pipeline deployment fails |
| 84 | + """ |
| 85 | + start_time = asyncio.get_event_loop().time() |
| 86 | + |
| 87 | + while True: |
| 88 | + current_time = asyncio.get_event_loop().time() |
| 89 | + if current_time - start_time > timeout_seconds: |
| 90 | + raise TimeoutError( |
| 91 | + f"Pipeline '{pipeline_name}' did not reach DEPLOYED status within {timeout_seconds} seconds" |
| 92 | + ) |
| 93 | + |
| 94 | + # Get the current pipeline status |
| 95 | + pipeline = await pipeline_resource.get(pipeline_name=pipeline_name, include_yaml=False) |
| 96 | + |
| 97 | + if pipeline.status == "DEPLOYED": |
| 98 | + return pipeline |
| 99 | + elif pipeline.status == "FAILED": |
| 100 | + raise Exception(f"Pipeline '{pipeline_name}' deployment failed") |
| 101 | + |
| 102 | + # Wait before next poll |
| 103 | + await asyncio.sleep(poll_interval) |
| 104 | + |
| 105 | + |
| 106 | +@pytest.mark.extra_slow |
| 107 | +@pytest.mark.asyncio |
| 108 | +async def test_get_logs_for_deployed_pipeline( |
| 109 | + pipeline_resource: PipelineResource, |
| 110 | + simple_yaml_config: str, |
| 111 | +) -> None: |
| 112 | + """ |
| 113 | + Test getting logs for a deployed pipeline. |
| 114 | +
|
| 115 | + This test: |
| 116 | + 1. Creates a valid pipeline |
| 117 | + 2. Deploys the pipeline |
| 118 | + 3. Waits for the pipeline to reach DEPLOYED status |
| 119 | + 4. Fetches logs for the deployed pipeline |
| 120 | + """ |
| 121 | + pipeline_name = "test-logs-pipeline" |
| 122 | + |
| 123 | + # Step 1: Create a pipeline |
| 124 | + await pipeline_resource.create(name=pipeline_name, yaml_config=simple_yaml_config) |
| 125 | + |
| 126 | + # Step 2: Deploy the pipeline |
| 127 | + deploy_result = await pipeline_resource.deploy(pipeline_name=pipeline_name) |
| 128 | + assert deploy_result.valid is True, f"Pipeline deployment failed: {deploy_result.errors}" |
| 129 | + |
| 130 | + # Step 3: Wait for the pipeline to be deployed |
| 131 | + deployed_pipeline = await wait_for_pipeline_deployment( |
| 132 | + pipeline_resource=pipeline_resource, |
| 133 | + pipeline_name=pipeline_name, |
| 134 | + timeout_seconds=300, # 5 minutes timeout |
| 135 | + poll_interval=15, # Check every 15 seconds |
| 136 | + ) |
| 137 | + |
| 138 | + assert deployed_pipeline.status == "DEPLOYED" |
| 139 | + |
| 140 | + # Step 4: Get logs for the deployed pipeline |
| 141 | + logs = await pipeline_resource.get_logs(pipeline_name=pipeline_name) |
| 142 | + |
| 143 | + # Verify the response structure |
| 144 | + assert isinstance(logs, PipelineLogList) |
| 145 | + assert isinstance(logs.data, list) |
| 146 | + assert isinstance(logs.has_more, bool) |
| 147 | + assert isinstance(logs.total, int) |
| 148 | + |
| 149 | + # The pipeline should have at least some logs after deployment |
| 150 | + # Note: We can't guarantee specific log content, but we can verify the structure |
| 151 | + for log_entry in logs.data: |
| 152 | + assert hasattr(log_entry, "log_id") |
| 153 | + assert hasattr(log_entry, "message") |
| 154 | + assert hasattr(log_entry, "logged_at") |
| 155 | + assert hasattr(log_entry, "level") |
| 156 | + assert hasattr(log_entry, "origin") |
| 157 | + |
| 158 | + |
| 159 | +@pytest.mark.asyncio |
| 160 | +async def test_get_logs_for_non_deployed_pipeline( |
| 161 | + pipeline_resource: PipelineResource, |
| 162 | + simple_yaml_config: str, |
| 163 | +) -> None: |
| 164 | + """ |
| 165 | + Test getting logs for a pipeline that is created but not deployed. |
| 166 | + This should still work but may return empty or minimal logs. |
| 167 | + """ |
| 168 | + pipeline_name = "test-logs-non-deployed-pipeline" |
| 169 | + |
| 170 | + # Create pipeline but do not deploy it |
| 171 | + await pipeline_resource.create(name=pipeline_name, yaml_config=simple_yaml_config) |
| 172 | + |
| 173 | + # Try to get logs for the non-deployed pipeline |
| 174 | + logs = await pipeline_resource.get_logs(pipeline_name=pipeline_name) |
| 175 | + |
| 176 | + # Should return a valid response structure even if empty |
| 177 | + assert isinstance(logs, PipelineLogList) |
| 178 | + assert isinstance(logs.data, list) |
| 179 | + assert isinstance(logs.has_more, bool) |
| 180 | + assert isinstance(logs.total, int) |
| 181 | + |
| 182 | + |
| 183 | +@pytest.mark.extra_slow |
| 184 | +@pytest.mark.asyncio |
| 185 | +async def test_deployment_timeout_handling( |
| 186 | + pipeline_resource: PipelineResource, |
| 187 | + simple_yaml_config: str, |
| 188 | +) -> None: |
| 189 | + """ |
| 190 | + Test that our deployment polling handles timeouts correctly. |
| 191 | + This test uses a very short timeout to verify the timeout mechanism works. |
| 192 | + """ |
| 193 | + pipeline_name = "test-timeout-pipeline" |
| 194 | + |
| 195 | + # Create and deploy pipeline |
| 196 | + await pipeline_resource.create(name=pipeline_name, yaml_config=simple_yaml_config) |
| 197 | + deploy_result = await pipeline_resource.deploy(pipeline_name=pipeline_name) |
| 198 | + assert deploy_result.valid is True |
| 199 | + |
| 200 | + # Test timeout with very short timeout (this should timeout unless deployment is very fast) |
| 201 | + with pytest.raises(TimeoutError, match="did not reach DEPLOYED status within 1 seconds"): |
| 202 | + await wait_for_pipeline_deployment( |
| 203 | + pipeline_resource=pipeline_resource, |
| 204 | + pipeline_name=pipeline_name, |
| 205 | + timeout_seconds=1, # Very short timeout |
| 206 | + poll_interval=1, |
| 207 | + ) |
0 commit comments