Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object FileUtils {
if (input == null) {
throw new RuntimeException("The inputStream can not be null")
}
input.autoClose(in => {
input.using(in => {
val b = new Array[Byte](4)
in.read(b, 0, b.length)
bytesToHexString(b)
Expand Down Expand Up @@ -174,7 +174,7 @@ object FileUtils {

@throws[IOException]
def readInputStream(in: InputStream, array: Array[Byte]): Unit = {
in.autoClose(is => {
in.using(is => {
var toRead = array.length
var ret = 0
var off = 0
Expand All @@ -196,7 +196,7 @@ object FileUtils {
val array = new Array[Byte](len.toInt)
Files
.newInputStream(file.toPath)
.autoClose(is => {
.using(is => {
readInputStream(is, array)
new String(array, StandardCharsets.UTF_8)
})
Expand All @@ -215,7 +215,7 @@ object FileUtils {
@throws[IOException]
def readEndOfFile(file: File, maxSize: Long): Array[Byte] = {
var readSize = maxSize
new RandomAccessFile(file, "r").autoClose(raFile => {
new RandomAccessFile(file, "r").using(raFile => {
if (raFile.length > maxSize) {
raFile.seek(raFile.length - maxSize)
} else if (raFile.length < maxSize) {
Expand Down Expand Up @@ -249,7 +249,7 @@ object FileUtils {
throw new IllegalArgumentException(
s"The startOffset $startOffset is great than the file length ${file.length}")
}
new RandomAccessFile(file, "r").autoClose(raFile => {
new RandomAccessFile(file, "r").using(raFile => {
val readSize = Math.min(maxSize, file.length - startOffset)
raFile.seek(startOffset)
val fileContent = new Array[Byte](readSize.toInt)
Expand All @@ -275,7 +275,7 @@ object FileUtils {
if (file.exists && file.isFile) {
Files
.lines(Paths.get(path))
.autoClose(stream =>
.using(stream =>
stream
.skip(offset)
.limit(limit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.streampark.common.util

import org.apache.streampark.common.util.Utils.close

import java.lang.{Boolean => JavaBool, Double => JavaDouble, Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}

import scala.collection.convert.{DecorateAsJava, DecorateAsScala, ToJavaImplicits, ToScalaImplicits}
import scala.language.implicitConversions
import scala.util.Try

object Implicits extends ToScalaImplicits with ToJavaImplicits with DecorateAsJava with DecorateAsScala {

Expand Down Expand Up @@ -55,17 +54,25 @@ object Implicits extends ToScalaImplicits with ToJavaImplicits with DecorateAsJa
type JavaShort = java.lang.Short

implicit class AutoCloseImplicits[T <: AutoCloseable](autoCloseable: T) {

implicit def autoClose[R](func: T => R)(implicit excFunc: Throwable => R = null): R = {
implicit def using[R](func: T => R)(implicit excFunc: Throwable => R = null): R = {
var exception: Option[Throwable] = Option.empty
try {
func(autoCloseable)
} catch {
case e: Throwable if excFunc != null => excFunc(e)
case e: Throwable =>
exception = Some(e)
if (excFunc != null) {
excFunc(e)
} else {
throw e
}
} finally {
close(autoCloseable)
Try(autoCloseable.close()).recover { case e =>
exception.foreach(originalEx => e.addSuppressed(originalEx))
throw e
}
}
}

}

implicit class StringImplicits(v: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.streampark.common.util

import org.apache.streampark.common.util.Implicits._
import org.apache.streampark.common.util.Implicits.AutoCloseImplicits

import org.apache.commons.lang3.StringUtils

Expand Down Expand Up @@ -75,7 +74,7 @@ object Utils extends Logger {
def getJarManifest(jarFile: File): jar.Manifest = {
requireCheckJarFile(jarFile.toURL)
new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile)))
.autoClose(_.getManifest)
.using(_.getManifest)
}

def getJarManClass(jarFile: File): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.funsuite.AnyFunSuite

class ImplicitsTest extends AnyFunSuite {
test(
"AutoCloseImplicits.autoClose should close the resource after execution and handle exceptions") {
"AutoCloseImplicits.using should close the resource after execution and handle exceptions") {
class MockResource extends AutoCloseable {
var closed = false
def close(): Unit = closed = true
Expand All @@ -35,7 +35,7 @@ class ImplicitsTest extends AnyFunSuite {

val mockResource = new MockResource
assertThrows[RuntimeException] {
mockResource.autoClose(operation)
mockResource.using(operation)
}
assert(mockResource.closed)
}
Expand Down
18 changes: 0 additions & 18 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,6 @@
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-connector-plugin</artifactId>
<version>2.2.0-SNAPSHOT</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
Expand Down Expand Up @@ -612,12 +600,6 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</dependency>
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-connector-plugin</artifactId>
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/plugins</outputDirectory>
</dependency>
</artifactItems>
</configuration>
<executions>
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading