diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml index 953b7e3710c..3fa4e41c8a0 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/pom.xml @@ -93,6 +93,10 @@ org.testcontainers testcontainers + + org.testcontainers + testcontainers-minio + org.testcontainers testcontainers-postgresql diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala index 102ba0f9696..2becfab6cdf 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala @@ -125,27 +125,33 @@ class FileBasedMetadata(fs: ObjectStore, meta: Metadata, directory: URI) while (!lockAcquired && retries < maxAttempts) { try { // try to create lock file with overwrite=false for atomicity - WithClose(fs.create(lockFilePath)) { - case None => - // lock file exists, check if it's stale - retries += 1 - if (isLockStale(lockFilePath)) { - // remove stale lock and retry - try { - fs.delete(lockFilePath) - } catch { - case NonFatal(_) => // ignore, will retry - } - } else { - // wait and retry - Thread.sleep(LockRetryDelay.toMillis.get) + try { + fs.create(lockFilePath).foreach { out => + try { + // write lock info for debugging - hostname + timestamp + val lockInfo = s"${java.net.InetAddress.getLocalHost.getHostName}:${System.currentTimeMillis()}" + out.write(lockInfo.getBytes(StandardCharsets.UTF_8)) + } finally { + out.close() } - case Some(out) => - // write lock info for debugging - hostname + timestamp - val lockInfo = s"${java.net.InetAddress.getLocalHost.getHostName}:${System.currentTimeMillis()}" - out.write(lockInfo.getBytes(StandardCharsets.UTF_8)) + lockAcquired = true + } + } catch { + case NonFatal(e) => logger.debug("Error writing lock file, may already exist?", e) + } + + if (!lockAcquired) { + // lock file exists, check if it's stale + retries += 1 + fs.modified(lockFilePath).foreach { modified => + val age = System.currentTimeMillis() - modified + if (age > LockTimeout.toMillis.get) { + fs.delete(lockFilePath) + } + } + // wait and retry + Thread.sleep(LockRetryDelay.toMillis.get) } - lockAcquired = true } catch { case NonFatal(e) => throw new RuntimeException(s"Failed to acquire lock at $lockFilePath", e) } @@ -178,20 +184,6 @@ class FileBasedMetadata(fs: ObjectStore, meta: Metadata, directory: URI) } } } - - /** - * Check if a lock file is stale (older than lock timeout) - */ - private def isLockStale(lockPath: URI): Boolean = { - try { - fs.modified(lockPath).forall { modified => - val age = System.currentTimeMillis() - modified - age > LockTimeout.toMillis.get - } - } catch { - case NonFatal(_) => true // if we can't read it, consider it stale - } - } } object FileBasedMetadata { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/fs/S3ObjectStoreTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/fs/S3ObjectStoreTest.scala new file mode 100644 index 00000000000..61255672276 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/fs/S3ObjectStoreTest.scala @@ -0,0 +1,73 @@ +/*********************************************************************** + * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * https://www.apache.org/licenses/LICENSE-2.0 + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.core.fs + +import org.apache.commons.io.IOUtils +import org.locationtech.geomesa.fs.storage.core.FileSystemContext +import org.locationtech.geomesa.utils.io.WithClose +import org.slf4j.LoggerFactory +import org.specs2.mutable.SpecificationWithJUnit +import org.specs2.specification.BeforeAfterAll +import org.testcontainers.containers.MinIOContainer +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.utility.DockerImageName + +import java.net.URI +import java.nio.charset.StandardCharsets + +class S3ObjectStoreTest extends SpecificationWithJUnit with BeforeAfterAll { + + var minio: MinIOContainer = _ + + lazy val conf = Map( + "fs.s3.region" -> "us-east-1", + "fs.s3.endpoint" -> minio.getS3URL, + "fs.s3.access-key-id" -> minio.getUserName, + "fs.s3.secret-access-key" -> minio.getPassword, + "fs.s3.force-path-style" -> "true", + ) + lazy val context = FileSystemContext(new URI("s3://geomesa/fs/"), conf, None) + + override def beforeAll(): Unit = { + minio = + new MinIOContainer( + DockerImageName.parse("minio/minio").withTag(sys.props.getOrElse("minio.docker.tag", "RELEASE.2024-10-29T16-01-48Z"))) + minio.start() + minio.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("minio"))) + minio.execInContainer("mc", "alias", "set", "localhost", "http://localhost:9000", minio.getUserName, minio.getPassword) + minio.execInContainer("mc", "mb", "localhost/geomesa") + } + + override def afterAll(): Unit = { + if (minio != null) { + minio.close() + } + } + + "S3ObjectStore" should { + "prevent overwriting existing files in create" in { + WithClose(ObjectStore(context)) { fs => + val file = new URI("s3://geomesa/fs/tmp.txt") + val first = fs.create(file).orNull + first must not(beNull) + val second = fs.create(file).orNull + second must not(beNull) // object doesn't exist yet as first request is still open + first.write("foo".getBytes(StandardCharsets.UTF_8)) + second.write("bar".getBytes(StandardCharsets.UTF_8)) + second.close() must not(throwAn[Exception]) + first.close() must throwAn[Exception] + WithClose(fs.read(file)) { opt => + opt must beSome + IOUtils.toString(opt.get, StandardCharsets.UTF_8) mustEqual "bar" + } + fs.create(file) must beNone + } + } + } +}