Skip to content

Commit e0bfad8

Browse files
committed
sleepytime for minio
1 parent 1f5ca76 commit e0bfad8

11 files changed

Lines changed: 306 additions & 77 deletions

File tree

.github/dependabot.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
version: 2
2+
3+
updates:
4+
- package-ecosystem: gradle
5+
directory: "/"
6+
registries:
7+
- github-packages
8+
schedule:
9+
interval: weekly
10+
commit-message:
11+
open-pull-request-limit: 20
12+
ignore:
13+
dependency-name: org.gusdb:fgputil*
14+
15+
registries:
16+
github-packages:
17+
type: maven-repository
18+
url: https://maven.pkg.github.com/veupathdb/maven-packages
19+
username: ${{ secrets.PACKAGES_USER }}
20+
password: ${{ secrets.PACKAGES_KEY }}

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
plugins {
2-
kotlin("jvm") version "2.0.20"
2+
alias(libs.plugins.kotlin)
33
}
44

55
allprojects {

gradle/libs.versions.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[plugins]
2+
kotlin = { id = "org.jetbrains.kotlin.jvm", version = "2.2.21" }
3+
dokka = { id = "org.jetbrains.dokka", version = "2.1.0" }
4+
5+
[libraries]
6+
logging = { module = "org.slf4j:slf4j-api", version = "2.0.17" }
7+
8+
jackson = { module = "org.veupathdb.lib:jackson-singleton-json", version = "4.0.5" }
9+
10+
db-pool = { module = "com.zaxxer:HikariCP", version = "5.1.0" }
11+
db-driver = { module = "org.postgresql:postgresql", version = "42.7.3" }
12+
13+
s3-api = { module = "org.veupathdb.lib.s3:s34k", version = "0.11.0" }
14+
s3-dirs = { module = "org.veupathdb.lib.s3:workspaces-java", version = "5.1.0" }
15+
16+
queue = { module = "org.veupathdb.lib:rabbit-job-queue", version = "2.0.1" }
17+
18+
metrics-prometheus-api = { module = "io.prometheus:simpleclient", version = "0.16.0" }
19+
metrics-prometheus-client = { module = "io.prometheus:simpleclient_common", version = "0.16.0" }
20+
21+
util-hashid = { module = "org.veupathdb.lib:hash-id", version = "1.1.0" }
22+
util-cache = { module = "com.github.ben-manes.caffeine:caffeine", version = "3.1.8" }
23+
24+
25+
[bundles]
26+
database = ["db-pool", "db-driver"]
27+
metrics = ["metrics-prometheus-api", "metrics-prometheus-client"]
28+
minio = ["s3-api", "s3-dirs"]

lib/build.gradle.kts

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,27 @@
1+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
2+
13
plugins {
2-
kotlin("jvm")
3-
id("org.jetbrains.dokka") version "1.9.20"
4+
alias(libs.plugins.kotlin)
5+
alias(libs.plugins.dokka)
46
`maven-publish`
57
}
68

79
group = "org.veupathdb.lib"
8-
version = "1.8.5"
10+
version = "1.8.6"
911

1012

1113
dependencies {
12-
// Logging
13-
api("org.slf4j:slf4j-api:1.7.36")
14-
15-
// Jackson
16-
implementation("org.veupathdb.lib:jackson-singleton:3.2.1")
17-
18-
// DB
19-
implementation("com.zaxxer:HikariCP:5.1.0")
20-
implementation("org.postgresql:postgresql:42.7.3")
14+
api(libs.logging)
15+
api(libs.util.hashid)
16+
api(libs.bundles.minio)
2117

22-
// S3
23-
api("org.veupathdb.lib.s3:s34k:0.11.0")
24-
api("org.veupathdb.lib.s3:workspaces-java:5.1.0")
18+
implementation(libs.bundles.database)
19+
implementation(libs.bundles.metrics)
2520

26-
// Rabbit
27-
implementation("org.veupathdb.lib:rabbit-job-queue:2.0.1")
21+
implementation(libs.jackson)
22+
implementation(libs.queue)
23+
implementation(libs.util.cache)
2824

29-
// Metrics
30-
implementation("io.prometheus:simpleclient:0.16.0")
31-
implementation("io.prometheus:simpleclient_common:0.16.0")
32-
33-
// Misc & Utils
34-
api("org.veupathdb.lib:hash-id:1.1.0")
35-
implementation("com.github.ben-manes.caffeine:caffeine:3.1.8") // Used for self-expiring cache.
36-
37-
// Testing
3825
testImplementation(kotlin("test"))
3926
}
4027

@@ -108,3 +95,8 @@ publishing {
10895
}
10996
}
11097
}
98+
99+
val compileKotlin: KotlinCompile by tasks
100+
compileKotlin.compilerOptions {
101+
freeCompilerArgs.set(listOf("-Xcontext-parameters"))
102+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.veupathdb.lib.compute.platform.intern.minio
2+
3+
import org.slf4j.LoggerFactory
4+
import org.veupathdb.lib.compute.platform.intern.minio.MinIOHax.sleepCountDuration
5+
6+
@Suppress("NOTHING_TO_INLINE")
7+
internal sealed class AbstractMinIOHack {
8+
companion object {
9+
private const val MaxDeleteAttempts = 5
10+
}
11+
12+
protected val log = LoggerFactory.getLogger(javaClass)!!
13+
14+
protected inline fun <T> withRetries(noinline msg: () -> String, noinline fn: () -> T): T =
15+
context(log) { MinIOHax.withRetries(msg, fn) }
16+
17+
protected inline fun <T> withRetries(count: Int, noinline msg: () -> String, noinline fn: () -> T): T =
18+
context(log) { MinIOHax.withRetries(count, msg, fn) }
19+
20+
protected inline fun sleep() =
21+
context(log) { MinIOHax.sleep() }
22+
23+
protected inline fun delete(
24+
path: String,
25+
noinline deleteFn: () -> Unit,
26+
noinline statFn: () -> Boolean,
27+
) = context(log) { MinIOHax.delete(path, deleteFn, statFn) }
28+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.veupathdb.lib.compute.platform.intern.minio
2+
3+
import org.veupathdb.lib.s3.workspaces.java.SubS3Workspace
4+
5+
internal class FuglyMinIOSubWorkspace(delegate: SubS3Workspace, override val parent: FuglyMinIOWorkspace)
6+
: FuglyMinIOWorkspace(delegate)
7+
, SubS3Workspace
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.veupathdb.lib.compute.platform.intern.minio
2+
3+
import org.veupathdb.lib.hash_id.HashID
4+
import org.veupathdb.lib.s3.workspaces.java.S3Workspace
5+
import java.io.File
6+
7+
internal open class FuglyMinIOWorkspace(private val delegate: S3Workspace)
8+
: AbstractMinIOHack()
9+
, S3Workspace by delegate
10+
{
11+
override fun exists() =
12+
withRetries({ "test for workspace $path" }, delegate::exists)
13+
14+
override fun open(path: String) =
15+
withRetries({ "open object ${objectPath(path)}" }) { delegate.open(path) }
16+
17+
override fun get(path: String) =
18+
withRetries({ "get object ${objectPath(path)}" }) { delegate[path] }
19+
?.let(::FuglyMinIOWorkspaceFile)
20+
21+
override fun copy(from: String, to: File) =
22+
withRetries({ "copy object ${objectPath(path)} to file ${to.path}" }) { delegate.copy(from, to) }
23+
24+
override fun files() =
25+
withRetries({ "list files in workspace $path" }, delegate::files)
26+
.map(::FuglyMinIOWorkspaceFile)
27+
28+
override fun contains(path: String) =
29+
withRetries({ "stat object ${objectPath(path)}" }) { path in delegate }
30+
31+
override fun delete() =
32+
delete(path, delegate::delete, ::exists)
33+
34+
override fun hasSubWorkspace(id: HashID) =
35+
withRetries({ "test for sub-workspace ${objectPath(id.string)}" }) { delegate.hasSubWorkspace(id) }
36+
37+
override fun openSubWorkspace(id: HashID) =
38+
withRetries({ "open sub-workspace ${objectPath(id.string)}" }) { delegate.openSubWorkspace(id) }
39+
?.let { FuglyMinIOSubWorkspace(it, this@FuglyMinIOWorkspace) }
40+
41+
override fun createSubWorkspace(id: HashID) =
42+
FuglyMinIOSubWorkspace(
43+
withRetries({ "create sub-workspace ${objectPath(id.string)}" }) { delegate.createSubWorkspace(id) },
44+
this@FuglyMinIOWorkspace,
45+
)
46+
47+
@Suppress("NOTHING_TO_INLINE")
48+
private inline fun objectPath(name: String) = "$path/$name"
49+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.veupathdb.lib.compute.platform.intern.minio
2+
3+
import org.veupathdb.lib.hash_id.HashID
4+
import org.veupathdb.lib.s3.workspaces.java.S3WorkspaceFactory
5+
6+
/**
7+
* Now with more hacks!!
8+
*/
9+
internal class FuglyMinIOWorkspaceFactory(private val delegate: S3WorkspaceFactory): AbstractMinIOHack() {
10+
11+
fun get(jobID: HashID) =
12+
withRetries({ "open workspace for job $jobID" }) { delegate.get(jobID) }
13+
?.let(::FuglyMinIOWorkspace)
14+
15+
fun create(jobID: HashID) =
16+
FuglyMinIOWorkspace(withRetries({ "create workspace for job $jobID" }) { delegate.create(jobID) })
17+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.veupathdb.lib.compute.platform.intern.minio
2+
3+
import org.veupathdb.lib.s3.workspaces.java.WorkspaceFile
4+
import java.io.File
5+
6+
internal class FuglyMinIOWorkspaceFile(private val delegate: WorkspaceFile): AbstractMinIOHack(),
7+
WorkspaceFile by delegate {
8+
override fun size() =
9+
withRetries({ "stat object $absolutePath" }, delegate::size)
10+
11+
override fun open() =
12+
withRetries({ "open object $absolutePath" }, delegate::open)
13+
14+
override fun download(localFile: File) =
15+
withRetries({ "download object $absolutePath to file ${localFile.path}" }) { delegate.download(localFile) }
16+
17+
override fun delete() =
18+
delete(absolutePath, delegate::delete, ::stat)
19+
20+
// workaround to check for object existence. If the object doesn't exist, the
21+
// underlying method throws an NPE when attempting to access the file size
22+
// property.
23+
private fun stat() =
24+
try {
25+
size()
26+
true
27+
} catch (_: NullPointerException) {
28+
false
29+
}
30+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.veupathdb.lib.compute.platform.intern.minio
2+
3+
import org.slf4j.Logger
4+
import org.veupathdb.lib.s3.s34k.errors.S34KError
5+
import org.veupathdb.lib.s3.s34k.objects.S3Object
6+
import kotlin.time.Duration.Companion.milliseconds
7+
8+
@Suppress("NOTHING_TO_INLINE")
9+
internal object MinIOHax {
10+
var MaxDeleteAttempts = 5
11+
12+
var SleepMillis = 1_000L
13+
14+
var RetryCount = 3
15+
16+
fun sleepCountDuration(sleepCount: Int) = (sleepCount * SleepMillis).milliseconds
17+
18+
context(log: Logger)
19+
fun <T> withRetries(actionMsg: () -> String, fn: () -> T) =
20+
withRetries(RetryCount, actionMsg, fn)
21+
22+
context(log: Logger)
23+
fun <T> withRetries(retries: Int, actionMsg: () -> String, fn: () -> T): T {
24+
var lastError: S34KError? = null
25+
26+
for (i in 1..retries) {
27+
try {
28+
return fn()
29+
} catch (e: S34KError) {
30+
log.warn("failed {} time(s) to {}", i, actionMsg())
31+
32+
if (lastError != null)
33+
e.addSuppressed(lastError)
34+
35+
lastError = e
36+
37+
sleep()
38+
}
39+
}
40+
41+
log.error("failed {} time(s) to {}", RetryCount, actionMsg())
42+
throw lastError!!
43+
}
44+
45+
context(log: Logger)
46+
fun sleep() {
47+
log.debug("sleeping for {}", { SleepMillis.milliseconds })
48+
Thread.sleep(SleepMillis)
49+
}
50+
51+
context(log: Logger)
52+
fun delete(obj: S3Object) = delete(obj.path, obj::delete, obj::exists)
53+
54+
context(log: Logger)
55+
fun delete(path: String, deleteFn: () -> Unit, statFn: () -> Boolean) {
56+
// Tell minio to delete the object
57+
deleteFn()
58+
59+
// Sleep for a bit to let minio ponder the delete request
60+
sleep()
61+
62+
// If minio is being particularly dim, give it some more time to work it out
63+
for (i in 1..MaxDeleteAttempts) {
64+
if (!statFn())
65+
return
66+
67+
log.debug("waiting for object {} deletion: {}", path, sleepCountDuration(i))
68+
sleep()
69+
}
70+
71+
log.error("waited {} for object {} deletion, but MinIO reports that it still exists", sleepCountDuration(MaxDeleteAttempts), path)
72+
throw RuntimeException("object deletion timeout for MinIO object $path")
73+
}
74+
}

0 commit comments

Comments
 (0)