Skip to content

Commit 03fdc8f

Browse files
fultonjclaude
authored andcommitted
[cephx_key] Add aes256k cipher support
Add an optional `cipher` parameter (choices: aes, aes256k; default: aes) to the `cephx_key` Ansible module so CI jobs can generate AES-256k (32-byte, type=2) CephX keys. - Refactor __create_cephx_key() to accept cipher argument; use key_type=2 and os.urandom(32) for aes256k, key_type=1 and os.urandom(16) for aes (default, backward compatible). - Update DOCUMENTATION, EXAMPLES and RETURN docstrings. - Update the "Generate a cephx key" task in hooks/playbooks/ceph.yml to pass `cipher: "{{ cifmw_ceph_key_cipher | default('aes') }}"`, allowing scenarios to opt in via a single variable. - Add tests/unit/modules/test_cephx_key.py with 8 tests covering both cipher modes, invalid input, base64 validity, and key randomness. Jira: OSPRH-29667 Signed-off-by: John Fulton <fulton@redhat.com> Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 04a26f4 commit 03fdc8f

3 files changed

Lines changed: 183 additions & 10 deletions

File tree

hooks/playbooks/ceph.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@
353353

354354
- name: Generate a cephx key
355355
cephx_key:
356+
cipher: "{{ cifmw_ceph_key_cipher | default('aes') }}"
356357
register: cephx
357358
no_log: "{{ cifmw_nolog | default(true) | bool }}"
358359

plugins/modules/cephx_key.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,53 @@
1414
short_description: Generate a random CephX authentication key
1515
1616
description:
17-
- Generate a random CephX authentication key and return it
17+
- Generate a random CephX authentication key and return it.
18+
- Supports AES-128 (default, type=1, 16-byte key) and AES-256k (type=2, 32-byte key) ciphers.
19+
20+
options:
21+
cipher:
22+
description:
23+
- The cipher to use when generating the CephX key.
24+
- Use C(aes) for AES-128 (16-byte key, 40-char base64, type=1). This is the default.
25+
- Use C(aes256k) for AES-256k (32-byte key, 60-char base64, type=2).
26+
type: str
27+
default: aes
28+
choices: [aes, aes256k]
1829
1930
author:
2031
- John Fulton (@fultonj)
2132
"""
2233

2334
EXAMPLES = r"""
24-
- name: Generate a cephx key
35+
- name: Generate a cephx key (AES-128, backward compatible default)
2536
cifmw.general.cephx_key:
2637
register: cephx
2738
39+
- name: Generate a cephx key with explicit AES-128 cipher
40+
cifmw.general.cephx_key:
41+
cipher: aes
42+
register: cephx
43+
44+
- name: Generate a cephx key with AES-256k cipher
45+
cifmw.general.cephx_key:
46+
cipher: aes256k
47+
register: cephx
48+
2849
- name: Show cephx key
2950
debug:
3051
msg: "{{ cephx.key }}"
3152
"""
3253

3354
RETURN = r"""
3455
key:
35-
description: A random cephx authentication key
36-
type: dict
56+
description:
57+
- A random CephX authentication key encoded as base64.
58+
- AES-128 keys are 40 characters long (ending with ==).
59+
- AES-256k keys are 60 characters long (ending with =).
60+
type: str
3761
returned: success
3862
sample:
39-
- KEY: AQC+vYNXgDAgAhAAc8UoYt+OTz5uhV7ItLdwUw==
63+
- AQC+vYNXgDAgAhAAc8UoYt+OTz5uhV7ItLdwUw==
4064
"""
4165

4266

@@ -47,21 +71,33 @@
4771
import time
4872

4973

50-
def __create_cephx_key():
74+
def __create_cephx_key(cipher="aes"):
5175
# NOTE(fultonj): Taken from
5276
# https://github.com/ceph/ceph-deploy/blob/master/ceph_deploy/new.py#L21
53-
key = os.urandom(16)
54-
header = struct.pack("<hiih", 1, int(time.time()), 0, len(key))
77+
if cipher == "aes256k":
78+
key_type = 2
79+
key = os.urandom(32)
80+
else:
81+
key_type = 1
82+
key = os.urandom(16)
83+
header = struct.pack("<hiih", key_type, int(time.time()), 0, len(key))
5584
return base64.b64encode(header + key).decode("utf-8")
5685

5786

5887
def main():
59-
mod_args = {}
88+
mod_args = {
89+
"cipher": {
90+
"type": "str",
91+
"default": "aes",
92+
"choices": ["aes", "aes256k"],
93+
}
94+
}
6095
module = AnsibleModule(argument_spec=mod_args, supports_check_mode=False)
6196

6297
result = {"changed": False, "error": ""}
6398

64-
cephx_key = __create_cephx_key()
99+
cipher = module.params["cipher"]
100+
cephx_key = __create_cephx_key(cipher)
65101
if not cephx_key:
66102
result["msg"] = "Error: unable to create cephx key"
67103
module.fail_json(**result)
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Copyright: (c) 2026, Red Hat
2+
3+
# GNU General Public License v3.0+ (see COPYING or
4+
# https://www.gnu.org/licenses/gpl-3.0.txt)
5+
from __future__ import absolute_import, division, print_function
6+
7+
import base64
8+
import struct
9+
import unittest.mock # noqa: F401 — required so unittest.mock is loaded for utils.py setUp
10+
11+
from ansible_collections.cifmw.general.tests.unit.utils import (
12+
AnsibleExitJson,
13+
AnsibleFailJson,
14+
ModuleBaseTestCase,
15+
set_module_args,
16+
)
17+
from ansible_collections.cifmw.general.plugins.modules import cephx_key
18+
19+
20+
class TestCephxKey(ModuleBaseTestCase):
21+
"""Unit tests for the cephx_key Ansible module."""
22+
23+
def _decode_key(self, key_b64):
24+
"""Decode a base64 CephX key and return (header_tuple, key_bytes)."""
25+
raw = base64.b64decode(key_b64)
26+
# Header is 12 bytes: struct.pack("<hiih", type, sec, nsec, key_len)
27+
header = struct.unpack("<hiih", raw[:12])
28+
key_bytes = raw[12:]
29+
return header, key_bytes
30+
31+
def test_default_cipher_returns_aes128_key(self):
32+
"""No args: default cipher produces a 28-byte (AES-128) encoded key."""
33+
set_module_args({})
34+
with self.assertRaises(AnsibleExitJson) as ctx:
35+
cephx_key.main()
36+
result = ctx.exception.args[0]
37+
self.assertIn("key", result)
38+
key_b64 = result["key"]
39+
raw = base64.b64decode(key_b64)
40+
self.assertEqual(len(raw), 28)
41+
header, key_bytes = self._decode_key(key_b64)
42+
# type=1, key_len=16
43+
self.assertEqual(header[0], 1)
44+
self.assertEqual(header[3], 16)
45+
self.assertEqual(len(key_bytes), 16)
46+
47+
def test_aes_cipher_returns_aes128_key(self):
48+
"""cipher=aes produces a 28-byte (AES-128) encoded key."""
49+
set_module_args({"cipher": "aes"})
50+
with self.assertRaises(AnsibleExitJson) as ctx:
51+
cephx_key.main()
52+
result = ctx.exception.args[0]
53+
key_b64 = result["key"]
54+
raw = base64.b64decode(key_b64)
55+
self.assertEqual(len(raw), 28)
56+
header, key_bytes = self._decode_key(key_b64)
57+
self.assertEqual(header[0], 1)
58+
self.assertEqual(header[3], 16)
59+
self.assertEqual(len(key_bytes), 16)
60+
61+
def test_aes256k_cipher_returns_aes256k_key(self):
62+
"""cipher=aes256k produces a 44-byte (AES-256k) encoded key."""
63+
set_module_args({"cipher": "aes256k"})
64+
with self.assertRaises(AnsibleExitJson) as ctx:
65+
cephx_key.main()
66+
result = ctx.exception.args[0]
67+
key_b64 = result["key"]
68+
raw = base64.b64decode(key_b64)
69+
self.assertEqual(len(raw), 44)
70+
header, key_bytes = self._decode_key(key_b64)
71+
# type=2, key_len=32
72+
self.assertEqual(header[0], 2)
73+
self.assertEqual(header[3], 32)
74+
self.assertEqual(len(key_bytes), 32)
75+
76+
def test_invalid_cipher_fails(self):
77+
"""cipher=invalid raises AnsibleFailJson (AnsibleModule enforces choices)."""
78+
set_module_args({"cipher": "invalid"})
79+
with self.assertRaises(AnsibleFailJson) as ctx:
80+
cephx_key.main()
81+
result = ctx.exception.args[0]
82+
self.assertTrue(result["failed"])
83+
84+
def test_aes_key_is_valid_base64(self):
85+
"""AES-128 key is valid base64 and ends with ==."""
86+
set_module_args({"cipher": "aes"})
87+
with self.assertRaises(AnsibleExitJson) as ctx:
88+
cephx_key.main()
89+
key_b64 = ctx.exception.args[0]["key"]
90+
# Should not raise
91+
decoded = base64.b64decode(key_b64)
92+
self.assertIsInstance(decoded, bytes)
93+
self.assertTrue(
94+
key_b64.endswith("=="), f"Expected == suffix, got: {key_b64[-2:]}"
95+
)
96+
97+
def test_aes256k_key_is_valid_base64(self):
98+
"""AES-256k key is valid base64 and ends with a single =."""
99+
set_module_args({"cipher": "aes256k"})
100+
with self.assertRaises(AnsibleExitJson) as ctx:
101+
cephx_key.main()
102+
key_b64 = ctx.exception.args[0]["key"]
103+
decoded = base64.b64decode(key_b64)
104+
self.assertIsInstance(decoded, bytes)
105+
self.assertTrue(
106+
key_b64.endswith("=") and not key_b64.endswith("=="),
107+
f"Expected single = suffix, got: {key_b64[-2:]}",
108+
)
109+
110+
def test_key_changes_on_each_call(self):
111+
"""Two successive calls produce different keys (randomness check)."""
112+
set_module_args({"cipher": "aes"})
113+
with self.assertRaises(AnsibleExitJson) as ctx1:
114+
cephx_key.main()
115+
key1 = ctx1.exception.args[0]["key"]
116+
117+
set_module_args({"cipher": "aes"})
118+
with self.assertRaises(AnsibleExitJson) as ctx2:
119+
cephx_key.main()
120+
key2 = ctx2.exception.args[0]["key"]
121+
122+
self.assertNotEqual(key1, key2)
123+
124+
def test_aes256k_key_changes_on_each_call(self):
125+
"""Two successive aes256k calls produce different keys (randomness check)."""
126+
set_module_args({"cipher": "aes256k"})
127+
with self.assertRaises(AnsibleExitJson) as ctx1:
128+
cephx_key.main()
129+
key1 = ctx1.exception.args[0]["key"]
130+
131+
set_module_args({"cipher": "aes256k"})
132+
with self.assertRaises(AnsibleExitJson) as ctx2:
133+
cephx_key.main()
134+
key2 = ctx2.exception.args[0]["key"]
135+
136+
self.assertNotEqual(key1, key2)

0 commit comments

Comments
 (0)