|
16 | 16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | 17 | # |
18 | 18 | ######################################################################### |
19 | | - |
20 | | -from geonode.base.models import ResourceBase |
21 | | -from geonode.geoserver.helpers import gs_catalog |
22 | 19 | import os |
23 | | -import shutil |
24 | 20 | import logging |
25 | | -import tempfile |
26 | | -from io import IOBase |
27 | | -from urllib.request import urljoin |
28 | | - |
29 | | -from django.conf import settings |
30 | 21 |
|
31 | 22 | from django.urls import reverse |
32 | | -from django.contrib.auth import authenticate, get_user_model |
33 | | -from django.test.utils import override_settings |
34 | | - |
35 | | -from requests_toolbelt.multipart.encoder import MultipartEncoder |
| 23 | +from django.contrib.auth import get_user_model |
36 | 24 |
|
37 | 25 | from rest_framework.test import APITestCase |
38 | 26 |
|
39 | | -from seleniumrequests import Firefox |
40 | | - |
41 | | -# from selenium.common import exceptions |
42 | | -# from selenium.webdriver.common.by import By |
43 | | -from selenium.webdriver import FirefoxOptions |
44 | | -from selenium.webdriver.support.wait import WebDriverWait |
45 | | -from selenium.webdriver.support import expected_conditions as EC |
46 | | -from selenium.webdriver.firefox.firefox_binary import FirefoxBinary |
47 | | - |
48 | | -from webdriver_manager.firefox import GeckoDriverManager |
49 | | - |
50 | | -from geonode.tests.base import GeoNodeLiveTestSupport |
51 | 27 | from geonode.geoserver.helpers import ogc_server_settings |
52 | 28 | from geonode.upload.models import UploadSizeLimit, UploadParallelismLimit |
53 | 29 |
|
|
60 | 36 | logger = logging.getLogger("importer") |
61 | 37 |
|
62 | 38 |
|
63 | | -@override_settings( |
64 | | - DEBUG=True, |
65 | | - ALLOWED_HOSTS=["*"], |
66 | | - SITEURL=LIVE_SERVER_URL, |
67 | | - CSRF_COOKIE_SECURE=False, |
68 | | - CSRF_COOKIE_HTTPONLY=False, |
69 | | - CORS_ORIGIN_ALLOW_ALL=True, |
70 | | - SESSION_COOKIE_SECURE=False, |
71 | | - DEFAULT_MAX_PARALLEL_UPLOADS_PER_USER=5, |
72 | | -) |
73 | | -class UploadApiTests(GeoNodeLiveTestSupport, APITestCase): |
74 | | - port = 0 |
75 | | - |
76 | | - @classmethod |
77 | | - def setUpClass(cls): |
78 | | - super().setUpClass() |
79 | | - |
80 | | - try: |
81 | | - """Instantiate selenium driver instance""" |
82 | | - binary = FirefoxBinary("/usr/bin/firefox") |
83 | | - opts = FirefoxOptions() |
84 | | - opts.add_argument("--headless") |
85 | | - executable_path = GeckoDriverManager().install() |
86 | | - cls.selenium = Firefox(firefox_binary=binary, firefox_options=opts, executable_path=executable_path) |
87 | | - cls.selenium.implicitly_wait(10) |
88 | | - except Exception as e: |
89 | | - logger.error(e) |
90 | | - |
91 | | - @classmethod |
92 | | - def tearDownClass(cls): |
93 | | - """Quit selenium driver instance""" |
94 | | - try: |
95 | | - cls.selenium.quit() |
96 | | - except Exception as e: |
97 | | - logger.debug(e) |
98 | | - super().tearDownClass() |
99 | | - |
100 | | - def setUp(self): |
101 | | - super().setUp() |
102 | | - self.temp_folder = tempfile.mkdtemp(dir=CURRENT_LOCATION) |
103 | | - self.session_id = None |
104 | | - self.csrf_token = None |
105 | | - |
106 | | - def tearDown(self): |
107 | | - shutil.rmtree(self.temp_folder, ignore_errors=True) |
108 | | - return super().tearDown() |
109 | | - |
110 | | - def set_session_cookies(self, url=None): |
111 | | - # selenium will set cookie domain based on current page domain |
112 | | - self.selenium.get(url or f"{self.live_server_url}/") |
113 | | - self.csrf_token = self.selenium.get_cookie("csrftoken")["value"] |
114 | | - self.session_id = self.selenium.get_cookie(settings.SESSION_COOKIE_NAME)["value"] |
115 | | - self.selenium.add_cookie( |
116 | | - {"name": settings.SESSION_COOKIE_NAME, "value": self.session_id, "secure": False, "path": "/"} |
117 | | - ) |
118 | | - self.selenium.add_cookie({"name": "csrftoken", "value": self.csrf_token, "secure": False, "path": "/"}) |
119 | | - |
120 | | - def click_button(self, label): |
121 | | - selector = f"//button[contains(., '{label}')]" |
122 | | - self.selenium.find_element_by_xpath(selector).click() |
123 | | - |
124 | | - def do_login(self, username="admin", password="admin"): |
125 | | - """Method to login the GeoNode site""" |
126 | | - assert authenticate(username=username, password=password) |
127 | | - self.assertTrue(self.client.login(username=username, password=password)) # Native django test client |
128 | | - |
129 | | - url = urljoin(settings.SITEURL, f"{reverse('account_login')}?next=/layers") |
130 | | - self.set_session_cookies(url) |
131 | | - self.selenium.save_screenshot(os.path.join(self.temp_folder, "login.png")) |
132 | | - |
133 | | - title = self.selenium.title |
134 | | - current_url = self.selenium.current_url |
135 | | - logger.debug(f" ---- title: {title} / current_url: {current_url}") |
136 | | - |
137 | | - username_input = self.selenium.find_element_by_xpath('//input[@id="id_login"][@type="text"]') |
138 | | - username_input.send_keys(username) |
139 | | - password_input = self.selenium.find_element_by_xpath('//input[@id="id_password"][@type="password"]') |
140 | | - password_input.send_keys(password) |
141 | | - |
142 | | - self.selenium.save_screenshot(os.path.join(self.temp_folder, "login-set_fields.png")) |
143 | | - self.click_button("Sign In") |
144 | | - self.selenium.save_screenshot(os.path.join(self.temp_folder, "login-sign_in.png")) |
145 | | - |
146 | | - title = self.selenium.title |
147 | | - current_url = self.selenium.current_url |
148 | | - logger.debug(f" ---- title: {title} / current_url: {current_url}") |
149 | | - |
150 | | - # Wait until the response is received |
151 | | - WebDriverWait(self.selenium, 10).until(EC.title_contains("Explore Layers")) |
152 | | - self.set_session_cookies(url) |
153 | | - |
154 | | - def do_logout(self): |
155 | | - url = urljoin(settings.SITEURL, f"{reverse('account_logout')}") |
156 | | - self.selenium.get(url) |
157 | | - self.click_button("Log out") |
158 | | - |
159 | | - def do_upload_step(self, step=None): |
160 | | - step = urljoin(settings.SITEURL, reverse("data_upload", args=[step] if step else [])) |
161 | | - return step |
162 | | - |
163 | | - def live_upload_file(self, _file): |
164 | | - """function that uploads a file, or a collection of files, to |
165 | | - the GeoNode""" |
166 | | - spatial_files = ("dbf_file", "shx_file", "prj_file") |
167 | | - base, ext = os.path.splitext(_file) |
168 | | - params = { |
169 | | - # make public since wms client doesn't do authentication |
170 | | - "csrfmiddlewaretoken": self.csrf_token, |
171 | | - "permissions": '{ "users": {"AnonymousUser": ["view_resourcebase"]} , "groups":{}}', |
172 | | - "time": "false", |
173 | | - "charset": "UTF-8", |
174 | | - } |
175 | | - cookies = {settings.SESSION_COOKIE_NAME: self.session_id, "csrftoken": self.csrf_token} |
176 | | - headers = { |
177 | | - "X-CSRFToken": self.csrf_token, |
178 | | - "X-Requested-With": "XMLHttpRequest", |
179 | | - "Set-Cookie": f"csrftoken={self.csrf_token}; sessionid={self.session_id}", |
180 | | - } |
181 | | - url = self.do_upload_step() |
182 | | - logger.debug(f" ---- UPLOAD URL: {url} / cookies: {cookies} / headers: {headers}") |
183 | | - |
184 | | - # deal with shapefiles |
185 | | - if ext.lower() == ".shp": |
186 | | - for spatial_file in spatial_files: |
187 | | - ext, _ = spatial_file.split("_") |
188 | | - file_path = f"{base}.{ext}" |
189 | | - # sometimes a shapefile is missing an extra file, |
190 | | - # allow for that |
191 | | - if os.path.exists(file_path): |
192 | | - params[spatial_file] = open(file_path, "rb") |
193 | | - |
194 | | - with open(_file, "rb") as base_file: |
195 | | - params["base_file"] = base_file |
196 | | - for name, value in params.items(): |
197 | | - if isinstance(value, IOBase): |
198 | | - params[name] = (os.path.basename(value.name), value) |
199 | | - |
200 | | - # refresh to exchange cookies with the server. |
201 | | - self.selenium.refresh() |
202 | | - self.selenium.get(url) |
203 | | - self.selenium.save_screenshot(os.path.join(self.temp_folder, "upload-page.png")) |
204 | | - logger.debug(f" ------------ UPLOAD FORM: {params}") |
205 | | - encoder = MultipartEncoder(fields=params) |
206 | | - headers["Content-Type"] = encoder.content_type |
207 | | - response = self.selenium.request("POST", url, data=encoder, headers=headers) |
208 | | - |
209 | | - # Closes the files |
210 | | - for spatial_file in spatial_files: |
211 | | - if isinstance(params.get(spatial_file), IOBase): |
212 | | - params[spatial_file].close() |
213 | | - |
214 | | - try: |
215 | | - logger.error(f" -- response: {response.status_code} / {response.json()}") |
216 | | - return response, response.json() |
217 | | - except ValueError: |
218 | | - logger.exception(ValueError(f"probably not json, status {response.status_code} / {response.content}")) |
219 | | - return response, response.content |
220 | | - |
221 | | - def _cleanup_layer(self, layer_name): |
222 | | - # removing the layer from geonode |
223 | | - x = ResourceBase.objects.filter(alternate__icontains=layer_name) |
224 | | - if x.exists(): |
225 | | - for el in x.iterator(): |
226 | | - el.delete() |
227 | | - # removing the layer from geoserver |
228 | | - dataset = gs_catalog.get_layer(layer_name) |
229 | | - if dataset: |
230 | | - gs_catalog.delete(dataset, purge="all", recurse=True) |
231 | | - # removing the layer from geoserver |
232 | | - store = gs_catalog.get_store(layer_name, workspace="geonode") |
233 | | - if store: |
234 | | - gs_catalog.delete(store, purge="all", recurse=True) |
235 | | - |
236 | | - |
237 | 39 | class UploadSizeLimitTests(APITestCase): |
238 | 40 | fixtures = [ |
239 | 41 | "group_test_data.json", |
|
0 commit comments