@@ -110,10 +110,25 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
110110 defaultValue = "" ,
111111 help = "Path to the native Amazon Kinesis Producer Library (KPL) binary.\n "
112112 + "Only use this setting if you want to use a custom build of the native code.\n "
113- + "This setting can also be set with the environment variable `PULSAR_IO_KINESIS_KPL_PATH`.\n "
113+ + "This setting can also be set with the environment variable `PULSAR_IO_KINESIS_KPL_1_0_PATH`"
114+ + "or `PULSAR_IO_KINESIS_KPL_PATH`.\n "
114115 + "If not set, the Kinesis sink will use the built-in native executable."
115116 )
116- private String nativeExecutable = System .getenv ("PULSAR_IO_KINESIS_KPL_PATH" );
117+ private String nativeExecutable = resolveDefaultKinesisProducerLibraryPath ();
118+
119+ private static String resolveDefaultKinesisProducerLibraryPath () {
120+ // Prefer PULSAR_IO_KINESIS_KPL_1_0_PATH environment variable over PULSAR_IO_KINESIS_KPL_PATH.
121+ // This setting supports building a Pulsar Functions base image that is used to run different Pulsar IO Kinesis
122+ // sink versions. The older versions of Pulsar IO Kinesis sink can continue to use the binary configured with
123+ // PULSAR_IO_KINESIS_KPL_PATH, pointing to a 0.15.12 native executable. The newer versions of Pulsar IO Kinesis
124+ // sink can use the binary configured with PULSAR_IO_KINESIS_KPL_1_0_PATH, pointing to a 1.0.4
125+ // native executable.
126+ String kplPath = System .getenv ("PULSAR_IO_KINESIS_KPL_1_0_PATH" );
127+ if (isNotBlank (kplPath )) {
128+ return kplPath ;
129+ }
130+ return System .getenv ("PULSAR_IO_KINESIS_KPL_PATH" );
131+ }
117132
118133 public static KinesisSinkConfig load (Map <String , Object > config , SinkContext sinkContext ) {
119134 KinesisSinkConfig kinesisSinkConfig = IOConfigUtils .loadWithSecrets (config , KinesisSinkConfig .class , sinkContext );
0 commit comments