Skip to content

Commit e177256

Browse files
committed
implement pod exec v5
1 parent a49d85d commit e177256

3 files changed

Lines changed: 212 additions & 7 deletions

File tree

kubernetes/base/stream/ws_client.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
ERROR_CHANNEL = 3
4141
RESIZE_CHANNEL = 4
4242

43+
V4_CHANNEL_PROTOCOL = "v4.channel.k8s.io"
44+
V5_CHANNEL_PROTOCOL = "v5.channel.k8s.io"
45+
4346
class _IgnoredIO:
4447
def write(self, _x):
4548
pass
@@ -59,13 +62,16 @@ def __init__(self, configuration, url, headers, capture_all, binary=False):
5962
"""
6063
self._connected = False
6164
self._channels = {}
65+
self._closed_channels = set()
66+
self.subprotocol = None
6267
self.binary = binary
6368
self.newline = '\n' if not self.binary else b'\n'
6469
if capture_all:
6570
self._all = StringIO() if not self.binary else BytesIO()
6671
else:
6772
self._all = _IgnoredIO()
6873
self.sock = create_websocket(configuration, url, headers)
74+
self.subprotocol = getattr(self.sock, 'subprotocol', None)
6975
self._connected = True
7076
self._returncode = None
7177

@@ -93,6 +99,7 @@ def readline_channel(self, channel, timeout=None):
9399
timeout = float("inf")
94100
start = time.time()
95101
while self.is_open() and time.time() - start < timeout:
102+
# Always try to drain the channel first
96103
if channel in self._channels:
97104
data = self._channels[channel]
98105
if self.newline in data:
@@ -104,6 +111,14 @@ def readline_channel(self, channel, timeout=None):
104111
else:
105112
del self._channels[channel]
106113
return ret
114+
115+
if channel in self._closed_channels:
116+
if channel in self._channels:
117+
ret = self._channels[channel]
118+
del self._channels[channel]
119+
return ret
120+
return b"" if self.binary else ""
121+
107122
self.update(timeout=(timeout - time.time() + start))
108123

109124
def write_channel(self, channel, data):
@@ -119,6 +134,14 @@ def write_channel(self, channel, data):
119134
payload = channel_prefix + data
120135
self.sock.send(payload, opcode=opcode)
121136

137+
def close_channel(self, channel):
138+
"""Close a channel (v5 protocol only)."""
139+
if self.subprotocol != V5_CHANNEL_PROTOCOL:
140+
return
141+
data = bytes([255, channel])
142+
self.sock.send(data, opcode=ABNF.OPCODE_BINARY)
143+
self._closed_channels.add(channel)
144+
122145
def peek_stdout(self, timeout=0):
123146
"""Same as peek_channel with channel=1."""
124147
return self.peek_channel(STDOUT_CHANNEL, timeout=timeout)
@@ -200,13 +223,24 @@ def update(self, timeout=0):
200223
return
201224
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
202225
data = frame.data
203-
if six.PY3 and not self.binary:
204-
data = data.decode("utf-8", "replace")
205-
if len(data) > 1:
226+
if len(data) > 0:
227+
# Parse channel from raw bytes to support v5 CLOSE signal AND avoid charset issues
206228
channel = data[0]
207-
if six.PY3 and not self.binary:
208-
channel = ord(channel)
229+
# In Py3, iterating bytes gives int, but indexing bytes gives int.
230+
# websocket-client frame.data might be bytes.
231+
232+
if channel == 255 and self.subprotocol == V5_CHANNEL_PROTOCOL: # v5 CLOSE
233+
if len(data) > 1:
234+
# data[1] is already int in Py3 bytes
235+
close_chan = data[1]
236+
self._closed_channels.add(close_chan)
237+
return
238+
209239
data = data[1:]
240+
# Decode data if expected text
241+
if not self.binary:
242+
data = data.decode("utf-8", "replace")
243+
210244
if data:
211245
if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:
212246
# keeping all messages in the order they received
@@ -469,7 +503,7 @@ def create_websocket(configuration, url, headers=None):
469503
header.append("sec-websocket-protocol: %s" %
470504
headers['sec-websocket-protocol'])
471505
else:
472-
header.append("sec-websocket-protocol: v4.channel.k8s.io")
506+
header.append("sec-websocket-protocol: %s,%s" % (V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL))
473507

474508
if url.startswith('wss://') and configuration.verify_ssl:
475509
ssl_opts = {

kubernetes/base/stream/ws_client_test.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@
1313
# limitations under the License.
1414

1515
import unittest
16+
from unittest.mock import MagicMock, patch
1617

17-
from .ws_client import get_websocket_url
18+
from . import ws_client as ws_client_module
19+
from .ws_client import get_websocket_url, WSClient, V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL
1820
from .ws_client import websocket_proxycare
1921
from kubernetes.client.configuration import Configuration
2022
import os
2123
import socket
2224
import threading
2325
import pytest
2426
from kubernetes import stream, client, config
27+
import websocket
2528

2629
try:
2730
import urllib3
@@ -123,6 +126,117 @@ def test_websocket_proxycare(self):
123126
assert dictval(connect_opts, 'http_proxy_auth') == expect_auth
124127
assert dictval(connect_opts, 'http_no_proxy') == expect_noproxy
125128

129+
130+
class WSClientProtocolTest(unittest.TestCase):
131+
"""Tests for WSClient V5 protocol handling"""
132+
133+
def setUp(self):
134+
# Mock configuration to avoid real connections in WSClient.__init__
135+
self.config_mock = MagicMock()
136+
self.config_mock.assert_hostname = False
137+
self.config_mock.api_key = {}
138+
self.config_mock.proxy = None
139+
self.config_mock.ssl_ca_cert = None
140+
self.config_mock.cert_file = None
141+
self.config_mock.key_file = None
142+
self.config_mock.verify_ssl = True
143+
144+
def test_create_websocket_header(self):
145+
"""Verify sec-websocket-protocol header requests v5 first"""
146+
# Patch WebSocket class in the module
147+
with patch.object(ws_client_module, 'WebSocket', autospec=True) as mock_ws_cls:
148+
mock_ws = mock_ws_cls.return_value
149+
150+
WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
151+
152+
mock_ws.connect.assert_called_once()
153+
call_args = mock_ws.connect.call_args
154+
# connect(url, **options)
155+
# check kwargs for 'header'
156+
kwargs = call_args[1]
157+
self.assertIn('header', kwargs)
158+
expected_header = f"sec-websocket-protocol: {V5_CHANNEL_PROTOCOL},{V4_CHANNEL_PROTOCOL}"
159+
self.assertIn(expected_header, kwargs['header'])
160+
161+
def test_close_channel_v5(self):
162+
"""Verify close_channel sends correct frame when v5 is negotiated"""
163+
with patch.object(ws_client_module, 'create_websocket') as mock_create:
164+
mock_ws = MagicMock()
165+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
166+
mock_ws.connected = True
167+
mock_create.return_value = mock_ws
168+
169+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
170+
client.close_channel(0)
171+
172+
mock_ws.send.assert_called_with(b'\xff\x00', opcode=websocket.ABNF.OPCODE_BINARY)
173+
174+
def test_close_channel_v4(self):
175+
"""Verify close_channel does nothing when v4 is negotiated"""
176+
with patch.object(ws_client_module, 'create_websocket') as mock_create:
177+
mock_ws = MagicMock()
178+
mock_ws.subprotocol = V4_CHANNEL_PROTOCOL
179+
mock_ws.connected = True
180+
mock_create.return_value = mock_ws
181+
182+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
183+
client.close_channel(0)
184+
185+
mock_ws.send.assert_not_called()
186+
187+
def test_update_receives_close_v5(self):
188+
"""Verify update processes close signal when v5 is negotiated"""
189+
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
190+
patch('select.select') as mock_select:
191+
192+
mock_ws = MagicMock()
193+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
194+
mock_ws.connected = True
195+
mock_ws.sock.fileno.return_value = 10
196+
197+
# Setup frame with close signal for channel 0
198+
frame = MagicMock()
199+
frame.data = b'\xff\x00'
200+
mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame)
201+
202+
mock_create.return_value = mock_ws
203+
# Make select return ready
204+
mock_select.return_value = ([mock_ws.sock], [], [])
205+
206+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
207+
client.update(timeout=0)
208+
209+
self.assertIn(0, client._closed_channels)
210+
211+
def test_update_ignores_close_signal_v4(self):
212+
"""Verify update treats 0xFF as regular data (or ignores signal interpretation) when v4"""
213+
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
214+
patch('select.select') as mock_select:
215+
216+
mock_ws = MagicMock()
217+
mock_ws.subprotocol = V4_CHANNEL_PROTOCOL
218+
mock_ws.connected = True
219+
mock_ws.sock.fileno.return_value = 10
220+
221+
# Setup frame that looks like close signal but should be treated as data
222+
frame = MagicMock()
223+
frame.data = b'\xff\x00'
224+
mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame)
225+
226+
mock_create.return_value = mock_ws
227+
mock_select.return_value = ([mock_ws.sock], [], [])
228+
229+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=True) # binary=True to avoid decode errors
230+
client.update(timeout=0)
231+
232+
# Should NOT be in closed channels
233+
self.assertNotIn(0, client._closed_channels)
234+
# Should be in data channels (channel 255 with data \x00)
235+
# Code: channel = data[0] (255), data = data[1:] (\x00)
236+
# if channel (255) not in _channels...
237+
self.assertIn(255, client._channels)
238+
self.assertEqual(client._channels[255], b'\x00')
239+
126240
@pytest.fixture(scope="module")
127241
def dummy_proxy():
128242
#Dummy Proxy

kubernetes/e2e_test/test_client.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,63 @@ def test_pod_apis(self):
201201
resp = api.delete_namespaced_pod(name=name, body={},
202202
namespace='default')
203203

204+
def test_pod_exec_close_channel(self):
205+
"""Test sending CLOSE signal for a channel (v5 protocol)."""
206+
client = api_client.ApiClient(configuration=self.config)
207+
api = core_v1_api.CoreV1Api(client)
208+
209+
name = 'busybox-test-' + short_uuid()
210+
pod_manifest = manifest_with_command(
211+
name, "while true;do date;sleep 5; done")
212+
213+
resp = api.create_namespaced_pod(body=pod_manifest, namespace='default')
214+
self.assertEqual(name, resp.metadata.name)
215+
216+
# Wait for pod to be running
217+
timeout = time.time() + 60
218+
while True:
219+
resp = api.read_namespaced_pod(name=name, namespace='default')
220+
if resp.status.phase == 'Running':
221+
break
222+
if time.time() > timeout:
223+
self.fail("Timeout waiting for pod to be running")
224+
time.sleep(1)
225+
226+
# Use cat to echo stdin to stdout.
227+
# When stdin is closed, cat should exit, terminating the command.
228+
resp = stream(api.connect_post_namespaced_pod_exec, name, 'default',
229+
command=['/bin/sh', '-c', 'cat'],
230+
stderr=True, stdin=True,
231+
stdout=True, tty=False,
232+
_preload_content=False)
233+
234+
if resp.subprotocol != "v5.channel.k8s.io":
235+
resp.close()
236+
api.delete_namespaced_pod(name=name, body={}, namespace='default')
237+
self.skipTest("Skipping test: v5.channel.k8s.io subprotocol not negotiated")
238+
239+
try:
240+
resp.write_stdin("test-close\n")
241+
line = resp.readline_stdout(timeout=5)
242+
self.assertEqual("test-close", line)
243+
244+
# Close stdin (channel 0)
245+
# This should send EOF to cat, causing it to exit.
246+
resp.close_channel(0)
247+
248+
# Wait for process to exit
249+
resp.update(timeout=5)
250+
start = time.time()
251+
while resp.is_open() and time.time() - start < 10:
252+
resp.update(timeout=1)
253+
254+
self.assertFalse(resp.is_open(), "Connection should close after cat exits")
255+
self.assertEqual(resp.returncode, 0)
256+
finally:
257+
if resp.is_open():
258+
resp.close()
259+
api.delete_namespaced_pod(name=name, body={}, namespace='default')
260+
204261
def test_exit_code(self):
205262
client = api_client.ApiClient(configuration=self.config)
206263
api = core_v1_api.CoreV1Api(client)

0 commit comments

Comments
 (0)