|
| 1 | +"""Pass 2: Extract structured Workflow objects from episode transcripts. |
| 2 | +
|
| 3 | +Takes an EpisodeTranscript (Pass 1 output) and uses a VLM to identify |
| 4 | +high-level workflow steps, merging related transcript entries into |
| 5 | +WorkflowStep objects and returning a complete Workflow. |
| 6 | +
|
| 7 | +See docs/design/workflow_extraction_pipeline.md Section 4.4.2 for the |
| 8 | +design specification. |
| 9 | +""" |
| 10 | + |
| 11 | +from __future__ import annotations |
| 12 | + |
| 13 | +import logging |
| 14 | +from typing import Any |
| 15 | + |
| 16 | +from openadapt_evals.workflow.models import ( |
| 17 | + ActionType, |
| 18 | + EpisodeTranscript, |
| 19 | + RecordingSource, |
| 20 | + Workflow, |
| 21 | + WorkflowStep, |
| 22 | +) |
| 23 | + |
| 24 | +logger = logging.getLogger(__name__) |
| 25 | + |
| 26 | +_EXTRACTION_PROMPT = """\ |
| 27 | +You are a workflow extraction expert. Given a transcript of a desktop \ |
| 28 | +recording session, extract the high-level workflow steps. |
| 29 | +
|
| 30 | +Task description: {task_description} |
| 31 | +Episode summary: {episode_summary} |
| 32 | +Primary goal: {primary_goal} |
| 33 | +Applications used: {apps_used} |
| 34 | +
|
| 35 | +The transcript contains {entry_count} raw actions. Your job is to merge \ |
| 36 | +related actions into meaningful workflow steps. For example, "click cell A1" \ |
| 37 | +followed by "type 'Year'" should become a single step: "Enter the header \ |
| 38 | +'Year' in cell A1". |
| 39 | +
|
| 40 | +Transcript entries: |
| 41 | +{transcript_text} |
| 42 | +
|
| 43 | +Output a JSON object with these fields: |
| 44 | +- name: Short name for the workflow (e.g., "Enter spreadsheet headers") |
| 45 | +- description: Detailed description of what this workflow accomplishes |
| 46 | +- goal: The end goal of this workflow |
| 47 | +- domain: Domain classification (e.g., "spreadsheet", "document", \ |
| 48 | +"web_browser", "file_management", "system_settings") |
| 49 | +- complexity: "simple" (1-3 steps), "medium" (4-8 steps), or "complex" (9+) |
| 50 | +- tags: List of relevant tags (e.g., ["data-entry", "headers"]) |
| 51 | +- steps: Array of step objects, each with: |
| 52 | + - description: What this step accomplishes (e.g., "Type the header 'Year' \ |
| 53 | +in cell A1") |
| 54 | + - think: The reasoning behind this step (e.g., "Need to label the first \ |
| 55 | +column") |
| 56 | + - action: The concrete action(s) taken (e.g., "Click cell A1, type 'Year'") |
| 57 | + - expect: Expected result after this step (e.g., "Cell A1 contains 'Year'") |
| 58 | + - action_type: One of: click, double_click, right_click, drag, type, key, \ |
| 59 | +key_combo, scroll, wait, app_switch, unknown |
| 60 | + - app_name: Application used for this step |
| 61 | + - ui_element: Target UI element (e.g., "Cell A1 in the spreadsheet") |
| 62 | + - is_prerequisite: Is this a setup step? (true/false) |
| 63 | + - is_verification: Is this checking a result? (true/false) |
| 64 | + - is_optional: Could this step be skipped? (true/false) |
| 65 | + - source_entry_indices: Array of transcript entry indices that this step \ |
| 66 | +was derived from |
| 67 | + - parameters: Object with extracted parameters (e.g., {{"cell": "A1", \ |
| 68 | +"value": "Year"}}) |
| 69 | +
|
| 70 | +Merge consecutive related actions into single steps where it makes sense. \ |
| 71 | +Skip corrective actions (mistakes and their fixes) unless they are part of \ |
| 72 | +the core workflow. Focus on goal-directed actions. |
| 73 | +
|
| 74 | +Output ONLY the JSON object, no additional text. |
| 75 | +""" |
| 76 | + |
| 77 | + |
| 78 | +def _build_transcript_text(transcript: EpisodeTranscript) -> str: |
| 79 | + """Format transcript entries as numbered text for the VLM prompt.""" |
| 80 | + lines = [] |
| 81 | + for entry in transcript.entries: |
| 82 | + parts = [ |
| 83 | + f"[{entry.entry_index}]", |
| 84 | + f"t={entry.timestamp_start:.1f}s", |
| 85 | + f"[{entry.action_type.value}]", |
| 86 | + entry.narration, |
| 87 | + ] |
| 88 | + if entry.intent: |
| 89 | + parts.append(f"(intent: {entry.intent})") |
| 90 | + if entry.app_context: |
| 91 | + parts.append(f"[{entry.app_context}]") |
| 92 | + if entry.is_corrective: |
| 93 | + parts.append("[CORRECTIVE]") |
| 94 | + if entry.is_exploratory: |
| 95 | + parts.append("[EXPLORATORY]") |
| 96 | + |
| 97 | + lines.append(" ".join(parts)) |
| 98 | + return "\n".join(lines) |
| 99 | + |
| 100 | + |
| 101 | +def _parse_extraction_response( |
| 102 | + raw: str, |
| 103 | + transcript: EpisodeTranscript, |
| 104 | +) -> dict[str, Any] | None: |
| 105 | + """Parse VLM response into a workflow dict. |
| 106 | +
|
| 107 | + Uses the shared extract_json utility for robust parsing, with |
| 108 | + a fallback to regex-based extraction. |
| 109 | + """ |
| 110 | + from openadapt_evals.vlm import extract_json |
| 111 | + |
| 112 | + result = extract_json(raw) |
| 113 | + if isinstance(result, dict): |
| 114 | + return result |
| 115 | + |
| 116 | + logger.warning("Could not parse VLM extraction response") |
| 117 | + return None |
| 118 | + |
| 119 | + |
| 120 | +def _build_workflow_from_parsed( |
| 121 | + parsed: dict[str, Any], |
| 122 | + transcript: EpisodeTranscript, |
| 123 | + recording_source: RecordingSource, |
| 124 | +) -> Workflow: |
| 125 | + """Convert parsed VLM JSON into a Workflow with WorkflowSteps.""" |
| 126 | + raw_steps = parsed.get("steps", []) |
| 127 | + steps: list[WorkflowStep] = [] |
| 128 | + |
| 129 | + for i, raw_step in enumerate(raw_steps): |
| 130 | + # Resolve source entry indices for timestamp bounds |
| 131 | + source_indices = raw_step.get("source_entry_indices", [i]) |
| 132 | + source_entries = [e for e in transcript.entries if e.entry_index in source_indices] |
| 133 | + |
| 134 | + if source_entries: |
| 135 | + ts_start = min(e.timestamp_start for e in source_entries) |
| 136 | + ts_end = max(e.timestamp_end or e.timestamp_start for e in source_entries) |
| 137 | + else: |
| 138 | + ts_start = 0.0 |
| 139 | + ts_end = 0.0 |
| 140 | + |
| 141 | + # Parse action_type with fallback |
| 142 | + action_type_str = raw_step.get("action_type", "unknown") |
| 143 | + try: |
| 144 | + action_type = ActionType(action_type_str) |
| 145 | + except ValueError: |
| 146 | + action_type = ActionType.UNKNOWN |
| 147 | + |
| 148 | + # Pick a representative screenshot from source entries |
| 149 | + screenshot_path = None |
| 150 | + for entry in source_entries: |
| 151 | + if entry.screenshot_before_path: |
| 152 | + screenshot_path = entry.screenshot_before_path |
| 153 | + break |
| 154 | + |
| 155 | + step = WorkflowStep( |
| 156 | + step_index=i, |
| 157 | + timestamp_start=ts_start, |
| 158 | + timestamp_end=ts_end, |
| 159 | + description=raw_step.get("description", f"Step {i + 1}"), |
| 160 | + think=raw_step.get("think", ""), |
| 161 | + action=raw_step.get("action", ""), |
| 162 | + expect=raw_step.get("expect", ""), |
| 163 | + action_type=action_type, |
| 164 | + is_prerequisite=raw_step.get("is_prerequisite", False), |
| 165 | + is_verification=raw_step.get("is_verification", False), |
| 166 | + is_optional=raw_step.get("is_optional", False), |
| 167 | + app_name=raw_step.get("app_name", ""), |
| 168 | + ui_element=raw_step.get("ui_element", ""), |
| 169 | + screenshot_path=screenshot_path, |
| 170 | + source_entry_indices=source_indices, |
| 171 | + parameters=raw_step.get("parameters", {}), |
| 172 | + ) |
| 173 | + steps.append(step) |
| 174 | + |
| 175 | + # Compute total duration from transcript |
| 176 | + if transcript.entries: |
| 177 | + total_duration = ( |
| 178 | + transcript.entries[-1].timestamp_start - transcript.entries[0].timestamp_start |
| 179 | + ) |
| 180 | + else: |
| 181 | + total_duration = 0.0 |
| 182 | + |
| 183 | + # Derive app_names from steps, falling back to transcript |
| 184 | + step_apps = sorted(set(s.app_name for s in steps if s.app_name)) |
| 185 | + app_names = step_apps if step_apps else list(transcript.apps_used) |
| 186 | + |
| 187 | + workflow = Workflow( |
| 188 | + name=parsed.get("name", transcript.task_description), |
| 189 | + description=parsed.get("description", transcript.episode_summary), |
| 190 | + goal=parsed.get("goal", transcript.primary_goal), |
| 191 | + app_names=app_names, |
| 192 | + domain=parsed.get("domain", transcript.domain_classification or "unknown"), |
| 193 | + complexity=parsed.get("complexity", "medium"), |
| 194 | + tags=parsed.get("tags", []), |
| 195 | + steps=steps, |
| 196 | + total_duration_seconds=total_duration, |
| 197 | + session_id=transcript.session_id, |
| 198 | + transcript_id=transcript.transcript_id, |
| 199 | + recording_source=recording_source, |
| 200 | + ) |
| 201 | + |
| 202 | + return workflow |
| 203 | + |
| 204 | + |
| 205 | +def _build_fallback_workflow( |
| 206 | + transcript: EpisodeTranscript, |
| 207 | + recording_source: RecordingSource, |
| 208 | +) -> Workflow: |
| 209 | + """Build a 1:1 fallback workflow when VLM parsing fails. |
| 210 | +
|
| 211 | + Each transcript entry becomes its own WorkflowStep so downstream |
| 212 | + passes still have something to work with. |
| 213 | + """ |
| 214 | + steps: list[WorkflowStep] = [] |
| 215 | + for entry in transcript.entries: |
| 216 | + if entry.is_corrective: |
| 217 | + continue |
| 218 | + step = WorkflowStep( |
| 219 | + step_index=len(steps), |
| 220 | + timestamp_start=entry.timestamp_start, |
| 221 | + timestamp_end=entry.timestamp_end or entry.timestamp_start, |
| 222 | + description=entry.narration, |
| 223 | + think=entry.intent, |
| 224 | + action=f"[{entry.action_type.value}] {entry.narration}", |
| 225 | + expect=entry.state_change, |
| 226 | + action_type=entry.action_type, |
| 227 | + app_name=entry.app_context or "", |
| 228 | + ui_element=entry.ui_element_description, |
| 229 | + screenshot_path=entry.screenshot_before_path, |
| 230 | + source_entry_indices=[entry.entry_index], |
| 231 | + ) |
| 232 | + steps.append(step) |
| 233 | + |
| 234 | + if transcript.entries: |
| 235 | + total_duration = ( |
| 236 | + transcript.entries[-1].timestamp_start - transcript.entries[0].timestamp_start |
| 237 | + ) |
| 238 | + else: |
| 239 | + total_duration = 0.0 |
| 240 | + |
| 241 | + return Workflow( |
| 242 | + name=transcript.task_description, |
| 243 | + description=transcript.episode_summary, |
| 244 | + goal=transcript.primary_goal, |
| 245 | + app_names=list(transcript.apps_used), |
| 246 | + domain=transcript.domain_classification or "unknown", |
| 247 | + complexity="medium", |
| 248 | + tags=[], |
| 249 | + steps=steps, |
| 250 | + total_duration_seconds=total_duration, |
| 251 | + session_id=transcript.session_id, |
| 252 | + transcript_id=transcript.transcript_id, |
| 253 | + recording_source=recording_source, |
| 254 | + ) |
| 255 | + |
| 256 | + |
| 257 | +def extract_workflow( |
| 258 | + transcript: EpisodeTranscript, |
| 259 | + model: str = "gpt-4.1-mini", |
| 260 | + provider: str = "openai", |
| 261 | + recording_source: RecordingSource = RecordingSource.NATIVE_CAPTURE, |
| 262 | +) -> Workflow: |
| 263 | + """Extract a structured Workflow from an EpisodeTranscript. |
| 264 | +
|
| 265 | + Sends the full transcript to a VLM in a single call, asking it to |
| 266 | + identify high-level workflow steps by merging related actions. |
| 267 | +
|
| 268 | + Args: |
| 269 | + transcript: EpisodeTranscript from Pass 1. |
| 270 | + model: VLM model name. |
| 271 | + provider: VLM provider (``"openai"`` or ``"anthropic"``). |
| 272 | + recording_source: Source of the original recording. |
| 273 | +
|
| 274 | + Returns: |
| 275 | + A Workflow with WorkflowStep entries derived from the transcript. |
| 276 | + """ |
| 277 | + from openadapt_evals.vlm import vlm_call |
| 278 | + |
| 279 | + transcript_text = _build_transcript_text(transcript) |
| 280 | + |
| 281 | + prompt = _EXTRACTION_PROMPT.format( |
| 282 | + task_description=transcript.task_description, |
| 283 | + episode_summary=transcript.episode_summary or "N/A", |
| 284 | + primary_goal=transcript.primary_goal or transcript.task_description, |
| 285 | + apps_used=", ".join(transcript.apps_used) if transcript.apps_used else "Unknown", |
| 286 | + entry_count=len(transcript.entries), |
| 287 | + transcript_text=transcript_text, |
| 288 | + ) |
| 289 | + |
| 290 | + logger.info( |
| 291 | + "Extracting workflow from transcript %s (%d entries)", |
| 292 | + transcript.transcript_id, |
| 293 | + len(transcript.entries), |
| 294 | + ) |
| 295 | + |
| 296 | + raw = vlm_call( |
| 297 | + prompt, |
| 298 | + model=model, |
| 299 | + provider=provider, |
| 300 | + max_tokens=4096, |
| 301 | + ) |
| 302 | + |
| 303 | + parsed = _parse_extraction_response(raw, transcript) |
| 304 | + |
| 305 | + if parsed is None: |
| 306 | + logger.warning( |
| 307 | + "VLM extraction failed for transcript %s, using fallback", |
| 308 | + transcript.transcript_id, |
| 309 | + ) |
| 310 | + return _build_fallback_workflow(transcript, recording_source) |
| 311 | + |
| 312 | + workflow = _build_workflow_from_parsed(parsed, transcript, recording_source) |
| 313 | + |
| 314 | + logger.info( |
| 315 | + "Workflow extracted: %s (%d steps from %d transcript entries)", |
| 316 | + workflow.name, |
| 317 | + workflow.step_count, |
| 318 | + len(transcript.entries), |
| 319 | + ) |
| 320 | + |
| 321 | + return workflow |
0 commit comments