|
| 1 | +package io.ably.lib.test.helper |
| 2 | + |
| 3 | +import kotlinx.coroutines.Dispatchers |
| 4 | +import kotlinx.coroutines.delay |
| 5 | +import kotlinx.coroutines.sync.Mutex |
| 6 | +import kotlinx.coroutines.sync.withLock |
| 7 | +import kotlinx.coroutines.withContext |
| 8 | +import java.io.ByteArrayInputStream |
| 9 | +import java.net.URI |
| 10 | +import java.net.http.HttpClient |
| 11 | +import java.net.http.HttpRequest as JHttpRequest |
| 12 | +import java.net.http.HttpResponse as JHttpResponse |
| 13 | +import java.nio.channels.FileChannel |
| 14 | +import java.nio.file.Files |
| 15 | +import java.nio.file.Path |
| 16 | +import java.nio.file.StandardOpenOption.CREATE |
| 17 | +import java.nio.file.StandardOpenOption.TRUNCATE_EXISTING |
| 18 | +import java.nio.file.StandardOpenOption.WRITE |
| 19 | +import java.security.MessageDigest |
| 20 | +import java.util.zip.GZIPInputStream |
| 21 | + |
| 22 | +/** |
| 23 | + * Manages the lifecycle of the `uts-proxy` binary used for integration tests. |
| 24 | + * |
| 25 | + * Downloads the binary from GitHub releases on first use, caching it at |
| 26 | + * `~/.cache/uts-proxy/<version>/uts-proxy`. Safe for concurrent Gradle test workers — |
| 27 | + * a `FileLock` on `uts-proxy.lock` serialises the download across OS processes, while |
| 28 | + * a [Mutex] serialises it within the same JVM. |
| 29 | + * |
| 30 | + * Call [ensureProxy] in `@BeforeAll` / `setUpAll()` for every proxy integration test suite. |
| 31 | + */ |
| 32 | +object ProxyManager { |
| 33 | + |
| 34 | + private const val PROXY_VERSION = "v0.2.0" |
| 35 | + private const val VERSION_BARE = "0.2.0" |
| 36 | + const val CONTROL_PORT = 9100 |
| 37 | + private const val SANDBOX_HOST = "sandbox.realtime.ably-nonprod.net" |
| 38 | + private const val GITHUB_BASE = |
| 39 | + "https://github.com/ably/uts-proxy/releases/download/$PROXY_VERSION" |
| 40 | + |
| 41 | + val sandboxRealtimeHost: String = SANDBOX_HOST |
| 42 | + val sandboxRestHost: String = SANDBOX_HOST |
| 43 | + |
| 44 | + private val CHECKSUMS = mapOf( |
| 45 | + "uts-proxy_${VERSION_BARE}_darwin_amd64.tar.gz" to |
| 46 | + "4abc4bd0682b61d53889c3ad3b240b44cf942878ed9fb04e8912a48070d2666d", |
| 47 | + "uts-proxy_${VERSION_BARE}_darwin_arm64.tar.gz" to |
| 48 | + "2b95cdb5659988f54ad3d413c713f94f944e3b0014011aba2e339b9537c59b2f", |
| 49 | + "uts-proxy_${VERSION_BARE}_linux_amd64.tar.gz" to |
| 50 | + "aa6d536101ebc3bfa6870ca4cfb75be1947360dc5c1c77d7a8536baa1fee7caa", |
| 51 | + "uts-proxy_${VERSION_BARE}_linux_arm64.tar.gz" to |
| 52 | + "c8f9363ae579508004727175a098bd0b73518ee3f08cf9071b0c372f8199767a", |
| 53 | + ) |
| 54 | + |
| 55 | + private val os: String by lazy { |
| 56 | + val name = System.getProperty("os.name").lowercase() |
| 57 | + when { |
| 58 | + name.contains("mac") -> "darwin" |
| 59 | + name.contains("linux") -> "linux" |
| 60 | + else -> error("Unsupported OS for uts-proxy: ${System.getProperty("os.name")}") |
| 61 | + } |
| 62 | + } |
| 63 | + |
| 64 | + private val arch: String by lazy { |
| 65 | + when (System.getProperty("os.arch").lowercase()) { |
| 66 | + "amd64", "x86_64" -> "amd64" |
| 67 | + "aarch64", "arm64" -> "arm64" |
| 68 | + else -> error("Unsupported arch for uts-proxy: ${System.getProperty("os.arch")}") |
| 69 | + } |
| 70 | + } |
| 71 | + |
| 72 | + private val archiveName: String get() = "uts-proxy_${VERSION_BARE}_${os}_${arch}.tar.gz" |
| 73 | + |
| 74 | + private val cacheDir: Path |
| 75 | + get() = Path.of(System.getProperty("user.home"), ".cache", "uts-proxy", PROXY_VERSION) |
| 76 | + |
| 77 | + private val binaryPath: Path get() = cacheDir.resolve("uts-proxy") |
| 78 | + |
| 79 | + @Volatile private var proxyProcess: Process? = null |
| 80 | + private val mutex = Mutex() |
| 81 | + private val httpClient: HttpClient = HttpClient.newHttpClient() |
| 82 | + |
| 83 | + /** |
| 84 | + * Ensures the `uts-proxy` process is running on [CONTROL_PORT]. |
| 85 | + * |
| 86 | + * If the proxy is already healthy (e.g. started by a previous test class in the same run), |
| 87 | + * this is a no-op. Otherwise it downloads + verifies the binary and starts the process. |
| 88 | + * |
| 89 | + * @param timeoutMs Maximum real-time milliseconds to wait for the process to become healthy. |
| 90 | + */ |
| 91 | + suspend fun ensureProxy(timeoutMs: Int = 15_000): Unit = mutex.withLock { |
| 92 | + if (isHealthy()) return |
| 93 | + ensureBinary() |
| 94 | + proxyProcess = withContext(Dispatchers.IO) { |
| 95 | + ProcessBuilder(binaryPath.toString(), "--port", "$CONTROL_PORT") |
| 96 | + .redirectErrorStream(true) |
| 97 | + .redirectOutput(ProcessBuilder.Redirect.DISCARD) |
| 98 | + .start() |
| 99 | + } |
| 100 | + waitForHealth(timeoutMs.toLong()) |
| 101 | + } |
| 102 | + |
| 103 | + /** |
| 104 | + * No-op retained for Dart API compatibility. |
| 105 | + * The proxy process is shared for the lifetime of the test suite and exits with the JVM. |
| 106 | + */ |
| 107 | + fun stopProxy() = Unit |
| 108 | + |
| 109 | + // ── Internal ────────────────────────────────────────────────────────────── |
| 110 | + |
| 111 | + internal suspend fun isHealthy(): Boolean = runCatching { |
| 112 | + withContext(Dispatchers.IO) { |
| 113 | + val req = JHttpRequest.newBuilder() |
| 114 | + .uri(URI.create("http://localhost:$CONTROL_PORT/health")) |
| 115 | + .GET().build() |
| 116 | + httpClient.send(req, JHttpResponse.BodyHandlers.ofString()).statusCode() == 200 |
| 117 | + } |
| 118 | + }.getOrDefault(false) |
| 119 | + |
| 120 | + private suspend fun waitForHealth(timeoutMs: Long) { |
| 121 | + val deadline = System.currentTimeMillis() + timeoutMs |
| 122 | + while (System.currentTimeMillis() < deadline) { |
| 123 | + if (isHealthy()) return |
| 124 | + delay(200) |
| 125 | + } |
| 126 | + proxyProcess?.destroyForcibly() |
| 127 | + proxyProcess = null |
| 128 | + error("uts-proxy did not become healthy within ${timeoutMs}ms") |
| 129 | + } |
| 130 | + |
| 131 | + /** Ensures the binary is present in the cache, downloading and extracting if needed. */ |
| 132 | + private suspend fun ensureBinary() = withContext(Dispatchers.IO) { |
| 133 | + Files.createDirectories(cacheDir) |
| 134 | + // FileLock serialises across multiple Gradle test worker JVMs. |
| 135 | + val lockFile = cacheDir.resolve("uts-proxy.lock") |
| 136 | + FileChannel.open(lockFile, CREATE, WRITE).use { channel -> |
| 137 | + channel.lock().use { |
| 138 | + val file = binaryPath.toFile() |
| 139 | + if (file.exists() && sha256Hex(file.readBytes()) == CHECKSUMS[archiveName]) { |
| 140 | + return@withContext // already cached and valid |
| 141 | + } |
| 142 | + val archiveBytes = downloadArchive() |
| 143 | + verifyChecksum(archiveBytes) |
| 144 | + val binary = extractFromTarGz(archiveBytes) |
| 145 | + Files.write(binaryPath, binary, CREATE, TRUNCATE_EXISTING) |
| 146 | + binaryPath.toFile().setExecutable(true) |
| 147 | + } |
| 148 | + } |
| 149 | + } |
| 150 | + |
| 151 | + private fun downloadArchive(): ByteArray { |
| 152 | + System.err.println("Downloading uts-proxy $PROXY_VERSION ($archiveName)…") |
| 153 | + val req = JHttpRequest.newBuilder() |
| 154 | + .uri(URI.create("$GITHUB_BASE/$archiveName")) |
| 155 | + .GET().build() |
| 156 | + val resp = httpClient.send(req, JHttpResponse.BodyHandlers.ofByteArray()) |
| 157 | + check(resp.statusCode() == 200) { |
| 158 | + "Failed to download uts-proxy from $GITHUB_BASE/$archiveName: HTTP ${resp.statusCode()}" |
| 159 | + } |
| 160 | + return resp.body() |
| 161 | + } |
| 162 | + |
| 163 | + private fun verifyChecksum(bytes: ByteArray) { |
| 164 | + val expected = CHECKSUMS[archiveName] |
| 165 | + ?: error("No checksum for $archiveName — unsupported platform/arch") |
| 166 | + val actual = sha256Hex(bytes) |
| 167 | + check(actual == expected) { |
| 168 | + "Checksum mismatch for $archiveName: expected $expected, got $actual" |
| 169 | + } |
| 170 | + } |
| 171 | + |
| 172 | + private fun sha256Hex(bytes: ByteArray): String = |
| 173 | + MessageDigest.getInstance("SHA-256") |
| 174 | + .digest(bytes) |
| 175 | + .joinToString("") { "%02x".format(it) } |
| 176 | + |
| 177 | + /** |
| 178 | + * Extracts the `uts-proxy` binary from a `.tar.gz` archive using only JDK stdlib. |
| 179 | + * |
| 180 | + * TAR format: sequential 512-byte header blocks each followed by file-data blocks |
| 181 | + * (padded to a multiple of 512). We parse only the fields we need: |
| 182 | + * - offset 0–99 : filename (null-terminated) |
| 183 | + * - offset 124–135: file size in octal ASCII |
| 184 | + * - offset 156 : entry type ('0'/NUL = regular file, '5' = directory, …) |
| 185 | + */ |
| 186 | + private fun extractFromTarGz(archiveBytes: ByteArray): ByteArray { |
| 187 | + GZIPInputStream(ByteArrayInputStream(archiveBytes)).use { gzip -> |
| 188 | + val headerBuf = ByteArray(512) |
| 189 | + while (true) { |
| 190 | + // Read one header block (exactly 512 bytes) |
| 191 | + var totalRead = 0 |
| 192 | + while (totalRead < 512) { |
| 193 | + val n = gzip.read(headerBuf, totalRead, 512 - totalRead) |
| 194 | + if (n < 0) break |
| 195 | + totalRead += n |
| 196 | + } |
| 197 | + // End-of-archive: two consecutive zero-filled 512-byte blocks |
| 198 | + if (totalRead < 512 || headerBuf.all { it == 0.toByte() }) break |
| 199 | + |
| 200 | + // Filename (null-terminated, strip leading ./ or /) |
| 201 | + val nameEnd = (0 until 100).firstOrNull { headerBuf[it] == 0.toByte() } ?: 100 |
| 202 | + val name = String(headerBuf, 0, nameEnd).trimStart('.', '/') |
| 203 | + |
| 204 | + // File size (octal ASCII at offset 124, 12 bytes) |
| 205 | + val sizeStr = String(headerBuf, 124, 12).trimEnd(' |