Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions perfkitbenchmarker/ai_agent_benchmark_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def Prepare(self) -> None:
self.agent_service.agent_config = self._GetDefaultAgentConfig()

self.BeforeCreateAgent()
self.spec.always_call_cleanup = True
self.agent_service.Create()
self.UploadValidatorScript()
self.PostPrepare()
Expand Down
15 changes: 14 additions & 1 deletion perfkitbenchmarker/data/agents/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc
import collections.abc
import json
from typing import Any
from typing import Any, Self
import urllib.parse

from absl import logging
Expand Down Expand Up @@ -38,6 +38,19 @@ class PromptConfig[AgentConfigT](BasePromptConfig):
run_uri: str
agent_config: AgentConfigT

@classmethod
def create_for_initial_prompt(cls, deployment_config: Any) -> Self:
return cls(
agent=deployment_config.agent,
framework=deployment_config.framework,
prompt=deployment_config.initial_prompt,
output_dir="",
session_id="timetoreadysession",
user_id="timetoreadyuser",
run_uri=deployment_config.run_uri,
agent_config=deployment_config.agent_config,
)


class BaseEndpoint(abc.ABC):
"""An abstract interface for executing agents across different environments.
Expand Down
41 changes: 41 additions & 0 deletions perfkitbenchmarker/data/agents/deploy_agent_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
"""

import argparse
import asyncio
import importlib
import os
import time
from typing import Any

import common_utils
import pydantic
import vertexai
import yaml
Expand All @@ -33,6 +36,7 @@ class DeploymentConfig[AgentConfigT](BaseDeploymentConfig):
staging_bucket: str
run_uri: str
agent_config: AgentConfigT
initial_prompt: str | None = None


def _import_agent_module(agent: str, framework: str) -> Any:
Expand All @@ -41,6 +45,31 @@ def _import_agent_module(agent: str, framework: str) -> Any:
return importlib.import_module(module_name)


async def _measure_time_to_ready[AgentConfigT](
remote_agent: Any,
handler: Any,
config: DeploymentConfig[AgentConfigT],
) -> float | None:
"""Measures the time to first chunk."""
print("Sending initial prompt...")
endpoint = handler.create_endpoint(remote_agent)
prompt_config = common_utils.PromptConfig[
AgentConfigT
].create_for_initial_prompt(config)
first_chunk_time = None
try:
async for _ in endpoint.stream_execute(prompt_config=prompt_config):
if first_chunk_time is None:
first_chunk_time = time.monotonic()
return first_chunk_time
except Exception as e: # pylint: disable=broad-exception-caught
# Since the agent has already been created, it's better to let this script
# finish normally, so the resource is marked as created and can be cleaned
# up normally.
print(f"Error measuring time to ready: {e}")
return None


def run_deployment[AgentConfigT](
config: DeploymentConfig[AgentConfigT], module: Any
) -> None:
Expand Down Expand Up @@ -75,13 +104,25 @@ def run_deployment[AgentConfigT](
"display_name": display_name,
}

create_start = time.monotonic()
remote_agent = client.agent_engines.create(
agent=agent_to_deploy,
config=deploy_config,
)
create_time = time.monotonic() - create_start
print(f"Time to Create: {create_time}")

print("Successfully deployed Agent Engine!")
print(f"Resource name: {remote_agent.api_resource.name}")

if config.initial_prompt:
first_chunk_time = asyncio.run(
_measure_time_to_ready(remote_agent, handler, config)
)
if first_chunk_time is not None:
ready_time = first_chunk_time - create_start
print(f"Time to Ready: {ready_time}")


def main() -> None:
parser = argparse.ArgumentParser(
Expand Down
60 changes: 58 additions & 2 deletions perfkitbenchmarker/providers/gcp/gcp_ai_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from perfkitbenchmarker import context
from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.providers.gcp import gce_virtual_machine
from perfkitbenchmarker.providers.gcp import gcs
Expand Down Expand Up @@ -363,6 +364,8 @@ def __init__(self, client_vm, ai_agent_spec):
self._remote_agent_name = None
self._staging_bucket = self.base_dir
self.spec = ai_agent_spec
self._time_to_create: float | None = None
self._time_to_ready: float | None = None

@override
def _StageAgentCode(self):
Expand Down Expand Up @@ -419,6 +422,9 @@ def _GetDeploymentConfig(self) -> dict[str, Any]:
'staging_bucket': self._staging_bucket,
'agent_config': self.agent_config,
})
initial_prompt = self._GetInitialPromptText()
if initial_prompt:
config['initial_prompt'] = initial_prompt
return config

def _Create(self):
Expand Down Expand Up @@ -446,12 +452,17 @@ def _Create(self):
command = ' && '.join(command_parts)
stdout, _ = self.client_vm.RemoteCommand(command)

# 5. Parse output to get remote agent name
# 5. Parse output to get remote agent name and latencies
for line in stdout.split('\n'):
if line.startswith('Resource name: '):
_, _, agent_name = line.partition('Resource name: ')
self._remote_agent_name = agent_name.strip()
break
elif line.startswith('Time to Create: '):
_, _, time_str = line.partition('Time to Create: ')
self._time_to_create = float(time_str.strip())
elif line.startswith('Time to Ready: '):
_, _, time_str = line.partition('Time to Ready: ')
self._time_to_ready = float(time_str.strip())

if not self._remote_agent_name:
raise errors.Benchmarks.PrepareException(
Expand All @@ -464,6 +475,16 @@ def _Create(self):
self._remote_agent_name,
)

def _PostCreate(self):
if (
ai_agent_service.AI_AGENT_INITIAL_PROMPT_URL.value
and not self._time_to_ready
):
raise errors.Benchmarks.PrepareException(
'Initial prompt was explictly passed, but failed to get remote ready'
' time from deploy script output.'
)

def _Delete(self):
"""Deletes the remote agent."""
if not self._remote_agent_name:
Expand Down Expand Up @@ -600,3 +621,38 @@ def Execute(
'Agent execution finished. Raw output:\n%s',
stdout,
)

@override
def GetSamples(self):
samples = super().GetSamples()
create_time_sample = [s for s in samples if s.metric == 'Time to Create'][0]
resource_type = create_time_sample.metadata.get('resource_type')
resource_class = create_time_sample.metadata.get('resource_class')
metadata = {
'resource_type': resource_type,
'resource_class': resource_class,
}

# Remove existing samples for 'Time to Create' and 'Time to Ready' if any
samples = [
s
for s in samples
if s.metric not in ('Time to Create', 'Time to Ready')
]

if self._time_to_create is not None:
samples.append(
sample.Sample(
'Time to Create',
self._time_to_create,
'seconds',
metadata=metadata,
)
)
if self._time_to_ready is not None:
samples.append(
sample.Sample(
'Time to Ready', self._time_to_ready, 'seconds', metadata=metadata
)
)
return samples
17 changes: 17 additions & 0 deletions perfkitbenchmarker/resources/ai_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

FLAGS = flags.FLAGS

AI_AGENT_INITIAL_PROMPT_URL = flags.DEFINE_string(
'ai_agent_initial_prompt_url',
None,
'Object storage URL (e.g. gs://bucket/prompt.txt) to an initial prompt.',
)


def GetAiAgentServiceClass(cloud: str, deployment_type: str):
"""Returns the correct AI agent service class based on cloud/type."""
Expand Down Expand Up @@ -66,6 +72,17 @@ def _GetDeploymentConfig(self) -> dict[str, Any]:
"""Gets config dict for deployment/creation."""
return {'run_uri': FLAGS.run_uri}

def _GetInitialPromptText(self) -> str | None:
"""Fetches the initial prompt text from object storage."""
url = AI_AGENT_INITIAL_PROMPT_URL.value
if not url:
return None

local_path = vm_util.PrependTempDir('initial_prompt.txt')
self.storage_service.Copy(url, local_path)
with open(local_path, 'r') as f:
return f.read().strip()

def _GetRunConfig(
self,
output_dir: str,
Expand Down
Loading