Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-minio</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,33 @@ class FileBasedMetadata(fs: ObjectStore, meta: Metadata, directory: URI)
while (!lockAcquired && retries < maxAttempts) {
try {
// try to create lock file with overwrite=false for atomicity
WithClose(fs.create(lockFilePath)) {
case None =>
// lock file exists, check if it's stale
retries += 1
if (isLockStale(lockFilePath)) {
// remove stale lock and retry
try {
fs.delete(lockFilePath)
} catch {
case NonFatal(_) => // ignore, will retry
}
} else {
// wait and retry
Thread.sleep(LockRetryDelay.toMillis.get)
try {
fs.create(lockFilePath).foreach { out =>
try {
// write lock info for debugging - hostname + timestamp
val lockInfo = s"${java.net.InetAddress.getLocalHost.getHostName}:${System.currentTimeMillis()}"
out.write(lockInfo.getBytes(StandardCharsets.UTF_8))
} finally {
out.close()
}
case Some(out) =>
// write lock info for debugging - hostname + timestamp
val lockInfo = s"${java.net.InetAddress.getLocalHost.getHostName}:${System.currentTimeMillis()}"
out.write(lockInfo.getBytes(StandardCharsets.UTF_8))
lockAcquired = true
}
} catch {
case NonFatal(e) => logger.debug("Error writing lock file, may already exist?", e)
}

if (!lockAcquired) {
// lock file exists, check if it's stale
retries += 1
fs.modified(lockFilePath).foreach { modified =>
val age = System.currentTimeMillis() - modified
if (age > LockTimeout.toMillis.get) {
fs.delete(lockFilePath)
}
}
// wait and retry
Thread.sleep(LockRetryDelay.toMillis.get)
}
lockAcquired = true
} catch {
case NonFatal(e) => throw new RuntimeException(s"Failed to acquire lock at $lockFilePath", e)
}
Expand Down Expand Up @@ -178,20 +184,6 @@ class FileBasedMetadata(fs: ObjectStore, meta: Metadata, directory: URI)
}
}
}

/**
* Check if a lock file is stale (older than lock timeout)
*/
private def isLockStale(lockPath: URI): Boolean = {
try {
fs.modified(lockPath).forall { modified =>
val age = System.currentTimeMillis() - modified
age > LockTimeout.toMillis.get
}
} catch {
case NonFatal(_) => true // if we can't read it, consider it stale
}
}
}

object FileBasedMetadata {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/***********************************************************************
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* https://www.apache.org/licenses/LICENSE-2.0
***********************************************************************/

package org.locationtech.geomesa.fs.storage.core.fs

import org.apache.commons.io.IOUtils
import org.locationtech.geomesa.fs.storage.core.FileSystemContext
import org.locationtech.geomesa.utils.io.WithClose
import org.slf4j.LoggerFactory
import org.specs2.mutable.SpecificationWithJUnit
import org.specs2.specification.BeforeAfterAll
import org.testcontainers.containers.MinIOContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.utility.DockerImageName

import java.net.URI
import java.nio.charset.StandardCharsets

class S3ObjectStoreTest extends SpecificationWithJUnit with BeforeAfterAll {

var minio: MinIOContainer = _

lazy val conf = Map(
"fs.s3.region" -> "us-east-1",
"fs.s3.endpoint" -> minio.getS3URL,
"fs.s3.access-key-id" -> minio.getUserName,
"fs.s3.secret-access-key" -> minio.getPassword,
"fs.s3.force-path-style" -> "true",
)
lazy val context = FileSystemContext(new URI("s3://geomesa/fs/"), conf, None)

override def beforeAll(): Unit = {
minio =
new MinIOContainer(
DockerImageName.parse("minio/minio").withTag(sys.props.getOrElse("minio.docker.tag", "RELEASE.2024-10-29T16-01-48Z")))
minio.start()
minio.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("minio")))
minio.execInContainer("mc", "alias", "set", "localhost", "http://localhost:9000", minio.getUserName, minio.getPassword)
minio.execInContainer("mc", "mb", "localhost/geomesa")
}

override def afterAll(): Unit = {
if (minio != null) {
minio.close()
}
}

"S3ObjectStore" should {
"prevent overwriting existing files in create" in {
WithClose(ObjectStore(context)) { fs =>
val file = new URI("s3://geomesa/fs/tmp.txt")
val first = fs.create(file).orNull
first must not(beNull)
val second = fs.create(file).orNull
second must not(beNull) // object doesn't exist yet as first request is still open
first.write("foo".getBytes(StandardCharsets.UTF_8))
second.write("bar".getBytes(StandardCharsets.UTF_8))
second.close() must not(throwAn[Exception])
first.close() must throwAn[Exception]
WithClose(fs.read(file)) { opt =>
opt must beSome
IOUtils.toString(opt.get, StandardCharsets.UTF_8) mustEqual "bar"
}
fs.create(file) must beNone
}
}
}
}
Loading