diff --git a/.circleci/config.yml b/.circleci/config.yml index 4faf18a9..26054001 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ version: 2.1 .job_defaults: &job_defaults docker: # specify the version you desire here - - image: cimg/clojure:1.11.1-openjdk-8.0 + - image: cimg/clojure:1.11.4-openjdk-8.0 # Specify service dependencies here if necessary # CircleCI maintains a library of pre-built images # documented at https://circleci.com/docs/2.0/circleci-images/ @@ -21,6 +21,18 @@ version: 2.1 # The resource_class feature allows configuring CPU and RAM resources for each job. Different resource classes are available for different executors. https://circleci.com/docs/2.0/configuration-reference/#resourceclass resource_class: large + +.job_defaults_jdk_21: &job_defaults_jdk_21 + docker: + - image: cimg/clojure:1.11.4-openjdk-21.0 + + working_directory: ~/repo + + environment: + LEIN_ROOT: "true" + JVM_OPTS: -Xmx3200m + + resource_class: large commands: restore_deps_cache: @@ -72,6 +84,36 @@ jobs: name: Run tests with dropped error deferred detection command: lein do clean, with-profile +dropped-error-deferred-detection test no_output_timeout: 20m + prepare_deps_cache_jdk_21: + <<: *job_defaults_jdk_21 + steps: + - checkout + - restore_deps_cache + - run: + name: Install bb + command: | + sudo bash < <(curl -s https://raw.githubusercontent.com/babashka/babashka/3d916df4a0c1e00df94100860b8eb5577e59c56a/install) + + - run: + name: Download and cache dependencies + command: lein with-profile pedantic,dev,test,jdk-21 deps + + - run: + name: Ensure deps.edn is in sync with project.clj + command: deps/ensure-deps-up-to-date + + - save_cache: + paths: + - ~/.m2 + key: v2-dependencies-jdk_21-{{ checksum "project.clj" }} + test_with_leak_detection_jdk_21: + <<: *job_defaults_jdk_21 + steps: + - checkout + - restore_deps_cache + - run: + name: Run tests with leak detection (JDK 21) + command: lein do clean, with-profile +leak-detection,+jdk-21 test :default+leak patch_hold_test_with_dropped_error_deferred_detection_status: <<: *job_defaults steps: @@ -136,6 +178,10 @@ workflows: - test_with_leak_detection: requires: - prepare_deps_cache + - prepare_deps_cache_jdk_21 + - test_with_leak_detection_jdk_21: + requires: + - prepare_deps_cache_jdk_21 - hold_test_with_dropped_error_deferred_detection: type: approval requires: diff --git a/CHANGES.md b/CHANGES.md index 8b52ef1f..024be3db 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,18 @@ +### Unreleased + +* BREAKING: Bump Netty to 4.2.10.Final +* Promote io_uring transport from incubator (`io.netty.incubator` → `io.netty`) +* Load io_uring transport classes lazily via reflection (requires Java 9+; gracefully unavailable on Java 8) +* Add kqueue `osx-aarch_64` native transport for Apple Silicon +* Update `self-signed-ssl-context` for JDK 24+ compatibility via Netty `CertificateBuilder`, with `SelfSignedCertificate` fallback +* Fix deprecated `Class.newInstance()` usage in `CertificateBuilder` reflection (use `Constructor.newInstance()`) +* Remove deprecated `ChannelOption/MAX_MESSAGES_PER_READ` usage +* Fix HTTP/2 connection preface not flushed on already-active pipelines (affected JDK 21 with unpooled allocator) + +Contributions by (in alphabetical order): + +Robin Lahtinen + ### 0.9.5 * Fix NPE in `wrap-exceptions` HTTP client middleware (#766, #767) diff --git a/deps.edn b/deps.edn index 1f8d8516..766a0372 100644 --- a/deps.edn +++ b/deps.edn @@ -10,24 +10,26 @@ org.clj-commons/dirigiste {:mvn/version "1.0.4"}, org.clj-commons/primitive-math {:mvn/version "1.0.1"}, potemkin/potemkin {:mvn/version "0.4.8"}, - io.netty/netty-transport {:mvn/version "4.1.130.Final"}, + io.netty/netty-transport {:mvn/version "4.2.10.Final"}, io.netty/netty-transport-native-epoll$linux-x86_64 - {:mvn/version "4.1.130.Final"}, + {:mvn/version "4.2.10.Final"}, io.netty/netty-transport-native-epoll$linux-aarch_64 - {:mvn/version "4.1.130.Final"}, + {:mvn/version "4.2.10.Final"}, io.netty/netty-transport-native-kqueue$osx-x86_64 - {:mvn/version "4.1.130.Final"}, - io.netty.incubator/netty-incubator-transport-native-io_uring$linux-x86_64 - {:mvn/version "0.0.26.Final"}, - io.netty.incubator/netty-incubator-transport-native-io_uring$linux-aarch_64 - {:mvn/version "0.0.26.Final"}, - io.netty/netty-codec {:mvn/version "4.1.130.Final"}, - io.netty/netty-codec-http {:mvn/version "4.1.130.Final"}, - io.netty/netty-codec-http2 {:mvn/version "4.1.130.Final"}, - io.netty/netty-handler {:mvn/version "4.1.130.Final"}, - io.netty/netty-handler-proxy {:mvn/version "4.1.130.Final"}, - io.netty/netty-resolver {:mvn/version "4.1.130.Final"}, - io.netty/netty-resolver-dns {:mvn/version "4.1.130.Final"}, + {:mvn/version "4.2.10.Final"}, + io.netty/netty-transport-native-kqueue$osx-aarch_64 + {:mvn/version "4.2.10.Final"}, + io.netty/netty-transport-native-io_uring$linux-x86_64 + {:mvn/version "4.2.10.Final"}, + io.netty/netty-transport-native-io_uring$linux-aarch_64 + {:mvn/version "4.2.10.Final"}, + io.netty/netty-codec-http {:mvn/version "4.2.10.Final"}, + io.netty/netty-codec-http2 {:mvn/version "4.2.10.Final"}, + io.netty/netty-codec-compression {:mvn/version "4.2.10.Final"}, + io.netty/netty-handler {:mvn/version "4.2.10.Final"}, + io.netty/netty-handler-proxy {:mvn/version "4.2.10.Final"}, + io.netty/netty-resolver {:mvn/version "4.2.10.Final"}, + io.netty/netty-resolver-dns {:mvn/version "4.2.10.Final"}, metosin/malli {:mvn/version "0.20.0", :exclusions [org.clojure/clojure]}}, :aliases diff --git a/project.clj b/project.clj index 325ac2b8..9ab518f7 100644 --- a/project.clj +++ b/project.clj @@ -1,5 +1,5 @@ ;; you'll need to run the script at `deps/lein-to-deps` after changing any dependencies -(def netty-version "4.1.130.Final") +(def netty-version "4.2.10.Final") (def brotli-version "1.20.0") @@ -18,16 +18,19 @@ [io.netty/netty-transport-native-epoll ~netty-version :classifier "linux-x86_64"] [io.netty/netty-transport-native-epoll ~netty-version :classifier "linux-aarch_64"] [io.netty/netty-transport-native-kqueue ~netty-version :classifier "osx-x86_64"] - [io.netty.incubator/netty-incubator-transport-native-io_uring "0.0.26.Final" :classifier "linux-x86_64"] - [io.netty.incubator/netty-incubator-transport-native-io_uring "0.0.26.Final" :classifier "linux-aarch_64"] - [io.netty/netty-codec ~netty-version] + [io.netty/netty-transport-native-kqueue ~netty-version :classifier "osx-aarch_64"] + [io.netty/netty-transport-native-io_uring ~netty-version :classifier "linux-x86_64"] + [io.netty/netty-transport-native-io_uring ~netty-version :classifier "linux-aarch_64"] [io.netty/netty-codec-http ~netty-version] [io.netty/netty-codec-http2 ~netty-version] + [io.netty/netty-codec-compression ~netty-version] [io.netty/netty-handler ~netty-version] [io.netty/netty-handler-proxy ~netty-version] [io.netty/netty-resolver ~netty-version] [io.netty/netty-resolver-dns ~netty-version] [metosin/malli "0.20.0" :exclusions [org.clojure/clojure]]] + :exclusions [org.bouncycastle/bcprov-jdk15on + org.bouncycastle/bcpkix-jdk15on] :profiles {:dev {:dependencies [[criterium "0.4.6"] [cheshire "6.1.0"] [org.slf4j/slf4j-simple "2.0.17"] @@ -37,7 +40,8 @@ [org.bouncycastle/bcprov-jdk18on "1.83"] [org.bouncycastle/bcpkix-jdk18on "1.83" :exclusions [org.bouncycastle/bcutil-jdk18on]] ;;[org.bouncycastle/bctls-jdk18on "1.75"] - [io.netty/netty-tcnative-boringssl-static "2.0.74.Final"] + [io.netty/netty-tcnative-boringssl-static "2.0.75.Final"] + [io.netty/netty-pkitesting ~netty-version] ;;[com.aayushatharva.brotli4j/all ~brotli-version] [com.aayushatharva.brotli4j/brotli4j ~brotli-version] [com.aayushatharva.brotli4j/service ~brotli-version] @@ -66,6 +70,8 @@ "-Dio.netty.allocator.type=unpooled"]} :dropped-error-deferred-detection {:jvm-opts ["-Dorg.slf4j.simpleLogger.log.manifold.debug=warn" "-Daleph.testutils.detect-dropped-error-deferreds=true"]} + :test-unsafe-deny {:jvm-opts ["--sun-misc-unsafe-memory-access=deny"]} + :jdk-21 {:javac-options ^:replace ["--release" "11"]} :pedantic {:pedantic? :abort} :trace {:jvm-opts ["-Dorg.slf4j.simpleLogger.defaultLogLevel=trace"]} :profile {:dependencies [[com.clojure-goes-fast/clj-async-profiler "1.7.0"]] diff --git a/src/aleph/http.clj b/src/aleph/http.clj index c1818cf0..6489bd10 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -249,6 +249,15 @@ (some? log-activity) (assoc :log-activity (netty/activity-logger "aleph-client" log-activity)) + ;; When insecure?, disable endpoint identification (hostname + ;; verification) unless the user explicitly set it. On JDK 8 + ;; with OpenSSL/tcnative, Netty 4.2's trust manager wrapping + ;; causes hostname verification to run even with + ;; InsecureTrustManagerFactory, leading to SSLHandshakeException. + (and insecure? + (not (contains? connection-options :ssl-endpoint-id-alg))) + (assoc :ssl-endpoint-id-alg nil) + true (update :ssl-context #(client/ssl-context % http-versions insecure?))) p (promise) diff --git a/src/aleph/http/http2.clj b/src/aleph/http/http2.clj index 425e749c..2b7322a6 100644 --- a/src/aleph/http/http2.clj +++ b/src/aleph/http/http2.clj @@ -1399,6 +1399,21 @@ true http2-conn-pipeline-transform) + ;; Flush the HTTP/2 connection preface (magic + initial SETTINGS) that was + ;; written by Http2ConnectionHandler.sendPreface() during handlerAdded. + ;; When Http2FrameCodec is added to an already-active pipeline (the normal + ;; case for both client post-SSL-handshake and server ALPN paths), + ;; channelActive is never replayed, so the flushPreface guard in + ;; PrefaceDecoder.channelActive never fires. Without this explicit flush, + ;; the preface sits in SslHandler.pendingUnencryptedWrites and under + ;; certain allocator/timing conditions (e.g., unpooled allocator on + ;; JDK 21), a SETTINGS ACK can be flushed before the initial SETTINGS, + ;; causing the peer to reject the connection. + ;; See https://github.com/netty/netty/issues/12089 + (let [ch (.channel pipeline)] + (when (.isActive ch) + (.flush ch))) + (log/debug "Conn chan pipeline:" pipeline) pipeline)) diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index 7f481aea..6efafa92 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -32,11 +32,13 @@ ChannelOutboundInvoker ChannelPipeline EventLoopGroup - FileRegion) + FileRegion + IoHandlerFactory + MultiThreadIoEventLoopGroup) (io.netty.channel.epoll Epoll EpollDatagramChannel - EpollEventLoopGroup + EpollIoHandler EpollServerSocketChannel EpollSocketChannel) (io.netty.channel.group @@ -45,10 +47,10 @@ (io.netty.channel.kqueue KQueue KQueueDatagramChannel - KQueueEventLoopGroup + KQueueIoHandler KQueueServerSocketChannel KQueueSocketChannel) - (io.netty.channel.nio NioEventLoopGroup) + (io.netty.channel.nio NioIoHandler) (io.netty.channel.socket ChannelInputShutdownReadComplete ServerSocketChannel) @@ -81,12 +83,6 @@ IdleState IdleStateEvent IdleStateHandler) - (io.netty.incubator.channel.uring - IOUring - IOUringDatagramChannel - IOUringEventLoopGroup - IOUringServerSocketChannel - IOUringSocketChannel) (io.netty.resolver AddressResolverGroup NoopAddressResolverGroup @@ -122,6 +118,7 @@ File IOException InputStream) + (java.lang.reflect Constructor) (java.net InetSocketAddress SocketAddress @@ -132,6 +129,7 @@ (java.util.concurrent CancellationException ConcurrentHashMap + Executor ScheduledFuture ThreadFactory TimeUnit) @@ -1053,7 +1051,13 @@ (.sessionTimeout session-timeout) (some? application-protocol-config) - (.applicationProtocolConfig application-protocol-config)) + (.applicationProtocolConfig application-protocol-config) + + ;; Netty 4.2 defaults to "HTTPS" endpoint identification at the builder level. + ;; Aleph manages hostname verification itself in ssl-handler, so disable the + ;; builder-level default to avoid double-verification. + true + (.endpointIdentificationAlgorithm nil)) (.build builder)))) @@ -1164,21 +1168,79 @@ (str/join ", " (.supportedProtocols application-protocol-config)))) (.build b))))) +(defn- try-certificate-builder + "Attempts to create a self-signed cert using Netty 4.2's CertificateBuilder + (from io.netty/netty-pkitesting). Returns a map of {:private-key :certificate-chain} + or nil if the class is unavailable." + [^String hostname] + (try + (let [builder-class (Class/forName "io.netty.pkitesting.CertificateBuilder") + ^Constructor + ctor (.getDeclaredConstructor builder-class (into-array Class [])) + builder (.newInstance ctor (object-array [])) + ;; CertificateBuilder.subject(String) -> CertificateBuilder + subject-m (.getMethod builder-class "subject" (into-array Class [String])) + _ (.invoke subject-m builder (object-array [(str "cn=" hostname)])) + ;; CertificateBuilder.setIsCertificateAuthority(boolean) -> CertificateBuilder + ;; Must be true: buildSelfSigned() requires root CA flag internally + set-ca-m (.getMethod builder-class "setIsCertificateAuthority" (into-array Class [Boolean/TYPE])) + _ (.invoke set-ca-m builder (object-array [(Boolean/valueOf true)])) + ;; CertificateBuilder.addSanDnsName(String) -> CertificateBuilder + ;; Required for HTTPS endpoint identification (RFC 6125: SAN takes precedence over CN) + san-m (.getMethod builder-class "addSanDnsName" (into-array Class [String])) + _ (.invoke san-m builder (object-array [hostname])) + ;; CertificateBuilder.buildSelfSigned() -> X509Bundle + build-m (.getMethod builder-class "buildSelfSigned" (into-array Class [])) + bundle (.invoke build-m builder (object-array [])) + ;; X509Bundle.getKeyPair() -> KeyPair + bundle-class (Class/forName "io.netty.pkitesting.X509Bundle") + get-kp-m (.getMethod bundle-class "getKeyPair" (into-array Class [])) + key-pair (.invoke get-kp-m bundle (object-array [])) + ;; KeyPair.getPrivate() -> PrivateKey + priv-key (.getPrivate ^java.security.KeyPair key-pair) + ;; X509Bundle.getCertificate() -> X509Certificate + get-cert-m (.getMethod bundle-class "getCertificate" (into-array Class [])) + certificate (.invoke get-cert-m bundle (object-array []))] + (log/debug "Using CertificateBuilder from netty-pkitesting for self-signed cert") + {:private-key priv-key + :certificate-chain [certificate]}) + (catch ClassNotFoundException _ + nil) + (catch Exception e + (log/debug e "CertificateBuilder reflection failed, falling back to SelfSignedCertificate") + nil))) + (defn self-signed-ssl-context "A self-signed SSL context for servers. Never use in production. Even if you control all clients, and want to use - a self-signed cert internally, do not use this fn, because Netty's - SelfSignedCertificate class is only for testing, and uses an insecure PRNG." + a self-signed cert internally, do not use this fn, because the generated + certificate uses weak algorithms and is only for testing. + + Prefers Netty 4.2's `CertificateBuilder` from `io.netty/netty-pkitesting` + when available (works on all JDKs including 24+). Falls back to Netty's + `SelfSignedCertificate` which does not work on JDK 24+ where the internal + certificate generator was removed. Add `io.netty/netty-pkitesting` to your + classpath for JDK 24+ support." ([] (self-signed-ssl-context "localhost")) ([hostname] (self-signed-ssl-context hostname {})) ([hostname opts] - (let [cert (SelfSignedCertificate. hostname)] - (ssl-server-context (assoc opts - :private-key (.privateKey cert) - :certificate-chain (.certificate cert)))))) + (if-let [cert-map (try-certificate-builder hostname)] + (ssl-server-context (merge opts cert-map)) + ;; Fallback to SelfSignedCertificate (pre-JDK 24) + (let [^SelfSignedCertificate cert (try + (SelfSignedCertificate. hostname) + (catch UnsupportedOperationException e + (throw (UnsupportedOperationException. + (str "SelfSignedCertificate is not supported on this JDK version. " + "Add io.netty/netty-pkitesting to your classpath, or use " + "ssl-server-context with pre-generated certificates instead.") + e))))] + (ssl-server-context (assoc opts + :private-key (.privateKey cert) + :certificate-chain (.certificate cert))))))) (defn insecure-ssl-client-context @@ -1236,8 +1298,16 @@ (defn ^:no-doc kqueue-available? [] (KQueue/isAvailable)) -(defn ^:no-doc io-uring-available? [] - (IOUring/isAvailable)) +(defn ^:no-doc io-uring-available? + "Returns true if io_uring transport is available. + Uses reflection to avoid loading io_uring classes (which require Java 9+) + at namespace load time." + [] + (try + (let [cls (Class/forName "io.netty.channel.uring.IoUring") + m (.getMethod cls "isAvailable" (into-array Class []))] + (boolean (.invoke m nil (object-array [])))) + (catch Throwable _ false))) (defn ^:no-doc determine-transport [transport epoll?] (or transport (if epoll? :epoll :nio))) @@ -1246,7 +1316,15 @@ (case transport :epoll [(Epoll/unavailabilityCause) "Epoll"] :kqueue [(KQueue/unavailabilityCause) "KQueue"] - :io-uring [(IOUring/unavailabilityCause) "IO-Uring"] + :io-uring (try + (let [cls (Class/forName "io.netty.channel.uring.IoUring") + m (.getMethod cls "unavailabilityCause" (into-array Class []))] + [(.invoke m nil (object-array [])) "IO-Uring"]) + (catch UnsupportedClassVersionError _ + [(UnsupportedClassVersionError. + "io_uring transport requires Java 9+") "IO-Uring"]) + (catch Throwable t + [t "IO-Uring"])) nil)) (defn ^:no-doc ensure-transport-available! [transport] @@ -1287,29 +1365,51 @@ (def ^:no-doc ^String client-event-thread-pool-name "aleph-netty-client-event-pool") +(defn- create-io-event-loop-group + "Creates a MultiThreadIoEventLoopGroup with the appropriate IoHandler for the given transport. + The third argument may be either a ThreadFactory or an Executor. When an Executor is + provided, Netty uses it directly for scheduling; when a ThreadFactory is provided, Netty + creates its own threads using the factory." + ^MultiThreadIoEventLoopGroup [transport ^long thread-count thread-factory-or-executor] + (let [^IoHandlerFactory handler-factory + (case transport + :nio (NioIoHandler/newFactory) + :epoll (EpollIoHandler/newFactory) + :kqueue (KQueueIoHandler/newFactory) + :io-uring (let [cls (Class/forName "io.netty.channel.uring.IoUringIoHandler") + m (.getMethod cls "newFactory" (into-array Class []))] + (.invoke m nil (object-array []))))] + (if (instance? Executor thread-factory-or-executor) + (MultiThreadIoEventLoopGroup. thread-count + ^Executor thread-factory-or-executor + handler-factory) + (MultiThreadIoEventLoopGroup. thread-count + ^ThreadFactory thread-factory-or-executor + handler-factory)))) + (def ^:no-doc epoll-client-group (delay (let [thread-count (get-default-event-loop-threads) thread-factory (enumerating-thread-factory client-event-thread-pool-name true)] - (EpollEventLoopGroup. (long thread-count) thread-factory)))) + (create-io-event-loop-group :epoll thread-count thread-factory)))) (def ^:no-doc nio-client-group (delay (let [thread-count (get-default-event-loop-threads) thread-factory (enumerating-thread-factory client-event-thread-pool-name true)] - (NioEventLoopGroup. (long thread-count) thread-factory)))) + (create-io-event-loop-group :nio thread-count thread-factory)))) (def ^:no-doc kqueue-client-group (delay (let [thread-count (get-default-event-loop-threads) thread-factory (enumerating-thread-factory client-event-thread-pool-name true)] - (KQueueEventLoopGroup. (long thread-count) thread-factory)))) + (create-io-event-loop-group :kqueue thread-count thread-factory)))) (def ^:no-doc io-uring-client-group (delay (let [thread-count (get-default-event-loop-threads) thread-factory (enumerating-thread-factory client-event-thread-pool-name true)] - (IOUringEventLoopGroup. (long thread-count) thread-factory)))) + (create-io-event-loop-group :io-uring thread-count thread-factory)))) (defn ^:no-doc transport-client-group [transport] (case transport @@ -1319,18 +1419,25 @@ :nio nio-client-group)) (defn ^:no-doc transport-event-loop-group [transport ^long num-threads ^ThreadFactory thread-factory] - (case transport - :epoll (EpollEventLoopGroup. num-threads thread-factory) - :kqueue (KQueueEventLoopGroup. num-threads thread-factory) - :io-uring (IOUringEventLoopGroup. num-threads thread-factory) - :nio (NioEventLoopGroup. num-threads thread-factory))) + (create-io-event-loop-group transport num-threads thread-factory)) + +;; Define channel classes as vars to avoid CLJ-2842 eager static init in `case`. +;; io_uring classes require Java 9+ (class file version 53.0), so they must be +;; resolved via Class/forName with try/catch to avoid UnsupportedClassVersionError +;; on Java 8. +(def ^:private epoll-server-channel-class EpollServerSocketChannel) +(def ^:private kqueue-server-channel-class KQueueServerSocketChannel) +(def ^:private io-uring-server-channel-class + (try (Class/forName "io.netty.channel.uring.IoUringServerSocketChannel") + (catch Throwable _ nil))) +(def ^:private nio-server-channel-class NioServerSocketChannel) (defn ^:no-doc transport-server-channel-class [transport] (case transport - :epoll EpollServerSocketChannel - :kqueue KQueueServerSocketChannel - :io-uring IOUringServerSocketChannel - :nio NioServerSocketChannel)) + :epoll epoll-server-channel-class + :kqueue kqueue-server-channel-class + :io-uring io-uring-server-channel-class + :nio nio-server-channel-class)) (defn- wrapping-channel-factory ^ChannelFactory [listen-socket transport] @@ -1396,19 +1503,35 @@ (SingletonDnsServerAddressStreamProvider. (first addresses)) (SequentialDnsServerAddressStreamProvider. ^Iterable addresses)))) +;; Define datagram/socket channel classes as vars to avoid CLJ-2842 eager static init in `case`. +;; io_uring classes are resolved lazily via Class/forName (require Java 9+). +(def ^:private epoll-datagram-channel-class EpollDatagramChannel) +(def ^:private kqueue-datagram-channel-class KQueueDatagramChannel) +(def ^:private io-uring-datagram-channel-class + (try (Class/forName "io.netty.channel.uring.IoUringDatagramChannel") + (catch Throwable _ nil))) +(def ^:private nio-datagram-channel-class NioDatagramChannel) + +(def ^:private epoll-socket-channel-class EpollSocketChannel) +(def ^:private kqueue-socket-channel-class KQueueSocketChannel) +(def ^:private io-uring-socket-channel-class + (try (Class/forName "io.netty.channel.uring.IoUringSocketChannel") + (catch Throwable _ nil))) +(def ^:private nio-socket-channel-class NioSocketChannel) + (defn ^:no-doc transport-channel-type [transport] (case transport - :epoll EpollDatagramChannel - :kqueue KQueueDatagramChannel - :io-uring IOUringDatagramChannel - :nio NioDatagramChannel)) + :epoll epoll-datagram-channel-class + :kqueue kqueue-datagram-channel-class + :io-uring io-uring-datagram-channel-class + :nio nio-datagram-channel-class)) (defn- transport-channel-class [transport] (case transport - :epoll EpollSocketChannel - :kqueue KQueueSocketChannel - :io-uring IOUringSocketChannel - :nio NioSocketChannel)) + :epoll epoll-socket-channel-class + :kqueue kqueue-socket-channel-class + :io-uring io-uring-socket-channel-class + :nio nio-socket-channel-class)) (defn dns-resolver-group-builder "Creates an instance of DnsAddressResolverGroupBuilder that is used to @@ -1599,7 +1722,6 @@ bootstrap (doto (Bootstrap.) (.option ChannelOption/SO_REUSEADDR true) (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout)) - #_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5 (.group client-event-loop-group) (.channel chan-class) (.handler initializer) @@ -1750,14 +1872,12 @@ (let [b (doto (ServerBootstrap.) (.option ChannelOption/SO_BACKLOG (int 1024)) (.option ChannelOption/SO_REUSEADDR true) - (.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) (.group group) (cond-> (nil? listen-socket) (.channel channel-class)) (cond-> (some? listen-socket) (.channelFactory (wrapping-channel-factory listen-socket transport))) ;;TODO: add a server (.handler) call to the bootstrap, for logging or something (.childHandler (pipeline-initializer pipeline-builder)) (.childOption ChannelOption/SO_REUSEADDR true) - (.childOption ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) bootstrap-transform) ^ServerSocketChannel diff --git a/test/aleph/http/pipeline_stress_test.clj b/test/aleph/http/pipeline_stress_test.clj new file mode 100644 index 00000000..0a603d18 --- /dev/null +++ b/test/aleph/http/pipeline_stress_test.clj @@ -0,0 +1,146 @@ +(ns aleph.http.pipeline-stress-test + "Stress tests for Netty 4.2 pipeline behavior under concurrent HTTP/2 streams. + Validates that pipeline modifications (handler add/remove) under load do not + cause race conditions or ConcurrentModificationExceptions with Netty 4.2's + stack-flattened pipeline. Tagged :stress - excluded from default test runs. + Run with: lein test :stress" + (:require + [aleph.http :as http] + [aleph.netty :as netty] + [aleph.resource-leak-detector] + [aleph.ssl :as test-ssl] + [aleph.testutils] + [clj-commons.byte-streams :as bs] + [clojure.test :refer [deftest is testing]] + [manifold.deferred :as d]) + (:import + (java.io Closeable))) + +(set! *warn-on-reflection* false) + +(def ^:private stress-port 18443) + +(def ^:private server-options + {:port stress-port + :shutdown-timeout 0 + :http-versions [:http2 :http1] + :ssl-context test-ssl/server-ssl-context-opts}) + +(defn- echo-handler + "Simple echo handler that returns the request body." + [req] + {:status 200 + :headers {"content-type" "text/plain"} + :body (or (some-> (:body req) (bs/to-string)) "ok")}) + +(defn- slow-handler + "Handler with artificial delay to increase overlap of concurrent streams." + [_req] + (let [d' (d/deferred)] + (future + (Thread/sleep (long (+ 1 (rand-int 10)))) + (d/success! d' {:status 200 + :headers {"content-type" "text/plain"} + :body "slow-ok"})) + d')) + +(defn- make-pool + "Creates an HTTP connection pool for stress testing." + [] + (http/connection-pool + {:connection-options {:insecure? true + :http-versions [:http2 :http1]}})) + +(defmacro ^:private with-stress-server + "Starts a server with the given handler and options, runs body, then cleans up." + [handler opts & body] + `(let [server# (http/start-server ~handler (merge server-options ~opts)) + pool# (make-pool)] + (try + (let [~'pool pool#] + ~@body) + (finally + (.close ^Closeable server#) + (netty/wait-for-close server#) + (.shutdown pool#))))) + +(deftest ^:stress test-concurrent-http2-streams + (testing "N concurrent HTTP/2 requests over a single connection" + (with-stress-server echo-handler {} + (let [n 100 + url (str "https://localhost:" stress-port "/echo") + results (doall + (for [i (range n)] + (http/get url + {:pool pool + :insecure? true + :body (str "req-" i)})))] + (doseq [[i result] (map-indexed vector results)] + (let [resp @(d/timeout! result 10000 ::timeout)] + (is (not= ::timeout resp) + (str "Request " i " timed out")) + (when (not= ::timeout resp) + (is (= 200 (:status resp)) + (str "Request " i " failed with status " (:status resp)))))))))) + +(deftest ^:stress test-concurrent-http2-with-slow-handlers + (testing "Concurrent HTTP/2 streams with variable-latency handlers" + (with-stress-server slow-handler {} + (let [n 50 + url (str "https://localhost:" stress-port "/slow") + results (doall + (for [_ (range n)] + (http/get url {:pool pool :insecure? true})))] + (doseq [[i result] (map-indexed vector results)] + (let [resp @(d/timeout! result 30000 ::timeout)] + (is (not= ::timeout resp) + (str "Slow request " i " timed out")) + (when (not= ::timeout resp) + (is (= 200 (:status resp))) + (is (= "slow-ok" (bs/to-string (:body resp))))))))))) + +(deftest ^:stress test-rapid-connect-disconnect + (testing "Rapid connection establishment and teardown" + (with-stress-server echo-handler {} + (let [n 20] + (dotimes [i n] + ;; Each iteration creates a fresh pool (new connection) + (let [pool' (make-pool)] + (try + (let [resp @(d/timeout! + (http/get (str "https://localhost:" stress-port "/ping") + {:pool pool' :insecure? true}) + 10000 ::timeout)] + (is (not= ::timeout resp) + (str "Rapid connect iteration " i " timed out")) + (when (not= ::timeout resp) + (is (= 200 (:status resp))))) + (finally + (.shutdown pool'))))))))) + +(deftest ^:stress test-mixed-http-versions + (testing "Mixed HTTP/1.1 and HTTP/2 requests to same server" + (with-stress-server echo-handler {} + (let [url (str "https://localhost:" stress-port "/mixed") + ;; HTTP/2 pool + h2-pool (http/connection-pool + {:connection-options {:insecure? true :http-versions [:http2]}}) + ;; HTTP/1.1 pool + h1-pool (http/connection-pool + {:connection-options {:insecure? true :http-versions [:http1]}})] + (try + (let [h2-results (doall (for [_ (range 20)] + (http/get url {:pool h2-pool :insecure? true}))) + h1-results (doall (for [_ (range 20)] + (http/get url {:pool h1-pool :insecure? true})))] + (doseq [r (concat h2-results h1-results)] + (let [resp @(d/timeout! r 10000 ::timeout)] + (is (not= ::timeout resp)) + (when (not= ::timeout resp) + (is (= 200 (:status resp))))))) + (finally + (.shutdown h2-pool) + (.shutdown h1-pool))))))) + +(aleph.resource-leak-detector/instrument-tests!) +(aleph.testutils/instrument-tests-with-dropped-error-deferred-detection!) diff --git a/test/aleph/resource_leak_detector.clj b/test/aleph/resource_leak_detector.clj index 40bda83b..8d0e2729 100644 --- a/test/aleph/resource_leak_detector.clj +++ b/test/aleph/resource_leak_detector.clj @@ -22,7 +22,8 @@ (io.netty.util ResourceLeakDetector ResourceLeakDetector$Level - ResourceLeakDetectorFactory))) + ResourceLeakDetectorFactory) + (java.lang.ref WeakReference))) (defn enabled? "Checks whether the resource leak detector is enabled. @@ -59,7 +60,7 @@ (def max-probe-gc-runs "Maximum number of times the GC will be run to detect a leaked probe." - 10) + 50) (def probe-hint-marker "ALEPH LEAK DETECTOR PROBE") @@ -91,9 +92,27 @@ ;; NOTE: Not setting to bare `nil` to appease `clj-kondo`. (def current-leaks (atom nil)) +(def gc-poll-interval-ms + "Milliseconds to sleep between GC polls in force-leak-detection!. + Gives the reference-processing thread time to run after System/gc." + 10) + +(def max-gc-polls + "Maximum number of GC polls before giving up in force-leak-detection!." + 10) + (defn force-leak-detection! [] - (System/gc) - (System/runFinalization) + ;; Use a WeakReference sentinel to confirm GC has actually run and + ;; processed references. This is the standard OpenJDK ForceGC pattern. + ;; PhantomReferences (used by Netty's ResourceLeakDetector) are enqueued + ;; asynchronously after GC; polling with a short sleep ensures the + ;; reference-processing thread has had time to run. + (let [sentinel (WeakReference. (Object.))] + (loop [n max-gc-polls] + (System/gc) + (when (and (.get sentinel) (pos? n)) + (Thread/sleep (long gc-poll-interval-ms)) + (recur (dec n))))) ;; Transitively trigger a track() invocation which in turn works ;; off the leaked references queue. (-> AbstractByteBufAllocator/DEFAULT (.buffer 1) .release)) @@ -104,6 +123,7 @@ (if (zero? n) (throw (RuntimeException. "Gave up awaiting leak probe. Try increasing `aleph.resource-leak-detector/max-probe-gc-runs`.")) (when-not (some (partial contains-hint? probe-hint) @current-leaks) + (Thread/sleep (long gc-poll-interval-ms)) (recur (dec n)))))) (defn with-leak-collection diff --git a/test/aleph/ssl.clj b/test/aleph/ssl.clj index 1f71fca2..a74e71b3 100644 --- a/test/aleph/ssl.clj +++ b/test/aleph/ssl.clj @@ -2,7 +2,6 @@ (:require [aleph.netty :as netty]) (:import - (io.netty.handler.ssl.util SelfSignedCertificate) (java.io ByteArrayInputStream) (java.security KeyFactory PrivateKey) (java.security.cert CertificateFactory X509Certificate) @@ -63,13 +62,16 @@ (def client-ssl-context (netty/ssl-client-context client-ssl-context-opts)) -(def wrong-hostname-cert - (SelfSignedCertificate. "some-random-hostname")) +(def ^X509Certificate wrong-hostname-cert + (gen-cert (read-string (slurp "test/wrong_hostname_cert.edn")))) + +(def wrong-hostname-key + (gen-key 65537 (read-string (slurp "test/wrong_hostname_key.edn")))) (def wrong-hostname-server-ssl-context-opts - {:private-key (.privateKey wrong-hostname-cert) - :certificate-chain (.certificate wrong-hostname-cert)}) + {:private-key wrong-hostname-key + :certificate-chain [wrong-hostname-cert]}) (def wrong-hostname-client-ssl-context-opts (assoc client-ssl-context-opts - :trust-store (.certificate wrong-hostname-cert))) + :trust-store [wrong-hostname-cert])) diff --git a/test/aleph/tcp_ssl_test.clj b/test/aleph/tcp_ssl_test.clj index ec9be044..2b4ff93f 100644 --- a/test/aleph/tcp_ssl_test.clj +++ b/test/aleph/tcp_ssl_test.clj @@ -7,10 +7,11 @@ [aleph.tcp-test :refer [with-server]] [aleph.testutils] [clj-commons.byte-streams :as bs] - [clojure.test :refer [deftest is]] + [clojure.test :refer [deftest is testing]] [manifold.deferred :as d] [manifold.stream :as s]) (:import + (io.netty.handler.ssl SslContext) (java.security.cert X509Certificate) (java.util.concurrent TimeoutException) (javax.net.ssl SSLHandshakeException))) @@ -149,5 +150,27 @@ (deliver continue-handshake true) (is (deref c 1000 false)))))) +(deftest test-self-signed-ssl-context + (testing "self-signed-ssl-context returns a valid server SslContext" + (let [ctx (netty/self-signed-ssl-context)] + (is (instance? SslContext ctx)) + (is (.isServer ^SslContext ctx)))) + + (testing "self-signed-ssl-context with custom hostname" + (let [ctx (netty/self-signed-ssl-context "example.test")] + (is (instance? SslContext ctx)) + (is (.isServer ^SslContext ctx)))) + + (testing "self-signed-ssl-context can serve TLS traffic" + (with-server (tcp/start-server (fn [s _] (s/connect s s)) + {:port 10001 + :shutdown-timeout 0 + :ssl-context (netty/self-signed-ssl-context "localhost")}) + (let [c @(tcp/client {:host "localhost" + :port 10001 + :ssl-context (netty/insecure-ssl-client-context)})] + (s/put! c "foo") + (is (= "foo" (bs/to-string @(s/take! c)))))))) + (aleph.resource-leak-detector/instrument-tests!) (aleph.testutils/instrument-tests-with-dropped-error-deferred-detection!) diff --git a/test/wrong_hostname_cert.edn b/test/wrong_hostname_cert.edn new file mode 100644 index 00000000..f72ec81e --- /dev/null +++ b/test/wrong_hostname_cert.edn @@ -0,0 +1 @@ +"MIIC5DCCAcygAwIBAgIJANJrncVPZseQMA0GCSqGSIb3DQEBCwUAMB8xHTAbBgNVBAMTFHNvbWUtcmFuZG9tLWhvc3RuYW1lMCAXDTI2MDIxMzEyNTI1OVoYDzIxMjYwMTIwMTI1MjU5WjAfMR0wGwYDVQQDExRzb21lLXJhbmRvbS1ob3N0bmFtZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAItpP3w3Kkp24JCDoZVJrgz+JfSAiv8hLUFqkwWBMiwzmHsARsBoezge5jQGK+Pin636lv0HuGuJv6cPgJXE5mDGNYsgHXp9tHjZ99oRMK8g7s6mFTdpako3VacWMQucsftz7o60h9C62eQA+3fzufHzessWXH+cvHmHazM9XmhR8f5OuW5VtBN4iMWjEnvJmf1NfxaC5+OXct/UfZNH9AY+66heuyAh0CtDPGfuzyWJWuRvz5+yzQS8ESsi0tdrfsqix1dIdLFWyyzIC5hmi8+RPYu4Gbrmn3ryYLkS+6DQKRhI+P+orVyoLiqrc/lp+nsrrOd/7j2z+OHiRlN3X9MCAwEAAaMhMB8wHQYDVR0OBBYEFEeUMQvjlmkjyedzj5gTfOG5JM3hMA0GCSqGSIb3DQEBCwUAA4IBAQB9qi177VxOAQ8VCWb0Cnwu/HggQv0zPtM/+P6E+kr3uZaMHypTEbWYKKY1lV2dRo2brqD4MlVhKLe+VG+TMc84UXD69YKsf3QFdW4nOQ5dYhEnrqaqGGSvOARvlcK0oN5RxjZIuZfxNMLcX/kLi8c+LIbRG03DoUfCjOa8/cji+EH9z4zPizah3lBwm/RIVN5wTCD+r7BPZVDA6adVGsI8eMTx6m6OXBvK+fN9ck+xkh8UlUYmbPGFcXlzVFB2NY003FOxwess7Q7HROzkGMlGhuNW7R3YyFmisn03EWSzNNo1IUfkywXz0Gfbl8YfFVXTiGsu2cQucf7F97G+5y8W" diff --git a/test/wrong_hostname_key.edn b/test/wrong_hostname_key.edn new file mode 100644 index 00000000..dc707aea --- /dev/null +++ b/test/wrong_hostname_key.edn @@ -0,0 +1,7 @@ +{:modulus "8b693f7c372a4a76e09083a19549ae0cfe25f4808aff212d416a930581322c33987b0046c0687b381ee634062be3e29fadfa96fd07b86b89bfa70f8095c4e660c6358b201d7a7db478d9f7da1130af20eecea61537696a4a3755a716310b9cb1fb73ee8eb487d0bad9e400fb77f3b9f1f37acb165c7f9cbc79876b333d5e6851f1fe4eb96e55b4137888c5a3127bc999fd4d7f1682e7e39772dfd47d9347f4063eeba85ebb2021d02b433c67eecf25895ae46fcf9fb2cd04bc112b22d2d76b7ecaa2c7574874b156cb2cc80b98668bcf913d8bb819bae69f7af260b912fba0d0291848f8ffa8ad5ca82e2aab73f969fa7b2bace77fee3db3f8e1e24653775fd3" + :privateExponent "118025d30f0dc50945497eecb2d3ec007f8afe3580bd6449cd1376df53079c5ae1c257178aa1d30e097af67eebd590a660edbb348a5a914d5a7cbb4a7602a0b4f74cd36935aa21cb9cfc27cbfd2a72619cf8de0a7dc942e98c26b971e4b84374684f765bfdb7aaf139d7107480eb5ca54dde554e3a505eb99e3becae22ff0df3440a8ae4972ebfc03e417807b4eebc4a2a3e790972c8549c479eb394f3dfb07b6eba6fe381248b3d8bf58b59d632ba54ea5d5f45a9a02b64f35652def75d66d3cbd609127a78e588553619db94c3d1c863121994588b4aa00497202431e6f9ab32495d8a71804ec5501a245c7c2baafbb0776685d892f12f274d28e17e1b0779" + :prime1 "c3cdab102a00b0f684cf149b4341187ea120852e3047592900ccb9da88e490df0f4f2f255ac97c3f06023c7e3da110ac1d8e1cf0c75d0469c1c34f178de9d037496622a32e9f9db2862d6b15a45059ce5238165f87252f105363cf03fcf5821ca787af82863b8743b930dcc653ea98675f4ef6fa101a0edb462b68cffc38d86d" + :prime2 "b645558da1f3e5dd019cce56599fc8c249547c722d4b24b0d79c3313143384a98f54d2a8070fe7dfe585f9fd1f88dd54d4f62eede0b5f73f010c4a18947ba661c73179a713b8c29319d31cceaa13f017afa6602432e14b8ad2925fb139e585c961fea27e20b829dfd6747fd1814346f7da13f19a321965149d4b651f5e7a713f" + :exponent1 "751586062b67b06a292e7f1efccf6f2b418465bf21e5783e59991245c8780cdd25956f16c23c1b8cc0361420550ffc1d3fab1c492a9dc7a6c9d3d576bb48a65679109703c73c711b30baf5fec1fbbe47eec85530986ab6c2e76a967ec703b3e31896bd6faa7eec3aa601bef2e57931db1695ec7fc9a31b61298b89a85d254cdd" + :exponent2 "72d7fad0d3a34648db343eba1f9c52b53fd8f0d44bff95adab1c6afdf173887a01aa735495a68af602bb48c9192e762e76446b4b4c93baa642f5c855f6707664134af418b68f6a619f1aea82b89cf9612336544b5a97a3ae638100b43d0386295e0a3fbdf2c5b6f4a73e1e2f16201abe1df504fc9fe66a4ef3d0c6755e650da7" + :coefficient "3782d9398feff2c35e79efebff579fc14069c9730c5a4a11276dbf3ec2465d77c86ee995881ddbc68863360e1d897c625c8cc39355cb7df7af730b634bd697d7ee03d17cb1b8b1da5903576ed647694e327ac8b47a9ba17d30cca47eaec79ab2de547dd487b8b23c53af5ee5ae57cb774034134b622006c2c28824314c3ec03f"}