2929
3030from collections import namedtuple
3131import codecs
32+ import importlib
3233from io import BytesIO
34+ import os
35+ import sys
3336from urllib .parse import urlencode , urlparse
3437
35- from werkzeug . datastructures import MultiDict
38+ from tests . support . suppconst import MIG_BASE
3639
40+ OBJECTS_TYPE = 'objects'
3741
38- # named type representing the tuple that is passed to WSGI handlers
39- _PreparedWsgi = namedtuple ('_PreparedWsgi' , ['environ' , 'start_response' ])
42+
43+ def _import_forcibly (module_name , relative_module_dir = None ):
44+ """Custom import function to allow an import of a file for testing
45+ that resides within a non-module directory."""
46+
47+ module_path = os .path .join (MIG_BASE , 'mig' )
48+ if relative_module_dir is not None :
49+ module_path = os .path .join (module_path , relative_module_dir )
50+ sys .path .append (module_path )
51+ mod = importlib .import_module (module_name )
52+ sys .path .pop (- 1 ) # do not leave the forced module path
53+ return mod
54+
55+
56+ migwsgi = _import_forcibly ('migwsgi' , relative_module_dir = 'wsgi-bin' )
4057
4158
4259class FakeWsgiStartResponse :
@@ -51,7 +68,29 @@ def __call__(self, status, headers, exc=None):
5168 self .calls .append ((status , headers , exc ))
5269
5370
54- def create_wsgi_environ (configuration , wsgi_url , method = 'GET' , query = None , headers = None , form = None ):
71+ def _urlencode_form (form_content ):
72+ """
73+ Convert a data structure describing form contents to byte string
74+ that can be directly sent as the body of an HTTP request.
75+ """
76+
77+ field_key_and_value_pairs = []
78+ if isinstance (form_content , dict ):
79+ for key , value in form_content .items ():
80+ if isinstance (value , list ):
81+ for item in value :
82+ field_key_and_value_pairs .append ((key , item ))
83+ continue
84+ field_key_and_value_pairs .append ((key , value ))
85+ elif isinstance (form_content , list ):
86+ field_key_and_value_pairs = form_content
87+ else :
88+ raise AssertionError ("invalid form content" )
89+ return urlencode (field_key_and_value_pairs , doseq = True ).encode ('ascii' )
90+
91+
92+ def create_wsgi_environ (configuration , wsgi_url , method = None ,
93+ query = None , headers = None , form = None , mig_user_dn = None ):
5594 """Populate the necessary variables that will constitute a valid WSGI
5695 environment given a URL to which we will make a requests under test and
5796 various other options that set up the nature of that request."""
@@ -67,7 +106,7 @@ def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, heade
67106 method = 'POST'
68107 request_query = ''
69108
70- body = urlencode ( MultiDict ( form )). encode ( 'ascii' )
109+ body = _urlencode_form ( form )
71110
72111 headers = headers or {}
73112 if not 'Content-Type' in headers :
@@ -76,6 +115,7 @@ def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, heade
76115 headers ['Content-Length' ] = str (len (body ))
77116 wsgi_input = BytesIO (body )
78117 else :
118+ assert method is not None , "method required with no payload specified"
79119 request_query = parsed_url .query
80120 wsgi_input = ()
81121
@@ -99,6 +139,16 @@ def close(self, *ars, **kwargs):
99139 environ ['SCRIPT_URI' ] = '' .join (
100140 ('http://' , environ ['HTTP_HOST' ], environ ['PATH_INFO' ]))
101141
142+ if mig_user_dn :
143+ environ ['REMOTE_USER' ] = mig_user_dn
144+
145+ path_parts = parsed_url .path .split ('/' )
146+ maybe_script_name = path_parts [- 1 ]
147+ _ , script_ext = os .path .splitext (path_parts [- 1 ])
148+ if script_ext != '' :
149+ # the script has an extension, so treat it as a functionality file
150+ environ ['SCRIPT_NAME' ] = maybe_script_name
151+
102152 if headers :
103153 for k , v in headers .items ():
104154 header_key = k .replace ('-' , '_' ).upper ()
@@ -112,38 +162,74 @@ def close(self, *ars, **kwargs):
112162 return environ
113163
114164
115- def create_wsgi_start_response ():
116- return FakeWsgiStartResponse ()
165+ class _PreparedWsgi :
166+ """
167+ Object representing a simulated WSGI request to be exercised by a test case.
168+ """
117169
170+ def __init__ (self , configuration , url , ** kwargs ):
171+ self .configuration = configuration
172+ self .environ = create_wsgi_environ (configuration , url , ** kwargs )
173+ self .start_response = FakeWsgiStartResponse ()
118174
119- def prepare_wsgi (configuration , url , ** kwargs ):
120- return _PreparedWsgi (
121- create_wsgi_environ (configuration , url , ** kwargs ),
122- create_wsgi_start_response ()
123- )
175+ def __iter__ (self ):
176+ return iter ((self .environ , self .start_response ))
177+
178+ def _bind_invocation (self ):
179+ self .application_args = (
180+ self .environ ,
181+ self .start_response ,
182+ )
183+
184+ self .application_kwargs = dict (
185+ configuration = self .configuration ,
186+ _set_os_environ = False ,
187+ )
124188
189+ return migwsgi .application (
190+ * self .application_args ,
191+ ** self .application_kwargs
192+ )
125193
126- def _trigger_and_unpack_result (wsgi_result ):
127- chunks = list (wsgi_result )
128- assert len (chunks ) > 0 , "invocation returned no output"
129- complete_value = b'' .join (chunks )
130- decoded_value = codecs .decode (complete_value , 'utf8' )
131- return decoded_value
194+ @staticmethod
195+ def trigger_and_unpack_result (wsgi_result ):
196+ chunks = list (wsgi_result )
197+ assert len (chunks ) > 0 , "invocation returned no output"
198+ complete_value = b'' .join (chunks )
199+ decoded_value = codecs .decode (complete_value , 'utf8' )
200+ return decoded_value
201+
202+
203+ def prepare_wsgi (configuration , url , ** kwargs ):
204+ if 'method' not in kwargs :
205+ kwargs ['method' ] = 'GET'
206+ return _PreparedWsgi (configuration , url , ** kwargs )
132207
133208
134209class WsgiAssertMixin :
135210 """Custom assertions for verifying server code executed under test."""
136211
137- def assertWsgiResponse (self , wsgi_result , fake_wsgi , expected_status_code ):
138- assert isinstance (fake_wsgi , _PreparedWsgi )
212+ def prepareWsgiAssert (self , configuration , url , ** kwargs ):
213+ return _PreparedWsgi (configuration , url , ** kwargs )
214+
215+ def assertWsgiResponse (self , wsgi_result , prepared_wsgi ,
216+ expected_status_code = None ,
217+ expected_content_type = None ,
218+ content_format = None ):
219+ assert isinstance (prepared_wsgi , _PreparedWsgi )
139220
140- content = _trigger_and_unpack_result (wsgi_result )
221+ if wsgi_result :
222+ # legacy codepath
223+ pass
224+ else :
225+ wsgi_result = prepared_wsgi ._bind_invocation ()
226+ content = _PreparedWsgi .trigger_and_unpack_result (wsgi_result )
141227
142228 def called_once (fake ):
143229 assert hasattr (fake , 'calls' )
144230 return len (fake .calls ) == 1
145231
146- fake_start_response = fake_wsgi .start_response
232+ fake_start_response = prepared_wsgi .start_response
147233
148234 try :
149235 self .assertTrue (called_once (fake_start_response ))
@@ -155,11 +241,16 @@ def called_once(fake):
155241
156242 wsgi_call = fake_start_response .calls [0 ]
157243
158- # check for expected HTTP status code
159- wsgi_status = wsgi_call [0 ]
160- actual_status_code = int (wsgi_status [0 :3 ])
161- self .assertEqual (actual_status_code , expected_status_code )
244+ if expected_status_code :
245+ # check for expected HTTP status code
246+ wsgi_status = wsgi_call [0 ]
247+ actual_status_code = int (wsgi_status [0 :3 ])
248+ self .assertEqual (actual_status_code , expected_status_code )
162249
163250 headers = dict (wsgi_call [1 ])
164251
252+ if expected_content_type :
253+ actual_content_type = headers .get ('Content-Type' , 'none/none' )
254+ self .assertEqual (actual_content_type , expected_content_type , "mismatched Content-Type" )
255+
165256 return content , headers
0 commit comments