Skip to content

SOCKS5: support looking up names remotely#2666

Merged
dpkp merged 4 commits into
dpkp:masterfrom
jschwartzenberg:feature/socks5h
Nov 20, 2025
Merged

SOCKS5: support looking up names remotely#2666
dpkp merged 4 commits into
dpkp:masterfrom
jschwartzenberg:feature/socks5h

Conversation

@jschwartzenberg

@jschwartzenberg jschwartzenberg commented Jul 23, 2025

Copy link
Copy Markdown
Contributor

Hi all, I'm dealing with a Kafka cluster behind a SOCKS proxy where the broker names cannot locally be resolved.

This PR implements resolving the name remotely. It is probably rather hacky though, so I appreciate any feedback to improve it and will make changes accordingly.

@jschwartzenberg jschwartzenberg force-pushed the feature/socks5h branch 2 times, most recently from 5e712e4 to 8771bee Compare July 25, 2025 13:46
@jschwartzenberg jschwartzenberg changed the title WIP: support looking up remote name resolving with SOCKS5 support looking up remote name resolving with SOCKS5 Jul 25, 2025
@jschwartzenberg jschwartzenberg changed the title support looking up remote name resolving with SOCKS5 SOCKS5: support looking up names remotely Jul 25, 2025
Comment thread kafka/conn.py Outdated
else:
self._sock = self._socks5_proxy.socket(None, socket.SOCK_STREAM)
self._sock_afi = None
self._sock_addr = [self.host.encode('ascii'), self.port]

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend you defer str.encode() until needed by socks code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this as you suggested. Let me know what you think!

Comment thread kafka/conn.py Outdated
return self.state
else:
if self.config["socks5_proxy"] is None or not self._socks5_proxy.use_remote_lookup():
next_lookup = self._next_afi_sockaddr()

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler to move the self._socks5_proxy.use_remote_lookup() inside of _next_afi_sockaddr() ? For socks proxy w/ remote_lookup, it should return (socket.AF_UNSPEC, (self.host, self.port)). With that change I think most of these other changes would be unnecessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that indeed reduces the changes quite a bit! Let me know what you think of the current version!

Comment thread kafka/conn.py Outdated
"""
if self.disconnected() or self.connecting():
if len(self._gai) > 0:
if len(self._gai) > 0 or self._socks5_proxy.use_remote_lookup():

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this crash if/when self._socks5_proxy is None and self._gai is empty?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're completely right. I added some extra logic now to check whether self._socks5_proxy is set.

Comment thread kafka/conn.py Outdated

self._sock.setblocking(False)
self.config['state_change_callback'](self.node_id, self._sock, self)
if self._sock_afi != None:

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally should check is or is not for None, not equality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of the None variant for the socket family now.

Comment thread kafka/conn.py Outdated
return self._api_version

def __str__(self):
if self._sock_afi != None:

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implied suggestion above is to use socket.AF_UNSPEC instead of None

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, makes sense :)

@jschwartzenberg

Copy link
Copy Markdown
Contributor Author

Many thanks for your review! I'll rework and push a better commit!

Comment thread kafka/conn.py
self._sock_afi, self._sock_addr = next_lookup
try:
if self.config["socks5_proxy"] is not None:
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether I could avoid this change. But next_lookup = self._next_afi_sockaddr() on line 377 will fail then because it uses the initialized self._socks5_proxy to check if it should look up hostnames remotely.

Let me know if you prefer this to be handled differently than it's now.

@dpkp

dpkp commented Nov 18, 2025

Copy link
Copy Markdown
Owner

I added a couple commits to fix a few things. Let me know if it looks ok. I did some basic local testing to validate.

Comment thread kafka/conn.py Outdated
if self.disconnected() or self.connecting():
if len(self._gai) > 0:
return 0
elif Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]):

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Socks5Wrapper.use_remote_lookup(None) returns False, so this works in the default case.

Comment thread kafka/socks5_wrapper.py
"!{}s".format(addr_len),
socket.inet_pton(self._target_afi, addr[0]),
)
self._buffer_out += struct.pack("!H", addr[1]) # port

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the pack format from h to H. Unsigned int is required for ports >32k

@dpkp dpkp merged commit c75f97c into dpkp:master Nov 20, 2025
18 checks passed
dpkp pushed a commit that referenced this pull request Nov 20, 2025
@jschwartzenberg

Copy link
Copy Markdown
Contributor Author

The new 2.3.0 release is working great! Many thanks for incorporating this!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants