From 1dbc51ba4320fdaf3dc382eae6c39c3d1ef8c4e3 Mon Sep 17 00:00:00 2001 From: Dallas Date: Wed, 26 Mar 2025 09:26:23 -0700 Subject: [PATCH] Add annotator workload balancer example --- .../annotator_workload_balancer/README.md | 85 ++++++ .../rebalance_annotator_workload.py | 260 ++++++++++++++++++ 2 files changed, 345 insertions(+) create mode 100644 examples/annotator_workload_balancer/README.md create mode 100644 examples/annotator_workload_balancer/rebalance_annotator_workload.py diff --git a/examples/annotator_workload_balancer/README.md b/examples/annotator_workload_balancer/README.md new file mode 100644 index 000000000..c2fb4aad2 --- /dev/null +++ b/examples/annotator_workload_balancer/README.md @@ -0,0 +1,85 @@ +# Annotator Workload Balancer + +This example demonstrates how to use the Label Studio SDK to automatically detect and rebalance workload differences between annotators in a Label Studio project. + +## Overview + +The script monitors the distribution of pending annotation tasks among annotators and automatically reassigns tasks when the workload becomes imbalanced. This helps maintain an even distribution of work, prevents annotator overload, and ensures more consistent project completion timelines. + +## Features + +- Periodically checks annotator workloads +- Identifies overloaded and underloaded annotators +- Automatically reassigns tasks when imbalance exceeds a configurable threshold +- Detailed logging of workload status and rebalancing operations +- Configurable check intervals and rebalance thresholds + +## Requirements + +- Python 3.7+ +- Label Studio SDK + +## Installation + +```bash +pip install label-studio-sdk +``` + +## Usage + +1. Configure the script with your Label Studio URL, API key, and project ID +2. Set the rebalance threshold and check interval as needed +3. Run the script: + +```bash +python rebalance_annotator_workload.py +``` + +## Configuration + +```python +# Configuration - Replace these with your actual details +LABEL_STUDIO_URL = "https://app.heartex.com" # Your Label Studio URL +API_KEY = "your_api_key_here" # Your API key +PROJECT_ID = 0 # Your project ID +REBALANCE_THRESHOLD = 2 # Trigger rebalance when difference > this value +CHECK_INTERVAL = 30 # Time between checks (in seconds) +``` + +## How It Works + +1. The script connects to your Label Studio instance using the SDK +2. It retrieves all tasks and analyzes their assignment and completion status +3. It calculates the pending workload for each annotator +4. If the difference between the most and least loaded annotators exceeds the threshold, it: + - Identifies a pending task from the overloaded annotator + - Reassigns it to the underloaded annotator + +## Example Output + +``` +Starting Label Studio workload balancer +Project ID: 123 +Rebalance threshold: 2 tasks +Check interval: 30 seconds +Press Ctrl+C to stop + +================================================== +Checking for workload imbalance at 2023-09-15 14:30:45 +================================================== +Retrieved 100 tasks from project 123 + +--- ASSIGNMENT SUMMARY --- +Annotator workloads (sorted by pending tasks): + Annotator 456: 40 total, 32 pending, 8 completed + Annotator 789: 35 total, 20 pending, 15 completed + Annotator 123: 40 total, 18 pending, 22 completed +------------------------ + +Most loaded annotator: 456 with 32 pending tasks +Least loaded annotator: 123 with 18 pending tasks +Difference: 14 tasks +Workload imbalance detected (difference > 2). Rebalancing... +Successfully reassigned task 78901 from annotator 456 to 123 +Next check in 30 seconds... +``` \ No newline at end of file diff --git a/examples/annotator_workload_balancer/rebalance_annotator_workload.py b/examples/annotator_workload_balancer/rebalance_annotator_workload.py new file mode 100644 index 000000000..f5d10a3ff --- /dev/null +++ b/examples/annotator_workload_balancer/rebalance_annotator_workload.py @@ -0,0 +1,260 @@ +import time +import label_studio_sdk +import json + +# Configuration - Replace these with your actual details +LABEL_STUDIO_URL = "https://app.heartex.com" # Your Label Studio URL +API_KEY = "your_api_key_here" # Your API key +PROJECT_ID = 0 # Your project ID +REBALANCE_THRESHOLD = 2 # Trigger rebalance when difference > this value +CHECK_INTERVAL = 30 # Time between checks (in seconds) + +def get_pending_task_counts(tasks): + """ + Analyze tasks and build dictionaries of total and pending task counts per annotator. + + Args: + tasks: List of task objects from Label Studio API + + Returns: + Dictionary with annotator IDs as keys and pending task counts as values + """ + # Track both total assignments and pending assignments + total_counts = {} + pending_counts = {} + + for task in tasks: + task_id = task.get('id', 'unknown') + + # Identify which annotators have completed this task + annotations = task.get('annotations', []) + completed_by = set() + for annotation in annotations: + if 'completed_by' in annotation: + # Handle different formats of completed_by (dict or string) + if isinstance(annotation['completed_by'], dict): + if 'id' in annotation['completed_by']: + completed_by.add(str(annotation['completed_by']['id'])) + else: + completed_by.add(str(annotation['completed_by'])) + + # Process each annotator assigned to this task + annotators = task.get('annotators', []) + for annotator in annotators: + # Extract annotator ID from different possible structures + if isinstance(annotator, dict): + if 'id' in annotator: + annotator_id = str(annotator['id']) + elif 'user_id' in annotator: + annotator_id = str(annotator['user_id']) + else: + continue # Skip if we can't identify the annotator + else: + annotator_id = str(annotator) + + # Count total assignments + total_counts[annotator_id] = total_counts.get(annotator_id, 0) + 1 + + # Only count pending tasks (not completed by this annotator) + if annotator_id not in completed_by: + pending_counts[annotator_id] = pending_counts.get(annotator_id, 0) + 1 + + # Print summary of assignments + print("\n--- ASSIGNMENT SUMMARY ---") + print("Annotator workloads (sorted by pending tasks):") + + # Sort annotators by pending task count (highest first) + sorted_annotators = sorted( + total_counts.keys(), + key=lambda a: pending_counts.get(a, 0), + reverse=True + ) + + for annotator_id in sorted_annotators: + total = total_counts[annotator_id] + pending = pending_counts.get(annotator_id, 0) + completed = total - pending + print(f" Annotator {annotator_id}: {total} total, {pending} pending, {completed} completed") + + print("------------------------\n") + + return pending_counts + +def rebalance_assignments(): + """ + Main function to check for workload imbalance and reassign tasks if needed. + """ + print("Fetching tasks from Label Studio...") + + # Connect to Label Studio + try: + client = label_studio_sdk.Client(url=LABEL_STUDIO_URL, api_key=API_KEY) + project = client.get_project(PROJECT_ID) + except Exception as e: + print(f"Error connecting to Label Studio: {e}") + return + + # Retrieve all tasks + try: + tasks = project.get_tasks(only_ids=False) + if not tasks: + print("No tasks found in the project.") + return + print(f"Retrieved {len(tasks)} tasks from project {PROJECT_ID}") + except Exception as e: + print(f"Error retrieving tasks: {e}") + return + + # Get counts of pending tasks per annotator + pending_counts = get_pending_task_counts(tasks) + + if not pending_counts: + print("No pending tasks found.") + return + + if len(pending_counts) < 2: + print("Need at least two annotators with pending tasks to perform balancing.") + return + + # Identify the overloaded and underloaded annotators + overloaded = max(pending_counts, key=lambda a: pending_counts[a]) + underloaded = min(pending_counts, key=lambda a: pending_counts[a]) + max_count = pending_counts[overloaded] + min_count = pending_counts[underloaded] + + print(f"Most loaded annotator: {overloaded} with {max_count} pending tasks") + print(f"Least loaded annotator: {underloaded} with {min_count} pending tasks") + print(f"Difference: {max_count - min_count} tasks") + + # Check if rebalancing is needed + if max_count - min_count <= REBALANCE_THRESHOLD: + print(f"Workload is balanced (difference ≤ {REBALANCE_THRESHOLD}); no reassignment needed.") + return + + print(f"Workload imbalance detected (difference > {REBALANCE_THRESHOLD}). Rebalancing...") + + # Find a pending task to reassign + task_to_move = find_task_to_reassign(tasks, overloaded) + + if not task_to_move: + print("No suitable task found for reassignment.") + return + + # Perform the reassignment + try: + reassign_task(client, task_to_move, overloaded, underloaded) + print(f"Successfully reassigned task {task_to_move} from annotator {overloaded} to {underloaded}") + except Exception as e: + print(f"Error during reassignment: {e}") + +def find_task_to_reassign(tasks, overloaded_annotator): + """ + Find a pending task assigned to the overloaded annotator that can be reassigned. + + Args: + tasks: List of task objects + overloaded_annotator: ID of the annotator with too many tasks + + Returns: + ID of a task that can be reassigned, or None if no suitable task is found + """ + for task in tasks: + # Check if this task has been completed by anyone + annotations = task.get('annotations', []) + completed_by = set() + for annotation in annotations: + if 'completed_by' in annotation: + if isinstance(annotation['completed_by'], dict): + if 'id' in annotation['completed_by']: + completed_by.add(str(annotation['completed_by']['id'])) + else: + completed_by.add(str(annotation['completed_by'])) + + # Check if the overloaded annotator is assigned to this task + annotators = task.get("annotators", []) + for annotator in annotators: + annotator_id = None + if isinstance(annotator, dict): + annotator_id = str(annotator.get('id', '')) or str(annotator.get('user_id', '')) + else: + annotator_id = str(annotator) + + # Only consider tasks that are assigned to overloaded annotator AND not completed by them + if annotator_id == overloaded_annotator and annotator_id not in completed_by: + return task["id"] + + return None + +def reassign_task(client, task_id, from_annotator, to_annotator): + """ + Reassign a task from one annotator to another. + + Args: + client: Label Studio client + task_id: ID of the task to reassign + from_annotator: ID of the annotator to remove + to_annotator: ID of the annotator to add + """ + # Get current task details + response = client.make_request("get", f"/api/tasks/{task_id}") + task_info = response.json() + + # Extract current annotators + current_assign = task_info.get("annotators", []) + + # Convert to list of IDs + current_assign_ids = [] + for annotator in current_assign: + if isinstance(annotator, dict): + if 'id' in annotator: + current_assign_ids.append(str(annotator['id'])) + elif 'user_id' in annotator: + current_assign_ids.append(str(annotator['user_id'])) + else: + current_assign_ids.append(str(annotator)) + + # Create new assignment list + new_assign_ids = [a for a in current_assign_ids if a != from_annotator] + if to_annotator not in new_assign_ids: + new_assign_ids.append(to_annotator) + + # Prepare the payload to update this task's assignment + body = { + "type": "AN", # "AN" for annotators; "RE" for reviewers + "users": new_assign_ids, + "selectedItems": {"all": False, "included": [task_id]}, + "filters": {"conjunction": "and", "items": []}, + "action": "set" + } + + # Make the API request + client.make_request("post", f"/api/projects/{PROJECT_ID}/tasks/assignees", json=body) + +def main(): + """ + Main function to run the rebalancing script periodically. + """ + print(f"Starting Label Studio workload balancer") + print(f"Project ID: {PROJECT_ID}") + print(f"Rebalance threshold: {REBALANCE_THRESHOLD} tasks") + print(f"Check interval: {CHECK_INTERVAL} seconds") + print("Press Ctrl+C to stop\n") + + while True: + try: + print("\n" + "="*50) + print(f"Checking for workload imbalance at {time.strftime('%Y-%m-%d %H:%M:%S')}") + print("="*50) + rebalance_assignments() + print(f"Next check in {CHECK_INTERVAL} seconds...") + time.sleep(CHECK_INTERVAL) + except KeyboardInterrupt: + print("\nStopping workload balancer") + break + except Exception as e: + print(f"Error: {e}") + print(f"Retrying in {CHECK_INTERVAL} seconds...") + time.sleep(CHECK_INTERVAL) + +if __name__ == "__main__": + main() \ No newline at end of file