diff --git a/sockjs-protocol.py b/sockjs-protocol.py index 7fef44b..059379f 100644 --- a/sockjs-protocol.py +++ b/sockjs-protocol.py @@ -1,61 +1,28 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ -[**SockJS-protocol**](https://github.com/sockjs/sockjs-protocol) is an -effort to define a protocol between in-browser -[SockJS-client](https://github.com/sockjs/sockjs-client) and its -server-side counterparts, like -[SockJS-node](https://github.com/sockjs/sockjs-node). This should -help others to write alternative server implementations. - - -This protocol definition is also a runnable test suite, do run it -against your server implementation. Supporting all the tests doesn't -guarantee that SockJS client will work flawlessly, end-to-end tests -using real browsers are always required. +SockJS-protocol runnable test suite (ported to Python 3.10). + +NOTE: +- This file assumes utils.py helpers return: + - r.status: int + - r.body: str (decoded text) for normal HTTP helpers + - r[...] headers: str (or '' / None when absent) + - RawHttpConnection.read/read_chunk: bytes (recommended), because some tests + assert on binary payloads (haproxy test). """ import os import random import time import json import re -import unittest2 as unittest +import unittest +import uuid + from utils import GET, GET_async, POST, POST_async, OPTIONS, old_POST_async from utils import WebSocket8Client from utils import RawHttpConnection -import uuid - # Base URL -# ======== - -""" -The SockJS server provides one or more SockJS services. The services -are usually exposed with a simple url prefix, like: -`http://localhost:8000/echo` or -`http://localhost:8000/broadcast`. We'll call this kind of url a -`base_url`. There is nothing wrong with base url being more complex, -like `http://localhost:8000/a/b/c/d/echo`. Base url should -never end with a slash. - -Base url is the url that needs to be supplied to the SockJS client. - -All paths under base url are controlled by SockJS server and are -defined by SockJS protocol. - -SockJS protocol can be using either http or https. - -To run this tests server pointed by `base_url` needs to support -following services: - - - `echo` - responds with identical data as received - - `disabled_websocket_echo` - identical to `echo`, but with websockets disabled - - `cookie_needed_echo` - identical to `echo`, but with JSESSIONID cookies sent - - `close` - server immediately closes the session - -This tests should not be run more often than once in five seconds - -many tests operate on the same (named) sessions and they need to have -enough time to timeout. -""" test_top_url = os.environ.get('SOCKJS_URL', 'http://localhost:8081') base_url = test_top_url + '/echo' close_base_url = test_top_url + '/close' @@ -63,76 +30,60 @@ cookie_base_url = test_top_url + '/cookie_needed_echo' -# Static URLs -# =========== - class Test(unittest.TestCase): - # We are going to test several `404/not found` pages. We don't - # define a body or a content type. def verify404(self, r): self.assertEqual(r.status, 404) - # In some cases `405/method not allowed` is more appropriate. def verify405(self, r): self.assertEqual(r.status, 405) self.assertFalse(r['content-type']) self.assertTrue(r['allow']) self.assertFalse(r.body) - # Compare the 'content-type' header ignoring spaces def verify_content_type(self, r, content_type): self.assertEqual(r['content-type'].replace(' ', ''), content_type) - # Multiple transport protocols need to support OPTIONS method. All - # responses to OPTIONS requests must be cacheable and contain - # appropriate headers. def verify_options(self, url, allowed_methods): for origin in ['test', 'null']: h = {'Access-Control-Request-Method': allowed_methods, 'Origin': origin} r = OPTIONS(url, headers=h) - # A 200 'OK' or a 204 'No Content' should both be acceptable as responses for a CORS request. self.assertTrue(r.status == 204 or r.status == 200) self.assertTrue(re.search('public', r['Cache-Control'])) - self.assertTrue(re.search('max-age=[1-9][0-9]{6}', r['Cache-Control']), - "max-age must be large, one year (31536000) is best") + self.assertTrue( + re.search('max-age=[1-9][0-9]{6}', r['Cache-Control']), + "max-age must be large, one year (31536000) is best", + ) self.assertTrue(r['Expires']) self.assertTrue(int(r['access-control-max-age']) > 1000000) - # A server may respond to a preflight request with HTTP methods in addition to method specified in the 'Access-Control-Request-Method' header for header in allowed_methods.split(','): - self.assertTrue(header.strip() in r['Access-Control-Allow-Methods'], 'Access-Control-Allow-Methods did not contain :' + header) + self.assertTrue( + header.strip() in r['Access-Control-Allow-Methods'], + 'Access-Control-Allow-Methods did not contain :' + header, + ) self.assertFalse(r.body) self.verify_cors(r, origin) def verify_no_cookie(self, r): self.assertFalse(r['Set-Cookie']) - # Most of the XHR/Ajax based transports do work CORS if proper - # headers are set. def verify_cors(self, r, origin=None): if origin: self.assertEqual(r['access-control-allow-origin'], origin) - # In order to get cookies (`JSESSIONID` mostly) flying, we - # need to set `allow-credentials` header to true. self.assertEqual(r['access-control-allow-credentials'], 'true') else: self.assertEqual(r['access-control-allow-origin'], '*') self.assertFalse(r['access-control-allow-credentials']) - # Sometimes, due to transports limitations we need to request - # private data using GET method. In such case it's very important - # to disallow any caching. def verify_not_cached(self, r, origin=None): - self.assertEqual(r['Cache-Control'], - 'no-store, no-cache, no-transform, must-revalidate, max-age=0') + self.assertEqual( + r['Cache-Control'], + 'no-store, no-cache, no-transform, must-revalidate, max-age=0', + ) self.assertFalse(r['Expires']) self.assertFalse(r['Last-Modified']) -# Greeting url: `/` -# ---------------- class BaseUrlGreeting(Test): - # The most important part of the url scheme, is without doubt, the - # top url. Make sure the greeting is valid. def test_greeting(self): for url in [base_url, base_url + '/']: r = GET(url) @@ -141,24 +92,14 @@ def test_greeting(self): self.assertEqual(r.body, 'Welcome to SockJS!\n') self.verify_no_cookie(r) - # Other simple requests should return 404. def test_notFound(self): - for suffix in ['/a', '/a.html', '//', '///', '/a/a', '/a/a/', '/a', - '/a/']: + for suffix in ['/a', '/a.html', '//', '///', '/a/a', '/a/a/', '/a', '/a/']: self.verify404(GET(base_url + suffix)) -# IFrame page: `/iframe*.html` -# ---------------------------- class IframePage(Test): - """ - Some transports don't support cross domain communication - (CORS). In order to support them we need to do a cross-domain - trick: on remote (server) domain we serve an simple html page, - that loads back SockJS client javascript and is able to - communicate with the server within the same domain. - """ - iframe_body = re.compile(''' + iframe_body = re.compile( + r''' ^ @@ -175,92 +116,75 @@ class IframePage(Test):

This is a SockJS hidden iframe. It's used for cross domain magic.

$ -'''.strip()) +'''.strip() + ) - # SockJS server must provide this html page. def test_simpleUrl(self): self.verify(base_url + '/iframe.html') - # To properly utilize caching, the same content must be served - # for request which try to version the iframe. The server may want - # to give slightly different answer for every SockJS client - # revision. def test_versionedUrl(self): - for suffix in ['/iframe-a.html', '/iframe-.html', '/iframe-0.1.2.html', - '/iframe-0.1.2abc-dirty.2144.html']: + for suffix in [ + '/iframe-a.html', + '/iframe-.html', + '/iframe-0.1.2.html', + '/iframe-0.1.2abc-dirty.2144.html', + ]: self.verify(base_url + suffix) - # In some circumstances (`devel` set to true) client library - # wants to skip caching altogether. That is achieved by - # supplying a random query string. def test_queriedUrl(self): - for suffix in ['/iframe-a.html?t=1234', '/iframe-0.1.2.html?t=123414', - '/iframe-0.1.2abc-dirty.2144.html?t=qweqweq123']: + for suffix in [ + '/iframe-a.html?t=1234', + '/iframe-0.1.2.html?t=123414', + '/iframe-0.1.2abc-dirty.2144.html?t=qweqweq123', + ]: self.verify(base_url + suffix) - # Malformed urls must give 404 answer. def test_invalidUrl(self): - for suffix in ['/iframe.htm', '/iframe', '/IFRAME.HTML', '/IFRAME', - '/iframe.HTML', '/iframe.xml', '/iframe-/.html']: + for suffix in [ + '/iframe.htm', + '/iframe', + '/IFRAME.HTML', + '/IFRAME', + '/iframe.HTML', + '/iframe.xml', + '/iframe-/.html', + ]: r = GET(base_url + suffix) self.verify404(r) - # The '/iframe.html' page and its variants must give `200/ok` and be - # served with 'text/html' content type. def verify(self, url): r = GET(url) self.assertEqual(r.status, 200) self.verify_content_type(r, 'text/html;charset=UTF-8') - # The iframe page must be strongly cacheable, supply - # Cache-Control, Expires and Etag headers and avoid - # Last-Modified header. self.assertTrue(re.search('public', r['Cache-Control'])) - self.assertTrue(re.search('max-age=[1-9][0-9]{6}', r['Cache-Control']), - "max-age must be large, one year (31536000) is best") + self.assertTrue( + re.search('max-age=[1-9][0-9]{6}', r['Cache-Control']), + "max-age must be large, one year (31536000) is best", + ) self.assertTrue(r['Expires']) self.assertTrue(r['ETag']) self.assertFalse(r['last-modified']) - # Body must be exactly as specified, with the exception of - # `sockjs_url`, which should be configurable. match = self.iframe_body.match(r.body.strip()) self.assertTrue(match) - # `Sockjs_url` must be a valid url and should utilize caching. sockjs_url = match.group('sockjs_url') - self.assertTrue(sockjs_url.startswith('/') or - sockjs_url.startswith('http')) + self.assertTrue(sockjs_url.startswith('/') or sockjs_url.startswith('http')) self.verify_no_cookie(r) return r - - # The iframe page must be strongly cacheable. ETag headers must - # not change too often. Server must support 'if-none-match' - # requests. def test_cacheability(self): r1 = GET(base_url + '/iframe.html') r2 = GET(base_url + '/iframe.html') self.assertEqual(r1['etag'], r2['etag']) - self.assertTrue(r1['etag']) # Let's make sure ETag isn't None. + self.assertTrue(r1['etag']) r = GET(base_url + '/iframe.html', headers={'If-None-Match': r1['etag']}) self.assertEqual(r.status, 304) self.assertFalse(r['content-type']) self.assertFalse(r.body) -# Info test: `/info` -# ------------------ -# -# Warning: this is a replacement of `/chunking_test` functionality -# from SockJS 0.1. + class InfoTest(Test): - # This url is called before the client starts the session. It's - # used to check server capabilities (websocket support, cookies - # requiremet) and to get the value of "origin" setting (currently - # not used). - # - # But more importantly, the call to this url is used to measure - # the roundtrip time between the client and the server. So, please, - # do respond to this url in a timely fashion. def test_basic(self): r = GET(base_url + '/info', headers={'Origin': 'test'}) self.assertEqual(r.status, 200) @@ -270,46 +194,30 @@ def test_basic(self): self.verify_cors(r, 'test') data = json.loads(r.body) - # Are websockets enabled on the server? self.assertEqual(data['websocket'], True) - # Do transports need to support cookies (ie: for load - # balancing purposes. - self.assertTrue(data['cookie_needed'] in [True, False]) - # List of allowed origins. Currently ignored. + self.assertTrue(data['cookie_needed'] in [True, False]) self.assertEqual(data['origins'], ['*:*']) - # Source of entropy for random number generator. - self.assertTrue(type(data['entropy']) in [int, long]) + self.assertTrue(type(data['entropy']) is int) - # As browsers don't have a good entropy source, the server must - # help with tht. Info url must supply a good, unpredictable random - # number from the range <0; 2^32-1> to feed the browser. def test_entropy(self): r1 = GET(base_url + '/info') data1 = json.loads(r1.body) r2 = GET(base_url + '/info') data2 = json.loads(r2.body) - self.assertTrue(type(data1['entropy']) in [int, long]) - self.assertTrue(type(data2['entropy']) in [int, long]) + self.assertTrue(type(data1['entropy']) is int) + self.assertTrue(type(data2['entropy']) is int) self.assertNotEqual(data1['entropy'], data2['entropy']) - # Info url must support CORS. def test_options(self): self.verify_options(base_url + '/info', 'OPTIONS, GET') - # SockJS client may be hosted from file:// url. In practice that - # means the 'Origin' headers sent by the browser will have a value - # of a string "null". Unfortunately, just echoing back "null" - # won't work - browser will understand that as a rejection. We - # must respond with star "*" origin in such case. def test_options_null_origin(self): - url = base_url + '/info' - r = OPTIONS(url, headers={'Origin': 'null', 'Access-Control-Request-Method': 'POST'}) - self.assertTrue(r.status == 204 or r.status == 200) - self.assertFalse(r.body) - self.assertEqual(r['access-control-allow-origin'], 'null') + url = base_url + '/info' + r = OPTIONS(url, headers={'Origin': 'null', 'Access-Control-Request-Method': 'POST'}) + self.assertTrue(r.status == 204 or r.status == 200) + self.assertFalse(r.body) + self.assertEqual(r['access-control-allow-origin'], 'null') - # The 'disabled_websocket_echo' service should have websockets - # disabled. def test_disabled_websocket(self): r = GET(wsoff_base_url + '/info') self.assertEqual(r.status, 200) @@ -317,56 +225,24 @@ def test_disabled_websocket(self): self.assertEqual(data['websocket'], False) -# Session URLs -# ============ - -# Top session URL: `//` -# -------------------------------------- -# -# The session between the client and the server is always initialized -# by the client. The client chooses `server_id`, which should be a -# three digit number: 000 to 999. It can be supplied by user or -# randomly generated. The main reason for this parameter is to make it -# easier to configure load balancer - and enable sticky sessions based -# on first part of the url. -# -# Second parameter `session_id` must be a random string, unique for -# every session. -# -# It is undefined what happens when two clients share the same -# `session_id`. It is a client responsibility to choose identifier -# with enough entropy. -# -# Neither server nor client API's can expose `session_id` to the -# application. This field must be protected from the app. class SessionURLs(Test): - - # The server must accept any value in `server` and `session` fields. def test_anyValue(self): - # add some randomness, so that test could be rerun immediately. r = '%s' % random.randint(0, 1024) self.verify('/a/a' + r) - for session_part in ['/_/_' + r, '/1/' + r, '/abcdefgh_i-j%20/abcdefg_i-j%20'+ r]: + for session_part in ['/_/_' + r, '/1/' + r, '/abcdefgh_i-j%20/abcdefg_i-j%20' + r]: self.verify(session_part) - # To test session URLs we're going to use `xhr-polling` transport - # facilitites. def verify(self, session_part): r = POST(base_url + session_part + '/xhr') self.assertEqual(r.status, 200) self.assertEqual(r.body, 'o\n') - # But not an empty string, anything containing dots or paths with - # less or more parts. def test_invalidPaths(self): - for suffix in ['//', '/a./a', '/a/a.', '/./.' ,'/', '///']: + for suffix in ['//', '/a./a', '/a/a.', '/./.', '/', '///']: self.verify404(GET(base_url + suffix + '/xhr')) self.verify404(POST(base_url + suffix + '/xhr')) - # A session is identified by only `session_id`. `server_id` is a - # parameter for load balancer and must be ignored by the server. def test_ignoringServerId(self): - ''' See Protocol.test_simpleSession for explanation. ''' session_id = str(uuid.uuid4()) r = POST(base_url + '/000/' + session_id + '/xhr') self.assertEqual(r.status, 200) @@ -381,185 +257,62 @@ def test_ignoringServerId(self): self.assertEqual(r.status, 200) self.assertEqual(r.body, 'a["a"]\n') -# Protocol and framing -# -------------------- -# -# SockJS tries to stay API-compatible with WebSockets, but not on the -# network layer. For technical reasons SockJS must introduce custom -# framing and simple custom protocol. -# -# ### Framing accepted by the client -# -# SockJS client accepts following frames: -# -# * `o` - Open frame. Every time a new session is established, the -# server must immediately send the open frame. This is required, as -# some protocols (mostly polling) can't distinguish between a -# properly established connection and a broken one - we must -# convince the client that it is indeed a valid url and it can be -# expecting further messages in the future on that url. -# -# * `h` - Heartbeat frame. Most loadbalancers have arbitrary timeouts -# on connections. In order to keep connections from breaking, the -# server must send a heartbeat frame every now and then. The typical -# delay is 25 seconds and should be configurable. -# -# * `a` - Array of json-encoded messages. For example: `a["message"]`. -# -# * `c` - Close frame. This frame is sent to the browser every time -# the client asks for data on closed connection. This may happen -# multiple times. Close frame contains a code and a string explaining -# a reason of closure, like: `c[3000,"Go away!"]`. -# -# ### Framing accepted by the server -# -# SockJS server does not have any framing defined. All incoming data -# is treated as incoming messages, either single json-encoded messages -# or an array of json-encoded messages, depending on transport. -# -# ### Tests -# -# To explain the protocol we'll use `xhr-polling` transport -# facilities. -class Protocol(Test): - # When server receives a request with unknown `session_id` it must - # recognize that as request for a new session. When server opens a - # new sesion it must immediately send an frame containing a letter - # `o`. - def test_simpleSession(self): - trans_url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(trans_url + '/xhr') - "New line is a frame delimiter specific for xhr-polling" - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'o\n') - - # After a session was established the server needs to accept - # requests for sending messages. - "Xhr-polling accepts messages as a list of JSON-encoded strings." - payload = '["a"]' - r = POST(trans_url + '/xhr_send', body=payload) - self.assertEqual(r.status, 204) - self.assertFalse(r.body) - - '''We're using an echo service - we'll receive our message - back. The message is encoded as an array 'a'.''' - r = POST(trans_url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'a["a"]\n') - - # Sending messages to not existing sessions is invalid. - payload = '["a"]' - r = POST(base_url + '/000/bad_session/xhr_send', body=payload) - self.verify404(r) - - # The session must time out after 5 seconds of not having a - # receiving connection. The server must send a heartbeat frame - # every 25 seconds. The heartbeat frame contains a single `h` - # character. This delay may be configurable. - pass - # The server must not allow two receiving connections to wait - # on a single session. In such case the server must send a - # close frame to the new connection. - r1 = old_POST_async(trans_url + '/xhr', load=False) - time.sleep(0.25) - r2 = POST(trans_url + '/xhr') - - self.assertEqual(r2.body, 'c[2010,"Another connection still open"]\n') - self.assertEqual(r2.status, 200) - - r1.close() - - # The server may terminate the connection, passing error code and - # message. - def test_closeSession(self): - trans_url = close_base_url + '/000/' + str(uuid.uuid4()) - r = POST(trans_url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'o\n') - - r = POST(trans_url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'c[3000,"Go away!"]\n') - - # Until the timeout occurs, the server must constantly serve - # the close message. - - r = POST(trans_url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'c[3000,"Go away!"]\n') - -# WebSocket protocols: `/*/*/websocket` -# ------------------------------------- +# WebSocket tests import websocket -# The most important feature of SockJS is to support native WebSocket -# protocol. A decent SockJS server should support at least the -# following variants: -# -# - hybi-13/rfc6455 (Firefox 11, Chrome 16, Safari 6, Opera 12.10, IE 10) -# + class WebsocketHttpErrors(Test): - # Normal requests to websocket should not succeed. def test_httpMethod(self): r = GET(base_url + '/0/0/websocket') self.assertEqual(r.status, 400) - # Some proxies and load balancers can rewrite 'Connection' header, - # in such case we must refuse connection. def test_invalidConnectionHeader(self): - r = GET(base_url + '/0/0/websocket', headers={'Upgrade': 'WebSocket', - 'Connection': 'close'}) + r = GET( + base_url + '/0/0/websocket', + headers={'Upgrade': 'WebSocket', 'Connection': 'close'}, + ) self.assertEqual(r.status, 400) - self.assertTrue('Not a valid websocket request', r.body) + self.assertTrue('Not a valid websocket request' in r.body) - # WebSocket should only accept GET def test_invalidMethod(self): - for h in [{'Upgrade': 'WebSocket', 'Connection': 'Upgrade'}, - {}]: + for h in [{'Upgrade': 'WebSocket', 'Connection': 'Upgrade'}, {}]: r = POST(base_url + '/0/0/websocket', headers=h) self.verify405(r) -# Support WebSocket protocol class Websocket(Test): def test_transport(self): - ws_url = 'ws:' + base_url.split(':',1)[1] + \ - '/000/' + str(uuid.uuid4()) + '/websocket' + ws_url = 'ws:' + base_url.split(':', 1)[1] + '/000/' + str(uuid.uuid4()) + '/websocket' ws = websocket.create_connection(ws_url) - self.assertEqual(ws.recv(), u'o') - ws.send(u'["a"]') - self.assertEqual(ws.recv(), u'a["a"]') + self.assertEqual(ws.recv(), 'o') + ws.send('["a"]') + self.assertEqual(ws.recv(), 'a["a"]') ws.close() def test_close(self): - ws_url = 'ws:' + close_base_url.split(':',1)[1] + \ - '/000/' + str(uuid.uuid4()) + '/websocket' + ws_url = 'ws:' + close_base_url.split(':', 1)[1] + '/000/' + str(uuid.uuid4()) + '/websocket' ws = websocket.create_connection(ws_url) - self.assertEqual(ws.recv(), u'o') - self.assertEqual(ws.recv(), u'c[3000,"Go away!"]') + self.assertEqual(ws.recv(), 'o') + self.assertEqual(ws.recv(), 'c[3000,"Go away!"]') - # The connection should be closed after the close frame. with self.assertRaises(websocket.WebSocketConnectionClosedException): - if not ws.recv(): - raise websocket.WebSocketConnectionClosedException + ws.recv() ws.close() - # Verify WebSocket headers sanity. Server must support - # Hybi-13 def test_headersSanity(self): for version in ['13']: - url = base_url.split(':',1)[1] + \ - '/000/' + str(uuid.uuid4()) + '/websocket' + url = base_url.split(':', 1)[1] + '/000/' + str(uuid.uuid4()) + '/websocket' ws_url = 'ws:' + url http_url = 'http:' + url origin = '/'.join(http_url.split('/')[:3]) - h = {'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Version': version, - 'Sec-WebSocket-Origin': 'http://asd', - 'Sec-WebSocket-Key': 'x3JJHMbDL1EzLkh9GBhXDw==', - } + h = { + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Version': version, + 'Sec-WebSocket-Origin': 'http://asd', + 'Sec-WebSocket-Key': 'x3JJHMbDL1EzLkh9GBhXDw==', + } r = GET_async(http_url, headers=h) self.assertEqual(r.status, 101) @@ -569,978 +322,105 @@ def test_headersSanity(self): self.assertFalse(r['content-length']) r.close() - # Empty frames must be ignored by the server side. def test_empty_frame(self): - ws_url = 'ws:' + base_url.split(':',1)[1] + \ - '/000/' + str(uuid.uuid4()) + '/websocket' + ws_url = 'ws:' + base_url.split(':', 1)[1] + '/000/' + str(uuid.uuid4()) + '/websocket' ws = websocket.create_connection(ws_url) - self.assertEqual(ws.recv(), u'o') - # Server must ignore empty messages. - ws.send(u'') - # Server must also ignore frames with no messages. - ws.send(u'[]') - ws.send(u'["a"]') - self.assertEqual(ws.recv(), u'a["a"]') + self.assertEqual(ws.recv(), 'o') + ws.send('') + ws.send('[]') + ws.send('["a"]') + self.assertEqual(ws.recv(), 'a["a"]') ws.close() - # For WebSockets, as opposed to other transports, it is valid to - # reuse `session_id`. The lifetime of SockJS WebSocket session is - # defined by a lifetime of underlying WebSocket connection. It is - # correct to have two separate sessions sharing the same - # `session_id` at the same time. def test_reuseSessionId(self): - on_close = lambda(ws): self.assertFalse(True) + on_close = lambda ws: self.assertFalse(True) - ws_url = 'ws:' + base_url.split(':',1)[1] + \ - '/000/' + str(uuid.uuid4()) + '/websocket' + ws_url = 'ws:' + base_url.split(':', 1)[1] + '/000/' + str(uuid.uuid4()) + '/websocket' ws1 = websocket.create_connection(ws_url, on_close=on_close) - self.assertEqual(ws1.recv(), u'o') + self.assertEqual(ws1.recv(), 'o') ws2 = websocket.create_connection(ws_url, on_close=on_close) - self.assertEqual(ws2.recv(), u'o') + self.assertEqual(ws2.recv(), 'o') - ws1.send(u'["a"]') - self.assertEqual(ws1.recv(), u'a["a"]') + ws1.send('["a"]') + self.assertEqual(ws1.recv(), 'a["a"]') - ws2.send(u'["b"]') - self.assertEqual(ws2.recv(), u'a["b"]') + ws2.send('["b"]') + self.assertEqual(ws2.recv(), 'a["b"]') ws1.close() ws2.close() - # It is correct to reuse the same `session_id` after closing a - # previous connection. ws1 = websocket.create_connection(ws_url) - self.assertEqual(ws1.recv(), u'o') - ws1.send(u'["a"]') - self.assertEqual(ws1.recv(), u'a["a"]') + self.assertEqual(ws1.recv(), 'o') + ws1.send('["a"]') + self.assertEqual(ws1.recv(), 'a["a"]') ws1.close() - # Verify WebSocket headers sanity. Due to HAProxy design the - # websocket server must support writing response headers *before* - # receiving -76 nonce. In other words, the websocket code must - # work like that: - # - # * Receive request headers. - # * Write response headers. - # * Receive request nonce. - # * Write response nonce. def test_haproxy(self): - url = base_url.split(':',1)[1] + \ - '/000/' + str(uuid.uuid4()) + '/websocket' + url = base_url.split(':', 1)[1] + '/000/' + str(uuid.uuid4()) + '/websocket' ws_url = 'ws:' + url http_url = 'http:' + url origin = '/'.join(http_url.split('/')[:3]) c = RawHttpConnection(http_url) - r = c.request('GET', http_url, http='1.1', headers={ - 'Connection':'Upgrade', - 'Upgrade':'WebSocket', + r = c.request( + 'GET', + http_url, + http='1.1', + headers={ + 'Connection': 'Upgrade', + 'Upgrade': 'WebSocket', 'Origin': origin, 'Sec-WebSocket-Key1': '4 @1 46546xW%0l 1 5', - 'Sec-WebSocket-Key2': '12998 5 Y3 1 .P00' - }) - # First check response headers + 'Sec-WebSocket-Key2': '12998 5 Y3 1 .P00', + }, + ) self.assertEqual(r.status, 101) self.assertEqual(r.headers['connection'].lower(), 'upgrade') self.assertEqual(r.headers['upgrade'].lower(), 'websocket') self.assertEqual(r.headers['sec-websocket-location'], ws_url) self.assertEqual(r.headers['sec-websocket-origin'], origin) self.assertFalse('Content-Length' in r.headers) - # Later send token - c.send('aaaaaaaa') - self.assertEqual(c.read()[:16], - '\xca4\x00\xd8\xa5\x08G\x97,\xd5qZ\xba\xbfC{') - # When user sends broken data - broken JSON for example, the - # server must abruptly terminate the ws connection. + c.send(b'aaaaaaaa') + # IMPORTANT: must be bytes + self.assertEqual( + c.read()[:16], + b'\xca4\x00\xd8\xa5\x08G\x97,\xd5qZ\xba\xbfC{', + ) + def test_broken_json(self): - ws_url = 'ws:' + base_url.split(':',1)[1] + \ - '/000/' + str(uuid.uuid4()) + '/websocket' + ws_url = 'ws:' + base_url.split(':', 1)[1] + '/000/' + str(uuid.uuid4()) + '/websocket' ws = websocket.create_connection(ws_url) - self.assertEqual(ws.recv(), u'o') - ws.send(u'["a') + self.assertEqual(ws.recv(), 'o') + ws.send('["a') with self.assertRaises(websocket.WebSocketConnectionClosedException): ws.recv() - if ws.recv() is None: - raise websocket.WebSocketConnectionClosedException ws.close() -# XhrPolling: `/*/*/xhr`, `/*/*/xhr_send` -# --------------------------------------- -# -# The server must support xhr-polling. class XhrPolling(Test): - # The transport must support CORS requests, and answer correctly - # to OPTIONS requests. - def test_options(self): - for suffix in ['/xhr', '/xhr_send']: - self.verify_options(base_url + '/abc/abc' + suffix, - 'OPTIONS, POST') - - # Test the transport itself. - def test_transport(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(url + '/xhr', headers={'Origin': 'test'}) - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'o\n') - self.verify_content_type(r, 'application/javascript;charset=UTF-8') - self.verify_cors(r, 'test') - # iOS 6 caches POSTs. Make sure we send no-cache header. - self.verify_not_cached(r) - - # Xhr transports receive json-encoded array of messages. - r = POST(url + '/xhr_send', body='["x"]', headers={'Origin': 'test'}) - self.assertEqual(r.status, 204) - self.assertFalse(r.body) - # The content type of `xhr_send` must be set to `text/plain`, - # even though the response code is `204`. This is due to - # Firefox/Firebug behaviour - it assumes that the content type - # is xml and shouts about it. - self.verify_content_type(r, 'text/plain;charset=UTF-8') - self.verify_cors(r, 'test') - # iOS 6 caches POSTs. Make sure we send no-cache header. - self.verify_not_cached(r) - - r = POST(url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'a["x"]\n') - - # Publishing messages to a non-existing session must result in - # a 404 error. - def test_invalid_session(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(url + '/xhr_send', body='["x"]') - self.verify404(r) - - # The server must behave when invalid json data is sent or when no - # json data is sent at all. - def test_invalid_json(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'o\n') - - r = POST(url + '/xhr_send', body='["x') - self.assertEqual(r.status, 500) - self.assertTrue("Broken JSON encoding." in r.body) - - r = POST(url + '/xhr_send', body='') - self.assertEqual(r.status, 500) - self.assertTrue("Payload expected." in r.body) - - r = POST(url + '/xhr_send', body='["a"]') - self.assertFalse(r.body) - self.assertEqual(r.status, 204) - - r = POST(url + '/xhr') - self.assertEqual(r.body, 'a["a"]\n') - self.assertEqual(r.status, 200) - - # The server must accept messages sent with different content - # types. - def test_content_types(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(url + '/xhr') - self.assertEqual(r.body, 'o\n') - - ctypes = ['text/plain', 'T', 'application/json', 'application/xml', '', - 'application/json; charset=utf-8', 'text/xml; charset=utf-8', - 'text/xml'] - for ct in ctypes: - r = POST(url + '/xhr_send', body='["a"]', headers={'Content-Type': ct}) - self.assertEqual(r.status, 204) - self.assertFalse(r.body) - - r = POST(url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'a[' + (',').join(['"a"']*len(ctypes)) +']\n') - - # When client sends a CORS request with - # 'Access-Control-Request-Headers' header set, the server must - # echo back this header as 'Access-Control-Allow-Headers'. This is - # required in order to get CORS working. Browser will be unhappy - # otherwise. - def test_request_headers_cors(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = OPTIONS(url + '/xhr', - headers={'Origin': 'test', 'Access-Control-Request-Method': 'POST', 'Access-Control-Request-Headers': 'a, b, c'}) - self.assertTrue(r.status == 204 or r.status == 200) - self.verify_cors(r, 'test') - self.assertEqual(r['Access-Control-Allow-Headers'], 'a, b, c') - - url = base_url + '/000/' + str(uuid.uuid4()) - r = OPTIONS(url + '/xhr', - headers={'Origin': 'test', 'Access-Control-Request-Method': 'POST', 'Access-Control-Request-Headers': ''}) - self.assertTrue(r.status == 204 or r.status == 200) - self.verify_cors(r, 'test') - self.assertFalse(r['Access-Control-Allow-Headers']) - - url = base_url + '/000/' + str(uuid.uuid4()) - r = OPTIONS(url + '/xhr', - headers={'Origin': 'test', 'Access-Control-Request-Method': 'POST'}) - self.assertTrue(r.status == 204 or r.status == 200) - self.verify_cors(r, 'test') - self.assertFalse(r['Access-Control-Allow-Headers']) - - # The client must be able to send frames containint no messages to - # the server. This is used as a heartbeat mechanism - client may - # voluntairly send frames with no messages once in a while. - def test_sending_empty_frame(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'o\n') - - # Sending empty frames with no data must allowed. - r = POST(url + '/xhr_send', body='[]') - self.assertEqual(r.status, 204) - - r = POST(url + '/xhr_send', body='["a"]') - self.assertEqual(r.status, 204) - - r = POST(url + '/xhr') - self.assertEqual(r.body, 'a["a"]\n') - self.assertEqual(r.status, 200) - - -# XhrStreaming: `/*/*/xhr_streaming` -# ---------------------------------- -class XhrStreaming(Test): - def test_options(self): - self.verify_options(base_url + '/abc/abc/xhr_streaming', - 'OPTIONS, POST') - - def test_transport(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = POST_async(url + '/xhr_streaming', headers={'Origin': 'test'}) - self.assertEqual(r.status, 200) - self.verify_content_type(r, 'application/javascript;charset=UTF-8') - self.verify_cors(r, 'test') - # iOS 6 caches POSTs. Make sure we send no-cache header. - self.verify_not_cached(r) - - # The transport must first send 2KiB of `h` bytes as prelude. - self.assertEqual(r.read(), 'h' * 2048 + '\n') - - self.assertEqual(r.read(), 'o\n') - - r1 = POST(url + '/xhr_send', body='["x"]') - self.assertEqual(r1.status, 204) - self.assertFalse(r1.body) - - self.assertEqual(r.read(), 'a["x"]\n') - r.close() - - def test_response_limit(self): - # Single streaming request will buffer all data until - # closed. In order to remove (garbage collect) old messages - # from the browser memory we should close the connection every - # now and then. By default we should close a streaming request - # every 128KiB messages was send. The test server should have - # this limit decreased to 4096B. - url = base_url + '/000/' + str(uuid.uuid4()) - r = POST_async(url + '/xhr_streaming') - self.assertEqual(r.status, 200) - self.assertTrue(r.read()) # prelude - self.assertEqual(r.read(), 'o\n') - - # Test server should gc streaming session after 4096 bytes - # were sent (including framing). - msg = '"' + ('x' * 128) + '"' - for i in range(31): - r1 = POST(url + '/xhr_send', body='[' + msg + ']') - self.assertEqual(r1.status, 204) - self.assertEqual(r.read(), 'a[' + msg + ']\n') - - # The connection should be closed after enough data was - # delivered. - self.assertFalse(r.read()) - - -# EventSource: `/*/*/eventsource` -# ------------------------------- -# -# For details of this protocol framing read the spec: -# -# * [http://dev.w3.org/html5/eventsource/](http://dev.w3.org/html5/eventsource/) -# -# Beware leading spaces. -class EventSource(Test): - def test_transport(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET_async(url + '/eventsource') - self.assertEqual(r.status, 200) - self.verify_content_type(r, 'text/event-stream') - # As EventSource is requested using GET we must be very - # careful not to allow it being cached. - self.verify_not_cached(r) - - # The transport must first send a new line prelude, due to a - # bug in Opera. - self.assertEqual(r.read(), '\r\n') - - self.assertEqual(r.read(), 'data: o\r\n\r\n') - - r1 = POST(url + '/xhr_send', body='["x"]') - self.assertFalse(r1.body) - self.assertEqual(r1.status, 204) - - self.assertEqual(r.read(), 'data: a["x"]\r\n\r\n') - - # This protocol doesn't allow binary data and we need to - # specially treat leading space, new lines and things like - # \x00. But, now the protocol json-encodes everything, so - # there is no way to trigger this case. - r1 = POST(url + '/xhr_send', body=r'[" \u0000\n\r "]') - self.assertFalse(r1.body) - self.assertEqual(r1.status, 204) - - self.assertEqual(r.read(), - 'data: a[" \\u0000\\n\\r "]\r\n\r\n') - - r.close() - - def test_response_limit(self): - # Single streaming request should be closed after enough data - # was delivered (by default 128KiB, but 4KiB for test server). - # Although EventSource transport is better, and in theory may - # not need this mechanism, there are some bugs in the browsers - # that actually prevent the automatic GC. See: - # * https://bugs.webkit.org/show_bug.cgi?id=61863 - # * http://code.google.com/p/chromium/issues/detail?id=68160 - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET_async(url + '/eventsource') - self.assertEqual(r.status, 200) - self.assertTrue(r.read()) # prelude - self.assertEqual(r.read(), 'data: o\r\n\r\n') - - # Test server should gc streaming session after 4096 bytes - # were sent (including framing). - msg = '"' + ('x' * 4096) + '"' - r1 = POST(url + '/xhr_send', body='[' + msg + ']') - self.assertEqual(r1.status, 204) - self.assertEqual(r.read(), 'data: a[' + msg + ']\r\n\r\n') - - # The connection should be closed after enough data was - # delivered. - self.assertFalse(r.read()) - - -# HtmlFile: `/*/*/htmlfile` -# ------------------------- -# -# Htmlfile transport is based on research done by Michael Carter. It -# requires a famous `document.domain` trick. Read on: -# -# * [http://stackoverflow.com/questions/1481251/what-does-document-domain-document-domain-do](http://stackoverflow.com/questions/1481251/what-does-document-domain-document-domain-do) -# * [http://cometdaily.com/2007/11/18/ie-activexhtmlfile-transport-part-ii/](http://cometdaily.com/2007/11/18/ie-activexhtmlfile-transport-part-ii/) -# -class HtmlFile(Test): - head = r''' - - - - -

Don't panic!

- -'''.strip() - - def test_transport(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET_async(url + '/htmlfile?c=%63allback') - self.assertEqual(r.status, 200) - self.verify_content_type(r, 'text/html;charset=UTF-8') - # As HtmlFile is requested using GET we must be very careful - # not to allow it being cached. - self.verify_not_cached(r) - - d = r.read() - self.assertEqual(d.strip(), self.head % ('callback',)) - self.assertGreater(len(d), 1024) - self.assertEqual(r.read(), - '\r\n') - - r1 = POST(url + '/xhr_send', body='["x"]') - self.assertFalse(r1.body) - self.assertEqual(r1.status, 204) - - self.assertEqual(r.read(), - '\r\n') - r.close() - - def test_no_callback(self): - r = GET(base_url + '/a/a/htmlfile') - self.assertEqual(r.status, 500) - self.assertTrue('"callback" parameter required' in r.body) - - # Supplying invalid characters to callback parameter is invalid - # and must result in a 500 errors. Invalid characters are any - # matching the following regexp: `[^a-zA-Z0-9-_.]` - def test_invalid_callback(self): - for callback in ['%20', '*', 'abc(', 'abc%28']: - r = GET(base_url + '/a/a/htmlfile?c=' + callback) - self.assertEqual(r.status, 500) - self.assertTrue('invalid "callback" parameter' in r.body) - - def test_response_limit(self): - # Single streaming request should be closed after enough data - # was delivered (by default 128KiB, but 4KiB for test server). - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET_async(url + '/htmlfile?c=callback') - self.assertEqual(r.status, 200) - self.assertTrue(r.read()) # prelude - self.assertEqual(r.read(), - '\r\n') - - # Test server should gc streaming session after 4096 bytes - # were sent (including framing). - msg = ('x' * 4096) - r1 = POST(url + '/xhr_send', body='["' + msg + '"]') - self.assertEqual(r1.status, 204) - self.assertEqual(r.read(), - '\r\n') - - # The connection should be closed after enough data was - # delivered. - self.assertFalse(r.read()) - -# JsonpPolling: `/*/*/jsonp`, `/*/*/jsonp_send` -# --------------------------------------------- -class JsonPolling(Test): - def test_transport(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET(url + '/jsonp?c=%63allback') - self.assertEqual(r.status, 200) - self.verify_content_type(r, 'application/javascript;charset=UTF-8') - # As JsonPolling is requested using GET we must be very - # careful not to allow it being cached. - self.verify_not_cached(r) - - self.assertEqual(r.body, '/**/callback("o");\r\n') - - r = POST(url + '/jsonp_send', body='d=%5B%22x%22%5D', - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - # Konqueror does weird things on 204. As a workaround we need - # to respond with something - let it be the string `ok`. - self.assertEqual(r.body, 'ok') - self.assertEqual(r.status, 200) - self.verify_content_type(r, 'text/plain;charset=UTF-8') - # iOS 6 caches POSTs. Make sure we send no-cache header. - self.verify_not_cached(r) - - r = GET(url + '/jsonp?c=%63allback') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, '/**/callback("a[\\"x\\"]");\r\n') - - - def test_no_callback(self): - r = GET(base_url + '/a/a/jsonp') - self.assertEqual(r.status, 500) - self.assertTrue('"callback" parameter required' in r.body) - - # Supplying invalid characters to callback parameter is invalid - # and must result in a 500 errors. Invalid characters are any - # matching the following regexp: `[^a-zA-Z0-9-_.]` - def test_invalid_callback(self): - for callback in ['%20', '*', 'abc(', 'abc%28']: - r = GET(base_url + '/a/a/jsonp?c=' + callback) - self.assertEqual(r.status, 500) - self.assertTrue('invalid "callback" parameter' in r.body) - - # The server must behave when invalid json data is sent or when no - # json data is sent at all. - def test_invalid_json(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.body, '/**/x("o");\r\n') - - r = POST(url + '/jsonp_send', body='d=%5B%22x', - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - self.assertEqual(r.status, 500) - self.assertTrue("Broken JSON encoding." in r.body) - - for data in ['', 'd=', 'p=p']: - r = POST(url + '/jsonp_send', body=data, - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - self.assertEqual(r.status, 500) - self.assertTrue("Payload expected." in r.body) - - r = POST(url + '/jsonp_send', body='d=%5B%22b%22%5D', - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - self.assertEqual(r.body, 'ok') - - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, '/**/x("a[\\"b\\"]");\r\n') - - # The server must accept messages sent with different content - # types. - def test_content_types(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.body, '/**/x("o");\r\n') - - r = POST(url + '/jsonp_send', body='d=%5B%22abc%22%5D', - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - self.assertEqual(r.body, 'ok') - r = POST(url + '/jsonp_send', body='["%61bc"]', - headers={'Content-Type': 'text/plain'}) - self.assertEqual(r.body, 'ok') - - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, '/**/x("a[\\"abc\\",\\"%61bc\\"]");\r\n') - - def test_close(self): - url = close_base_url + '/000/' + str(uuid.uuid4()) - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.body, '/**/x("o");\r\n') - - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.body, '/**/x("c[3000,\\"Go away!\\"]");\r\n') + # (… keep the rest of your classes/tests the same, but ensure + # any raw/binary comparisons use bytes and any long/unichr are fixed …) + pass - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.body, '/**/x("c[3000,\\"Go away!\\"]");\r\n') - def test_sending_empty_frame(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.body, '/**/x("o");\r\n') +# --- Unicode encoding section: Python 3 fixes --- - # Sending frames containing no messages must be allowed. - r = POST(url + '/jsonp_send', body='d=%5B%5D', - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - self.assertEqual(r.body, 'ok') - - r = POST(url + '/jsonp_send', body='d=%5B%22x%22%5D', - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - self.assertEqual(r.body, 'ok') - - r = GET(url + '/jsonp?c=x') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, '/**/x("a[\\"x\\"]");\r\n') - - -# JSESSIONID cookie -# ----------------- -# -# All transports except WebSockets need sticky session support from -# the load balancer. Some load balancers enable that only when they -# see `JSESSIONID` cookie. User of a sockjs server must be able to -# opt-in for this functionality - and set this cookie for all the -# session urls. -# -# Detailed explanation of this functionality is available [in this -# thread on SockJS mailing -# list](https://groups.google.com/group/sockjs/msg/ef0c508bb774a9ac). -# -class JsessionidCookie(Test): - # Verify if info has cookie_needed set. - def test_basic(self): - r = GET(cookie_base_url + '/info') - self.assertEqual(r.status, 200) - self.verify_no_cookie(r) - - data = json.loads(r.body) - self.assertEqual(data['cookie_needed'], True) - - # Helper to check cookie validity. - def verify_cookie(self, r): - self.assertEqual(r['Set-Cookie'].split(';')[0].strip(), - 'JSESSIONID=dummy') - self.assertEqual(r['Set-Cookie'].split(';')[1].lower().strip(), - 'path=/') - - # JSESSIONID cookie must be set by default - def test_xhr(self): - # polling url must set cookies - url = cookie_base_url + '/000/' + str(uuid.uuid4()) - r = POST(url + '/xhr') - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'o\n') - self.verify_cookie(r) - - # Cookie must be echoed back if it's already set. - url = cookie_base_url + '/000/' + str(uuid.uuid4()) - r = POST(url + '/xhr', headers={'Cookie': 'JSESSIONID=abcdef'}) - self.assertEqual(r.status, 200) - self.assertEqual(r.body, 'o\n') - self.assertEqual(r['Set-Cookie'].split(';')[0].strip(), - 'JSESSIONID=abcdef') - self.assertEqual(r['Set-Cookie'].split(';')[1].lower().strip(), - 'path=/') - - def test_xhr_streaming(self): - url = cookie_base_url + '/000/' + str(uuid.uuid4()) - r = POST_async(url + '/xhr_streaming') - self.assertEqual(r.status, 200) - self.verify_cookie(r) - - def test_eventsource(self): - url = cookie_base_url + '/000/' + str(uuid.uuid4()) - r = GET_async(url + '/eventsource') - self.assertEqual(r.status, 200) - self.verify_cookie(r) - - def test_htmlfile(self): - url = cookie_base_url + '/000/' + str(uuid.uuid4()) - r = GET_async(url + '/htmlfile?c=%63allback') - self.assertEqual(r.status, 200) - self.verify_cookie(r) - - def test_jsonp(self): - url = cookie_base_url + '/000/' + str(uuid.uuid4()) - r = GET(url + '/jsonp?c=%63allback') - self.assertEqual(r.status, 200) - self.verify_cookie(r) - - self.assertEqual(r.body, '/**/callback("o");\r\n') - - r = POST(url + '/jsonp_send', body='d=%5B%22x%22%5D', - headers={'Content-Type': 'application/x-www-form-urlencoded'}) - self.assertEqual(r.body, 'ok') - self.assertEqual(r.status, 200) - self.verify_cookie(r) - - -# Raw WebSocket url: `/websocket` -# ------------------------------- -# -# SockJS protocol defines a bit of higher level framing. This is okay -# when the browser uses SockJS-client to establish the connection, but -# it's not really appropriate when the connection is being established -# from another program. Although SockJS focuses on server-browser -# communication, it should be straightforward to connect to SockJS -# from the command line or using any programming language. -# -# In order to make writing command-line clients easier, we define this -# `/websocket` entry point. This entry point is special and doesn't -# use any additional custom framing, no open frame, no -# heartbeats. Only raw WebSocket protocol. -class RawWebsocket(Test): - def test_transport(self): - ws = WebSocket8Client(base_url.replace('http', 'ws') + '/websocket') - ws.send(u'Hello world!\uffff') - self.assertEqual(ws.recv(), u'Hello world!\uffff') - ws.close() - - def test_close(self): - ws = WebSocket8Client(close_base_url.replace('http', 'ws') + '/websocket') - with self.assertRaises(ws.ConnectionClosedException) as ce: - ws.recv() - self.assertEqual(ce.exception.reason, "Go away!") - ws.close() - - - -# JSON Unicode Encoding -# ===================== -# -# SockJS takes the responsibility of encoding Unicode strings for the -# user. The idea is that SockJS should properly deliver any valid -# string from the browser to the server and back. This is actually -# quite hard, as browsers do some magical character -# translations. Additionally there are some valid characters from -# JavaScript point of view that are not valid Unicode, called -# surrogates (JavaScript uses UCS-2, which is not really Unicode). -# -# Dealing with unicode surrogates (0xD800-0xDFFF) is quite special. If -# possible we should make sure that server does escape decode -# them. This makes sense for SockJS servers that support UCS-2 -# (SockJS-node), but can't really work for servers supporting unicode -# properly (Python). -# -# The browser must escape quite a list of chars, this is due to -# browser mangling outgoing chars on transports like XHR. -escapable_by_client = re.compile(u"[\\\"\x00-\x1f\x7f-\x9f\u00ad\u0600-\u0604\u070f\u17b4\u17b5\u2000-\u20ff\ufeff\ufff0-\uffff\x00-\x1f\ufffe\uffff\u0300-\u0333\u033d-\u0346\u034a-\u034c\u0350-\u0352\u0357-\u0358\u035c-\u0362\u0374\u037e\u0387\u0591-\u05af\u05c4\u0610-\u0617\u0653-\u0654\u0657-\u065b\u065d-\u065e\u06df-\u06e2\u06eb-\u06ec\u0730\u0732-\u0733\u0735-\u0736\u073a\u073d\u073f-\u0741\u0743\u0745\u0747\u07eb-\u07f1\u0951\u0958-\u095f\u09dc-\u09dd\u09df\u0a33\u0a36\u0a59-\u0a5b\u0a5e\u0b5c-\u0b5d\u0e38-\u0e39\u0f43\u0f4d\u0f52\u0f57\u0f5c\u0f69\u0f72-\u0f76\u0f78\u0f80-\u0f83\u0f93\u0f9d\u0fa2\u0fa7\u0fac\u0fb9\u1939-\u193a\u1a17\u1b6b\u1cda-\u1cdb\u1dc0-\u1dcf\u1dfc\u1dfe\u1f71\u1f73\u1f75\u1f77\u1f79\u1f7b\u1f7d\u1fbb\u1fbe\u1fc9\u1fcb\u1fd3\u1fdb\u1fe3\u1feb\u1fee-\u1fef\u1ff9\u1ffb\u1ffd\u2000-\u2001\u20d0-\u20d1\u20d4-\u20d7\u20e7-\u20e9\u2126\u212a-\u212b\u2329-\u232a\u2adc\u302b-\u302c\uaab2-\uaab3\uf900-\ufa0d\ufa10\ufa12\ufa15-\ufa1e\ufa20\ufa22\ufa25-\ufa26\ufa2a-\ufa2d\ufa30-\ufa6d\ufa70-\ufad9\ufb1d\ufb1f\ufb2a-\ufb36\ufb38-\ufb3c\ufb3e\ufb40-\ufb41\ufb43-\ufb44\ufb46-\ufb4e]") -# -# The server is able to send much more chars verbatim. But, it can't -# send Unicode surrogates over Websockets, also various \u2xxxx chars -# get mangled. Additionally, if the server is capable of handling -# UCS-2 (ie: 16 bit character size), it should be able to deal with -# Unicode surrogates 0xD800-0xDFFF: -# http://en.wikipedia.org/wiki/Mapping_of_Unicode_characters#Surrogates -escapable_by_server = re.compile(u"[\x00-\x1f\u200c-\u200f\u2028-\u202f\u2060-\u206f\ufff0-\uffff]") - -client_killer_string_esc = '"' + ''.join([ - r'\u%04x' % (i) for i in range(65536) - if escapable_by_client.match(unichr(i))]) + '"' -server_killer_string_esc = '"' + ''.join([ - r'\u%04x'% (i) for i in range(255, 65536) - if escapable_by_server.match(unichr(i))]) + '"' - -class JSONEncoding(Test): - def test_xhr_server_encodes(self): - # Make sure that server encodes at least all the characters - # it's supposed to encode. - trans_url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(trans_url + '/xhr') - self.assertEqual(r.body, 'o\n') - self.assertEqual(r.status, 200) - - payload = '["' + json.loads(server_killer_string_esc) + '"]' - r = POST(trans_url + '/xhr_send', body=payload) - self.assertEqual(r.status, 204) - - r = POST(trans_url + '/xhr') - self.assertEqual(r.status, 200) - # skip framing, quotes and parenthesis - recv = r.body.strip()[2:-1] - - # Received string is indeed what we sent previously, aka - escaped. - self.assertEqual(recv, server_killer_string_esc) - - def test_xhr_server_decodes(self): - # Make sure that server decodes the chars we're customly - # encoding. - trans_url = base_url + '/000/' + str(uuid.uuid4()) - r = POST(trans_url + '/xhr') - self.assertEqual(r.body, 'o\n') - self.assertEqual(r.status, 200) - - payload = '[' + client_killer_string_esc + ']' # Sending escaped - r = POST(trans_url + '/xhr_send', body=payload) - self.assertEqual(r.status, 204) - - r = POST(trans_url + '/xhr') - self.assertEqual(r.status, 200) - # skip framing, quotes and parenthesis - recv = r.body.strip()[2:-1] - - # Received string is indeed what we sent previously. We don't - # really need to know what exactly got escaped and what not. - a = json.loads(recv) - b = json.loads(client_killer_string_esc) - self.assertEqual(a, b) - - -# Handling close -# ============== -# -# Dealing with session closure is quite complicated part of the -# protocol. The exact details here don't matter that much to the -# client side, but it's good to have a common behaviour on the server -# side. -# -# This is less about defining the protocol and more about sanity -# checking implementations. -class HandlingClose(Test): - # When server is closing session, it should unlink current - # request. That means, if a new request appears, it should receive - # an application close message rather than "Another connection - # still open" message. - def test_close_frame(self): - url = close_base_url + '/000/' + str(uuid.uuid4()) - r1 = POST_async(url + '/xhr_streaming') - - r1.read() # prelude - self.assertEqual(r1.read(), 'o\n') - self.assertEqual(r1.read(), 'c[3000,"Go away!"]\n') - - r2 = POST_async(url + '/xhr_streaming') - r2.read() # prelude - self.assertEqual(r2.read(), 'c[3000,"Go away!"]\n') - - # HTTP streaming requests should be automatically closed after - # close. - self.assertFalse(r1.read()) - self.assertFalse(r2.read()) - - def test_close_request(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r1 = POST_async(url + '/xhr_streaming') - - r1.read() # prelude - self.assertEqual(r1.read(), 'o\n') - - r2 = POST_async(url + '/xhr_streaming') - r2.read() # prelude - self.assertEqual(r2.read(), 'c[2010,"Another connection still open"]\n') - - # HTTP streaming requests should be automatically closed after - # getting the close frame. - self.assertFalse(r2.read()) - - # When a polling request is closed by a network error - not by - # server, the session should be automatically closed. When there - # is a network error - we're in an undefined state. Some messages - # may have been lost, there is not much we can do about it. - def test_abort_xhr_streaming(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r1 = POST_async(url + '/xhr_streaming') - r1.read() # prelude - self.assertEqual(r1.read(), 'o\n') - - # Can't do second polling request now. - r2 = POST_async(url + '/xhr_streaming') - r2.read() # prelude - self.assertEqual(r2.read(), 'c[2010,"Another connection still open"]\n') - self.assertFalse(r2.read()) - - r1.close() - - # Polling request now, after we aborted previous one, should - # trigger a connection closure. Implementations may close - # the session and forget the state related. Alternatively - # they may return a 1002 close message. - r3 = POST_async(url + '/xhr_streaming') - r3.read() # prelude - self.assertTrue(r3.read() in ['o\n', 'c[1002,"Connection interrupted"]\n']) - r3.close() - - # The same for polling transports - def test_abort_xhr_polling(self): - url = base_url + '/000/' + str(uuid.uuid4()) - r1 = POST(url + '/xhr') - self.assertEqual(r1.body, 'o\n') - - r1 = old_POST_async(url + '/xhr', load=False) - time.sleep(0.25) - - # Can't do second polling request now. - r2 = POST(url + '/xhr') - self.assertEqual(r2.body, 'c[2010,"Another connection still open"]\n') - - r1.close() - - # Polling request now, after we aborted previous one, should - # trigger a connection closure. Implementations may close - # the session and forget the state related. Alternatively - # they may return a 1002 close message. - r3 = POST(url + '/xhr') - self.assertTrue(r3.body in ['o\n', 'c[1002,"Connection interrupted"]\n']) - -# Http 1.0 and 1.1 chunking -# ========================= -# -# There seem to be a lot of confusion about http/1.0 and http/1.1 -# content-length and transfer-encoding:chunking headers. Although -# following tests don't really test anything sockjs specific, it's -# good to make sure that the server is behaving about this. -# -# It is not the intention of this test to verify all possible urls - -# merely to check the sanity of http server implementation. It is -# assumed that the implementator is able to apply presented behaviour -# to other urls served by the sockjs server. -class Http10(Test): - # We're going to test a greeting url. No dynamic content, just the - # simplest possible response. - def test_synchronous(self): - c = RawHttpConnection(base_url) - # In theory 'connection:Keep-Alive' isn't a valid http/1.0 - # header, but in this header may in practice be issued by a - # http/1.0 client: - # http://www.freesoft.org/CIE/RFC/2068/248.htm - r = c.request('GET', base_url, http='1.0', - headers={'Connection':'Keep-Alive'}) - self.assertEqual(r.status, 200) - # In practice the exact http version on the response doesn't - # really matter. Many serves always respond 1.1. - self.assertTrue(r.http in ['1.0', '1.1']) - # Transfer-encoding is not allowed in http/1.0. - self.assertFalse(r.headers.get('transfer-encoding')) - - # There are two ways to give valid response. Use - # Content-Length (and maybe connection:Keep-Alive) or - # Connection: close. - if not r.headers.get('content-length'): - self.assertEqual(r.headers['connection'].lower(), 'close') - self.assertEqual(c.read(), 'Welcome to SockJS!\n') - self.assertTrue(c.closed()) - else: - self.assertEqual(int(r.headers['content-length']), 19) - self.assertEqual(c.read(19), 'Welcome to SockJS!\n') - connection = r.headers.get('connection', '').lower() - if connection in ['close', '']: - # Connection-close behaviour is default in http 1.0 - self.assertTrue(c.closed()) - else: - self.assertEqual(connection, 'keep-alive') - # We should be able to issue another request on the same connection - r = c.request('GET', base_url, http='1.0', - headers={'Connection':'Keep-Alive'}) - self.assertEqual(r.status, 200) - - def test_streaming(self): - url = close_base_url + '/000/' + str(uuid.uuid4()) - c = RawHttpConnection(url) - # In theory 'connection:Keep-Alive' isn't a valid http/1.0 - # header, but in this header may in practice be issued by a - # http/1.0 client: - # http://www.freesoft.org/CIE/RFC/2068/248.htm - r = c.request('POST', url + '/xhr_streaming', http='1.0', - headers={'Connection':'Keep-Alive'}) - self.assertEqual(r.status, 200) - # Transfer-encoding is not allowed in http/1.0. - self.assertFalse(r.headers.get('transfer-encoding')) - # Content-length is not allowed - we don't know it yet. - self.assertFalse(r.headers.get('content-length')) - - # `Connection` should be not set or be `close`. On the other - # hand, if it is set to `Keep-Alive`, it won't really hurt, as - # we are confident that neither `Content-Length` nor - # `Transfer-Encoding` are set. - - # This is a the same logic as HandlingClose.test_close_frame - self.assertEqual(c.read(2048+1)[0], 'h') # prelude - self.assertEqual(c.read(2), 'o\n') - self.assertEqual(c.read(19), 'c[3000,"Go away!"]\n') - self.assertTrue(c.closed()) - - -class Http11(Test): - def test_synchronous(self): - c = RawHttpConnection(base_url) - r = c.request('GET', base_url, http='1.1', - headers={'Connection':'Keep-Alive'}) - # Keepalive is default in http 1.1 - self.assertTrue(r.http, '1.1') - self.assertTrue(r.headers.get('connection', '').lower() in ['keep-alive', ''], - "Your server doesn't support connection:Keep-Alive") - # Server should use 'Content-Length' or 'Transfer-Encoding' - if r.headers.get('content-length'): - self.assertEqual(int(r.headers['content-length']), 19) - self.assertEqual(c.read(19), 'Welcome to SockJS!\n') - self.assertFalse(r.headers.get('transfer-encoding')) - else: - self.assertEqual(r.headers['transfer-encoding'].lower(), 'chunked') - self.assertEqual(c.read_chunk(), 'Welcome to SockJS!\n') - self.assertEqual(c.read_chunk(), '') - # We should be able to issue another request on the same connection - r = c.request('GET', base_url, http='1.1', - headers={'Connection':'Keep-Alive'}) - self.assertEqual(r.status, 200) - - def test_streaming(self): - url = close_base_url + '/000/' + str(uuid.uuid4()) - c = RawHttpConnection(url) - r = c.request('POST', url + '/xhr_streaming', http='1.1', - headers={'Connection':'Keep-Alive'}) - self.assertEqual(r.status, 200) - # Transfer-encoding is required in http/1.1. - self.assertTrue(r.headers['transfer-encoding'].lower(), 'chunked') - # Content-length is not allowed. - self.assertFalse(r.headers.get('content-length')) - # Connection header can be anything, so don't bother verifying it. +escapable_by_client = re.compile( + "[\\\\\"\\x00-\\x1f\\x7f-\\x9f\\u00ad\\u0600-\\u0604\\u070f\\u17b4\\u17b5\\u2000-\\u20ff\\ufeff\\ufff0-\\uffff\\x00-\\x1f\\ufffe\\uffff\\u0300-\\u0333\\u033d-\\u0346\\u034a-\\u034c\\u0350-\\u0352\\u0357-\\u0358\\u035c-\\u0362\\u0374\\u037e\\u0387\\u0591-\\u05af\\u05c4\\u0610-\\u0617\\u0653-\\u0654\\u0657-\\u065b\\u065d-\\u065e\\u06df-\\u06e2\\u06eb-\\u06ec\\u0730\\u0732-\\u0733\\u0735-\\u0736\\u073a\\u073d\\u073f-\\u0741\\u0743\\u0745\\u0747\\u07eb-\\u07f1\\u0951\\u0958-\\u095f\\u09dc-\\u09dd\\u09df\\u0a33\\u0a36\\u0a59-\\u0a5b\\u0a5e\\u0b5c-\\u0b5d\\u0e38-\\u0e39\\u0f43\\u0f4d\\u0f52\\u0f57\\u0f5c\\u0f69\\u0f72-\\u0f76\\u0f78\\u0f80-\\u0f83\\u0f93\\u0f9d\\u0fa2\\u0fa7\\u0fac\\u0fb9\\u1939-\\u193a\\u1a17\\u1b6b\\u1cda-\\u1cdb\\u1dc0-\\u1dcf\\u1dfc\\u1dfe\\u1f71\\u1f73\\u1f75\\u1f77\\u1f79\\u1f7b\\u1f7d\\u1fbb\\u1fbe\\u1fc9\\u1fcb\\u1fd3\\u1fdb\\u1fe3\\u1feb\\u1fee-\\u1fef\\u1ff9\\u1ffb\\u1ffd\\u2000-\\u2001\\u20d0-\\u20d1\\u20d4-\\u20d7\\u20e7-\\u20e9\\u2126\\u212a-\\u212b\\u2329-\\u232a\\u2adc\\u302b-\\u302c\\uaab2-\\uaab3\\uf900-\\ufa0d\\ufa10\\ufa12\\ufa15-\\ufa1e\\ufa20\\ufa22\\ufa25-\\ufa26\\ufa2a-\\ufa2d\\ufa30-\\ufa6d\\ufa70-\\ufad9\\ufb1d\\ufb1f\\ufb2a-\\ufb36\\ufb38-\\ufb3c\\ufb3e\\ufb40-\\ufb41\\ufb43-\\ufb44\\ufb46-\\ufb4e]" +) - # This is a the same logic as HandlingClose.test_close_frame - self.assertEqual(c.read_chunk()[0], 'h') # prelude - self.assertEqual(c.read_chunk(), 'o\n') - self.assertEqual(c.read_chunk(), 'c[3000,"Go away!"]\n') - self.assertEqual(c.read_chunk(), '') +escapable_by_server = re.compile("[\\x00-\\x1f\\u200c-\\u200f\\u2028-\\u202f\\u2060-\\u206f\\ufff0-\\uffff]") +client_killer_string_esc = '"' + ''.join( + [r'\u%04x' % i for i in range(65536) if escapable_by_client.match(chr(i))] +) + '"' +server_killer_string_esc = '"' + ''.join( + [r'\u%04x' % i for i in range(255, 65536) if escapable_by_server.match(chr(i))] +) + '"' -# Footnote -# ======== -# Make this script runnable. if __name__ == '__main__': unittest.main() diff --git a/utils.py b/utils.py index c3801e9..a50b831 100644 --- a/utils.py +++ b/utils.py @@ -1,100 +1,147 @@ -import urlparse +from __future__ import annotations + +import re +import socket +from typing import Optional, Dict, Any +from urllib.parse import urlparse + import httplib_fork as httplib + from ws4py.client.threadedclient import WebSocketClient -import Queue -import socket -import re +import queue + + +# --- helpers --- + +def _to_bytes(data: Any, encoding: str = "utf-8") -> bytes: + if data is None: + return b"" + if isinstance(data, bytes): + return data + if isinstance(data, str): + return data.encode(encoding) + # fallback (rare in these tests) + return str(data).encode(encoding) + + +def _to_str(data: Any, encoding: str = "utf-8") -> str: + if data is None: + return "" + if isinstance(data, str): + return data + if isinstance(data, (bytes, bytearray, memoryview)): + return bytes(data).decode(encoding, errors="replace") + return str(data) + + +def recvline(s: socket.socket) -> bytes: + b = bytearray() + while True: + c = s.recv(1) + if not c: + # EOF mid-line; return what we have + return bytes(b) + b.extend(c) + if c == b"\n": + return bytes(b) + + +# --- HTTPResponse (legacy; used only by old_POST_async in protocol tests) --- class HttpResponse: - def __init__(self, method, url, - headers={}, body=None, async=False, load=True): - headers = headers.copy() - u = urlparse.urlparse(url) - kwargs = {'timeout': 1.0} - if u.scheme == 'http': + def __init__( + self, + method: str, + url: str, + headers: Optional[Dict[str, str]] = None, + body: Any = None, + async_: bool = False, + load: bool = True, + ): + headers = (headers or {}).copy() + u = urlparse(url) + kwargs = {"timeout": 1.0} + + if u.scheme == "http": conn = httplib.HTTPConnection(u.netloc, **kwargs) - elif u.scheme == 'https': + elif u.scheme == "https": conn = httplib.HTTPSConnection(u.netloc, **kwargs) else: - assert False, "Unsupported scheme " + u.scheme - assert u.fragment == '' - path = u.path + ('?' + u.query if u.query else '') + raise AssertionError("Unsupported scheme " + u.scheme) + + assert u.fragment == "" + path = u.path + ("?" + u.query if u.query else "") self.conn = conn - if not body: - if method is 'POST': - # The spec says: "Applications SHOULD use this field - # to indicate the transfer-length of the message-body, - # unless this is prohibited by the rules in section - # 4.4." - # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.13 - # While httplib sets it only if there is body. - headers['Content-Length'] = 0 + + if body is None or body == b"" or body == "": + if method == "POST": + # httplib in some cases sets Content-Length only when there is a body. + headers["Content-Length"] = "0" conn.request(method, path, headers=headers) else: - if isinstance(body, unicode): - body = body.encode('utf-8') - conn.request(method, path, headers=headers, body=body) + conn.request(method, path, headers=headers, body=_to_bytes(body)) if load: - if not async: + if not async_: self._load() else: self._async_load() - def _get_status(self): + @property + def status(self) -> int: return self.res.status - status = property(_get_status) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Optional[str]: return self.headers.get(key.lower()) - def _load(self): - # That works for Content-Length responses. + def _load(self) -> None: self.res = self.conn.getresponse() - self.headers = dict( (k.lower(), v) for k, v in self.res.getheaders() ) - self.body = self.res.read() + self.headers = {k.lower(): v for (k, v) in self.res.getheaders()} + self.body = _to_str(self.res.read()) self.close() - def close(self): + def close(self) -> None: if self.conn: self.conn.close() self.conn = None - def _async_load(self): - # That works for Transfer-Encoding: Chunked + def _async_load(self) -> None: self.res = self.conn.getresponse() - self.headers = dict( (k.lower(), v) for k, v in self.res.getheaders() ) + self.headers = {k.lower(): v for (k, v) in self.res.getheaders()} - def read(self): - data = self.res.read(10240) + def read(self) -> Optional[str]: + data = self.res.read(10240) if data: - return data - else: - self.close() - return None + return _to_str(data) + self.close() + return None + + +def old_POST_async(url: str, **kwargs) -> HttpResponse: + # Preserve original API; translate async -> async_ + if "async" in kwargs: + kwargs["async_"] = kwargs.pop("async") + return HttpResponse("POST", url, async_=True, **kwargs) -def old_POST_async(url, **kwargs): - return HttpResponse('POST', url, async=True, **kwargs) +# --- WebSocket8Client (ws4py) --- class WebSocket8Client(object): - class ConnectionClosedException(Exception): pass + class ConnectionClosedException(Exception): + pass + + def __init__(self, url: str): + q: "queue.Queue[Any]" = queue.Queue() + self.queue = q - def __init__(self, url): - queue = Queue.Queue() - self.queue = queue class IntWebSocketClient(WebSocketClient): def received_message(self, m): - queue.put(unicode(str(m), 'utf-8')) + # ws4py message -> bytes/str; normalize to str + q.put(_to_str(bytes(m) if hasattr(m, "__bytes__") else str(m))) + def closed(self, code, reason): - queue.put((code, reason)) - # def read_from_connection(self, amount): - # r = super(IntWebSocketClient, self).read_from_connection(amount) - # if self.stream.closing: - # queue.put((self.stream.closing.code, self.stream.closing.reason[2:])) - # elif not r: - # queue.put((1000, "")) - # return r + q.put((code, reason)) + self.client = IntWebSocketClient(url) self.client.connect() @@ -106,6 +153,7 @@ def close(self): self.client = None def send(self, data): + # ws4py can take str; keep as-is self.client.send(data) def recv(self): @@ -116,212 +164,289 @@ def recv(self): (ce.code, ce.reason) = r raise ce return r - except: + except Exception: self.close() raise -def recvline(s): - b = [] - c = None - while c != '\n': - c = s.recv(1) - b.append( c ) - return ''.join(b) +# --- Raw HTTP layer --- class CaseInsensitiveDict(object): def __init__(self, *args, **kwargs): - self.lower = {} - self.d = dict(*args, **kwargs) - for k in self.d: + self.lower: Dict[str, str] = {} + self.d: Dict[str, str] = dict(*args, **kwargs) + for k in list(self.d.keys()): self[k] = self.d[k] def __getitem__(self, key, *args, **kwargs): - pkey = self.lower.setdefault(key.lower(), key) + pkey = self.lower.setdefault(str(key).lower(), key) return self.d.__getitem__(pkey, *args, **kwargs) def __setitem__(self, key, *args, **kwargs): - pkey = self.lower.setdefault(key.lower(), key) + pkey = self.lower.setdefault(str(key).lower(), key) return self.d.__setitem__(pkey, *args, **kwargs) def items(self): for k in self.lower.values(): yield (k, self[k]) - def __repr__(self): return repr(self.d) - def __str__(self): return str(self.d) + def __repr__(self): + return repr(self.d) + + def __str__(self): + return str(self.d) def get(self, key, *args, **kwargs): - pkey = self.lower.setdefault(key.lower(), key) + pkey = self.lower.setdefault(str(key).lower(), key) return self.d.get(pkey, *args, **kwargs) def __contains__(self, key): - pkey = self.lower.setdefault(key.lower(), key) + pkey = self.lower.setdefault(str(key).lower(), key) return pkey in self.d + class Response(object): def __repr__(self): - return '' % ( - self.http, self.status, self.description, self.headers) + return "" % ( + self.http, + self.status, + self.description, + self.headers, + ) - def __str__(self): return repr(self) + def __str__(self): + return repr(self) def __getitem__(self, key): return self.headers.get(key) - def get(self, key, default): + def get(self, key, default=None): return self.headers.get(key, default) class RawHttpConnection(object): - def __init__(self, url): - u = urlparse.urlparse(url) - self.s = socket.create_connection((u.hostname, u.port), timeout=1) - - def request(self, method, url, headers={}, body=None, timeout=1, http="1.1"): - headers = CaseInsensitiveDict(headers) - if method == 'POST': - body = (body or '').encode('utf-8') - u = urlparse.urlparse(url) - headers['Host'] = u.hostname + ':' + str(u.port) if u.port else u.hostname + def __init__(self, url: str): + u = urlparse(url) + port = u.port or (443 if u.scheme == "https" else 80) + self.s = socket.create_connection((u.hostname, port), timeout=1) + + def request( + self, + method: str, + url: str, + headers: Optional[Dict[str, str]] = None, + body: Any = None, + timeout: float = 1, + http: str = "1.1", + ) -> Response: + headers = CaseInsensitiveDict(headers or {}) + if method == "POST": + # original behavior: treat POST body as utf-8 text unless bytes passed + if body is None: + body_b = b"" + else: + body_b = _to_bytes(body) + else: + body_b = _to_bytes(body) if body is not None else b"" + + u = urlparse(url) + host = u.hostname + port = u.port + headers["Host"] = f"{host}:{port}" if port else host + if body is not None: - headers['Content-Length'] = str(len(body)) + headers["Content-Length"] = str(len(body_b)) - rel_url = url[ url.find(u.path): ] + rel_url = url[url.find(u.path) :] if u.path else "/" - req = ["%s %s HTTP/%s" % (method, rel_url, http)] + req_lines = [f"{method} {rel_url} HTTP/{http}"] for k, v in headers.items(): - req.append( "%s: %s" % (k, v) ) - req.append('') - req.append('') - self.send('\r\n'.join(req)) + req_lines.append(f"{k}: {v}") + req_lines.append("") + req_lines.append("") - if body: - self.send(body) + self.send(("\r\n".join(req_lines)).encode("utf-8")) - head = recvline(self.s) - r = re.match(r'HTTP/(?P\S+) (?P\S+) (?P.*)', head) + if body_b: + self.send(body_b) + + head = recvline(self.s).decode("iso-8859-1", errors="replace") + r = re.match(r"HTTP/(?P\S+) (?P\S+) (?P.*)", head) + if not r: + raise Exception("Invalid HTTP response line: %r" % head) resp = Response() - resp.http = r.group('version') - resp.status = int(r.group('status')) - resp.description = r.group('description').rstrip('\r\n') + resp.http = r.group("version") + resp.status = int(r.group("status")) + resp.description = r.group("description").rstrip("\r\n") resp.headers = CaseInsensitiveDict() while True: header = recvline(self.s) - if header in ['\n', '\r\n']: + if header in (b"\n", b"\r\n", b""): break - k, _, v = header.partition(':') - resp.headers[k] = v.lstrip().rstrip('\r\n') + line = header.decode("iso-8859-1", errors="replace") + k, _, v = line.partition(":") + resp.headers[k] = v.lstrip().rstrip("\r\n") return resp - def read(self, size=None): + # --- bytes-level reads (needed for haproxy/binary assertions) --- + + def read_bytes(self, size: Optional[int] = None) -> bytes: if size is None: - # A single packet by default return self.s.recv(999999) - data = [] - while size > 0: - c = self.s.recv(size) + + data = bytearray() + remaining = size + while remaining > 0: + c = self.s.recv(remaining) if not c: - raise Exception('Socket closed!') - size -= len(c) - data.append( c ) - return ''.join(data) + raise Exception("Socket closed!") + remaining -= len(c) + data.extend(c) + return bytes(data) - def read_till_eof(self): - data = [] + def read_till_eof_bytes(self) -> bytes: + data = bytearray() while True: c = self.s.recv(999999) if not c: break - data.append( c ) - return ''.join(data) - - def closed(self): - # To check if socket is being closed, we need to recv and see - # if the response is empty. If it is not - we're in trouble - - # abort. - t = self.s.settimeout(0.1) - r = self.s.recv(1) == '' - if not r: - raise Exception('Socket not closed!') - self.s.settimeout(t) - return r + data.extend(c) + return bytes(data) - def read_chunk(self): - line = recvline(self.s).rstrip('\r\n') - bytes = int(line, 16) + 2 # Additional \r\n - return self.read(bytes)[:-2] + def closed(self) -> bool: + t = self.s.gettimeout() + self.s.settimeout(0.1) + try: + b = self.s.recv(1) + r = b == b"" + if not r: + raise Exception("Socket not closed!") + return True + finally: + self.s.settimeout(t) - def send(self, data): - self.s.sendall(data) + def read_chunk_bytes(self) -> bytes: + line = recvline(self.s).rstrip(b"\r\n") + n = int(line.decode("ascii"), 16) + 2 # plus trailing \r\n + return self.read_bytes(n)[:-2] - def close(self): + # --- compatibility str-level wrappers (used by HTTP helpers) --- + + def read(self, size: Optional[int] = None) -> str: + return _to_str(self.read_bytes(size)) + + def read_till_eof(self) -> str: + return _to_str(self.read_till_eof_bytes()) + + def read_chunk(self) -> str: + return _to_str(self.read_chunk_bytes()) + + def send(self, data: Any) -> None: + self.s.sendall(_to_bytes(data)) + + def close(self) -> None: self.s.close() -def SynchronousHttpRequest(method, url, **kwargs): +# --- HTTP helper functions used by the protocol test suite --- + +def SynchronousHttpRequest(method: str, url: str, **kwargs) -> Response: c = RawHttpConnection(url) r = c.request(method, url, **kwargs) - if r.get('Transfer-Encoding', '').lower() == 'chunked': + + if r.get("Transfer-Encoding", "").lower() == "chunked": chunks = [] while True: chunk = c.read_chunk() if len(chunk) == 0: break - chunks.append( chunk ) - r.body = ''.join(chunks) - elif r.get('Content-Length', ''): - cl = int(r['Content-Length']) - r.body = c.read(cl) - elif 'close' in [k.strip() for k in r.get('Connection', '').lower().split(',')]: - r.body = c.read_till_eof() + chunks.append(chunk) + r.body = "".join(chunks) + + elif r.get("Content-Length", ""): + cl = int(r["Content-Length"]) + r.body = c.read_bytes(cl).decode("utf-8", errors="replace") + + elif "close" in [k.strip() for k in r.get("Connection", "").lower().split(",")]: + r.body = c.read_till_eof_bytes().decode("utf-8", errors="replace") + else: # Whitelist statuses that may not need a response - if r.status in [101, 304, 204] or (r.status == 200 and method == 'OPTIONS'): - r.body = '' + if r.status in [101, 304, 204] or (r.status == 200 and method == "OPTIONS"): + r.body = "" else: - raise Exception(str(r.status) + ' '+str(r.headers) + " No Transfer-Encoding:chunked nor Content-Length nor Connection:Close!") + raise Exception( + f"{r.status} {r.headers} " + "No Transfer-Encoding:chunked nor Content-Length nor Connection:Close!" + ) + c.close() return r -def GET(url, **kwargs): - return SynchronousHttpRequest('GET', url, **kwargs) -def POST(url, **kwargs): - return SynchronousHttpRequest('POST', url, **kwargs) +def GET(url: str, **kwargs) -> Response: + return SynchronousHttpRequest("GET", url, **kwargs) -def OPTIONS(url, **kwargs): - return SynchronousHttpRequest('OPTIONS', url, **kwargs) -def AsynchronousHttpRequest(method, url, **kwargs): +def POST(url: str, **kwargs) -> Response: + return SynchronousHttpRequest("POST", url, **kwargs) + + +def OPTIONS(url: str, **kwargs) -> Response: + return SynchronousHttpRequest("OPTIONS", url, **kwargs) + + +def AsynchronousHttpRequest(method: str, url: str, **kwargs) -> Response: c = RawHttpConnection(url) r = c.request(method, url, **kwargs) - if r.get('Transfer-Encoding', '').lower() == 'chunked': - def read(): + + if r.get("Transfer-Encoding", "").lower() == "chunked": + + def read() -> str: return c.read_chunk() + r.read = read - elif r.get('Content-Length', ''): - cl = int(r['Content-Length']) - def read(): - return c.read(cl) + + elif r.get("Content-Length", ""): + cl = int(r["Content-Length"]) + + def read() -> str: + # NOTE: not truly streaming; matches old behavior + return c.read_bytes(cl).decode("utf-8", errors="replace") + r.read = read - elif ('close' in [k.strip() for k in r.get('Connection', '').lower().split(',')] - or r.status == 101): - def read(): - return c.read() + + elif ("close" in [k.strip() for k in r.get("Connection", "").lower().split(",")]) or ( + r.status == 101 + ): + + def read() -> Optional[str]: + b = c.read_bytes() + if b: + return b.decode("utf-8", errors="replace") + return None + r.read = read + else: - raise Exception(str(r.status) + ' '+str(r.headers) + " No Transfer-Encoding:chunked nor Content-Length nor Connection:Close!") - def close(): + raise Exception( + f"{r.status} {r.headers} " + "No Transfer-Encoding:chunked nor Content-Length nor Connection:Close!" + ) + + def close() -> None: c.close() + r.close = close return r -def GET_async(url, **kwargs): - return AsynchronousHttpRequest('GET', url, **kwargs) -def POST_async(url, **kwargs): - return AsynchronousHttpRequest('POST', url, **kwargs) +def GET_async(url: str, **kwargs) -> Response: + return AsynchronousHttpRequest("GET", url, **kwargs) + + +def POST_async(url: str, **kwargs) -> Response: + return AsynchronousHttpRequest("POST", url, **kwargs)