2020import org .apache .fluss .config .Configuration ;
2121import org .apache .fluss .exception .AuthenticationException ;
2222import org .apache .fluss .security .auth .ClientAuthenticator ;
23+ import org .apache .fluss .security .auth .sasl .jaas .JaasConfig ;
2324import org .apache .fluss .security .auth .sasl .jaas .JaasContext ;
2425import org .apache .fluss .security .auth .sasl .jaas .LoginManager ;
26+ import org .apache .fluss .security .auth .sasl .plain .PlainLoginModule ;
2527import org .apache .fluss .security .auth .sasl .plain .PlainSaslServer ;
2628
2729import javax .annotation .Nullable ;
30+ import javax .security .auth .login .AppConfigurationEntry ;
2831import javax .security .auth .login .LoginException ;
2932import javax .security .sasl .SaslClient ;
3033
@@ -50,7 +53,13 @@ public class SaslClientAuthenticator implements ClientAuthenticator {
5053 public SaslClientAuthenticator (Configuration configuration ) {
5154 this .mechanism = configuration .get (CLIENT_SASL_MECHANISM ).toUpperCase ();
5255 String jaasConfigStr = configuration .getString (CLIENT_SASL_JAAS_CONFIG );
53- if (jaasConfigStr == null && mechanism .equals (PlainSaslServer .PLAIN_MECHANISM )) {
56+ if (jaasConfigStr != null ) {
57+ // Validate that only PlainLoginModule is allowed in the JAAS config.
58+ // Fluss uses a plugin-based authentication system and does not support
59+ // custom SASL mechanisms. The jaas.config option is retained for backward
60+ // compatibility only.
61+ validatePlainLoginModule (jaasConfigStr );
62+ } else if (mechanism .equals (PlainSaslServer .PLAIN_MECHANISM )) {
5463 String username = configuration .get (CLIENT_SASL_JAAS_USERNAME );
5564 String password = configuration .get (CLIENT_SASL_JAAS_PASSWORD );
5665 if (username != null || password != null ) {
@@ -68,6 +77,39 @@ public SaslClientAuthenticator(Configuration configuration) {
6877 this .pros = configuration .toMap ();
6978 }
7079
80+ /**
81+ * Validates that the provided JAAS configuration string only uses {@link PlainLoginModule}.
82+ *
83+ * @param jaasConfigStr the JAAS configuration string to validate
84+ * @throws AuthenticationException if the JAAS config uses a login module other than
85+ * PlainLoginModule
86+ */
87+ private static void validatePlainLoginModule (String jaasConfigStr ) {
88+ JaasConfig jaasConfig = new JaasConfig ("FlussClient" , jaasConfigStr );
89+ AppConfigurationEntry [] entries = jaasConfig .getAppConfigurationEntry ("FlussClient" );
90+ if (entries == null || entries .length == 0 ) {
91+ throw new AuthenticationException (
92+ "JAAS config property does not contain any login modules" );
93+ }
94+ if (entries .length != 1 ) {
95+ throw new AuthenticationException (
96+ "JAAS config property contains "
97+ + entries .length
98+ + " login modules, should be 1 module" );
99+ }
100+ String loginModuleName = entries [0 ].getLoginModuleName ();
101+ if (!PlainLoginModule .class .getName ().equals (loginModuleName )) {
102+ throw new AuthenticationException (
103+ String .format (
104+ "Only '%s' is supported in '%s'. "
105+ + "Fluss uses a plugin-based authentication system and does not support "
106+ + "custom SASL mechanisms. Got: '%s'" ,
107+ PlainLoginModule .class .getName (),
108+ CLIENT_SASL_JAAS_CONFIG .key (),
109+ loginModuleName ));
110+ }
111+ }
112+
71113 @ Override
72114 public String protocol () {
73115 return mechanism ;
0 commit comments