1717
1818package org .apache .flink .kubernetes .operator .admission ;
1919
20+ import org .apache .flink .annotation .VisibleForTesting ;
2021import org .apache .flink .kubernetes .operator .admission .informer .InformerManager ;
2122import org .apache .flink .kubernetes .operator .admission .mutator .FlinkMutator ;
2223import org .apache .flink .kubernetes .operator .api .FlinkStateSnapshot ;
4546import org .slf4j .Logger ;
4647import org .slf4j .LoggerFactory ;
4748
49+ import javax .annotation .Nullable ;
50+
4851import java .net .InetAddress ;
4952import java .net .InetSocketAddress ;
5053import java .nio .file .Path ;
@@ -58,26 +61,38 @@ public class FlinkOperatorWebhook {
5861
5962 private static FileSystemWatchService fileSystemWatchService ;
6063
61- public static void main (String [] args ) throws Exception {
62- EnvUtils .logEnvironmentInfo (LOG , "Flink Kubernetes Webhook" , args );
63- var informerManager = new InformerManager (new KubernetesClientBuilder ().build ());
64- var configManager =
65- new FlinkConfigManager (
66- informerManager ::setNamespaces ,
67- KubernetesClientUtils .isCrdInstalled (FlinkStateSnapshot .class ));
64+ @ VisibleForTesting final Set <FlinkResourceValidator > validators ;
65+ @ VisibleForTesting final Set <FlinkResourceMutator > mutators ;
66+ @ VisibleForTesting final AdmissionHandler admissionHandler ;
67+
68+ @ VisibleForTesting
69+ FlinkOperatorWebhook (
70+ @ Nullable InformerManager informerManager , @ Nullable FlinkConfigManager configManager ) {
71+ if (informerManager == null ) {
72+ informerManager = new InformerManager (new KubernetesClientBuilder ().build ());
73+ }
74+ if (configManager == null ) {
75+ configManager =
76+ new FlinkConfigManager (
77+ informerManager ::setNamespaces ,
78+ KubernetesClientUtils .isCrdInstalled (FlinkStateSnapshot .class ));
79+ }
80+
6881 var operatorConfig = configManager .getOperatorConfiguration ();
6982 if (!operatorConfig .isDynamicNamespacesEnabled ()) {
7083 informerManager .setNamespaces (operatorConfig .getWatchedNamespaces ());
7184 }
72- Set <FlinkResourceValidator > validators = ValidatorUtils .discoverValidators (configManager );
73- Set <FlinkResourceMutator > mutators = MutatorUtils .discoverMutators (configManager );
7485
75- AdmissionHandler endpoint =
86+ this .validators = ValidatorUtils .discoverValidators (configManager );
87+ this .mutators = MutatorUtils .discoverMutators (configManager );
88+ this .admissionHandler =
7689 new AdmissionHandler (
7790 new FlinkValidator (validators , informerManager ),
7891 new FlinkMutator (mutators , informerManager ));
92+ }
7993
80- ChannelInitializer <SocketChannel > initializer = createChannelInitializer (endpoint );
94+ public void run () throws Exception {
95+ ChannelInitializer <SocketChannel > initializer = createChannelInitializer (admissionHandler );
8196 NioEventLoopGroup bossGroup = new NioEventLoopGroup (1 );
8297 NioEventLoopGroup workerGroup = new NioEventLoopGroup ();
8398 try {
@@ -103,12 +118,18 @@ public static void main(String[] args) throws Exception {
103118 }
104119 }
105120
121+ public static void main (String [] args ) throws Exception {
122+ EnvUtils .logEnvironmentInfo (LOG , "Flink Kubernetes Webhook" , args );
123+ new FlinkOperatorWebhook (null , null ).run ();
124+ }
125+
106126 private static int getPort () {
107127 String portString = EnvUtils .getRequired (EnvUtils .ENV_WEBHOOK_SERVER_PORT );
108128 return Integer .parseInt (portString );
109129 }
110130
111- private static ChannelInitializer <SocketChannel > createChannelInitializer (
131+ @ VisibleForTesting
132+ static ChannelInitializer <SocketChannel > createChannelInitializer (
112133 AdmissionHandler admissionHandler ) throws Exception {
113134 SslContext sslContext = createSslContext ();
114135
@@ -150,7 +171,7 @@ private static SslContext createSslContext() throws Exception {
150171 stopFileSystemWatchService ();
151172 final String realKeystoreFileName =
152173 Path .of (keystorePathOpt .get ()).toRealPath ().getFileName ().toString ();
153- LOG .info ("Keystore path is resolved to real filename: " + realKeystoreFileName );
174+ LOG .info ("Keystore path is resolved to real filename: {}" , realKeystoreFileName );
154175 fileSystemWatchService =
155176 new FileSystemWatchService (Path .of (keystorePathOpt .get ()).getParent ().toString ()) {
156177 @ Override
@@ -160,7 +181,8 @@ protected void onFileOrDirectoryModified(Path relativePath) {
160181 reloadableSslContext .reload ();
161182 LOG .info ("SSL context reloaded successfully" );
162183 } catch (Exception e ) {
163- LOG .error ("SSL context reload received exception: " + e );
184+ LOG .error (
185+ "SSL context reload received exception: {}" , String .valueOf (e ));
164186 }
165187 }
166188 };
0 commit comments