-
-
Notifications
You must be signed in to change notification settings - Fork 303
Expand file tree
/
Copy paththrottling.py
More file actions
80 lines (62 loc) · 2.66 KB
/
throttling.py
File metadata and controls
80 lines (62 loc) · 2.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
from django.core.exceptions import ImproperlyConfigured
from rest_framework.exceptions import Throttled
from rest_framework.throttling import UserRateThrottle
from rest_framework.views import exception_handler
class PermissionBasedUserRateThrottle(UserRateThrottle):
"""
Throttles authenticated users based on their assigned permissions.
If no throttling permission is assigned, defaults to `medium` throttling
for authenticated users and `anon` for unauthenticated users.
"""
def __init__(self):
pass
def allow_request(self, request, view):
user = request.user
throttling_tier = "medium"
if not user or not user.is_authenticated:
throttling_tier = "anon"
elif user.has_perm("vulnerabilities.throttle_3_unrestricted"):
return True
elif user.has_perm("vulnerabilities.throttle_2_high"):
throttling_tier = "high"
elif user.has_perm("vulnerabilities.throttle_1_medium"):
throttling_tier = "medium"
elif user.has_perm("vulnerabilities.throttle_0_low"):
throttling_tier = "low"
self.rate = self.get_throttle_rate(throttling_tier)
self.num_requests, self.duration = self.parse_rate(self.rate)
return super().allow_request(request, view)
def get_throttle_rate(self, tier):
try:
return self.THROTTLE_RATES[tier]
except KeyError:
msg = f"No throttle rate set for {tier}."
raise ImproperlyConfigured(msg)
class AnonUserUIThrottle(UserRateThrottle):
scope = "ui"
def allow_request(self, request, view):
self.rate = self.THROTTLE_RATES.get("ui")
self.num_requests, self.duration = self.parse_rate(self.rate)
return super().allow_request(request, view)
def get_cache_key(self, request, view):
ident = self.get_ident(request)
return f"throttle_ui_{ident}"
def throttled_exception_handler(exception, context):
"""
Return this response whenever a request has been throttled
"""
response = exception_handler(exception, context)
if isinstance(exception, Throttled):
response_data = {
"message": "Your request has been throttled. Please contact support@nexb.com"
}
response.data = response_data
return response