Skip to content

Commit edd91e9

Browse files
muhammad-ali-eclauderitwik-gpre-commit-ci[bot]
authored andcommitted
UN-2470 [FEAT] Remove Django dependency from Celery workers with internal APIs (#1494)
* UN-2470 [MISC] Remove Django dependency from Celery workers This commit introduces a new worker architecture that decouples Celery workers from Django where possible, enabling support for gevent/eventlet pool types and reducing worker startup overhead. Key changes: - Created separate worker modules (api-deployment, callback, file_processing, general) - Added internal API endpoints for worker communication - Implemented Django-free task execution where appropriate - Added shared utilities and client facades - Updated container configurations for new worker architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix pre-commit issues: file permissions and ruff errors Setup the docker for new workers - Add executable permissions to worker entrypoint files - Fix import order in namespace package __init__.py - Remove unused variable api_status in general worker - Address ruff E402 and F841 errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactoreed, Dockerfiles,fixes * flexibility on celery run commands * added debug logs * handled filehistory for API * cleanup * cleanup * cloud plugin structure * minor changes in import plugin * added notification and logger workers under new worker module * add docker compatibility for new workers * handled docker issues * log consumer worker fixes * added scheduler worker * minor env changes * cleanup the logs * minor changes in logs * resolved scheduler worker issues * cleanup and refactor * ensuring backward compatibbility to existing wokers * added configuration internal apis and cache utils * optimization * Fix API client singleton pattern to share HTTP sessions - Fix flawed singleton implementation that was trying to share BaseAPIClient instances - Now properly shares HTTP sessions between specialized clients - Eliminates 6x BaseAPIClient initialization by reusing the same underlying session - Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session) - Added debug logging to verify singleton pattern activation * cleanup and structuring * cleanup in callback * file system connectors issue * celery env values changes * optional gossip * variables for sync, mingle and gossip * Fix for file type check * Task pipeline issue resolving * api deployement failed response handled * Task pipline fixes * updated file history cleanup with active file execution * pipline status update and workflow ui page execution * cleanup and resolvinf conflicts * remove unstract-core from conenctoprs * Commit uv.lock changes * uv locks updates * resolve migration issues * defer connector-metadtda * Fix connector migration for production scale - Add encryption key handling with defer() to prevent decryption failures - Add final cleanup step to fix duplicate connector names - Optimize for large datasets with batch processing and bulk operations - Ensure unique constraint in migration 0004 can be created successfully 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * hitl fixes * minor fixes on hitl * api_hub related changes * dockerfile fixes * api client cache fixes with actual response class * fix: tags and llm_profile_id * optimized clear cache * cleanup * enhanced logs * added more handling on is file dir and added loggers * cleanup the runplatform script * internal apis are excempting from csrf * sonal cloud issues * sona-cloud issues * resolving sonar cloud issues * resolving sonar cloud issues * Delta: added Batch size fix in workers * comments addressed * celery configurational changes for new workers * fiixes in callback regaurding the pipline type check * change internal url registry logic * gitignore changes * gitignore changes * addressng pr cmmnets and cleanup the codes * adding missed profiles for v2 * sonal cloud blocker issues resolved * imlement otel * Commit uv.lock changes * handle execution time and some cleanup * adding user_data in metadata Pr: #1544 * scheduler backward compatibitlity * replace user_data with custom_data * Commit uv.lock changes * celery worker command issue resolved * enhance package imports in connectors by changing to lazy imports * Update runner.py by removing the otel from it Update runner.py by removing the otel from it Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> * added delta changes * handle erro to destination db * resolve tool instances id validation and hitl queu name in API * handled direct execution from workflow page to worker and logs * handle cost logs * Update health.py Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * minor log changes * introducing log consumer scheduler to bulk create, and socket .emit from worker for ws * Commit uv.lock changes * time limit or timeout celery config cleanup * implemented redis client class in worker * pipline status enum mismatch * notification worker fixes * resolve uv lock conflicts * workflow log fixes * ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys * default TTL changed for unified logs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent b20df3c commit edd91e9

312 files changed

Lines changed: 68874 additions & 384 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
# Created by https://www.toptal.com/developers/gitignore/api/windows,macos,linux,pycharm,pycharm+all,pycharm+iml,python,visualstudiocode,react,django
22
# Edit at https://www.toptal.com/developers/gitignore?templates=windows,macos,linux,pycharm,pycharm+all,pycharm+iml,python,visualstudiocode,react,django
33

4+
# Development helper scripts
5+
*.sh
6+
# list Exceptional files with ! like !fix-and-test.sh
7+
!run-platform.sh
8+
!workers/run-worker.sh
9+
!workers/run-worker-docker.sh
10+
!workers/log_consumer/scheduler.sh
11+
412
### Django ###
513
*.log
614
*.pot
@@ -622,6 +630,7 @@ backend/plugins/processor/*
622630
# Subscription Plugins
623631
backend/plugins/subscription/*
624632

633+
625634
# API Deployment Plugins
626635
backend/plugins/api/**
627636

@@ -685,6 +694,7 @@ backend/requirements.txt
685694
backend/backend/*_urls.py
686695
!backend/backend/base_urls.py
687696
!backend/backend/public_urls.py
697+
!backend/backend/internal_base_urls.py
688698
# TODO: Remove after v2 migration is completed
689699
backend/backend/*_urls_v2.py
690700
!backend/backend/public_urls_v2.py

backend/account_v2/custom_auth_middleware.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from account_v2.authentication_service import AuthenticationService
99
from account_v2.constants import Common
1010
from backend.constants import RequestHeader
11+
from backend.internal_api_constants import INTERNAL_API_PREFIX
1112

1213

1314
class CustomAuthMiddleware:
@@ -22,6 +23,10 @@ def __call__(self, request: HttpRequest) -> HttpResponse:
2223
if any(request.path.startswith(path) for path in settings.WHITELISTED_PATHS):
2324
return self.get_response(request)
2425

26+
# Skip internal API paths - they are handled by InternalAPIAuthMiddleware
27+
if request.path.startswith(f"{INTERNAL_API_PREFIX}/"):
28+
return self.get_response(request)
29+
2530
# Authenticating With API_KEY
2631
x_api_key = request.headers.get(RequestHeader.X_API_KEY)
2732
if (
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""Account Internal API Serializers
2+
Handles serialization for organization context related endpoints.
3+
"""
4+
5+
from rest_framework import serializers
6+
7+
8+
class OrganizationContextSerializer(serializers.Serializer):
9+
"""Serializer for organization context information."""
10+
11+
organization_id = serializers.CharField()
12+
organization_name = serializers.CharField()
13+
organization_slug = serializers.CharField(required=False, allow_blank=True)
14+
created_at = serializers.CharField(required=False, allow_blank=True)
15+
settings = serializers.DictField(required=False)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
"""Internal API URLs for Organization Context
2+
URL patterns for organization-related internal APIs.
3+
"""
4+
5+
from django.urls import path
6+
7+
from .internal_views import OrganizationContextAPIView
8+
9+
urlpatterns = [
10+
# Organization context endpoint (backward compatibility)
11+
path(
12+
"<str:org_id>/", OrganizationContextAPIView.as_view(), name="organization-context"
13+
),
14+
# Organization context endpoint (explicit path)
15+
path(
16+
"<str:org_id>/context/",
17+
OrganizationContextAPIView.as_view(),
18+
name="organization-context-explicit",
19+
),
20+
]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Account Internal API Views
2+
Handles organization context related endpoints for internal services.
3+
"""
4+
5+
import logging
6+
7+
from rest_framework import status
8+
from rest_framework.response import Response
9+
from rest_framework.views import APIView
10+
from utils.organization_utils import get_organization_context, resolve_organization
11+
12+
from .internal_serializers import OrganizationContextSerializer
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class OrganizationContextAPIView(APIView):
18+
"""Internal API endpoint for getting organization context."""
19+
20+
def get(self, request, org_id):
21+
"""Get organization context information."""
22+
try:
23+
# Use shared utility to resolve organization
24+
organization = resolve_organization(org_id, raise_on_not_found=True)
25+
26+
# Use shared utility to get context data
27+
context_data = get_organization_context(organization)
28+
29+
serializer = OrganizationContextSerializer(context_data)
30+
31+
logger.info(f"Retrieved organization context for {org_id}")
32+
33+
return Response(serializer.data)
34+
35+
except Exception as e:
36+
logger.error(f"Failed to get organization context for {org_id}: {str(e)}")
37+
return Response(
38+
{"error": "Failed to get organization context", "detail": str(e)},
39+
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
40+
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
"""Account Internal API URLs
2+
Defines internal API endpoints for organization operations.
3+
"""
4+
5+
from django.urls import path
6+
7+
from .internal_views import OrganizationContextAPIView
8+
9+
urlpatterns = [
10+
# Organization context API
11+
path(
12+
"<str:org_id>/context/",
13+
OrganizationContextAPIView.as_view(),
14+
name="organization-context",
15+
),
16+
]

backend/account_v2/subscription_loader.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,28 @@ class SubscriptionConfig:
2121
METADATA_IS_ACTIVE = "is_active"
2222

2323

24+
# Cache for loaded plugins to avoid repeated loading
25+
_subscription_plugins_cache: list[Any] = []
26+
_plugins_loaded = False
27+
28+
2429
def load_plugins() -> list[Any]:
2530
"""Iterate through the subscription plugins and register them."""
31+
global _subscription_plugins_cache, _plugins_loaded
32+
33+
# Return cached plugins if already loaded
34+
if _plugins_loaded:
35+
return _subscription_plugins_cache
36+
2637
plugins_app = apps.get_app_config(SubscriptionConfig.PLUGINS_APP)
2738
package_path = plugins_app.module.__package__
2839
subscription_dir = os.path.join(plugins_app.path, SubscriptionConfig.PLUGIN_DIR)
2940
subscription_package_path = f"{package_path}.{SubscriptionConfig.PLUGIN_DIR}"
3041
subscription_plugins: list[Any] = []
3142

3243
if not os.path.exists(subscription_dir):
44+
_subscription_plugins_cache = subscription_plugins
45+
_plugins_loaded = True
3346
return subscription_plugins
3447

3548
for item in os.listdir(subscription_dir):
@@ -56,10 +69,13 @@ def load_plugins() -> list[Any]:
5669
SubscriptionConfig.METADATA: module.metadata,
5770
}
5871
)
72+
name = metadata.get(
73+
SubscriptionConfig.METADATA_NAME,
74+
getattr(module, "__name__", "unknown"),
75+
)
76+
is_active = metadata.get(SubscriptionConfig.METADATA_IS_ACTIVE, False)
5977
logger.info(
60-
"Loaded subscription plugin: %s, is_active: %s",
61-
module.metadata[SubscriptionConfig.METADATA_NAME],
62-
module.metadata[SubscriptionConfig.METADATA_IS_ACTIVE],
78+
"Loaded subscription plugin: %s, is_active: %s", name, is_active
6379
)
6480
else:
6581
logger.info(
@@ -75,6 +91,10 @@ def load_plugins() -> list[Any]:
7591
if len(subscription_plugins) == 0:
7692
logger.info("No subscription plugins found.")
7793

94+
# Cache the results for future requests
95+
_subscription_plugins_cache = subscription_plugins
96+
_plugins_loaded = True
97+
7898
return subscription_plugins
7999

80100

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Internal API Views for API v2
2+
3+
This module provides internal API endpoints for worker communication,
4+
specifically optimized for type-aware pipeline data fetching.
5+
6+
Since we know the context from worker function calls:
7+
- process_batch_callback_api -> APIDeployment model
8+
- process_batch_callback -> Pipeline model (handled in workflow_manager)
9+
10+
This provides direct access to APIDeployment model data without
11+
the overhead of checking both Pipeline and APIDeployment models.
12+
"""
13+
14+
import logging
15+
16+
from rest_framework import status
17+
from rest_framework.response import Response
18+
from rest_framework.views import APIView
19+
20+
from api_v2.models import APIDeployment
21+
from api_v2.serializers import APIDeploymentSerializer
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
class APIDeploymentDataView(APIView):
27+
"""Internal API endpoint for fetching APIDeployment data.
28+
29+
This endpoint is optimized for callback workers that know they're dealing
30+
with API deployments. It directly queries the APIDeployment model without
31+
checking the Pipeline model, improving performance.
32+
33+
Endpoint: GET /v2/api-deployments/{api_id}/data/
34+
"""
35+
36+
def get(self, request, api_id):
37+
"""Get APIDeployment model data by API ID.
38+
39+
Args:
40+
request: HTTP request object
41+
api_id: APIDeployment UUID
42+
43+
Returns:
44+
Response with APIDeployment model data
45+
"""
46+
try:
47+
logger.debug(f"Fetching APIDeployment data for ID: {api_id}")
48+
49+
# Query APIDeployment model directly (organization-scoped via DefaultOrganizationMixin)
50+
api_deployment = APIDeployment.objects.get(id=api_id)
51+
52+
# Serialize the APIDeployment model
53+
serializer = APIDeploymentSerializer(api_deployment)
54+
55+
# Use consistent response format with pipeline endpoint
56+
response_data = {"status": "success", "pipeline": serializer.data}
57+
58+
logger.info(
59+
f"Found APIDeployment {api_id}: name='{api_deployment.api_name}', display_name='{api_deployment.display_name}'"
60+
)
61+
return Response(response_data, status=status.HTTP_200_OK)
62+
63+
except APIDeployment.DoesNotExist:
64+
logger.warning(f"APIDeployment not found for ID: {api_id}")
65+
return Response(
66+
{"error": f"APIDeployment with ID {api_id} not found"},
67+
status=status.HTTP_404_NOT_FOUND,
68+
)
69+
except Exception as e:
70+
logger.error(f"Error fetching APIDeployment data for {api_id}: {str(e)}")
71+
return Response(
72+
{"error": f"Failed to fetch APIDeployment data: {str(e)}"},
73+
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
74+
)

backend/api_v2/internal_urls.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
"""Internal API URLs for API v2
2+
3+
Internal endpoints for worker communication, specifically optimized
4+
for type-aware pipeline data fetching.
5+
"""
6+
7+
from django.urls import path
8+
from rest_framework.urlpatterns import format_suffix_patterns
9+
10+
from api_v2.internal_api_views import APIDeploymentDataView
11+
12+
urlpatterns = format_suffix_patterns(
13+
[
14+
path(
15+
"<uuid:api_id>/",
16+
APIDeploymentDataView.as_view(),
17+
name="api_deployment_data_internal",
18+
),
19+
]
20+
)

backend/backend/base_urls.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@
2323
include("pipeline_v2.public_api_urls"),
2424
),
2525
path("", include("health.urls")),
26+
# Internal API for worker communication
27+
path("internal/", include("backend.internal_base_urls")),
2628
]

0 commit comments

Comments
 (0)