|
1 | 1 | import json |
2 | 2 | import logging |
| 3 | +from typing import Any |
3 | 4 |
|
4 | 5 | from account_v2.custom_exceptions import DuplicateData |
5 | 6 | from api_v2.exceptions import NoActiveAPIKeyError |
|
8 | 9 | from django.db import IntegrityError |
9 | 10 | from django.db.models import QuerySet |
10 | 11 | from django.http import HttpResponse |
11 | | -from permissions.permission import IsOwner |
| 12 | +from permissions.permission import IsOwner, IsOwnerOrSharedUser |
12 | 13 | from rest_framework import serializers, status, viewsets |
13 | 14 | from rest_framework.decorators import action |
14 | 15 | from rest_framework.request import Request |
|
29 | 30 | from pipeline_v2.serializers.execute import ( |
30 | 31 | PipelineExecuteSerializer as ExecuteSerializer, |
31 | 32 | ) |
| 33 | +from pipeline_v2.serializers.sharing import SharedUserListSerializer |
| 34 | + |
| 35 | +try: |
| 36 | + from plugins.notification.constants import ResourceType |
| 37 | + from plugins.notification.sharing_notification import SharingNotificationService |
| 38 | + |
| 39 | + NOTIFICATION_PLUGIN_AVAILABLE = True |
| 40 | + sharing_notification_service = SharingNotificationService() |
| 41 | +except ImportError: |
| 42 | + NOTIFICATION_PLUGIN_AVAILABLE = False |
| 43 | + sharing_notification_service = None |
32 | 44 |
|
33 | 45 | logger = logging.getLogger(__name__) |
34 | 46 |
|
35 | 47 |
|
36 | 48 | class PipelineViewSet(viewsets.ModelViewSet): |
37 | 49 | versioning_class = URLPathVersioning |
38 | 50 | queryset = Pipeline.objects.all() |
39 | | - permission_classes = [IsOwner] |
| 51 | + |
| 52 | + def get_permissions(self) -> list[Any]: |
| 53 | + if self.action in ["destroy", "partial_update", "update"]: |
| 54 | + return [IsOwner()] |
| 55 | + return [IsOwnerOrSharedUser()] |
| 56 | + |
40 | 57 | serializer_class = PipelineSerializer |
41 | 58 |
|
42 | 59 | def get_queryset(self) -> QuerySet: |
43 | | - queryset = Pipeline.objects.filter(created_by=self.request.user) |
| 60 | + # Use for_user manager method to include shared pipelines |
| 61 | + queryset = Pipeline.objects.for_user(self.request.user) |
44 | 62 |
|
45 | 63 | # Apply type filter if specified |
46 | 64 | pipeline_type = self.request.query_params.get(PipelineConstants.TYPE) |
@@ -101,6 +119,59 @@ def perform_destroy(self, instance: Pipeline) -> None: |
101 | 119 | super().perform_destroy(instance) |
102 | 120 | return SchedulerHelper.remove_job(pipeline_to_remove) |
103 | 121 |
|
| 122 | + @action(detail=True, methods=["get"], url_path="users", permission_classes=[IsOwner]) |
| 123 | + def list_of_shared_users(self, request: Request, pk: str | None = None) -> Response: |
| 124 | + """Returns the list of users the pipeline is shared with.""" |
| 125 | + pipeline = self.get_object() |
| 126 | + serializer = SharedUserListSerializer(pipeline) |
| 127 | + return Response(serializer.data, status=status.HTTP_200_OK) |
| 128 | + |
| 129 | + def partial_update(self, request: Request, *args: Any, **kwargs: Any) -> Response: |
| 130 | + """Override to handle sharing notifications.""" |
| 131 | + instance = self.get_object() |
| 132 | + current_shared_users = set(instance.shared_users.all()) |
| 133 | + |
| 134 | + response = super().partial_update(request, *args, **kwargs) |
| 135 | + |
| 136 | + if ( |
| 137 | + response.status_code == 200 |
| 138 | + and "shared_users" in request.data |
| 139 | + and NOTIFICATION_PLUGIN_AVAILABLE |
| 140 | + ): |
| 141 | + try: |
| 142 | + instance.refresh_from_db() |
| 143 | + new_shared_users = set(instance.shared_users.all()) |
| 144 | + newly_shared_users = new_shared_users - current_shared_users |
| 145 | + |
| 146 | + if ResourceType.ETL.value == instance.pipeline_type: |
| 147 | + resource_type = ResourceType.ETL.value |
| 148 | + elif ResourceType.TASK.value == instance.pipeline_type: |
| 149 | + resource_type = ResourceType.TASK.value |
| 150 | + |
| 151 | + if newly_shared_users: |
| 152 | + # Only send notifications if there are newly shared users |
| 153 | + sharing_notification_service.send_sharing_notification( |
| 154 | + resource_type=resource_type, |
| 155 | + resource_name=instance.pipeline_name, |
| 156 | + resource_id=str(instance.id), |
| 157 | + shared_by=request.user, |
| 158 | + shared_to=list(newly_shared_users), |
| 159 | + resource_instance=instance, |
| 160 | + ) |
| 161 | + |
| 162 | + logger.info( |
| 163 | + f"Sent sharing notifications for {instance.pipeline_type} " |
| 164 | + f"to {len(newly_shared_users)} users" |
| 165 | + ) |
| 166 | + |
| 167 | + except Exception as e: |
| 168 | + # Log error but don't fail the update operation |
| 169 | + logger.exception( |
| 170 | + f"Failed to send sharing notification, continuing update though: {str(e)}" |
| 171 | + ) |
| 172 | + |
| 173 | + return response |
| 174 | + |
104 | 175 | @action(detail=True, methods=["get"]) |
105 | 176 | def download_postman_collection( |
106 | 177 | self, request: Request, pk: str | None = None |
|
0 commit comments