|
3 | 3 | from enum import Enum |
4 | 4 | from typing import Any, Dict, List, Optional, Union |
5 | 5 | from dataclasses import dataclass, asdict, field |
6 | | - |
| 6 | +from socketdev.exceptions import APIFailure |
7 | 7 |
|
8 | 8 | from ..utils import IntegrationType, Utils |
9 | 9 |
|
@@ -710,137 +710,137 @@ def create_params_string(self, params: dict) -> str: |
710 | 710 | def get(self, org_slug: str, params: dict, use_types: bool = False) -> Union[dict, GetFullScanMetadataResponse]: |
711 | 711 | params_arg = self.create_params_string(params) |
712 | 712 | path = "orgs/" + org_slug + "/full-scans" + str(params_arg) |
713 | | - response = self.api.do_request(path=path) |
714 | | - |
715 | | - if response.status_code == 200: |
716 | | - result = response.json() |
717 | | - if use_types: |
718 | | - return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
719 | | - return result |
720 | | - |
721 | | - error_message = response.json().get("error", {}).get("message", "Unknown error") |
722 | | - log.error(f"Error getting full scan metadata: {response.status_code}, message: {error_message}") |
723 | | - if use_types: |
724 | | - return GetFullScanMetadataResponse.from_dict( |
725 | | - {"success": False, "status": response.status_code, "message": error_message} |
726 | | - ) |
| 713 | + try: |
| 714 | + response = self.api.do_request(path=path) |
| 715 | + if response.status_code == 200: |
| 716 | + result = response.json() |
| 717 | + if use_types: |
| 718 | + return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
| 719 | + return result |
| 720 | + except APIFailure as e: |
| 721 | + log.error(f"Socket SDK: API failure while getting full scan metadata {e}") |
| 722 | + raise |
| 723 | + except Exception as e: |
| 724 | + log.error(f"Socket SDK: Unexpected error while getting full scan metadata {e}") |
| 725 | + raise |
| 726 | + |
727 | 727 | return {} |
728 | 728 |
|
729 | 729 | def post(self, files: list, params: FullScanParams, use_types: bool = False) -> Union[dict, CreateFullScanResponse]: |
730 | 730 | Utils.validate_integration_type(params.integration_type if params.integration_type else "api") |
731 | 731 | org_slug = str(params.org_slug) |
732 | 732 | params_dict = params.to_dict() |
733 | 733 | params_dict.pop("org_slug") |
734 | | - |
735 | 734 | params_arg = self.create_params_string(params_dict) |
736 | | - |
737 | 735 | path = "orgs/" + org_slug + "/full-scans" + str(params_arg) |
738 | 736 |
|
739 | | - response = self.api.do_request(path=path, method="POST", files=files) |
740 | | - |
741 | | - if response.status_code == 201: |
742 | | - result = response.json() |
743 | | - if use_types: |
744 | | - return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result}) |
745 | | - return result |
| 737 | + try: |
| 738 | + response = self.api.do_request(path=path, method="POST", files=files) |
| 739 | + if response.status_code == 201: |
| 740 | + result = response.json() |
| 741 | + if use_types: |
| 742 | + return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result}) |
| 743 | + return result |
| 744 | + except APIFailure as e: |
| 745 | + log.error(f"Socket SDK: API failure while posting full scan {e}") |
| 746 | + raise |
| 747 | + except Exception as e: |
| 748 | + log.error(f"Socket SDK: Unexpected error while posting full scan {e}") |
| 749 | + raise |
746 | 750 |
|
747 | | - error_message = response.json().get("error", {}).get("message", "Unknown error") |
748 | | - log.error(f"Error posting {files} to the Fullscans API: {response.status_code}, message: {error_message}") |
749 | | - if use_types: |
750 | | - return CreateFullScanResponse.from_dict( |
751 | | - {"success": False, "status": response.status_code, "message": error_message} |
752 | | - ) |
753 | 751 | return {} |
754 | 752 |
|
755 | 753 | def delete(self, org_slug: str, full_scan_id: str) -> dict: |
756 | 754 | path = "orgs/" + org_slug + "/full-scans/" + full_scan_id |
| 755 | + try: |
| 756 | + response = self.api.do_request(path=path, method="DELETE") |
| 757 | + if response.status_code == 200: |
| 758 | + return response.json() |
| 759 | + except APIFailure as e: |
| 760 | + log.error(f"Socket SDK: API failure while deleting full scan {e}") |
| 761 | + raise |
| 762 | + except Exception as e: |
| 763 | + log.error(f"Socket SDK: Unexpected error while deleting full scan {e}") |
| 764 | + raise |
757 | 765 |
|
758 | | - response = self.api.do_request(path=path, method="DELETE") |
759 | | - |
760 | | - if response.status_code == 200: |
761 | | - result = response.json() |
762 | | - return result |
763 | | - |
764 | | - error_message = response.json().get("error", {}).get("message", "Unknown error") |
765 | | - log.error(f"Error deleting full scan: {response.status_code}, message: {error_message}") |
766 | 766 | return {} |
767 | 767 |
|
768 | 768 | def stream_diff( |
769 | 769 | self, org_slug: str, before: str, after: str, use_types: bool = False |
770 | 770 | ) -> Union[dict, StreamDiffResponse]: |
771 | 771 | path = f"orgs/{org_slug}/full-scans/diff?before={before}&after={after}" |
| 772 | + try: |
| 773 | + response = self.api.do_request(path=path, method="GET") |
| 774 | + if response.status_code == 200: |
| 775 | + result = response.json() |
| 776 | + if use_types: |
| 777 | + return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result}) |
| 778 | + return result |
| 779 | + except APIFailure as e: |
| 780 | + log.error(f"Socket SDK: API failure while streaming diff {e}") |
| 781 | + raise |
| 782 | + except Exception as e: |
| 783 | + log.error(f"Socket SDK: Unexpected error while streaming diff {e}") |
| 784 | + raise |
772 | 785 |
|
773 | | - response = self.api.do_request(path=path, method="GET") |
774 | | - |
775 | | - if response.status_code == 200: |
776 | | - result = response.json() |
777 | | - if use_types: |
778 | | - return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result}) |
779 | | - return result |
780 | | - |
781 | | - error_message = response.json().get("error", {}).get("message", "Unknown error") |
782 | | - log.error(f"Error streaming diff: {response.status_code}, message: {error_message}") |
783 | | - if use_types: |
784 | | - return StreamDiffResponse.from_dict( |
785 | | - {"success": False, "status": response.status_code, "message": error_message} |
786 | | - ) |
787 | 786 | return {} |
788 | 787 |
|
789 | 788 | def stream(self, org_slug: str, full_scan_id: str, use_types: bool = False) -> Union[dict, FullScanStreamResponse]: |
790 | 789 | path = "orgs/" + org_slug + "/full-scans/" + full_scan_id |
791 | | - response = self.api.do_request(path=path, method="GET") |
792 | | - |
793 | | - if response.status_code == 200: |
794 | | - try: |
795 | | - stream_str = [] |
796 | | - artifacts = {} |
797 | | - result = response.text |
798 | | - result = result.strip('"').strip() |
799 | | - for line in result.split("\n"): |
800 | | - if line != '"' and line != "" and line is not None: |
801 | | - item = json.loads(line) |
802 | | - stream_str.append(item) |
803 | | - for val in stream_str: |
804 | | - artifacts[val["id"]] = val |
805 | | - |
806 | | - if use_types: |
807 | | - return FullScanStreamResponse.from_dict({"success": True, "status": 200, "artifacts": artifacts}) |
808 | | - return artifacts |
| 790 | + try: |
| 791 | + response = self.api.do_request(path=path, method="GET") |
| 792 | + if response.status_code == 200: |
| 793 | + try: |
| 794 | + stream_str = [] |
| 795 | + artifacts = {} |
| 796 | + result = response.text.strip('"').strip() |
| 797 | + for line in result.split("\n"): |
| 798 | + if line != '"' and line != "" and line is not None: |
| 799 | + item = json.loads(line) |
| 800 | + stream_str.append(item) |
| 801 | + for val in stream_str: |
| 802 | + artifacts[val["id"]] = val |
| 803 | + |
| 804 | + if use_types: |
| 805 | + return FullScanStreamResponse.from_dict( |
| 806 | + {"success": True, "status": 200, "artifacts": artifacts} |
| 807 | + ) |
| 808 | + return artifacts |
| 809 | + |
| 810 | + except Exception as e: |
| 811 | + error_message = f"Error parsing stream response: {str(e)}" |
| 812 | + log.error(error_message) |
| 813 | + if use_types: |
| 814 | + return FullScanStreamResponse.from_dict( |
| 815 | + {"success": False, "status": response.status_code, "message": error_message} |
| 816 | + ) |
| 817 | + return {} |
| 818 | + |
| 819 | + except APIFailure as e: |
| 820 | + log.error(f"Socket SDK: API failure while streaming full scan {e}") |
| 821 | + raise |
| 822 | + except Exception as e: |
| 823 | + log.error(f"Socket SDK: Unexpected error while streaming full scan {e}") |
| 824 | + raise |
809 | 825 |
|
810 | | - except Exception as e: |
811 | | - error_message = f"Error parsing stream response: {str(e)}" |
812 | | - log.error(error_message) |
813 | | - if use_types: |
814 | | - return FullScanStreamResponse.from_dict( |
815 | | - {"success": False, "status": response.status_code, "message": error_message} |
816 | | - ) |
817 | | - return {} |
818 | | - |
819 | | - error_message = response.json().get("error", {}).get("message", "Unknown error") |
820 | | - log.error(f"Error streaming full scan: {response.status_code}, message: {error_message}") |
821 | | - if use_types: |
822 | | - return FullScanStreamResponse.from_dict( |
823 | | - {"success": False, "status": response.status_code, "message": error_message} |
824 | | - ) |
825 | 826 | return {} |
826 | 827 |
|
827 | 828 | def metadata( |
828 | 829 | self, org_slug: str, full_scan_id: str, use_types: bool = False |
829 | 830 | ) -> Union[dict, GetFullScanMetadataResponse]: |
830 | 831 | path = "orgs/" + org_slug + "/full-scans/" + full_scan_id + "/metadata" |
| 832 | + try: |
| 833 | + response = self.api.do_request(path=path, method="GET") |
| 834 | + if response.status_code == 200: |
| 835 | + result = response.json() |
| 836 | + if use_types: |
| 837 | + return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
| 838 | + return result |
| 839 | + except APIFailure as e: |
| 840 | + log.error(f"Socket SDK: API failure while getting metadata {e}") |
| 841 | + raise |
| 842 | + except Exception as e: |
| 843 | + log.error(f"Socket SDK: Unexpected error while getting metadata {e}") |
| 844 | + raise |
831 | 845 |
|
832 | | - response = self.api.do_request(path=path, method="GET") |
833 | | - |
834 | | - if response.status_code == 200: |
835 | | - result = response.json() |
836 | | - if use_types: |
837 | | - return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
838 | | - return result |
839 | | - |
840 | | - error_message = response.json().get("error", {}).get("message", "Unknown error") |
841 | | - log.error(f"Error getting metadata: {response.status_code}, message: {error_message}") |
842 | | - if use_types: |
843 | | - return GetFullScanMetadataResponse.from_dict( |
844 | | - {"success": False, "status": response.status_code, "message": error_message} |
845 | | - ) |
846 | 846 | return {} |
0 commit comments