diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml
index 953b7e3710c..3fa4e41c8a0 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml
@@ -93,6 +93,10 @@
org.testcontainers
testcontainers
+
+ org.testcontainers
+ testcontainers-minio
+
org.testcontainers
testcontainers-postgresql
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala
index 102ba0f9696..2becfab6cdf 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala
@@ -125,27 +125,33 @@ class FileBasedMetadata(fs: ObjectStore, meta: Metadata, directory: URI)
while (!lockAcquired && retries < maxAttempts) {
try {
// try to create lock file with overwrite=false for atomicity
- WithClose(fs.create(lockFilePath)) {
- case None =>
- // lock file exists, check if it's stale
- retries += 1
- if (isLockStale(lockFilePath)) {
- // remove stale lock and retry
- try {
- fs.delete(lockFilePath)
- } catch {
- case NonFatal(_) => // ignore, will retry
- }
- } else {
- // wait and retry
- Thread.sleep(LockRetryDelay.toMillis.get)
+ try {
+ fs.create(lockFilePath).foreach { out =>
+ try {
+ // write lock info for debugging - hostname + timestamp
+ val lockInfo = s"${java.net.InetAddress.getLocalHost.getHostName}:${System.currentTimeMillis()}"
+ out.write(lockInfo.getBytes(StandardCharsets.UTF_8))
+ } finally {
+ out.close()
}
- case Some(out) =>
- // write lock info for debugging - hostname + timestamp
- val lockInfo = s"${java.net.InetAddress.getLocalHost.getHostName}:${System.currentTimeMillis()}"
- out.write(lockInfo.getBytes(StandardCharsets.UTF_8))
+ lockAcquired = true
+ }
+ } catch {
+ case NonFatal(e) => logger.debug("Error writing lock file, may already exist?", e)
+ }
+
+ if (!lockAcquired) {
+ // lock file exists, check if it's stale
+ retries += 1
+ fs.modified(lockFilePath).foreach { modified =>
+ val age = System.currentTimeMillis() - modified
+ if (age > LockTimeout.toMillis.get) {
+ fs.delete(lockFilePath)
+ }
+ }
+ // wait and retry
+ Thread.sleep(LockRetryDelay.toMillis.get)
}
- lockAcquired = true
} catch {
case NonFatal(e) => throw new RuntimeException(s"Failed to acquire lock at $lockFilePath", e)
}
@@ -178,20 +184,6 @@ class FileBasedMetadata(fs: ObjectStore, meta: Metadata, directory: URI)
}
}
}
-
- /**
- * Check if a lock file is stale (older than lock timeout)
- */
- private def isLockStale(lockPath: URI): Boolean = {
- try {
- fs.modified(lockPath).forall { modified =>
- val age = System.currentTimeMillis() - modified
- age > LockTimeout.toMillis.get
- }
- } catch {
- case NonFatal(_) => true // if we can't read it, consider it stale
- }
- }
}
object FileBasedMetadata {
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/fs/S3ObjectStoreTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/fs/S3ObjectStoreTest.scala
new file mode 100644
index 00000000000..61255672276
--- /dev/null
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/fs/S3ObjectStoreTest.scala
@@ -0,0 +1,73 @@
+/***********************************************************************
+ * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Apache License, Version 2.0
+ * which accompanies this distribution and is available at
+ * https://www.apache.org/licenses/LICENSE-2.0
+ ***********************************************************************/
+
+package org.locationtech.geomesa.fs.storage.core.fs
+
+import org.apache.commons.io.IOUtils
+import org.locationtech.geomesa.fs.storage.core.FileSystemContext
+import org.locationtech.geomesa.utils.io.WithClose
+import org.slf4j.LoggerFactory
+import org.specs2.mutable.SpecificationWithJUnit
+import org.specs2.specification.BeforeAfterAll
+import org.testcontainers.containers.MinIOContainer
+import org.testcontainers.containers.output.Slf4jLogConsumer
+import org.testcontainers.utility.DockerImageName
+
+import java.net.URI
+import java.nio.charset.StandardCharsets
+
+class S3ObjectStoreTest extends SpecificationWithJUnit with BeforeAfterAll {
+
+ var minio: MinIOContainer = _
+
+ lazy val conf = Map(
+ "fs.s3.region" -> "us-east-1",
+ "fs.s3.endpoint" -> minio.getS3URL,
+ "fs.s3.access-key-id" -> minio.getUserName,
+ "fs.s3.secret-access-key" -> minio.getPassword,
+ "fs.s3.force-path-style" -> "true",
+ )
+ lazy val context = FileSystemContext(new URI("s3://geomesa/fs/"), conf, None)
+
+ override def beforeAll(): Unit = {
+ minio =
+ new MinIOContainer(
+ DockerImageName.parse("minio/minio").withTag(sys.props.getOrElse("minio.docker.tag", "RELEASE.2024-10-29T16-01-48Z")))
+ minio.start()
+ minio.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("minio")))
+ minio.execInContainer("mc", "alias", "set", "localhost", "http://localhost:9000", minio.getUserName, minio.getPassword)
+ minio.execInContainer("mc", "mb", "localhost/geomesa")
+ }
+
+ override def afterAll(): Unit = {
+ if (minio != null) {
+ minio.close()
+ }
+ }
+
+ "S3ObjectStore" should {
+ "prevent overwriting existing files in create" in {
+ WithClose(ObjectStore(context)) { fs =>
+ val file = new URI("s3://geomesa/fs/tmp.txt")
+ val first = fs.create(file).orNull
+ first must not(beNull)
+ val second = fs.create(file).orNull
+ second must not(beNull) // object doesn't exist yet as first request is still open
+ first.write("foo".getBytes(StandardCharsets.UTF_8))
+ second.write("bar".getBytes(StandardCharsets.UTF_8))
+ second.close() must not(throwAn[Exception])
+ first.close() must throwAn[Exception]
+ WithClose(fs.read(file)) { opt =>
+ opt must beSome
+ IOUtils.toString(opt.get, StandardCharsets.UTF_8) mustEqual "bar"
+ }
+ fs.create(file) must beNone
+ }
+ }
+ }
+}