Skip to content

Commit b6825d8

Browse files
Rework Python Mock BPA Tests (#113)
* init * refactor init * pr fixes * Clear state before assert My comment was unclear, this is what I meant * A few very minor cleanups * Fix setUp state * Applying autopep8 to python sources --------- Co-authored-by: Brian Sipos <brian.sipos@jhuapl.edu>
1 parent de57f50 commit b6825d8

11 files changed

Lines changed: 1104 additions & 945 deletions

File tree

.github/workflows/formatting.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,6 @@ jobs:
4646
run: |
4747
sudo apt-get update && sudo apt-get install -y \
4848
clang-format python3-pip
49-
pip3 install licenseheaders
49+
pip3 install autopep8 licenseheaders
5050
- name: Check code formatting
5151
run: ./build.sh check-format

mock-bpa-test/_generate_simple_bundles.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
from Crypto.Random import get_random_bytes
2929

3030
# HMAC signs, encrypts, or decrypts payload string
31-
# This is a really messy function for now
32-
def sign_and_encrypt( payload_s:str,
33-
sign:bool=True, sign_key_s:str='00112233445566778899AABBCCDDEEFF',
34-
enc:bool=True, enc_key_s:str='00112233445566778899AABBCCDDEEFF',
35-
denc:bool=False, denc_key_s:str='71776572747975696f70617364666768', denc_tag_s:str='5d37d992dbc6fc795ea597ed7e8a6078'):
36-
37-
#print(f'{payload_s} | {sign} | {sign_key_s} | {enc} | {enc_key_s} | {denc} | {denc_key_s} | {denc_tag_s}\n')
31+
# This is a really messy function for now
32+
33+
34+
def sign_and_encrypt(payload_s: str,
35+
sign: bool = True, sign_key_s: str = '00112233445566778899AABBCCDDEEFF',
36+
enc: bool = True, enc_key_s: str = '00112233445566778899AABBCCDDEEFF',
37+
denc: bool = False, denc_key_s: str = '71776572747975696f70617364666768', denc_tag_s: str = '5d37d992dbc6fc795ea597ed7e8a6078'):
38+
39+
# print(f'{payload_s} | {sign} | {sign_key_s} | {enc} | {enc_key_s} | {denc} | {denc_key_s} | {denc_tag_s}\n')
3840

3941
# TODO only convert what's needed in if statements below
4042
payload = bytes.fromhex(payload_s)
@@ -64,7 +66,8 @@ def sign_and_encrypt( payload_s:str,
6466
cipher = AES.new(enc_key, AES.MODE_GCM, nonce=iv)
6567
cipher.update(aad)
6668
ciphertext, tag = cipher.encrypt_and_digest(payload)
67-
results[1] = (ciphertext.hex(), tag.hex(), '69c411276fecddc4780df42c8a2af89296fabf34d7fae700', iv.hex())
69+
results[1] = (ciphertext.hex(), tag.hex(),
70+
'69c411276fecddc4780df42c8a2af89296fabf34d7fae700', iv.hex())
6871

6972
if denc:
7073
cipher = AES.new(denc_key, AES.MODE_GCM, nonce=iv)
@@ -81,10 +84,10 @@ def add_bib_to_bundle_over_x(bundle, x):
8184
if x == 0:
8285
sign = sign_and_encrypt(cbor2.dumps(bundle[0]).hex(), sign=True, enc=False, denc=False)[0]
8386
asb = [
84-
[x], 1, 1, [2, [2, 1]],
85-
[[1, 7], [3, 0]],
86-
[[[1, bytes.fromhex(sign)]]]
87-
]
87+
[x], 1, 1, [2, [2, 1]],
88+
[[1, 7], [3, 0]],
89+
[[[1, bytes.fromhex(sign)]]]
90+
]
8891
buf = io.BytesIO()
8992
for b in asb:
9093
cbor2.dump(b, buf)
@@ -101,7 +104,7 @@ def add_bib_to_bundle_over_x(bundle, x):
101104
# find the target block
102105
if blk[1] == x:
103106

104-
# get the HMAC signature
107+
# get the HMAC signature
105108
sign = sign_and_encrypt(blk[4], sign=True, enc=False, denc=False)[0]
106109

107110
# create ASB and cbor dump as BIB btsd
@@ -119,6 +122,7 @@ def add_bib_to_bundle_over_x(bundle, x):
119122

120123
return bundle
121124

125+
122126
def add_bcb_to_bundle_over_x(bundle, x):
123127
# find max blk num, we will make new BCB max+1. This guarantees the block num is free
124128
mx_blk_num = max(bundle, key=lambda blk: blk[1])[1]
@@ -133,9 +137,10 @@ def add_bcb_to_bundle_over_x(bundle, x):
133137
# create ASB and cbor dump as BCB btsd
134138
asb = [
135139
[x], 2, 1, [2, [2, 1]],
136-
# IV # wrapped key
137-
[[1, bytes.fromhex(ciphertext[3])], [2, 1], [3, bytes.fromhex(ciphertext[2])], [4, 0]],
138-
# ciphertext
140+
# IV # wrapped key
141+
[[1, bytes.fromhex(ciphertext[3])], [2, 1], [
142+
3, bytes.fromhex(ciphertext[2])], [4, 0]],
143+
# ciphertext
139144
[[[1, bytes.fromhex(ciphertext[1])]]]
140145
]
141146
buf = io.BytesIO()
@@ -150,14 +155,15 @@ def add_bcb_to_bundle_over_x(bundle, x):
150155

151156
return bundle
152157

158+
153159
b = [
154160
[7, 0, 0, [2, [1, 2]], [2, [2, 1]], [2, [2, 1]], [0, 40], 1000000],
155161
[1, 1, 0, 0, '526561647920746F2067656E657261746520612033322D62797465207061796C6F6164'],
156162
]
157163

158164

159-
print (f"ORIGINAL BUNDLE: {b}")
165+
print(f"ORIGINAL BUNDLE: {b}")
160166
b = add_bib_to_bundle_over_x(b, 1)
161167
print(f'BUNDLE AFTER BIB: {b}')
162-
#b = add_bcb_to_bundle_over_x(b, 1)
168+
# b = add_bcb_to_bundle_over_x(b, 1)
163169
print(f'FINAL BUNDLE: {b}')

mock-bpa-test/_test_util.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,38 +21,35 @@
2121
#
2222
from enum import Enum
2323

24+
2425
class DataFormat(Enum):
25-
BUNDLEARRAY=0
26-
HEX=1
27-
ERR=2
28-
NONE=3
26+
BUNDLEARRAY = 0
27+
HEX = 1
28+
ERR = 2
29+
NONE = 3
2930

3031
# "structure" to hold a simple test case
32+
33+
3134
class _TestCase:
3235
'''
3336
@param input_data: list representation of bundle
3437
@param expected_output: either list representation of expected output bundle OR a string to search log output for match
35-
@param policy_config: decimal digit representing uint32 for policy configuration
38+
@param policy_config: decimal digit representing uint32 for policy configuration OR path to JSON-encoded ION-like policy rules
39+
@param key_set: path to JWK-encoded key set
3640
@param is_working: True if test working
3741
@param input/output_data_format: data format of input/output
3842
'''
39-
def __init__(self, input_data, expected_output, policy_config, is_working: bool,
40-
input_data_format : DataFormat, expected_output_format : DataFormat):
43+
44+
def __init__(self, input_data, expected_output: DataFormat, policy_config: str, key_set: str, is_working: bool,
45+
input_data_format: DataFormat, expected_output_format: DataFormat):
4146
self.input_data = input_data
4247
self.expected_output = expected_output
4348
self.policy_config = policy_config
49+
self.key_set = key_set
4450

45-
# can be removed once all tests are wworking
51+
# can be removed once all tests are working
4652
self.is_working = is_working
4753

4854
self.input_data_format = input_data_format
4955
self.expected_output_format = expected_output_format
50-
51-
52-
class _TestSet:
53-
def __init__(self):
54-
self.cases = {}
55-
56-
NO_OUTPUT = 0
57-
FAILURE_CODE = -1
58-
DELETION = -2

mock-bpa-test/ccsds_tests.py

Lines changed: 0 additions & 179 deletions
This file was deleted.

mock-bpa-test/helpers/runner.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def _write_stdin(self, stream):
147147
stream.flush()
148148
LOGGER.debug('Stopping stdin thread')
149149

150-
def wait_for_line(self, timeout:float=5) -> str:
150+
def wait_for_line(self, timeout: float = 5) -> str:
151151
''' Wait for any received stdout line.
152152
153153
:param timeout: The total time to wait for this line.
@@ -160,7 +160,7 @@ def wait_for_line(self, timeout:float=5) -> str:
160160
raise TimeoutError('no lines received before timeout')
161161
return text
162162

163-
def wait_for_text(self, pattern:str, timeout:float=5) -> str:
163+
def wait_for_text(self, pattern: str, timeout: float = 5) -> str:
164164
''' Iterate through the received stdout lines until a specific
165165
full matching line is seen.
166166
@@ -187,7 +187,7 @@ def wait_for_text(self, pattern:str, timeout:float=5) -> str:
187187
if expr.match(text) is not None:
188188
return text
189189

190-
def send_stdin(self, text:str):
190+
def send_stdin(self, text: str):
191191
''' Send an exact line of text to the process stdin.
192192
193193
:param text: The line to send, which should include a newline

0 commit comments

Comments
 (0)