Skip to content

Commit f20b17d

Browse files
committed
Fix lint issues, apply ruff formatting
1 parent 62ed0df commit f20b17d

14 files changed

Lines changed: 1613 additions & 786 deletions

blockbuster/blockbuster.py

Lines changed: 680 additions & 356 deletions
Large diffs are not rendered by default.

tests/conftest.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
import base64
44
import io
5-
import os
65
import sys
7-
import asyncio
86

97
import httpx
108
import pytest
@@ -17,6 +15,7 @@
1715
# AES-CBC padding oracle simulator
1816
# ---------------------------------------------------------------------------
1917

18+
2019
class PaddingOracle:
2120
"""A simulated AES-CBC padding oracle for deterministic offline testing."""
2221

@@ -55,8 +54,8 @@ def check_padding(self, ciphertext_with_iv: bytes) -> bool:
5554

5655

5756
# Fixed test key / IV for reproducible tests
58-
AES_KEY = bytes(range(16)) # 0x00..0x0f
59-
AES_IV = bytes(range(16, 32)) # 0x10..0x1f
57+
AES_KEY = bytes(range(16)) # 0x00..0x0f
58+
AES_IV = bytes(range(16, 32)) # 0x10..0x1f
6059
ORACLE = PaddingOracle(AES_KEY, AES_IV, blocksize=16)
6160

6261

@@ -130,8 +129,14 @@ def encrypted_hello(oracle):
130129
# respx-based mock HTTP oracle
131130
# ---------------------------------------------------------------------------
132131

133-
def _make_oracle_route(oracle_obj, encoding_mode="base64", input_mode="parameter",
134-
vuln_param="token", oracle_text="Invalid padding"):
132+
133+
def _make_oracle_route(
134+
oracle_obj,
135+
encoding_mode="base64",
136+
input_mode="parameter",
137+
vuln_param="token",
138+
oracle_text="Invalid padding",
139+
):
135140
"""Return a respx side-effect function that simulates a padding oracle."""
136141

137142
def _handler(request: httpx.Request) -> httpx.Response:
@@ -141,24 +146,27 @@ def _handler(request: httpx.Request) -> httpx.Response:
141146

142147
if input_mode == "parameter":
143148
from urllib.parse import parse_qs, urlparse
149+
144150
qs = parse_qs(urlparse(url).query)
145151
token = qs.get(vuln_param, [None])[0]
146152
elif input_mode == "querystring":
147153
from urllib.parse import urlparse
154+
148155
token = urlparse(url).query
149156
elif input_mode == "cookie":
150157
cookie_header = request.headers.get("cookie", "")
151158
for part in cookie_header.split(";"):
152159
part = part.strip()
153160
if part.startswith(vuln_param + "="):
154-
token = part[len(vuln_param) + 1:]
161+
token = part[len(vuln_param) + 1 :]
155162
break
156163

157164
if token is None:
158165
return httpx.Response(400, text="Missing token")
159166

160167
# Decode
161168
import urllib.parse as up
169+
162170
if encoding_mode == "base64":
163171
raw = up.unquote_plus(token)
164172
raw += "=" * (len(raw) % 4)

tests/test_block_orchestration.py

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
"""Tests for nextBlock, block retry logic, and encrypt sanity check."""
22

3-
import asyncio
4-
import base64
5-
import io
6-
import sys
73
from types import SimpleNamespace
8-
from unittest.mock import patch, AsyncMock, MagicMock
4+
from unittest.mock import patch, AsyncMock
95

106
import pytest
117

12-
from tests.conftest import make_job, PaddingOracle, AES_KEY, AES_IV, ORACLE
8+
from tests.conftest import make_job
139

1410

1511
class TestNextBlockDecrypt:
@@ -24,8 +20,9 @@ async def test_successful_decrypt_returns_0(self):
2420
j.blocks = [[0] * 16]
2521
j.iv = [0] * 16
2622

27-
with patch.object(j, 'decryptBlock', new_callable=AsyncMock,
28-
return_value=b"decrypted_block!"):
23+
with patch.object(
24+
j, "decryptBlock", new_callable=AsyncMock, return_value=b"decrypted_block!"
25+
):
2926
result = await j.nextBlock()
3027

3128
assert result == 0
@@ -40,8 +37,12 @@ async def test_failed_decrypt_returns_1(self):
4037
j.blocks = [[0] * 16]
4138
j.iv = [0] * 16
4239

43-
with patch.object(j, 'decryptBlock', new_callable=AsyncMock,
44-
side_effect=Exception("Block failed")):
40+
with patch.object(
41+
j,
42+
"decryptBlock",
43+
new_callable=AsyncMock,
44+
side_effect=Exception("Block failed"),
45+
):
4546
result = await j.nextBlock()
4647

4748
assert result == 1
@@ -53,7 +54,9 @@ class TestNextBlockEncrypt:
5354

5455
@pytest.mark.asyncio
5556
async def test_successful_encrypt_with_passing_sanity_check(self):
56-
j = make_job(mode="encrypt", oracleMode="negative", oracleText="Invalid padding")
57+
j = make_job(
58+
mode="encrypt", oracleMode="negative", oracleText="Invalid padding"
59+
)
5760
j.initialize_client()
5861
j.blockCount = 1
5962
j.currentBlock = 0
@@ -65,16 +68,20 @@ async def test_successful_encrypt_with_passing_sanity_check(self):
6568
def mock_request(token):
6669
return SimpleNamespace(text="OK", status_code=200)
6770

68-
with patch.object(j, 'encryptBlock', new_callable=AsyncMock, return_value=block_result):
69-
with patch.object(j, 'makeRequest', side_effect=mock_request):
71+
with patch.object(
72+
j, "encryptBlock", new_callable=AsyncMock, return_value=block_result
73+
):
74+
with patch.object(j, "makeRequest", side_effect=mock_request):
7075
result = await j.nextBlock()
7176

7277
assert result == 0
7378
assert j.solvedBlocks[0] == block_result
7479

7580
@pytest.mark.asyncio
7681
async def test_failed_sanity_check_backs_out_block(self):
77-
j = make_job(mode="encrypt", oracleMode="negative", oracleText="Invalid padding")
82+
j = make_job(
83+
mode="encrypt", oracleMode="negative", oracleText="Invalid padding"
84+
)
7885
j.initialize_client()
7986
j.blockCount = 1
8087
j.currentBlock = 0
@@ -87,8 +94,10 @@ def mock_request(token):
8794
# Sanity check fails — the oracle text IS found (negative mode: pass = text absent)
8895
return SimpleNamespace(text="Invalid padding", status_code=200)
8996

90-
with patch.object(j, 'encryptBlock', new_callable=AsyncMock, return_value=block_result):
91-
with patch.object(j, 'makeRequest', side_effect=mock_request):
97+
with patch.object(
98+
j, "encryptBlock", new_callable=AsyncMock, return_value=block_result
99+
):
100+
with patch.object(j, "makeRequest", side_effect=mock_request):
92101
result = await j.nextBlock()
93102

94103
assert result == 1
@@ -103,8 +112,12 @@ async def test_encrypt_exception_returns_1(self):
103112
j.blocks = [[0] * 16]
104113
j.iv = [0] * 16
105114

106-
with patch.object(j, 'encryptBlock', new_callable=AsyncMock,
107-
side_effect=Exception("Encrypt failed")):
115+
with patch.object(
116+
j,
117+
"encryptBlock",
118+
new_callable=AsyncMock,
119+
side_effect=Exception("Encrypt failed"),
120+
):
108121
result = await j.nextBlock()
109122

110123
assert result == 1
@@ -136,8 +149,8 @@ async def mock_next_block():
136149
max_block_retries = 3
137150
block_failures = 0
138151

139-
with patch.object(j, 'nextBlock', side_effect=mock_next_block):
140-
with patch('blockbuster.blockbuster.saveState'):
152+
with patch.object(j, "nextBlock", side_effect=mock_next_block):
153+
with patch("blockbuster.blockbuster.saveState"):
141154
while j.currentBlock < j.blockCount:
142155
result = await j.nextBlock()
143156
if result == 0:
@@ -167,7 +180,7 @@ async def always_fail():
167180
max_block_retries = 3
168181
block_failures = 0
169182

170-
with patch.object(j, 'nextBlock', side_effect=always_fail):
183+
with patch.object(j, "nextBlock", side_effect=always_fail):
171184
while j.currentBlock < j.blockCount:
172185
result = await j.nextBlock()
173186
if result == 0:
@@ -188,8 +201,13 @@ class TestNextBlockEncryptKnownIV:
188201
@pytest.mark.asyncio
189202
async def test_encrypt_knowniv_appends_iv(self):
190203
iv = list(range(16))
191-
j = make_job(mode="encrypt", oracleMode="negative", oracleText="Invalid padding",
192-
ivMode="knownIV", iv=iv)
204+
j = make_job(
205+
mode="encrypt",
206+
oracleMode="negative",
207+
oracleText="Invalid padding",
208+
ivMode="knownIV",
209+
iv=iv,
210+
)
193211
j.initialize_client()
194212
j.blockCount = 1
195213
j.currentBlock = 0
@@ -202,8 +220,10 @@ def mock_request(token):
202220
captured_tokens.append(token)
203221
return SimpleNamespace(text="OK", status_code=200)
204222

205-
with patch.object(j, 'encryptBlock', new_callable=AsyncMock, return_value=block_result):
206-
with patch.object(j, 'makeRequest', side_effect=mock_request):
223+
with patch.object(
224+
j, "encryptBlock", new_callable=AsyncMock, return_value=block_result
225+
):
226+
with patch.object(j, "makeRequest", side_effect=mock_request):
207227
result = await j.nextBlock()
208228

209229
assert result == 0

tests/test_config_parsing.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
"""Tests for argument parsing and config validation in async_main."""
22

33
import configparser
4-
import io
54
import json
6-
import os
7-
import sys
8-
import tempfile
95

106
import pytest
117

@@ -101,11 +97,16 @@ def test_invalid_http_method(self, tmp_path):
10197
def test_invalid_post_format(self, tmp_path):
10298
path = self._write_config(tmp_path, {"httpMethod": "POST", "postFormat": "xml"})
10399
config = self._read_config(path)
104-
assert config["default"]["postFormat"] not in ("form-urlencoded", "multipart", "json")
100+
assert config["default"]["postFormat"] not in (
101+
"form-urlencoded",
102+
"multipart",
103+
"json",
104+
)
105105

106106
def test_invalid_proxy_ip(self):
107107
"""socket.inet_aton should reject invalid IPs."""
108108
import socket
109+
109110
with pytest.raises(socket.error):
110111
socket.inet_aton("not.an.ip")
111112

@@ -131,12 +132,22 @@ def test_valid_iv_modes(self, tmp_path):
131132
for mode in ("firstblock", "knownIV", "unknown", "anchorBlock"):
132133
path = self._write_config(tmp_path, {"ivMode": mode})
133134
config = self._read_config(path)
134-
assert config["default"]["ivMode"] in ("firstblock", "knownIV", "unknown", "anchorBlock")
135+
assert config["default"]["ivMode"] in (
136+
"firstblock",
137+
"knownIV",
138+
"unknown",
139+
"anchorBlock",
140+
)
135141

136142
def test_invalid_iv_mode(self, tmp_path):
137143
path = self._write_config(tmp_path, {"ivMode": "randomIV"})
138144
config = self._read_config(path)
139-
assert config["default"]["ivMode"] not in ("firstblock", "knownIV", "unknown", "anchorBlock")
145+
assert config["default"]["ivMode"] not in (
146+
"firstblock",
147+
"knownIV",
148+
"unknown",
149+
"anchorBlock",
150+
)
140151

141152
def test_knowniv_requires_iv(self, tmp_path):
142153
"""In knownIV mode, iv must be provided, correct length, all ints."""
@@ -146,7 +157,9 @@ def test_knowniv_requires_iv(self, tmp_path):
146157
assert len(iv) == 0 # Would fail validation
147158

148159
def test_knowniv_iv_length_mismatch(self, tmp_path):
149-
path = self._write_config(tmp_path, {"ivMode": "knownIV", "iv": "[1,2,3]", "blocksize": "16"})
160+
path = self._write_config(
161+
tmp_path, {"ivMode": "knownIV", "iv": "[1,2,3]", "blocksize": "16"}
162+
)
150163
config = self._read_config(path)
151164
iv = json.loads(config["default"]["iv"])
152165
blocksize = int(config["default"]["blocksize"])

0 commit comments

Comments
 (0)