@@ -762,3 +762,185 @@ def test_owner_can_delete_own_task(self, client: TestClient):
762762 client .post ("/api/v1/tasks" , json = _cron_task ("t1" ), headers = _user_headers ("alice@example.com" ))
763763 resp = client .delete ("/api/v1/tasks/t1" , headers = _user_headers ("alice@example.com" ))
764764 assert resp .status_code == 204
765+
766+ def test_non_admin_cannot_update_another_users_task (self , client : TestClient ):
767+ """PUT /tasks/{id} returns 403 when task belongs to a different user."""
768+ client .post (
769+ "/api/v1/tasks" ,
770+ json = _cron_task ("t1" ),
771+ headers = _user_headers ("alice@example.com" ),
772+ )
773+ updated_payload = _cron_task ("t1" )
774+ updated_payload ["name" ] = "renamed by bob"
775+ resp = client .put (
776+ "/api/v1/tasks/t1" ,
777+ json = updated_payload ,
778+ headers = _user_headers ("bob@example.com" ),
779+ )
780+ assert resp .status_code == 403
781+
782+ def test_non_admin_cannot_trigger_another_users_task (self , client : TestClient ):
783+ """POST /tasks/{id}/run returns 403 when task belongs to a different user."""
784+ client .post (
785+ "/api/v1/tasks" ,
786+ json = _cron_task ("t1" ),
787+ headers = _user_headers ("alice@example.com" ),
788+ )
789+ resp = client .post (
790+ "/api/v1/tasks/t1/run" ,
791+ headers = _user_headers ("bob@example.com" ),
792+ )
793+ assert resp .status_code == 403
794+
795+ def test_admin_can_update_another_users_task (self , client : TestClient ):
796+ """An admin can PUT another user's task."""
797+ client .post (
798+ "/api/v1/tasks" ,
799+ json = _cron_task ("t1" ),
800+ headers = _user_headers ("bob@example.com" ),
801+ )
802+ updated_payload = _cron_task ("t1" )
803+ updated_payload ["name" ] = "renamed by admin"
804+ resp = client .put (
805+ "/api/v1/tasks/t1" ,
806+ json = updated_payload ,
807+ headers = _admin_headers (),
808+ )
809+ assert resp .status_code == 200
810+ # Ownership is preserved across an admin-initiated update.
811+ listed = client .get ("/api/v1/tasks" , headers = _admin_headers ()).json ()
812+ owners = {t ["id" ]: t ["owner_id" ] for t in listed }
813+ assert owners ["t1" ] == "bob@example.com"
814+
815+ def test_admin_can_delete_another_users_task (self , client : TestClient ):
816+ """An admin can DELETE another user's task."""
817+ client .post (
818+ "/api/v1/tasks" ,
819+ json = _cron_task ("t1" ),
820+ headers = _user_headers ("bob@example.com" ),
821+ )
822+ resp = client .delete ("/api/v1/tasks/t1" , headers = _admin_headers ())
823+ assert resp .status_code == 204
824+
825+
826+ class TestAdminAuditLogging :
827+ """Section 4.4: admin actions on someone else's task emit a log line."""
828+
829+ def test_admin_update_emits_audit_log (
830+ self , client : TestClient , caplog : pytest .LogCaptureFixture
831+ ):
832+ """Admin PUT on another user's task emits a log line with both emails + action."""
833+ client .post (
834+ "/api/v1/tasks" ,
835+ json = _cron_task ("t1" ),
836+ headers = _user_headers ("bob@example.com" ),
837+ )
838+ updated_payload = _cron_task ("t1" )
839+ updated_payload ["name" ] = "renamed by admin"
840+ caplog .clear ()
841+ with caplog .at_level ("INFO" , logger = "autonomous_agents" ):
842+ resp = client .put (
843+ "/api/v1/tasks/t1" ,
844+ json = updated_payload ,
845+ headers = {
846+ "X-Authenticated-User-Email" : "admin@example.com" ,
847+ "X-Authenticated-User-Is-Admin" : "true" ,
848+ },
849+ )
850+ assert resp .status_code == 200
851+ admin_log_lines = [
852+ r .getMessage ()
853+ for r in caplog .records
854+ if "Admin " in r .getMessage () and "acted on task" in r .getMessage ()
855+ ]
856+ assert admin_log_lines , "expected an admin audit log line on PUT"
857+ msg = admin_log_lines [0 ]
858+ assert "admin@example.com" in msg
859+ assert "bob@example.com" in msg
860+ assert "t1" in msg
861+ assert "update" in msg
862+
863+ def test_admin_delete_emits_audit_log (
864+ self , client : TestClient , caplog : pytest .LogCaptureFixture
865+ ):
866+ """Admin DELETE on another user's task emits a log line with both emails + action."""
867+ client .post (
868+ "/api/v1/tasks" ,
869+ json = _cron_task ("t1" ),
870+ headers = _user_headers ("bob@example.com" ),
871+ )
872+ caplog .clear ()
873+ with caplog .at_level ("INFO" , logger = "autonomous_agents" ):
874+ resp = client .delete (
875+ "/api/v1/tasks/t1" ,
876+ headers = {
877+ "X-Authenticated-User-Email" : "admin@example.com" ,
878+ "X-Authenticated-User-Is-Admin" : "true" ,
879+ },
880+ )
881+ assert resp .status_code == 204
882+ admin_log_lines = [
883+ r .getMessage ()
884+ for r in caplog .records
885+ if "Admin " in r .getMessage () and "acted on task" in r .getMessage ()
886+ ]
887+ assert admin_log_lines , "expected an admin audit log line on DELETE"
888+ msg = admin_log_lines [0 ]
889+ assert "admin@example.com" in msg
890+ assert "bob@example.com" in msg
891+ assert "delete" in msg
892+
893+ def test_admin_trigger_emits_audit_log (
894+ self , client : TestClient , caplog : pytest .LogCaptureFixture
895+ ):
896+ """Admin manual trigger on another user's task emits a log line."""
897+ client .post (
898+ "/api/v1/tasks" ,
899+ json = _cron_task ("t1" ),
900+ headers = _user_headers ("bob@example.com" ),
901+ )
902+ caplog .clear ()
903+ with caplog .at_level ("INFO" , logger = "autonomous_agents" ):
904+ resp = client .post (
905+ "/api/v1/tasks/t1/run" ,
906+ headers = {
907+ "X-Authenticated-User-Email" : "admin@example.com" ,
908+ "X-Authenticated-User-Is-Admin" : "true" ,
909+ },
910+ )
911+ assert resp .status_code == 200
912+ admin_log_lines = [
913+ r .getMessage ()
914+ for r in caplog .records
915+ if "Admin " in r .getMessage () and "acted on task" in r .getMessage ()
916+ ]
917+ assert admin_log_lines , "expected an admin audit log line on manual trigger"
918+ msg = admin_log_lines [0 ]
919+ assert "admin@example.com" in msg
920+ assert "bob@example.com" in msg
921+ assert "trigger" in msg
922+
923+ def test_owner_action_does_not_emit_admin_audit_log (
924+ self , client : TestClient , caplog : pytest .LogCaptureFixture
925+ ):
926+ """Owner acting on their own task must not emit the cross-user admin audit log line."""
927+ client .post (
928+ "/api/v1/tasks" ,
929+ json = _cron_task ("t1" ),
930+ headers = _user_headers ("bob@example.com" ),
931+ )
932+ caplog .clear ()
933+ with caplog .at_level ("INFO" , logger = "autonomous_agents" ):
934+ resp = client .delete (
935+ "/api/v1/tasks/t1" ,
936+ headers = _user_headers ("bob@example.com" ),
937+ )
938+ assert resp .status_code == 204
939+ admin_log_lines = [
940+ r .getMessage ()
941+ for r in caplog .records
942+ if "Admin " in r .getMessage () and "acted on task" in r .getMessage ()
943+ ]
944+ assert admin_log_lines == [], (
945+ "owner deleting their own task should not emit the admin audit line"
946+ )
0 commit comments