diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 8e8d31550e8..ba83b70fb4e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -3283,6 +3283,32 @@ object KyuubiConf { .toSet() .createWithDefault(Set.empty) + val SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST: ConfigEntry[Set[String]] = + buildConf("kyuubi.server.limit.connections.ip.allowlist") + .doc("When this list is not empty, only the client ip in the allow list will be" + + " permitted to connect to kyuubi server, all other ips will be denied." + + " If this list is empty (default), no ip allowlist restriction is applied." + + " Note: if a client ip is in both ip.allowlist and ip.deny.list," + + " the deny list takes higher priority.") + .version("1.10.0") + .serverOnly + .stringConf + .toSet() + .createWithDefault(Set.empty) + + val SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST: ConfigEntry[Set[String]] = + buildConf("kyuubi.server.limit.connections.user.allowlist") + .doc("When this list is not empty, only the user in the allow list will be" + + " permitted to connect to kyuubi server, all other users will be denied." + + " If this list is empty (default), no user allowlist restriction is applied." + + " Note: if a user is in both user.allowlist and user.deny.list," + + " the deny list takes higher priority.") + .version("1.10.0") + .serverOnly + .stringConf + .toSet() + .createWithDefault(Set.empty) + val SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] = buildConf("kyuubi.server.limit.batch.connections.per.user") .doc("Maximum kyuubi server batch connections per user." + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 0996472653b..67a0c0ba261 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -175,6 +175,22 @@ object KyuubiServer extends Logging { info(s"Refreshed deny client ips from $existingDenyIps to $refreshedDenyIps") } + private[kyuubi] def refreshIpAllowlist(): Unit = synchronized { + val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] + val existingIpAllowlist = sessionMgr.getIpAllowlist + sessionMgr.refreshIpAllowlist(createKyuubiConf()) + val refreshedIpAllowlist = sessionMgr.getIpAllowlist + info(s"Refreshed ip allowlist from $existingIpAllowlist to $refreshedIpAllowlist") + } + + private[kyuubi] def refreshUserAllowlist(): Unit = synchronized { + val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] + val existingUserAllowlist = sessionMgr.getUserAllowlist + sessionMgr.refreshUserAllowlist(createKyuubiConf()) + val refreshedUserAllowlist = sessionMgr.getUserAllowlist + info(s"Refreshed user allowlist from $existingUserAllowlist to $refreshedUserAllowlist") + } + private def createKyuubiConf(): KyuubiConf = { KyuubiConf().loadFileDefaults().loadFromArgs(commandArgs) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index c588ead7a82..5a208addfc2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -161,6 +161,44 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { Response.ok(s"Refresh the deny ips successfully.").build() } + @ApiResponse( + responseCode = "200", + content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)), + description = "refresh the ip allowlist") + @POST + @Path("refresh/ip_allowlist") + def refreshIpAllowlist(): Response = { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Receive refresh ip allowlist request from $userName/$ipAddress") + if (!fe.isAdministrator(userName)) { + throw new ForbiddenException( + s"$userName is not allowed to refresh the ip allowlist") + } + info(s"Reloading ip allowlist") + KyuubiServer.refreshIpAllowlist() + Response.ok(s"Refresh the ip allowlist successfully.").build() + } + + @ApiResponse( + responseCode = "200", + content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)), + description = "refresh the user allowlist") + @POST + @Path("refresh/user_allowlist") + def refreshUserAllowlist(): Response = { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Receive refresh user allowlist request from $userName/$ipAddress") + if (!fe.isAdministrator(userName)) { + throw new ForbiddenException( + s"$userName is not allowed to refresh the user allowlist") + } + info(s"Reloading user allowlist") + KyuubiServer.refreshUserAllowlist() + Response.ok(s"Refresh the user allowlist successfully.").build() + } + @ApiResponse( responseCode = "200", content = Array(new Content( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 344da0e71e8..d5291fd5d25 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -396,13 +396,17 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { conf.get(SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).filter(_.nonEmpty) val userDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_USER_DENY_LIST).filter(_.nonEmpty) val ipDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_IP_DENY_LIST).filter(_.nonEmpty) + val ipAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty) + val userAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty) limiter = applySessionLimiter( userLimit, ipAddressLimit, userIpAddressLimit, userUnlimitedList, userDenyList, - ipDenyList) + ipDenyList, + ipAllowlist, + userAllowlist) val userBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER).getOrElse(0) val ipAddressBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_IPADDRESS).getOrElse(0) @@ -414,7 +418,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { userIpAddressBatchLimit, userUnlimitedList, userDenyList, - ipDenyList) + ipDenyList, + ipAllowlist, + userAllowlist) } private[kyuubi] def getUnlimitedUsers: Set[String] = { @@ -448,22 +454,49 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { batchLimiter.foreach(SessionLimiter.resetDenyIps(_, denyIps)) } + private[kyuubi] def getIpAllowlist: Set[String] = { + limiter.orElse(batchLimiter).map(SessionLimiter.getIpAllowlist).getOrElse(Set.empty) + } + + private[kyuubi] def refreshIpAllowlist(conf: KyuubiConf): Unit = { + val ipAllowlist = + conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty) + limiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist)) + batchLimiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist)) + } + + private[kyuubi] def getUserAllowlist: Set[String] = { + limiter.orElse(batchLimiter).map(SessionLimiter.getUserAllowlist).getOrElse(Set.empty) + } + + private[kyuubi] def refreshUserAllowlist(conf: KyuubiConf): Unit = { + val userAllowlist = + conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty) + limiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist)) + batchLimiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist)) + } + private def applySessionLimiter( userLimit: Int, ipAddressLimit: Int, userIpAddressLimit: Int, userUnlimitedList: Set[String], userDenyList: Set[String], - ipDenyList: Set[String]): Option[SessionLimiter] = { + ipDenyList: Set[String], + ipAllowlist: Set[String] = Set.empty, + userAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = { if (Seq(userLimit, ipAddressLimit, userIpAddressLimit).exists(_ > 0) || - userDenyList.nonEmpty || ipDenyList.nonEmpty) { + userDenyList.nonEmpty || ipDenyList.nonEmpty || + ipAllowlist.nonEmpty || userAllowlist.nonEmpty) { Some(SessionLimiter( userLimit, ipAddressLimit, userIpAddressLimit, userUnlimitedList, userDenyList, - ipDenyList)) + ipDenyList, + ipAllowlist, + userAllowlist)) } else { None } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala index 688154d0099..4d6923b7a77 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala @@ -108,7 +108,9 @@ class SessionLimiterWithAccessControlListImpl( userIpAddressLimit: Int, var unlimitedUsers: Set[String], var denyUsers: Set[String], - var denyIps: Set[String]) + var denyIps: Set[String], + var ipAllowlist: Set[String] = Set.empty, + var userAllowlist: Set[String] = Set.empty) extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) { override def increment(userIpAddress: UserIpAddress): Unit = { val user = userIpAddress.user @@ -123,6 +125,19 @@ class SessionLimiterWithAccessControlListImpl( s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)" throw KyuubiSQLException(errorMsg) } + // user allowlist check: when allowlist is not empty, only allowed users can connect + if (userAllowlist.nonEmpty && StringUtils.isNotBlank(user) && + !userAllowlist.contains(user)) { + val errorMsg = + s"Connection denied because the user is not in the user allowlist. (user: $user)" + throw KyuubiSQLException(errorMsg) + } + // ip allowlist check: when allowlist is not empty, only allowed ips can connect + if (ipAllowlist.nonEmpty && StringUtils.isNotBlank(ip) && !ipAllowlist.contains(ip)) { + val errorMsg = + s"Connection denied because the client ip is not in the ip allowlist. (ipAddress: $ip)" + throw KyuubiSQLException(errorMsg) + } if (!unlimitedUsers.contains(user)) { super.increment(userIpAddress) @@ -140,6 +155,14 @@ class SessionLimiterWithAccessControlListImpl( private[kyuubi] def setDenyIps(denyIps: Set[String]): Unit = { this.denyIps = denyIps } + + private[kyuubi] def setIpAllowlist(ipAllowlist: Set[String]): Unit = { + this.ipAllowlist = ipAllowlist + } + + private[kyuubi] def setUserAllowlist(userAllowlist: Set[String]): Unit = { + this.userAllowlist = userAllowlist + } } object SessionLimiter { @@ -150,14 +173,18 @@ object SessionLimiter { userIpAddressLimit: Int, unlimitedUsers: Set[String] = Set.empty, denyUsers: Set[String] = Set.empty, - denyIps: Set[String] = Set.empty): SessionLimiter = { + denyIps: Set[String] = Set.empty, + ipAllowlist: Set[String] = Set.empty, + userAllowlist: Set[String] = Set.empty): SessionLimiter = { new SessionLimiterWithAccessControlListImpl( userLimit, ipAddressLimit, userIpAddressLimit, unlimitedUsers, denyUsers, - denyIps) + denyIps, + ipAllowlist, + userAllowlist) } def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit = @@ -192,4 +219,26 @@ object SessionLimiter { case l: SessionLimiterWithAccessControlListImpl => l.denyIps case _ => Set.empty } + + def resetIpAllowlist(limiter: SessionLimiter, ipAllowlist: Set[String]): Unit = + limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.setIpAllowlist(ipAllowlist) + case _ => + } + + def getIpAllowlist(limiter: SessionLimiter): Set[String] = limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.ipAllowlist + case _ => Set.empty + } + + def resetUserAllowlist(limiter: SessionLimiter, userAllowlist: Set[String]): Unit = + limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.setUserAllowlist(userAllowlist) + case _ => + } + + def getUserAllowlist(limiter: SessionLimiter): Set[String] = limiter match { + case l: SessionLimiterWithAccessControlListImpl => l.userAllowlist + case _ => Set.empty + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala index 47c33d63e1d..bd8c99e44b9 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala @@ -214,4 +214,237 @@ class SessionLimiterSuite extends KyuubiFunSuite { limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values .foreach(c => assert(c.get() == 0)) } + + test("test session limiter with ip allowlist") { + val allowedIp = "10.0.0.1" + val blockedIp = "192.168.1.100" + val ipAllowlist = Set(allowedIp) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + ipAllowlist) + + // allowed ip should be able to connect + limiter.increment(UserIpAddress("user001", allowedIp)) + + // blocked ip should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user001", blockedIp)) + } + assert(caught.getMessage.equals( + s"Connection denied because the client ip is not in the ip allowlist." + + s" (ipAddress: $blockedIp)")) + } + + test("test session limiter ip allowlist with multiple ips") { + val allowedIp1 = "10.0.0.1" + val allowedIp2 = "10.0.0.2" + val blockedIp = "192.168.1.100" + val ipAllowlist = Set(allowedIp1, allowedIp2) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + ipAllowlist) + + // both allowed ips should be able to connect + limiter.increment(UserIpAddress("user001", allowedIp1)) + limiter.increment(UserIpAddress("user002", allowedIp2)) + + // blocked ip should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user003", blockedIp)) + } + assert(caught.getMessage.contains("not in the ip allowlist")) + } + + test("test session limiter empty ip allowlist allows all ips") { + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty) + + // when allowlist is empty, all ips should be allowed + limiter.increment(UserIpAddress("user001", "10.0.0.1")) + limiter.increment(UserIpAddress("user002", "192.168.1.100")) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } + + test("test session limiter ip deny list has higher priority than ip allowlist") { + val ip = "10.0.0.1" + val denyIps = Set(ip) + val ipAllowlist = Set(ip) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + denyIps, + ipAllowlist) + + // deny ip list check happens before allowlist check + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user001", ip)) + } + assert(caught.getMessage.equals( + s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)")) + } + + test("test refresh ip allowlist") { + val allowedIp = "10.0.0.1" + val blockedIp = "192.168.1.100" + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set(allowedIp)) + + // initially only allowedIp can connect + limiter.increment(UserIpAddress("user001", allowedIp)) + intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress("user002", blockedIp)) + } + + // refresh allowlist to include blockedIp + SessionLimiter.resetIpAllowlist(limiter, Set(allowedIp, blockedIp)) + limiter.increment(UserIpAddress("user002", blockedIp)) + + // refresh allowlist to empty (allow all) + SessionLimiter.resetIpAllowlist(limiter, Set.empty) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } + + test("test session limiter with user allowlist") { + val allowedUser = "user001" + val blockedUser = "user002" + val userAllowlist = Set(allowedUser) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + userAllowlist) + + // allowed user should be able to connect + limiter.increment(UserIpAddress(allowedUser, "10.0.0.1")) + + // blocked user should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(blockedUser, "10.0.0.1")) + } + assert(caught.getMessage.equals( + s"Connection denied because the user is not in the user allowlist." + + s" (user: $blockedUser)")) + } + + test("test session limiter user allowlist with multiple users") { + val allowedUser1 = "user001" + val allowedUser2 = "user002" + val blockedUser = "user003" + val userAllowlist = Set(allowedUser1, allowedUser2) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + userAllowlist) + + // both allowed users should be able to connect + limiter.increment(UserIpAddress(allowedUser1, "10.0.0.1")) + limiter.increment(UserIpAddress(allowedUser2, "10.0.0.2")) + + // blocked user should be denied + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(blockedUser, "192.168.1.100")) + } + assert(caught.getMessage.contains("not in the user allowlist")) + } + + test("test session limiter empty user allowlist allows all users") { + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + Set.empty) + + // when allowlist is empty, all users should be allowed + limiter.increment(UserIpAddress("user001", "10.0.0.1")) + limiter.increment(UserIpAddress("user002", "192.168.1.100")) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } + + test("test session limiter user deny list has higher priority than user allowlist") { + val user = "user001" + val denyUsers = Set(user) + val userAllowlist = Set(user) + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + denyUsers, + Set.empty, + Set.empty, + userAllowlist) + + // deny user list check happens before allowlist check + val caught = intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(user, "10.0.0.1")) + } + assert(caught.getMessage.equals( + s"Connection denied because the user is in the deny user list. (user: $user)")) + } + + test("test refresh user allowlist") { + val allowedUser = "user001" + val blockedUser = "user002" + val limiter = SessionLimiter( + 100, + 100, + 100, + Set.empty, + Set.empty, + Set.empty, + Set.empty, + Set(allowedUser)) + + // initially only allowedUser can connect + limiter.increment(UserIpAddress(allowedUser, "10.0.0.1")) + intercept[KyuubiSQLException] { + limiter.increment(UserIpAddress(blockedUser, "10.0.0.1")) + } + + // refresh allowlist to include blockedUser + SessionLimiter.resetUserAllowlist(limiter, Set(allowedUser, blockedUser)) + limiter.increment(UserIpAddress(blockedUser, "10.0.0.1")) + + // refresh allowlist to empty (allow all) + SessionLimiter.resetUserAllowlist(limiter, Set.empty) + limiter.increment(UserIpAddress("user003", "172.16.0.1")) + } }