|
24 | 24 | import org.apache.pulsar.client.api.PulsarClientException; |
25 | 25 | import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; |
26 | 26 | import org.apache.pulsar.client.impl.conf.ClientConfigurationData; |
| 27 | +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; |
| 28 | +import org.apache.pulsar.shade.com.google.common.collect.Maps; |
27 | 29 | import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; |
28 | 30 |
|
| 31 | +import java.util.Map; |
29 | 32 | import java.util.Properties; |
| 33 | +import java.util.stream.Collectors; |
30 | 34 |
|
31 | 35 | /** Utility to create Pulsar Admin Client from adminUrl and clientConfigurationData. */ |
32 | 36 | public class PulsarClientUtils { |
@@ -56,13 +60,18 @@ private static Authentication getAuth(ClientConfigurationData conf) |
56 | 60 | } |
57 | 61 |
|
58 | 62 | public static ClientConfigurationData newClientConf(String serviceUrl, Properties properties) { |
| 63 | + Map<String, Object> clientConfData = getClientParams(Maps.fromProperties(properties)); |
59 | 64 | ClientConfigurationData clientConf = new ClientConfigurationData(); |
| 65 | + clientConf = ConfigurationDataUtils.loadData(clientConfData, clientConf, ClientConfigurationData.class); |
60 | 66 | clientConf.setServiceUrl(serviceUrl); |
61 | | - if (properties != null) { |
62 | | - clientConf.setAuthParams(properties.getProperty(PulsarOptions.AUTH_PARAMS_KEY)); |
63 | | - clientConf.setAuthPluginClassName( |
64 | | - properties.getProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY)); |
65 | | - } |
66 | 67 | return clientConf; |
67 | 68 | } |
| 69 | + |
| 70 | + public static Map<String, Object> getClientParams(Map<String, String> parameters) { |
| 71 | + return parameters.keySet().stream() |
| 72 | + .filter(k -> k.startsWith(PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX)) |
| 73 | + .collect(Collectors.toMap( |
| 74 | + k -> k.substring(PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX.length()), |
| 75 | + k -> parameters.get(k))); |
| 76 | + } |
68 | 77 | } |
0 commit comments