diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 8ea6222e72..543500d7aa 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -43,7 +43,7 @@ jobs: not-ignore: ${{ steps.filter.outputs.not-ignore }} steps: - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v3.0.2 + - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 id: filter with: filters: | diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 225181d3cc..f64d7e7eef 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -43,7 +43,7 @@ jobs: not-ignore: ${{ steps.filter.outputs.not-ignore }} steps: - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v3.0.2 + - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 id: filter with: filters: | diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 4582dafc02..3da664d980 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -38,7 +38,7 @@ jobs: not-ignore: ${{ steps.filter.outputs.not-ignore }} steps: - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v3.0.2 + - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 id: filter with: filters: | diff --git a/.github/workflows/frontend.yml b/.github/workflows/frontend.yml index 8691618318..ce01c69e52 100644 --- a/.github/workflows/frontend.yml +++ b/.github/workflows/frontend.yml @@ -38,7 +38,7 @@ jobs: not-ignore: ${{ steps.filter.outputs.not-ignore }} steps: - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v3.0.2 + - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 id: filter with: filters: | diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 3359cde54f..ee604d4f0b 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -43,7 +43,7 @@ jobs: not-ignore: ${{ steps.filter.outputs.not-ignore }} steps: - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v3.0.2 + - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 id: filter with: filters: | diff --git a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh index 5045c78926..4d9ec7caad 100755 --- a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh +++ b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh @@ -360,7 +360,7 @@ start() { fi if [[ "${HADOOP_HOME}"x == ""x ]]; then - echo_y "WARN: HADOOP_HOME is undefined on your system env,please check it." + echo_y "WARN: HADOOP_HOME is undefined on your system env, please check it." else echo_w "Using HADOOP_HOME: ${HADOOP_HOME}" fi @@ -426,7 +426,7 @@ start_docker() { fi if [[ "${HADOOP_HOME}"x == ""x ]]; then - echo_y "WARN: HADOOP_HOME is undefined on your system env,please check it." + echo_y "WARN: HADOOP_HOME is undefined on your system env, please check it." else echo_w "Using HADOOP_HOME: ${HADOOP_HOME}" fi diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java index b1189c290d..adc87f2dd8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java @@ -129,4 +129,5 @@ public static void main(String[] args) throws IOException { break; } } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java index d7e81e5734..cebfaa0cf3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java @@ -17,8 +17,6 @@ package org.apache.streampark.console.system.authentication; -import org.apache.streampark.console.base.util.EncryptUtils; - import org.apache.shiro.authz.UnauthorizedException; import org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter; @@ -58,7 +56,7 @@ protected boolean executeLogin(ServletRequest request, ServletResponse response) HttpServletRequest httpServletRequest = (HttpServletRequest) request; String token = httpServletRequest.getHeader(TOKEN); try { - token = EncryptUtils.decrypt(token); + token = JWTUtil.decrypt(token); JWTToken jwtToken = new JWTToken(token); getSubject(request, response).login(jwtToken); return true; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTSecret.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTSecret.java new file mode 100644 index 0000000000..bb8fb73091 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTSecret.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.system.authentication; + +import org.apache.streampark.common.util.FileUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.PosixFilePermissions; +import java.security.SecureRandom; +import java.util.Base64; + +@Slf4j +public class JWTSecret { + + private static final int KEY_LENGTH = 32; + + public static byte[] getJWTSecret() { + Path keyPath = Paths.get(System.getProperty("user.home"), "streampark.jwt.key"); + File keyFile = keyPath.toFile(); + + // Try to load existing key + byte[] keyBytes = loadExistingKey(keyFile); + if (keyBytes != null) { + return keyBytes; + } + + // Generate new key + keyBytes = generateNewKey(); + saveNewKey(keyBytes, keyPath); + return keyBytes; + } + + private static byte[] loadExistingKey(File keyFile) { + if (!keyFile.exists()) { + return null; + } + + try { + String secret = FileUtils.readFile(keyFile).trim(); + byte[] keyBytes = Base64.getDecoder().decode(secret); + + if (keyBytes.length != KEY_LENGTH) { + log.error("Invalid HMAC key length: {} bytes (expected {} bytes)", keyBytes.length, KEY_LENGTH); + return null; + } + return keyBytes; + } catch (Exception e) { + log.error("Failed to read JWT key file", e); + } + // Clean up invalid file + safelyDeleteFile(keyFile); + return null; + } + + private static byte[] generateNewKey() { + byte[] key = new byte[KEY_LENGTH]; + new SecureRandom().nextBytes(key); + return key; + } + + private static void saveNewKey(byte[] keyBytes, Path keyPath) { + String encodedKey = Base64.getEncoder().encodeToString(keyBytes); + try { + // Ensure the directory exists + Files.createDirectories(keyPath.getParent()); + // Safely write to a temporary file before renaming + Path tempFile = Files.createTempFile(keyPath.getParent(), "streampark", ".tmp"); + Files.write(tempFile, encodedKey.getBytes(StandardCharsets.UTF_8)); + + // Atomically move after setting permissions + setStrictPermissions(tempFile); + Files.move(tempFile, keyPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + + } catch (Exception e) { + throw new SecurityException("Failed to generate JWT key", e); + } + } + + private static void setStrictPermissions(Path path) { + try { + Files.setPosixFilePermissions(path, + PosixFilePermissions.fromString("rw-------")); + } catch (UnsupportedOperationException e) { + log.warn("POSIX permissions not supported for {}", path); + } catch (IOException e) { + log.error("Failed to set permissions for {}", path, e); + } + } + + private static void safelyDeleteFile(File keyFile) { + try { + if (keyFile.exists() && !keyFile.delete()) { + log.warn("Failed to delete invalid key file: {}", keyFile.getAbsolutePath()); + } + } catch (SecurityException e) { + log.error("Security exception when deleting key file", e); + } + } + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java index f51c276f30..ac87684ae3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java @@ -17,7 +17,6 @@ package org.apache.streampark.console.system.authentication; -import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import org.apache.streampark.console.system.entity.User; @@ -28,6 +27,14 @@ import com.auth0.jwt.interfaces.DecodedJWT; import lombok.extern.slf4j.Slf4j; +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.Base64; import java.util.Date; import java.util.regex.Pattern; @@ -36,45 +43,25 @@ public class JWTUtil { private static Long ttlOfSecond; + private static final String ALGORITHM = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_IV_LENGTH = 12; private static final String JWT_USERID = "userId"; private static final String JWT_USERNAME = "userName"; private static final String JWT_TYPE = "type"; private static final String JWT_TIMESTAMP = "timestamp"; - /** - * verify token - * - * @param token token - * @return is valid token - */ - public static boolean verify(String token, String username, String secret) { - try { - Algorithm algorithm = Algorithm.HMAC256(secret); - JWTVerifier verifier = JWT.require(algorithm).withClaim(JWT_USERNAME, username).build(); - verifier.verify(token); - return true; - } catch (Exception ignored) { - return false; - } - } + private static byte[] JWT_KEY = JWTSecret.getJWTSecret(); // Used for HMAC256 /** get username from token */ public static String getUserName(String token) { - try { - DecodedJWT jwt = JWT.decode(token); - return jwt.getClaim(JWT_USERNAME).asString(); - } catch (Exception ignored) { - return null; - } + DecodedJWT jwt = decode(token); + return jwt != null ? jwt.getClaim(JWT_USERNAME).asString() : null; } public static Long getUserId(String token) { - try { - DecodedJWT jwt = JWT.decode(token); - return jwt.getClaim(JWT_USERID).asLong(); - } catch (Exception ignored) { - return null; - } + DecodedJWT jwt = decode(token); + return jwt != null ? jwt.getClaim(JWT_USERID).asLong() : null; } /** @@ -82,12 +69,8 @@ public static Long getUserId(String token) { * @return */ public static Long getTimestamp(String token) { - try { - DecodedJWT jwt = JWT.decode(token); - return jwt.getClaim(JWT_TIMESTAMP).asLong(); - } catch (Exception ignored) { - return 0L; - } + DecodedJWT jwt = decode(token); + return jwt != null ? jwt.getClaim(JWT_TIMESTAMP).asLong() : 0L; } /** @@ -95,13 +78,12 @@ public static Long getTimestamp(String token) { * @return */ public static AuthenticationType getAuthType(String token) { - try { - DecodedJWT jwt = JWT.decode(token); - int type = jwt.getClaim(JWT_TYPE).asInt(); - return AuthenticationType.of(type); - } catch (Exception ignored) { + DecodedJWT jwt = decode(token); + if (jwt == null) { return null; } + int type = jwt.getClaim(JWT_TYPE).asInt(); + return AuthenticationType.of(type); } /** @@ -125,7 +107,7 @@ public static String sign(User user, AuthenticationType authType) throws Excepti */ public static String sign(User user, AuthenticationType authType, Long expireTime) throws Exception { Date date = new Date(expireTime); - Algorithm algorithm = Algorithm.HMAC256(user.getPassword()); + Algorithm algorithm = Algorithm.HMAC256(JWT_KEY); JWTCreator.Builder builder = JWT.create() @@ -139,7 +121,7 @@ public static String sign(User user, AuthenticationType authType, Long expireTim } String token = builder.sign(algorithm); - return EncryptUtils.encrypt(token); + return encrypt(token); } public static Long getTTLOfSecond() { @@ -167,4 +149,77 @@ public static Long getTTLOfSecond() { } return ttlOfSecond; } + + private static DecodedJWT decode(String token) { + try { + Algorithm algorithm = Algorithm.HMAC256(JWT_KEY); + JWTVerifier verifier = JWT.require(algorithm).build(); + return verifier.verify(token); + } catch (Exception e) { + return null; + } + } + + public static boolean verify(String token) { + try { + // Decode the signing key using Base64 + Algorithm algorithm = Algorithm.HMAC256(JWT_KEY); + JWTVerifier verifier = JWT.require(algorithm).build(); + verifier.verify(token); + return true; + } catch (Exception e) { + log.warn("Invalid JWT: {}", e.getMessage()); + return false; + } + } + + /** + * Encrypts the given content using AES-GCM with a randomly generated IV. + * The IV is prepended to the ciphertext and the result is Base64-encoded. + * This allows the decrypt method to extract the IV and correctly decrypt the content. + * + * @param content the plaintext string to encrypt + * @return the Base64-encoded string containing the IV and ciphertext + * @throws Exception if encryption fails + */ + public static String encrypt(String content) throws Exception { + // Generate a random IV + byte[] iv = new byte[GCM_IV_LENGTH]; + SecureRandom.getInstanceStrong().nextBytes(iv); + + SecretKeySpec keySpec = new SecretKeySpec(JWT_KEY, "AES"); + + // Initialize the cipher + Cipher cipher = Cipher.getInstance(ALGORITHM); + cipher.init(Cipher.ENCRYPT_MODE, keySpec, new GCMParameterSpec(GCM_TAG_LENGTH, iv)); + + // Encrypt data + byte[] encrypted = cipher.doFinal(content.getBytes(StandardCharsets.UTF_8)); + + // Combine IV and ciphertext + ByteBuffer buffer = ByteBuffer.allocate(iv.length + encrypted.length); + buffer.put(iv); + buffer.put(encrypted); + + return Base64.getEncoder().encodeToString(buffer.array()); + } + + public static String decrypt(String content) throws Exception { + byte[] data = Base64.getDecoder().decode(content); + ByteBuffer buffer = ByteBuffer.wrap(data); + + byte[] iv = new byte[GCM_IV_LENGTH]; + buffer.get(iv); + byte[] encrypted = new byte[buffer.remaining()]; + buffer.get(encrypted); + + SecretKeySpec keySpec = new SecretKeySpec(JWT_KEY, "AES"); + + Cipher cipher = Cipher.getInstance(ALGORITHM); + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH, iv); + cipher.init(Cipher.DECRYPT_MODE, keySpec, spec); + + return new String(cipher.doFinal(encrypted), StandardCharsets.UTF_8); + } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java index 6ea9d88a48..6c65334d4f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java @@ -18,7 +18,6 @@ package org.apache.streampark.console.system.authentication; import org.apache.streampark.common.util.SystemPropertyUtils; -import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import org.apache.streampark.console.system.entity.AccessToken; import org.apache.streampark.console.system.entity.User; @@ -90,6 +89,12 @@ protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authent throw new AuthenticationException("the authorization token is invalid"); } + // Query user information by username + User user = userService.getByUsername(username); + if (user == null || !user.getUserId().equals(userId)) { + throw new AuthenticationException("the authorization token verification failed."); + } + switch (authType) { case SIGN: Long timestamp = JWTUtil.getTimestamp(credential); @@ -102,7 +107,7 @@ protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authent // Check whether the token belongs to the api and whether the permission is valid AccessToken accessToken = accessTokenService.getByUserId(userId); try { - String encryptToken = EncryptUtils.encrypt(credential); + String encryptToken = JWTUtil.encrypt(credential); if (accessToken == null || !accessToken.getToken().equals(encryptToken)) { throw new AuthenticationException("the openapi authorization token is invalid"); } @@ -112,7 +117,7 @@ protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authent if (AccessToken.STATUS_DISABLE.equals(accessToken.getStatus())) { throw new AuthenticationException( - "the openapi authorization token is disabled, please contact the administrator"); + "The OpenAPI authorization token is disabled. Please contact the administrator."); } if (User.STATUS_LOCK.equals(accessToken.getUserStatus())) { @@ -125,12 +130,6 @@ protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authent break; } - // Query user information by username - User user = userService.getByUsername(username); - if (user == null || !JWTUtil.verify(credential, username, user.getPassword())) { - throw new AuthenticationException("the authorization token verification failed."); - } - return new SimpleAuthenticationInfo(credential, credential, "streampark_shiro_realm"); } } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java index 6563b94e36..692afc0698 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.base.util; -import org.apache.streampark.common.constants.Constants; +import org.apache.streampark.console.system.authentication.JWTUtil; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -27,8 +27,8 @@ class EncryptUtilsTest { @Test void testEncrypt() throws Exception { String value = "apache streampark"; - String encrypt = EncryptUtils.encrypt(value, Constants.STREAM_PARK); - String decrypt = EncryptUtils.decrypt(encrypt, Constants.STREAM_PARK); + String encrypt = JWTUtil.encrypt(value); + String decrypt = JWTUtil.decrypt(encrypt); Assertions.assertEquals(value, decrypt); } } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java index c5de0d6a2f..ad802924d4 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java @@ -20,7 +20,6 @@ import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; -import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.system.authentication.JWTToken; import org.apache.streampark.console.system.authentication.JWTUtil; import org.apache.streampark.console.system.entity.AccessToken; @@ -51,14 +50,14 @@ void testCrudToken() throws Exception { // verify AccessToken accessToken = (AccessToken) restResponse.get("data"); LOG.info(accessToken.getToken()); - JWTToken jwtToken = new JWTToken(EncryptUtils.decrypt(accessToken.getToken())); + JWTToken jwtToken = new JWTToken(JWTUtil.decrypt(accessToken.getToken())); LOG.info(jwtToken.getToken()); String username = JWTUtil.getUserName(jwtToken.getToken()); Assertions.assertNotNull(username); Assertions.assertEquals("admin", username); User user = userService.getByUsername(username); Assertions.assertNotNull(user); - Assertions.assertTrue(JWTUtil.verify(jwtToken.getToken(), username, user.getPassword())); + Assertions.assertTrue(JWTUtil.verify(jwtToken.getToken())); // list AccessToken mockToken1 = new AccessToken(); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java index ff3ea0bb12..66c67d92b3 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java @@ -19,7 +19,6 @@ import org.apache.streampark.common.util.DateUtils; import org.apache.streampark.console.SpringUnitTestBase; -import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import org.apache.streampark.console.system.entity.User; @@ -47,7 +46,7 @@ void testExpireTime() throws Exception { AuthenticationType.SIGN, DateUtils.getTime(ttl, DateUtils.fullFormat(), TimeZone.getDefault())); assert token != null; - Date expiresAt = JWT.decode(EncryptUtils.decrypt(token)).getExpiresAt(); + Date expiresAt = JWT.decode(JWTUtil.decrypt(token)).getExpiresAt(); String decodeExpireTime = DateUtils.format(expiresAt, DateUtils.fullFormat(), TimeZone.getDefault()); Assertions.assertEquals(ttl, decodeExpireTime);