From 7027bf66a73191f4397ac59370cd4fa4068b71cd Mon Sep 17 00:00:00 2001 From: rockyyin Date: Thu, 9 Apr 2026 13:14:29 +0800 Subject: [PATCH 1/2] [KYUUBI #XXXX] Support IP client allowlist for connection access control ### _Why are the changes needed?_ Currently, Kyuubi supports IP deny list (ip.deny.list) to block specific IPs from connecting. However, in some security-sensitive environments, administrators need the opposite approach - only allowing specific trusted IPs to connect (allowlist/whitelist pattern). This is a common security requirement for production deployments. ### _How was this patch tested?_ - Added 5 unit test cases in SessionLimiterSuite: - test session limiter with ip allowlist - test session limiter ip allowlist with multiple ips - test session limiter empty ip allowlist allows all ips - test session limiter ip deny list has higher priority than ip allowlist - test refresh ip allowlist ### _Was this patch authored or co-authored using generative AI tooling?_ No ### Changes: - Add SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST config in KyuubiConf - Add ipAllowlist field in SessionLimiterWithAccessControlListImpl - Add ip allowlist check in SessionLimiter.increment() - Add getIpAllowlist/refreshIpAllowlist in KyuubiSessionManager - Add refreshIpAllowlist() in KyuubiServer - Add REST API endpoint POST /api/v1/admin/refresh/ip_allowlist - When ip.deny.list and ip.allowlist both contain the same IP, deny list takes higher priority --- .../org/apache/kyuubi/config/KyuubiConf.scala | 13 ++ .../apache/kyuubi/server/KyuubiServer.scala | 8 ++ .../kyuubi/server/api/v1/AdminResource.scala | 19 +++ .../kyuubi/session/KyuubiSessionManager.scala | 26 +++- .../kyuubi/session/SessionLimiter.scala | 30 ++++- .../kyuubi/session/SessionLimiterSuite.scala | 114 ++++++++++++++++++ 6 files changed, 202 insertions(+), 8 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 8e8d31550e8..6ec01ebf5b3 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -3283,6 +3283,19 @@ object KyuubiConf { .toSet() .createWithDefault(Set.empty) + val SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST: ConfigEntry[Set[String]] = + buildConf("kyuubi.server.limit.connections.ip.allowlist") + .doc("When this list is not empty, only the client ip in the allow list will be" + + " permitted to connect to kyuubi server, all other ips will be denied." + + " If this list is empty (default), no ip allowlist restriction is applied." + + " Note: if a client ip is in both ip.allowlist and ip.deny.list," + + " the deny list takes higher priority.") + .version("1.10.0") + .serverOnly + .stringConf + .toSet() + .createWithDefault(Set.empty) + val SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] = buildConf("kyuubi.server.limit.batch.connections.per.user") .doc("Maximum kyuubi server batch connections per user." + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 0996472653b..104b2d0f0f0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -175,6 +175,14 @@ object KyuubiServer extends Logging { info(s"Refreshed deny client ips from $existingDenyIps to $refreshedDenyIps") } + private[kyuubi] def refreshIpAllowlist(): Unit = synchronized { + val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] + val existingIpAllowlist = sessionMgr.getIpAllowlist + sessionMgr.refreshIpAllowlist(createKyuubiConf()) + val refreshedIpAllowlist = sessionMgr.getIpAllowlist + info(s"Refreshed ip allowlist from $existingIpAllowlist to $refreshedIpAllowlist") + } + private def createKyuubiConf(): KyuubiConf = { KyuubiConf().loadFileDefaults().loadFromArgs(commandArgs) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index c588ead7a82..68385a907df 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -161,6 +161,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { Response.ok(s"Refresh the deny ips successfully.").build() } + @ApiResponse( + responseCode = "200", + content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)), + description = "refresh the ip allowlist") + @POST + @Path("refresh/ip_allowlist") + def refreshIpAllowlist(): Response = { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Receive refresh ip allowlist request from $userName/$ipAddress") + if (!fe.isAdministrator(userName)) { + throw new ForbiddenException( + s"$userName is not allowed to refresh the ip allowlist") + } + info(s"Reloading ip allowlist") + KyuubiServer.refreshIpAllowlist() + Response.ok(s"Refresh the ip allowlist successfully.").build() + } + @ApiResponse( responseCode = "200", content = Array(new Content( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 344da0e71e8..c7ff7b83551 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -396,13 +396,15 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { conf.get(SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).filter(_.nonEmpty) val userDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_USER_DENY_LIST).filter(_.nonEmpty) val ipDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_IP_DENY_LIST).filter(_.nonEmpty) + val ipAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty) limiter = applySessionLimiter( userLimit, ipAddressLimit, userIpAddressLimit, userUnlimitedList, userDenyList, - ipDenyList) + ipDenyList, + ipAllowlist) val userBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER).getOrElse(0) val ipAddressBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_IPADDRESS).getOrElse(0) @@ -414,7 +416,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { userIpAddressBatchLimit, userUnlimitedList, userDenyList, - ipDenyList) + ipDenyList, + ipAllowlist) } private[kyuubi] def getUnlimitedUsers: Set[String] = { @@ -448,22 +451,35 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { batchLimiter.foreach(SessionLimiter.resetDenyIps(_, denyIps)) } + private[kyuubi] def getIpAllowlist: Set[String] = { + limiter.orElse(batchLimiter).map(SessionLimiter.getIpAllowlist).getOrElse(Set.empty) + } + + private[kyuubi] def refreshIpAllowlist(conf: KyuubiConf): Unit = { + val ipAllowlist = + conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty) + limiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist)) + batchLimiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist)) + } + private def applySessionLimiter( userLimit: Int, ipAddressLimit: Int, userIpAddressLimit: Int, userUnlimitedList: Set[String], userDenyList: Set[String], - ipDenyList: Set[String]): Option[SessionLimiter] = { + ipDenyList: Set[String], + ipAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = { if (Seq(userLimit, ipAddressLimit, userIpAddressLimit).exists(_ > 0) || - userDenyList.nonEmpty || ipDenyList.nonEmpty) { + userDenyList.nonEmpty || ipDenyList.nonEmpty || ipAllowlist.nonEmpty) { Some(SessionLimiter( userLimit, ipAddressLimit, userIpAddressLimit, userUnlimitedList, userDenyList, - ipDenyList)) + ipDenyList, + ipAllowlist)) } else { None } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala index 688154d0099..13f1d8f2fa0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala @@ -108,7 +108,8 @@ class SessionLimiterWithAccessControlListImpl( userIpAddressLimit: Int, var unlimitedUsers: Set[String], var denyUsers: Set[String], - var denyIps: Set[String]) + var denyIps: Set[String], + var ipAllowlist: Set[String] = Set.empty) extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) { override def increment(userIpAddress: UserIpAddress): Unit = { val user = userIpAddress.user @@ -123,6 +124,12 @@ class SessionLimiterWithAccessControlListImpl( s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)" throw KyuubiSQLException(errorMsg) } + // ip allowlist check: when allowlist is not empty, only allowed ips can connect + if (ipAllowlist.nonEmpty && StringUtils.isNotBlank(ip) && !ipAllowlist.contains(ip)) { + val errorMsg = + s"Connection denied because the client ip is not in the ip allowlist. (ipAddress: $ip)" + throw KyuubiSQLException(errorMsg) + } if (!unlimitedUsers.contains(user)) { super.increment(userIpAddress) @@ -140,6 +147,10 @@ class SessionLimiterWithAccessControlListImpl( private[kyuubi] def setDenyIps(denyIps: Set[String]): Unit = { this.denyIps = denyIps } + + private[kyuubi] def setIpAllowlist(ipAllowlist: Set[String]): Unit = { + this.ipAllowlist = ipAllowlist + } } object SessionLimiter { @@ -150,14 +161,16 @@ object SessionLimiter { userIpAddressLimit: Int, unlimitedUsers: Set[String] = Set.empty, denyUsers: Set[String] = Set.empty, - denyIps: Set[String] = Set.empty): SessionLimiter = { + denyIps: Set[String] = Set.empty, + ipAllowlist: Set[String] = Set.empty): SessionLimiter = { new SessionLimiterWithAccessControlListImpl( userLimit, ipAddressLimit, userIpAddressLimit, unlimitedUsers, denyUsers, - denyIps) + denyIps, + ipAllowlist) } def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit = @@ -192,4 +205,15 @@ object SessionLimiter { case l: SessionLimiterWithAccessControlListImpl => l.denyIps case _ => Set.empty } + + def resetIpAllowlist(limiter: SessionLimiter, ipAllowlist: Set[String]): Unit = + limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.setIpAllowlist(ipAllowlist) + case _ => + } + + def getIpAllowlist(limiter: SessionLimiter): Set[String] = limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.ipAllowlist + case _ => Set.empty + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala index 47c33d63e1d..c9fc46acf91 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala @@ -214,4 +214,118 @@ class SessionLimiterSuite extends KyuubiFunSuite { limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values .foreach(c => assert(c.get() == 0)) } + + test("test session limiter with ip allowlist") { + val allowedIp = "10.0.0.1" + val blockedIp = "192.168.1.100" + val ipAllowlist = Set(allowedIp) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + ipAllowlist) + + // allowed ip should be able to connect + limiter.increment(UserIpAddress("user001", allowedIp)) + + // blocked ip should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user001", blockedIp)) + } + assert(caught.getMessage.equals( + s"Connection denied because the client ip is not in the ip allowlist." + + s" (ipAddress: $blockedIp)")) + } + + test("test session limiter ip allowlist with multiple ips") { + val allowedIp1 = "10.0.0.1" + val allowedIp2 = "10.0.0.2" + val blockedIp = "192.168.1.100" + val ipAllowlist = Set(allowedIp1, allowedIp2) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + ipAllowlist) + + // both allowed ips should be able to connect + limiter.increment(UserIpAddress("user001", allowedIp1)) + limiter.increment(UserIpAddress("user002", allowedIp2)) + + // blocked ip should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user003", blockedIp)) + } + assert(caught.getMessage.contains("not in the ip allowlist")) + } + + test("test session limiter empty ip allowlist allows all ips") { + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty) + + // when allowlist is empty, all ips should be allowed + limiter.increment(UserIpAddress("user001", "10.0.0.1")) + limiter.increment(UserIpAddress("user002", "192.168.1.100")) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } + + test("test session limiter ip deny list has higher priority than ip allowlist") { + val ip = "10.0.0.1" + val denyIps = Set(ip) + val ipAllowlist = Set(ip) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + denyIps, + ipAllowlist) + + // deny ip list check happens before allowlist check + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user001", ip)) + } + assert(caught.getMessage.equals( + s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)")) + } + + test("test refresh ip allowlist") { + val allowedIp = "10.0.0.1" + val blockedIp = "192.168.1.100" + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set(allowedIp)) + + // initially only allowedIp can connect + limiter.increment(UserIpAddress("user001", allowedIp)) + intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user002", blockedIp)) + } + + // refresh allowlist to include blockedIp + SessionLimiter.resetIpAllowlist(limiter, Set(allowedIp, blockedIp)) + limiter.increment(UserIpAddress("user002", blockedIp)) + + // refresh allowlist to empty (allow all) + SessionLimiter.resetIpAllowlist(limiter, Set.empty) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } } From 4e5f7c51ef64132c6f65afedded8439cb30b05a3 Mon Sep 17 00:00:00 2001 From: rockyyin Date: Thu, 9 Apr 2026 13:21:58 +0800 Subject: [PATCH 2/2] Support user allowlist for connection access control Add user allowlist feature (kyuubi.server.limit.connections.user.allowlist) that restricts which users can connect to Kyuubi server. Changes: - Add SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST config in KyuubiConf - Add userAllowlist field in SessionLimiterWithAccessControlListImpl - Add user allowlist check in SessionLimiter.increment() - Add getUserAllowlist/refreshUserAllowlist in KyuubiSessionManager - Add refreshUserAllowlist() in KyuubiServer - Add REST API endpoint POST /api/v1/admin/refresh/user_allowlist - Add 5 test cases in SessionLimiterSuite - When user.deny.list and user.allowlist both contain the same user, deny list takes higher priority --- .../org/apache/kyuubi/config/KyuubiConf.scala | 13 ++ .../apache/kyuubi/server/KyuubiServer.scala | 8 ++ .../kyuubi/server/api/v1/AdminResource.scala | 19 +++ .../kyuubi/session/KyuubiSessionManager.scala | 27 +++- .../kyuubi/session/SessionLimiter.scala | 31 ++++- .../kyuubi/session/SessionLimiterSuite.scala | 119 ++++++++++++++++++ 6 files changed, 209 insertions(+), 8 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 6ec01ebf5b3..ba83b70fb4e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -3296,6 +3296,19 @@ object KyuubiConf { .toSet() .createWithDefault(Set.empty) + val SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST: ConfigEntry[Set[String]] = + buildConf("kyuubi.server.limit.connections.user.allowlist") + .doc("When this list is not empty, only the user in the allow list will be" + + " permitted to connect to kyuubi server, all other users will be denied." + + " If this list is empty (default), no user allowlist restriction is applied." + + " Note: if a user is in both user.allowlist and user.deny.list," + + " the deny list takes higher priority.") + .version("1.10.0") + .serverOnly + .stringConf + .toSet() + .createWithDefault(Set.empty) + val SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] = buildConf("kyuubi.server.limit.batch.connections.per.user") .doc("Maximum kyuubi server batch connections per user." + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 104b2d0f0f0..67a0c0ba261 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -183,6 +183,14 @@ object KyuubiServer extends Logging { info(s"Refreshed ip allowlist from $existingIpAllowlist to $refreshedIpAllowlist") } + private[kyuubi] def refreshUserAllowlist(): Unit = synchronized { + val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] + val existingUserAllowlist = sessionMgr.getUserAllowlist + sessionMgr.refreshUserAllowlist(createKyuubiConf()) + val refreshedUserAllowlist = sessionMgr.getUserAllowlist + info(s"Refreshed user allowlist from $existingUserAllowlist to $refreshedUserAllowlist") + } + private def createKyuubiConf(): KyuubiConf = { KyuubiConf().loadFileDefaults().loadFromArgs(commandArgs) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 68385a907df..5a208addfc2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -180,6 +180,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { Response.ok(s"Refresh the ip allowlist successfully.").build() } + @ApiResponse( + responseCode = "200", + content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)), + description = "refresh the user allowlist") + @POST + @Path("refresh/user_allowlist") + def refreshUserAllowlist(): Response = { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Receive refresh user allowlist request from $userName/$ipAddress") + if (!fe.isAdministrator(userName)) { + throw new ForbiddenException( + s"$userName is not allowed to refresh the user allowlist") + } + info(s"Reloading user allowlist") + KyuubiServer.refreshUserAllowlist() + Response.ok(s"Refresh the user allowlist successfully.").build() + } + @ApiResponse( responseCode = "200", content = Array(new Content( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index c7ff7b83551..d5291fd5d25 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -397,6 +397,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { val userDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_USER_DENY_LIST).filter(_.nonEmpty) val ipDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_IP_DENY_LIST).filter(_.nonEmpty) val ipAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty) + val userAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty) limiter = applySessionLimiter( userLimit, ipAddressLimit, @@ -404,7 +405,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { userUnlimitedList, userDenyList, ipDenyList, - ipAllowlist) + ipAllowlist, + userAllowlist) val userBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER).getOrElse(0) val ipAddressBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_IPADDRESS).getOrElse(0) @@ -417,7 +419,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { userUnlimitedList, userDenyList, ipDenyList, - ipAllowlist) + ipAllowlist, + userAllowlist) } private[kyuubi] def getUnlimitedUsers: Set[String] = { @@ -462,6 +465,17 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { batchLimiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist)) } + private[kyuubi] def getUserAllowlist: Set[String] = { + limiter.orElse(batchLimiter).map(SessionLimiter.getUserAllowlist).getOrElse(Set.empty) + } + + private[kyuubi] def refreshUserAllowlist(conf: KyuubiConf): Unit = { + val userAllowlist = + conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty) + limiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist)) + batchLimiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist)) + } + private def applySessionLimiter( userLimit: Int, ipAddressLimit: Int, @@ -469,9 +483,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { userUnlimitedList: Set[String], userDenyList: Set[String], ipDenyList: Set[String], - ipAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = { + ipAllowlist: Set[String] = Set.empty, + userAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = { if (Seq(userLimit, ipAddressLimit, userIpAddressLimit).exists(_ > 0) || - userDenyList.nonEmpty || ipDenyList.nonEmpty || ipAllowlist.nonEmpty) { + userDenyList.nonEmpty || ipDenyList.nonEmpty || + ipAllowlist.nonEmpty || userAllowlist.nonEmpty) { Some(SessionLimiter( userLimit, ipAddressLimit, @@ -479,7 +495,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { userUnlimitedList, userDenyList, ipDenyList, - ipAllowlist)) + ipAllowlist, + userAllowlist)) } else { None } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala index 13f1d8f2fa0..4d6923b7a77 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala @@ -109,7 +109,8 @@ class SessionLimiterWithAccessControlListImpl( var unlimitedUsers: Set[String], var denyUsers: Set[String], var denyIps: Set[String], - var ipAllowlist: Set[String] = Set.empty) + var ipAllowlist: Set[String] = Set.empty, + var userAllowlist: Set[String] = Set.empty) extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) { override def increment(userIpAddress: UserIpAddress): Unit = { val user = userIpAddress.user @@ -124,6 +125,13 @@ class SessionLimiterWithAccessControlListImpl( s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)" throw KyuubiSQLException(errorMsg) } + // user allowlist check: when allowlist is not empty, only allowed users can connect + if (userAllowlist.nonEmpty && StringUtils.isNotBlank(user) && + !userAllowlist.contains(user)) { + val errorMsg = + s"Connection denied because the user is not in the user allowlist. (user: $user)" + throw KyuubiSQLException(errorMsg) + } // ip allowlist check: when allowlist is not empty, only allowed ips can connect if (ipAllowlist.nonEmpty && StringUtils.isNotBlank(ip) && !ipAllowlist.contains(ip)) { val errorMsg = @@ -151,6 +159,10 @@ class SessionLimiterWithAccessControlListImpl( private[kyuubi] def setIpAllowlist(ipAllowlist: Set[String]): Unit = { this.ipAllowlist = ipAllowlist } + + private[kyuubi] def setUserAllowlist(userAllowlist: Set[String]): Unit = { + this.userAllowlist = userAllowlist + } } object SessionLimiter { @@ -162,7 +174,8 @@ object SessionLimiter { unlimitedUsers: Set[String] = Set.empty, denyUsers: Set[String] = Set.empty, denyIps: Set[String] = Set.empty, - ipAllowlist: Set[String] = Set.empty): SessionLimiter = { + ipAllowlist: Set[String] = Set.empty, + userAllowlist: Set[String] = Set.empty): SessionLimiter = { new SessionLimiterWithAccessControlListImpl( userLimit, ipAddressLimit, @@ -170,7 +183,8 @@ object SessionLimiter { unlimitedUsers, denyUsers, denyIps, - ipAllowlist) + ipAllowlist, + userAllowlist) } def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit = @@ -216,4 +230,15 @@ object SessionLimiter { case l: SessionLimiterWithAccessControlListImpl => l.ipAllowlist case _ => Set.empty } + + def resetUserAllowlist(limiter: SessionLimiter, userAllowlist: Set[String]): Unit = + limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.setUserAllowlist(userAllowlist) + case _ => + } + + def getUserAllowlist(limiter: SessionLimiter): Set[String] = limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.userAllowlist + case _ => Set.empty + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala index c9fc46acf91..bd8c99e44b9 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala @@ -328,4 +328,123 @@ class SessionLimiterSuite extends KyuubiFunSuite { SessionLimiter.resetIpAllowlist(limiter, Set.empty) limiter.increment(UserIpAddress("user003", "172.16.0.1")) } + + test("test session limiter with user allowlist") { + val allowedUser = "user001" + val blockedUser = "user002" + val userAllowlist = Set(allowedUser) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + userAllowlist) + + // allowed user should be able to connect + limiter.increment(UserIpAddress(allowedUser, "10.0.0.1")) + + // blocked user should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(blockedUser, "10.0.0.1")) + } + assert(caught.getMessage.equals( + s"Connection denied because the user is not in the user allowlist." + + s" (user: $blockedUser)")) + } + + test("test session limiter user allowlist with multiple users") { + val allowedUser1 = "user001" + val allowedUser2 = "user002" + val blockedUser = "user003" + val userAllowlist = Set(allowedUser1, allowedUser2) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + userAllowlist) + + // both allowed users should be able to connect + limiter.increment(UserIpAddress(allowedUser1, "10.0.0.1")) + limiter.increment(UserIpAddress(allowedUser2, "10.0.0.2")) + + // blocked user should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(blockedUser, "192.168.1.100")) + } + assert(caught.getMessage.contains("not in the user allowlist")) + } + + test("test session limiter empty user allowlist allows all users") { + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + Set.empty) + + // when allowlist is empty, all users should be allowed + limiter.increment(UserIpAddress("user001", "10.0.0.1")) + limiter.increment(UserIpAddress("user002", "192.168.1.100")) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } + + test("test session limiter user deny list has higher priority than user allowlist") { + val user = "user001" + val denyUsers = Set(user) + val userAllowlist = Set(user) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + denyUsers, + Set.empty, + Set.empty, + userAllowlist) + + // deny user list check happens before allowlist check + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(user, "10.0.0.1")) + } + assert(caught.getMessage.equals( + s"Connection denied because the user is in the deny user list. (user: $user)")) + } + + test("test refresh user allowlist") { + val allowedUser = "user001" + val blockedUser = "user002" + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + Set(allowedUser)) + + // initially only allowedUser can connect + limiter.increment(UserIpAddress(allowedUser, "10.0.0.1")) + intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(blockedUser, "10.0.0.1")) + } + + // refresh allowlist to include blockedUser + SessionLimiter.resetUserAllowlist(limiter, Set(allowedUser, blockedUser)) + limiter.increment(UserIpAddress(blockedUser, "10.0.0.1")) + + // refresh allowlist to empty (allow all) + SessionLimiter.resetUserAllowlist(limiter, Set.empty) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } }