-
Notifications
You must be signed in to change notification settings - Fork 850
Expand file tree
/
Copy pathcsrf_test_client.py
More file actions
148 lines (120 loc) · 4.74 KB
/
csrf_test_client.py
File metadata and controls
148 lines (120 loc) · 4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import re
import flask
from flask import current_app, testing
from werkzeug.datastructures import Headers
from werkzeug.test import EnvironBuilder
from flask_wtf.csrf import generate_csrf
import config
import sys
class RequestShim():
"""
A fake request that proxies cookie-related methods to a Flask test client.
"""
def __init__(self, client):
self.client = client
def set_cookie(self, key, value='', *args, **kwargs):
"Set the cookie on the Flask test client."
if kwargs['domain'] is None:
kwargs['domain'] = current_app.config["SERVER_NAME"] or "localhost"
return self.client.set_cookie(
key=key, value=value, *args, **kwargs
)
def delete_cookie(self, key, *args, **kwargs):
"Delete the cookie on the Flask test client."
server_name = current_app.config["SERVER_NAME"] or "localhost"
return self.client.delete_cookie(
server_name, key=key, *args, **kwargs
)
class TestClient(testing.FlaskClient):
def __init__(self, *args, **kwargs):
self.csrf_token = None
self.app = None
super().__init__(*args, **kwargs)
def setApp(self, _app):
self.app = _app
def open(self, *args, **kwargs):
if len(args) > 0 and isinstance(args[0], (EnvironBuilder, dict)):
return super().open(*args, **kwargs)
data = kwargs.get('data', {})
if self.csrf_token is not None and not (
'email' in data and
'password' in data and
'csrf_token' in data
):
api_key_headers = Headers({})
api_key_headers[
getattr(config, 'WTF_CSRF_HEADERS', ['X-CSRFToken'])[0]
] = self.csrf_token
headers = kwargs.pop('headers', Headers())
headers.extend(api_key_headers)
kwargs['headers'] = headers
return super().open(*args, **kwargs)
def fetch_csrf(self, res):
m = re.search(
b'<input id="csrf_token" name="csrf_token" type="hidden"'
b' value="([^"]*)">', res.data
)
if m is None:
# When login through Kerberos, we won't find the CSRF
return None
return m.group(1).decode("utf-8")
def generate_csrf_token(self, *args, **kwargs):
# First, we'll wrap our request shim around the test client, so
# that it will work correctly when Flask asks it to set a cookie.
request = RequestShim(self)
# Next, we need to look up any cookies that might already exist on
# this test client, such as the secure cookie that
# powers `flask.session`,
# and make a test request context that has those cookies in it.
server_name = self.app.config.get("SERVER_NAME") or "localhost"
environ_overrides = {
'wsgi.url_scheme': 'http',
'HTTP_HOST': server_name
}
self._add_cookies_to_wsgi(environ_overrides)
with self.app.test_request_context(
environ_overrides=environ_overrides
):
# Now, we call Flask-WTF's method of generating a CSRF token...
csrf_token = generate_csrf()
# ...which also sets a value in `flask.session`, so we need to
# ask Flask to save that value to the cookie jar in the test
# client. This is where we actually use that request shim we
# made!
self.app.session_interface.save_session(
self.app, flask.session, request)
return csrf_token
def login(self, email, password, _follow_redirects=False,
headers=None, extra_form_data=dict()):
csrf_token = None
if config.SERVER_MODE is True:
res = self.get('/login',
follow_redirects=_follow_redirects)
csrf_token = self.fetch_csrf(res)
if csrf_token is None:
csrf_token = self.generate_csrf_token()
form_data = dict(
email=email,
password=password,
csrf_token=csrf_token
)
if extra_form_data:
form_data.update(extra_form_data)
res = self.post(
'/authenticate/login', data=form_data,
follow_redirects=_follow_redirects,
headers=headers
)
self.csrf_token = csrf_token
return res
def logout(self):
self.get('/logout?next=/browser/', follow_redirects=False)
self.csrf_token = None