diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 83320d137..0e407a558 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -118,12 +118,15 @@ jobs: - netcat netcurl - edit - open wopen - - store_add + - store_add parallel + - kubernetes_runtime runtime: [docker, kubernetes] exclude: # netcat / netcurl not supported for kubernetes. - test: netcat netcurl runtime: kubernetes + - test: kubernetes_runtime + runtime: docker steps: - name: Clear free space run: | diff --git a/codalab/client/json_api_client.py b/codalab/client/json_api_client.py index e58e11e2a..81d3811e4 100644 --- a/codalab/client/json_api_client.py +++ b/codalab/client/json_api_client.py @@ -490,7 +490,7 @@ def update(self, resource_type, data, params=None): ) ) # Return list iff original data was list - return result if isinstance(data, list) else result[0] + return result if isinstance(data, list) or result is None else result[0] @wrap_exception('Unable to delete {1}') def delete(self, resource_type, resource_ids, params=None): diff --git a/codalab/common.py b/codalab/common.py index 125ac4831..402fe979e 100644 --- a/codalab/common.py +++ b/codalab/common.py @@ -286,7 +286,7 @@ def _get_azure_sas_url(self, path, **kwargs): account_name=AZURE_BLOB_ACCOUNT_NAME, container_name=AZURE_BLOB_CONTAINER_NAME, account_key=AZURE_BLOB_ACCOUNT_KEY, - expiry=datetime.datetime.now() + datetime.timedelta(hours=1), + expiry=datetime.datetime.now() + datetime.timedelta(hours=10), blob_name=blob_name, ) return f"{AZURE_BLOB_HTTP_ENDPOINT}/{AZURE_BLOB_CONTAINER_NAME}/{blob_name}?{sas_token}" @@ -306,7 +306,7 @@ def _get_gcs_signed_url(self, path, **kwargs): blob = bucket.blob(blob_name) signed_url = blob.generate_signed_url( version="v4", - expiration=datetime.timedelta(hours=1), + expiration=datetime.timedelta(hours=10), method=kwargs.get("method", "GET"), # HTTP method. eg, GET, PUT content_type=kwargs.get("request_content_type", None), response_disposition=kwargs.get("content_disposition", None), diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py new file mode 100644 index 000000000..6aff0fbd9 --- /dev/null +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -0,0 +1,81 @@ +from io import BytesIO +from threading import Lock + +from codalab.worker.un_gzip_stream import BytesBuffer + + +class MultiReaderFileStream(BytesIO): + """ + FileStream that support multiple readers + """ + NUM_READERS = 2 + # MAX memory usage <= MAX_BUF_SIZE + max(num_bytes called in read) + MAX_BUF_SIZE = 1024 * 1024 * 1024 # 10 MiB for test + + def __init__(self, fileobj): + self._bufs = [BytesBuffer() for _ in range(0, self.NUM_READERS)] + self._pos = [0 for _ in range(0, self.NUM_READERS)] + self._fileobj = fileobj + self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers. + self._current_max_buf_length = 0 + + class FileStreamReader(BytesIO): + def __init__(s, index): + s._index = index + + def read(s, num_bytes=None): + return self.read(s._index, num_bytes) + + def peek(s, num_bytes): + return self.peek(s._index, num_bytes) + + self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] + + def _fill_buf_bytes(self, index: int, num_bytes=None): + with self._lock: + while num_bytes is None or len(self._bufs[index]) < num_bytes: + s = self._fileobj.read(num_bytes) + if not s: + break + + for i in range(0, self.NUM_READERS): + self._bufs[i].write(s) + self.find_largest_buffer() + + def find_largest_buffer(self): + self._current_max_buf_length = len(self._bufs[0]) + for i in range(1, self.NUM_READERS): + self._current_max_buf_length = max(self._current_max_buf_length, len(self._bufs[i])) + # print(f"find largest buffer: {self._current_max_buf_length} in thread: {threading.current_thread().name}") + + def read(self, index: int, num_bytes=None): # type: ignore + """Read the specified number of bytes from the associated file. + index: index that specifies which reader is reading. + """ + # print(f"calling read() in thread {threading.current_thread().name}, num_bytes={num_bytes}") + # busy waiting until + while(self._current_max_buf_length > self.MAX_BUF_SIZE and len(self._bufs[index]) < self._current_max_buf_length): + # only the slowest reader could read + # print(f"Busy waiting in thread: {threading.current_thread().name}, current max_len = {self._current_max_buf_length}, current_buf_size = {len(self._bufs[index])}") + pass + + # If current thread is the slowest reader, continue read. + # If current thread is the slowest reader, and num_bytes > len(self._buf[index]) / num_bytes = None, will continue grow the buffer. + # max memory usage <= MAX_BUF_SIZE + max(num_bytes called in read) + self._fill_buf_bytes(index, num_bytes) + assert self._current_max_buf_length <= 2 * self.MAX_BUF_SIZE + if num_bytes is None: + num_bytes = len(self._bufs[index]) + s = self._bufs[index].read(num_bytes) + self.find_largest_buffer() + # print("Current thread name: ", threading.current_thread().name) + self._pos[index] += len(s) + return s + + def peek(self, index: int, num_bytes): # type: ignore + self._fill_buf_bytes(index, num_bytes) + s = self._bufs[index].peek(num_bytes) + return s + + def close(self): + self.__input.close() diff --git a/codalab/lib/beam/SQLiteIndexedTar.py b/codalab/lib/beam/SQLiteIndexedTar.py new file mode 100644 index 000000000..42c231cad --- /dev/null +++ b/codalab/lib/beam/SQLiteIndexedTar.py @@ -0,0 +1,1583 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# flake8: noqa +# type: ignore + +import io +import json +import os +import re +import sqlite3 +import stat +import sys +import tarfile +import tempfile +import time +import traceback +from timeit import default_timer as timer +from typing import Any, AnyStr, cast, Dict, IO, Iterable, List, Optional, Tuple, Union +from dataclasses import dataclass + +try: + import indexed_bzip2 +except ImportError: + pass +try: + import indexed_gzip +except ImportError: + pass + +from ratarmountcore.version import __version__ +from ratarmountcore.MountSource import FileInfo, MountSource +from ratarmountcore.ProgressBar import ProgressBar +from ratarmountcore.StenciledFile import StenciledFile +# from .compressions import supportedCompressions +from ratarmountcore.utils import RatarmountError, IndexNotOpenError, InvalidIndexError, CompressionError, overrides + +import collections + +CompressionInfo = collections.namedtuple( + 'CompressionInfo', ['suffixes', 'doubleSuffixes', 'moduleName', 'checkHeader', 'open'] +) + +supportedCompressions = { + 'gz': CompressionInfo( + ['gz', 'gzip'], + ['taz', 'tgz'], + 'indexed_gzip', + # lambda x: x.peek(2) == b'\x1F\x8B', + lambda x: True, + lambda x: indexed_gzip.IndexedGzipFile(fileobj=x), + ) +} + + +@dataclass +class SQLiteIndexedTarUserData: + # fmt: off + offset : int + offsetheader : int + istar : bool + issparse : bool + # fmt: on + + +class SQLiteIndexedTar(MountSource): + """ + This class reads once through the whole TAR archive and stores TAR file offsets + for all contained files in an index to support fast seeking to a given file. + """ + + # Version 0.1.0: + # - Initial version + # Version 0.2.0: + # - Add sparse support and 'offsetheader' and 'issparse' columns to the SQLite database + # - Add TAR file size metadata in order to quickly check whether the TAR changed + # - Add 'offsetheader' to the primary key of the 'files' table so that files which were + # updated in the TAR can still be accessed if necessary. + # Version 0.3.0: + # - Add arguments influencing the created index to metadata (ignore-zeros, recursive, ...) + # Version 0.4.0: + # - Added 'gzipindexes' table, which may contain multiple blobs in contrast to 'gzipindex' table. + __version__ = '0.4.0' + + def __init__( + # fmt: off + self, + tarFileName : Optional[str] = None, + fileObject : Optional[IO[bytes]] = None, + writeIndex : bool = False, + clearIndexCache : bool = False, + indexFilePath : Optional[str] = None, + indexFolders : Optional[List[str]] = None, + recursive : bool = False, + gzipSeekPointSpacing : int = 4*1024*1024, + encoding : str = tarfile.ENCODING, + stripRecursiveTarExtension : bool = False, + ignoreZeros : bool = False, + verifyModificationTime : bool = False, + parallelization : int = 1, + printDebug : int = 0, + # pylint: disable=unused-argument + **kwargs + # fmt: on + ) -> None: + """ + tarFileName : Path to the TAR file to be opened. If not specified, a fileObject must be specified. + If only a fileObject is given, the created index can't be cached (efficiently). + fileObject : A io.IOBase derived object. If not specified, tarFileName will be opened. + If it is an instance of IndexedBzip2File, IndexedGzipFile, or IndexedZstdFile, then the offset + loading and storing from and to the SQLite database is managed automatically by this class. + writeIndex : If true, then the sidecar index file will be written to a suitable location. + Will be ignored if indexFilePath is ':memory:' or if only fileObject is specified + but not tarFileName. + clearIndexCache : If true, then check all possible index file locations for the given tarFileName/fileObject + combination and delete them. This also implicitly forces a recreation of the index. + indexFilePath : Path to the index file for this TAR archive. This takes precedence over the automatically + chosen locations. If it is ':memory:', then the SQLite database will be kept in memory + and not stored to the file system at any point. + indexFolders : Specify one or multiple paths for storing .index.sqlite files. Paths will be tested for + suitability in the given order. An empty path will be interpreted as the location in which + the TAR resides. This overrides the default index fallback folder in ~/.ratarmount. + recursive : If true, then TAR files inside this archive will be recursively analyzed and added to the SQLite + index. Currently, this recursion can only break the outermost compression layer. I.e., a .tar.bz2 + file inside a tar.bz2 file can not be mounted recursively. + gzipSeekPointSpacing : This controls the frequency of gzip decoder seek points, see indexed_gzip documentation. + Larger spacings lead to less memory usage but increase the constant seek overhead. + encoding : Will be forwarded to tarfile. Specifies how filenames inside the TAR are encoded. + ignoreZeros : Will be forwarded to tarfile. Specifies to not only skip zero blocks but also blocks with + invalid data. Setting this to true can lead to some problems but is required to correctly + read concatenated tars. + stripRecursiveTarExtension : If true and if recursive is also true, then a .tar inside the current + tar will be mounted at / instead of .tar/. + verifyModificationTime : If true, then the index will be recreated automatically if the TAR archive has a more + recent modification time than the index file. + kwargs : Unused. Only for compatibility with generic MountSource interface. + """ + + # stores which parent folders were last tried to add to database and therefore do exist + self.parentFolderCache: List[Tuple[str, str]] = [] + self.sqlConnection: Optional[sqlite3.Connection] = None + self.indexFilePath = None + + # fmt: off + self.mountRecursively = recursive + self.encoding = encoding + self.stripRecursiveTarExtension = stripRecursiveTarExtension + self.ignoreZeros = ignoreZeros + self.verifyModificationTime = verifyModificationTime + self.gzipSeekPointSpacing = gzipSeekPointSpacing + self.parallelization = parallelization + self.printDebug = printDebug + self.isFileObject = fileObject is not None + # fmt: on + + # Determine an archive file name to show for debug output + self.tarFileName: str + if fileObject: + self.tarFileName = tarFileName if tarFileName else '' + else: + if tarFileName: + self.tarFileName = os.path.abspath(tarFileName) + else: + raise ValueError("At least one of tarFileName and fileObject arguments should be set!") + # If no fileObject given, then self.tarFileName is the path to the archive to open. + if not fileObject: + fileObject = open(self.tarFileName, 'rb') + fileSize = None + if fileObject.seekable(): + # print("In seekable branch") + fileObject.seek(0, io.SEEK_END) + fileSize = fileObject.tell() + fileObject.seek(0) # Even if not interested in the file size, seeking to the start might be useful. + + # rawFileObject : Only set when opening a compressed file and only kept to keep the + # compressed file handle from being closed by the garbage collector. + # tarFileObject : File object to the uncompressed (or decompressed) TAR file to read actual data out of. + # compression : Stores what kind of compression the originally specified TAR file uses. + # isTar : Can be false for the degenerated case of only a bz2 or gz file not containing a TAR + self.tarFileObject, self.rawFileObject, self.compression, self.isTar = SQLiteIndexedTar._openCompressedFile( + fileObject, gzipSeekPointSpacing, encoding, self.parallelization, printDebug=self.printDebug, filename=self.tarFileName + ) + + if not self.isTar and not self.rawFileObject: + raise RatarmountError("File object (" + str(fileObject) + ") could not be opened as a TAR file!" + str(self.isTar) + str(self.rawFileObject)) + + if self.compression == 'xz': + try: + if len(self.tarFileObject.block_boundaries) <= 1 and (fileSize is None or fileSize > 1024 * 1024): + print(f"[Warning] The specified file '{self.tarFileName}'") + print("[Warning] is compressed using xz but only contains one xz block. This makes it ") + print("[Warning] impossible to use true seeking! Please (re)compress your TAR using pixz") + print("[Warning] (see https://github.com/vasi/pixz) in order for ratarmount to do be able ") + print("[Warning] to do fast seeking to requested files.") + print("[Warning] As it is, each file access will decompress the whole TAR from the beginning!") + print() + except Exception: + pass + + # will be used for storing indexes if current path is read-only + if self.isFileObject: + possibleIndexFilePaths = [] + indexPathAsName = None + else: + possibleIndexFilePaths = [self.tarFileName + ".index.sqlite"] + indexPathAsName = self.tarFileName.replace("/", "_") + ".index.sqlite" + + if isinstance(indexFolders, str): + indexFolders = [indexFolders] + + # A given index file name takes precedence and there should be no implicit fallback + if indexFilePath: + if indexFilePath == ':memory:': + possibleIndexFilePaths = [] + else: + possibleIndexFilePaths = [os.path.abspath(os.path.expanduser(indexFilePath))] + elif indexFolders: + # An empty path is to be interpreted as the default path right besides the TAR + if '' not in indexFolders: + possibleIndexFilePaths = [] + + if indexPathAsName: + for folder in indexFolders: + if folder: + indexPath = os.path.join(folder, indexPathAsName) + possibleIndexFilePaths.append(os.path.abspath(os.path.expanduser(indexPath))) + else: + writeIndex = False + elif self.isFileObject: + writeIndex = False + + if clearIndexCache: + for indexPath in possibleIndexFilePaths: + if os.path.isfile(indexPath): + os.remove(indexPath) + + # Try to find an already existing index + for indexPath in possibleIndexFilePaths: + if self._tryLoadIndex(indexPath): + self.indexFilePath = indexPath + break + if self.indexIsLoaded() and self.sqlConnection: + try: + indexVersion = self.sqlConnection.execute( + "SELECT major,minor FROM versions WHERE name == 'index';" + ).fetchone() + + if indexVersion and indexVersion > __version__: + print("[Warning] The loaded index was created with a newer version of ratarmount.") + print("[Warning] If there are any problems, please update ratarmount or recreate the index") + print("[Warning] with this ratarmount version using the --recreate-index option!") + except Exception: + pass + + self._loadOrStoreCompressionOffsets() + self._reloadIndexReadOnly() + return + + # Find a suitable (writable) location for the index database + if writeIndex and indexFilePath != ':memory:': + for indexPath in possibleIndexFilePaths: + if self._pathIsWritable(indexPath, printDebug=self.printDebug) and self._pathCanBeUsedForSqlite( + indexPath, printDebug=self.printDebug + ): + self.indexFilePath = indexPath + break + + if not self.indexFilePath: + raise InvalidIndexError( + "Could not find any existing index or writable location for an index in " + + str(possibleIndexFilePaths) + ) + + self._createIndex(self.tarFileObject) + self._loadOrStoreCompressionOffsets() # store + if self.sqlConnection: + self._storeMetadata(self.sqlConnection) + self._reloadIndexReadOnly() + + if self.printDebug >= 1 and self.indexFilePath and os.path.isfile(self.indexFilePath): + # The 0-time is legacy for the automated tests + # fmt: off + print("Writing out TAR index to", self.indexFilePath, "took 0s", + "and is sized", os.stat( self.indexFilePath ).st_size, "B") + # fmt: on + + def __enter__(self): + return self + + def __exit__(self, exception_type, exception_value, exception_traceback): + if self.sqlConnection: + self.sqlConnection.commit() + self.sqlConnection.close() + + if self.tarFileObject: + self.tarFileObject.close() + + if self.rawFileObject: + self.tarFileObject.close() + + def _storeMetadata(self, connection: sqlite3.Connection) -> None: + self._storeVersionsMetadata(connection, printDebug=self.printDebug) + + metadataTable = """ + /* empty table whose sole existence specifies that we finished iterating the tar */ + CREATE TABLE "metadata" ( + "key" VARCHAR(65535) NOT NULL, /* e.g. "tarsize" */ + "value" VARCHAR(65535) NOT NULL /* e.g. size in bytes as integer */ + ); + """ + + connection.executescript(metadataTable) + + # All of these require the generic "metadata" table. + if not self.isFileObject: + self._storeTarMetadata(connection, self.tarFileName, printDebug=self.printDebug) + self._storeArgumentsMetadata(connection) + connection.commit() + + @staticmethod + def _storeVersionsMetadata(connection: sqlite3.Connection, printDebug: int = 0) -> None: + versionsTable = """ + /* This table's sole existence specifies that we finished iterating the tar for older ratarmount versions */ + CREATE TABLE "versions" ( + "name" VARCHAR(65535) NOT NULL, /* which component the version belongs to */ + "version" VARCHAR(65535) NOT NULL, /* free form version string */ + /* Semantic Versioning 2.0.0 (semver.org) parts if they can be specified: + * MAJOR version when you make incompatible API changes, + * MINOR version when you add functionality in a backwards compatible manner, and + * PATCH version when you make backwards compatible bug fixes. */ + "major" INTEGER, + "minor" INTEGER, + "patch" INTEGER + ); + """ + try: + connection.executescript(versionsTable) + except Exception as exception: + if printDebug >= 2: + print(exception) + print("[Warning] There was an error when adding metadata information. Index loading might not work.") + + try: + + def makeVersionRow( + versionName: str, version: str + ) -> Tuple[str, str, Optional[str], Optional[str], Optional[str]]: + versionNumbers = [re.sub('[^0-9]', '', x) for x in version.split('.')] + return ( + versionName, + version, + versionNumbers[0] if len(versionNumbers) > 0 else None, + versionNumbers[1] if len(versionNumbers) > 1 else None, + versionNumbers[2] if len(versionNumbers) > 2 else None, + ) + + versions = [ + makeVersionRow('ratarmount', __version__), + makeVersionRow('index', SQLiteIndexedTar.__version__), + ] + + for _, cinfo in supportedCompressions.items(): + if cinfo.moduleName in globals(): + module = globals()[cinfo.moduleName] + # zipfile has no __version__ attribute and PEP 396 ensuring that was rejected 2021-04-14 + # in favor of 'version' from importlib.metadata which does not even work with zipfile. + # Probably, because zipfile is a built-in module whose version would be the Python version. + # https://www.python.org/dev/peps/pep-0396/ + # The "python-xz" project is imported as an "xz" module, which complicates things because + # there is no generic way to get the "python-xz" name from the "xz" runtime module object + # and importlib.metadata.version will require "python-xz" as argument. + if hasattr(module, '__version__'): + versions += [makeVersionRow(cinfo.moduleName, module.__version__)] + + connection.executemany('INSERT OR REPLACE INTO "versions" VALUES (?,?,?,?,?)', versions) + except Exception as exception: + print("[Warning] There was an error when adding version information.") + if printDebug >= 3: + print(exception) + + @staticmethod + def _storeTarMetadata(connection: sqlite3.Connection, tarPath: AnyStr, printDebug: int = 0) -> None: + """Adds some consistency meta information to recognize the need to update the cached TAR index""" + try: + tarStats = os.stat(tarPath) + serializedTarStats = json.dumps( + {attr: getattr(tarStats, attr) for attr in dir(tarStats) if attr.startswith('st_')} + ) + connection.execute('INSERT INTO "metadata" VALUES (?,?)', ("tarstats", serializedTarStats)) + except Exception as exception: + print("[Warning] There was an error when adding file metadata.") + print("[Warning] Automatic detection of changed TAR files during index loading might not work.") + if printDebug >= 2: + print(exception) + if printDebug >= 3: + traceback.print_exc() + + def _storeArgumentsMetadata(self, connection: sqlite3.Connection) -> None: + argumentsToSave = [ + 'mountRecursively', + 'gzipSeekPointSpacing', + 'encoding', + 'stripRecursiveTarExtension', + 'ignoreZeros', + ] + + argumentsMetadata = json.dumps({argument: getattr(self, argument) for argument in argumentsToSave}) + + try: + connection.execute('INSERT INTO "metadata" VALUES (?,?)', ("arguments", argumentsMetadata)) + except Exception as exception: + if self.printDebug >= 2: + print(exception) + print("[Warning] There was an error when adding argument metadata.") + print("[Warning] Automatic detection of changed arguments files during index loading might not work.") + + @staticmethod + def _pathIsWritable(path: AnyStr, printDebug: int = 0) -> bool: + try: + folder = os.path.dirname(path) + if folder: + os.makedirs(folder, exist_ok=True) + + with open(path, 'wb') as file: + file.write(b'\0' * 1024 * 1024) + os.remove(path) + + return True + + except PermissionError: + if printDebug >= 2: + traceback.print_exc() + print("Could not create file:", path) + + except IOError: + if printDebug >= 2: + traceback.print_exc() + print("Could not create file:", path) + + return False + + @staticmethod + def _pathCanBeUsedForSqlite(path: AnyStr, printDebug: int = 0) -> bool: + fileExisted = os.path.isfile(path) + try: + folder = os.path.dirname(path) + if folder: + os.makedirs(folder, exist_ok=True) + + connection = SQLiteIndexedTar._openSqlDb(path) + connection.executescript('CREATE TABLE "files" ( "path" VARCHAR(65535) NOT NULL );') + connection.commit() + connection.close() + return True + except sqlite3.OperationalError: + if printDebug >= 2: + traceback.print_exc() + print("Could not create SQLite database at:", path) + finally: + if not fileExisted and os.path.isfile(path): + SQLiteIndexedTar._uncheckedRemove(path) + + return False + + @staticmethod + def _openSqlDb(path: AnyStr, **kwargs) -> sqlite3.Connection: + sqlConnection = sqlite3.connect(path, **kwargs) + sqlConnection.row_factory = sqlite3.Row + sqlConnection.executescript( + # Looking mode exclusive leads to a measurable speedup. E.g., find on 2k recursive files tar + # improves from ~1s to ~0.4s! + # https://blog.devart.com/increasing-sqlite-performance.html + """ + PRAGMA LOCKING_MODE = EXCLUSIVE; + PRAGMA TEMP_STORE = MEMORY; + PRAGMA JOURNAL_MODE = OFF; + PRAGMA SYNCHRONOUS = OFF; + """ + ) + return sqlConnection + + @staticmethod + def _initializeSqlDb(indexFilePath: Optional[str], printDebug: int = 0) -> sqlite3.Connection: + if printDebug >= 1: + print("Creating new SQLite index database at", indexFilePath if indexFilePath else ':memory:') + + createTables = """ + CREATE TABLE "files" ( + "path" VARCHAR(65535) NOT NULL, /* path with leading and without trailing slash */ + "name" VARCHAR(65535) NOT NULL, + "offsetheader" INTEGER, /* seek offset from TAR file where the TAR metadata for this file resides */ + "offset" INTEGER, /* seek offset from TAR file where these file's contents resides */ + "size" INTEGER, + "mtime" INTEGER, + "mode" INTEGER, + "type" INTEGER, + "linkname" VARCHAR(65535), + "uid" INTEGER, + "gid" INTEGER, + /* True for valid TAR files. Internally used to determine where to mount recursive TAR files. */ + "istar" BOOL , + "issparse" BOOL , /* for sparse files the file size refers to the expanded size! */ + /* See SQL benchmarks for decision on the primary key. + * See also https://www.sqlite.org/optoverview.html + * (path,name) tuples might appear multiple times in a TAR if it got updated. + * In order to also be able to show older versions, we need to add + * the offsetheader column to the primary key. */ + PRIMARY KEY (path,name,offsetheader) + ); + /* "A table created using CREATE TABLE AS has no PRIMARY KEY and no constraints of any kind" + * Therefore, it will not be sorted and inserting will be faster! */ + CREATE TABLE "filestmp" AS SELECT * FROM "files" WHERE 0; + CREATE TABLE "parentfolders" ( + "path" VARCHAR(65535) NOT NULL, + "name" VARCHAR(65535) NOT NULL, + "offsetheader" INTEGER, + "offset" INTEGER, + PRIMARY KEY (path,name) + UNIQUE (path,name) + ); + """ + + sqlConnection = SQLiteIndexedTar._openSqlDb(indexFilePath if indexFilePath else ':memory:') + tables = sqlConnection.execute('SELECT name FROM sqlite_master WHERE type = "table";') + if {"files", "filestmp", "parentfolders"}.intersection({t[0] for t in tables}): + raise InvalidIndexError( + f"The index file {indexFilePath} already seems to contain a table. Please specify --recreate-index." + ) + sqlConnection.executescript(createTables) + return sqlConnection + + def _reloadIndexReadOnly(self): + if not self.indexFilePath or self.indexFilePath == ':memory:' or not self.sqlConnection: + return + + self.sqlConnection.close() + self.sqlConnection = SQLiteIndexedTar._openSqlDb(f"file:{self.indexFilePath}?mode=rw", uri=True) + + @staticmethod + def _tarInfoFullMode(tarInfo: tarfile.TarInfo) -> int: + """ + Returns the full mode for a TarInfo object. Note that TarInfo.mode only contains the permission bits + and not other bits like set for directory, symbolic links, and other special files. + """ + return ( + tarInfo.mode + # fmt: off + | ( stat.S_IFDIR if tarInfo.isdir () else 0 ) + | ( stat.S_IFREG if tarInfo.isfile() else 0 ) + | ( stat.S_IFLNK if tarInfo.issym () else 0 ) + | ( stat.S_IFCHR if tarInfo.ischr () else 0 ) + | ( stat.S_IFIFO if tarInfo.isfifo() else 0 ) + # fmt: on + ) + + def _updateProgressBar(self, progressBar, fileobj: Any) -> None: + try: + if hasattr(fileobj, 'tell_compressed') and self.compression == 'bz2': + # Note that because bz2 works on a bitstream the tell_compressed returns the offset in bits + progressBar.update(fileobj.tell_compressed() // 8) + elif hasattr(fileobj, 'tell_compressed'): + progressBar.update(fileobj.tell_compressed()) + elif hasattr(fileobj, 'fileobj'): + progressBar.update(fileobj.fileobj().tell()) + elif self.rawFileObject and hasattr(self.rawFileObject, 'tell'): + progressBar.update(self.rawFileObject.tell()) + else: + progressBar.update(fileobj.tell()) + except Exception: + pass + + def _createIndex( + self, + # fmt: off + fileObject : Any, + progressBar : Any = None, + pathPrefix : str = '', + streamOffset: int = 0 + # fmt: on + ) -> None: + if self.printDebug >= 1: + print("Creating offset dictionary for", self.tarFileName, "...") + t0 = timer() + + # 1. If no SQL connection was given (by recursive call), open a new database file + openedConnection = False + if not self.indexIsLoaded() or not self.sqlConnection: + openedConnection = True + self.sqlConnection = self._initializeSqlDb(self.indexFilePath, printDebug=self.printDebug) + + # 2. Open TAR file reader + loadedTarFile: Any = [] # Feign an empty TAR file if anything goes wrong + if self.isTar: # Jiani: If the file is end with '.tar.gz', will go into this branch + try: + # r: uses seeks to skip to the next file inside the TAR while r| doesn't do any seeks. + # r| might be slower but for compressed files we have to go over all the data once anyways. + # Note that with ignore_zeros = True, no invalid header issues or similar will be raised even for + # non TAR files!? + loadedTarFile = tarfile.open( + # fmt:off + fileobj = fileObject, + mode = 'r|', + ignore_zeros = self.ignoreZeros, + encoding = self.encoding, + # fmt:on + ) + + except tarfile.ReadError: + print("[info] Can Not open the file using tar file reader") + pass + + if progressBar is None: + try: + progressBar = ProgressBar(os.fstat(fileObject.fileno()).st_size) + except io.UnsupportedOperation: + pass + # 3. Iterate over files inside TAR and add them to the database + try: + filesToMountRecursively = [] + for tarInfo in loadedTarFile: + loadedTarFile.members = [] # Clear this in order to limit memory usage by tarfile + self._updateProgressBar(progressBar, fileObject) + + # Add a leading '/' as a convention where '/' represents the TAR root folder + # Partly, done because fusepy specifies paths in a mounted directory like this + # os.normpath does not delete duplicate '/' at beginning of string! + # tarInfo.name might be identical to "." or begin with "./", which is bad! + # os.path.normpath can remove suffixed folder/./ path specifications but it can't remove + # a leading dot. + # TODO: Would be a nice function / line of code to test because it is very finicky. + # And some cases are only triggered for recursive mounts, i.e., for non-empty pathPrefix. + fullPath = "/" + os.path.normpath(pathPrefix + "/" + tarInfo.name).lstrip('/') + + # TODO: As for the tarfile type SQLite expects int but it is generally bytes. + # Most of them would be convertible to int like tarfile.SYMTYPE which is b'2', + # but others should throw errors, like GNUTYPE_SPARSE which is b'S'. + # When looking at the generated index, those values get silently converted to 0? + path, name = fullPath.rsplit("/", 1) + # fmt: off + fileInfo = ( + path , # 0 + name , # 1 + streamOffset + tarInfo.offset , # 2 + streamOffset + tarInfo.offset_data, # 3 + tarInfo.size , # 4 + tarInfo.mtime , # 5 + self._tarInfoFullMode(tarInfo) , # 6 + tarInfo.type , # 7 + tarInfo.linkname , # 8 + tarInfo.uid , # 9 + tarInfo.gid , # 10 + False , # 11 (isTar) + tarInfo.issparse() , # 12 + ) + # fmt: on + + if self.mountRecursively and tarInfo.isfile() and tarInfo.name.lower().endswith('.tar'): + filesToMountRecursively.append(fileInfo) + else: + self._setFileInfo(fileInfo) + except tarfile.ReadError as e: + if 'unexpected end of data' in str(e): + print( + "[Warning] The TAR file is incomplete. Ratarmount will work but some files might be cut off. " + "If the TAR file size changes, ratarmount will recreate the index during the next mounting." + ) + + # 4. Open contained TARs for recursive mounting + oldPos = fileObject.tell() + oldPrintName = self.tarFileName + for fileInfo in filesToMountRecursively: + # Strip file extension for mount point if so configured + modifiedName = fileInfo[1] + tarExtension = '.tar' + if ( + self.stripRecursiveTarExtension + and len(tarExtension) > 0 + and modifiedName.lower().endswith(tarExtension.lower()) + ): + modifiedName = modifiedName[: -len(tarExtension)] + + # Temporarily change tarFileName for the info output of the recursive call + self.tarFileName = os.path.join(fileInfo[0], fileInfo[1]) + + # StenciledFile's tell returns the offset inside the file chunk instead of the global one, + # so we have to always communicate the offset of this chunk to the recursive call no matter + # whether tarfile has streaming access or seeking access! + globalOffset = fileInfo[3] + size = fileInfo[4] + tarFileObject = StenciledFile(fileObject, [(globalOffset, size)]) + + isTar = False + try: + # Do not use os.path.join here because the leading / might be missing. + # This should instead be seen as the reverse operation of the rsplit further above. + self._createIndex(tarFileObject, progressBar, "/".join([fileInfo[0], modifiedName]), globalOffset) + isTar = True + except tarfile.ReadError: + pass + finally: + del tarFileObject + + if isTar: + modifiedFileInfo = list(fileInfo) + + # if the TAR file contents could be read, we need to adjust the actual + # TAR file's metadata to be a directory instead of a file + mode = modifiedFileInfo[6] + mode = ( + (mode & 0o777) + | stat.S_IFDIR + | (stat.S_IXUSR if mode & stat.S_IRUSR != 0 else 0) + | (stat.S_IXGRP if mode & stat.S_IRGRP != 0 else 0) + | (stat.S_IXOTH if mode & stat.S_IROTH != 0 else 0) + ) + + modifiedFileInfo[0] = fileInfo[0] + modifiedFileInfo[1] = modifiedName + modifiedFileInfo[6] = mode + modifiedFileInfo[11] = isTar + + self._setFileInfo(tuple(modifiedFileInfo)) + else: + self._setFileInfo(fileInfo) + + # fileObject.seek(oldPos) # Jiani: it's not seekable + self.tarFileName = oldPrintName + + # Everything below should not be done in a recursive call of createIndex + if streamOffset > 0: + t1 = timer() + if self.printDebug >= 1: + print(f"Creating offset dictionary for {self.tarFileName} took {t1 - t0:.2f}s") + return + + # If no file is in the TAR, then it most likely indicates a possibly compressed non TAR file. + # In that case add that itself to the file index. This won't work when called recursively, + # so check stream offset. + fileCount = self.sqlConnection.execute('SELECT COUNT(*) FROM "files";').fetchone()[0] + if fileCount == 0: # Jiani: For Codalab, the bundle contains only single files + # This branch is not used. + if self.printDebug >= 3: + print(f"Did not find any file in the given TAR: {self.tarFileName}. Assuming a compressed file.") + + try: + tarInfo = os.fstat(fileObject.fileno()) + except io.UnsupportedOperation: + # If fileObject doesn't have a fileno, we set tarInfo to None + # and set the relevant statistics (such as st_mtime) to sensible defaults. + tarInfo = None + fname = os.path.basename(self.tarFileName) + for suffix in ['.gz', '.bz2', '.bzip2', '.gzip', '.xz', '.zst', '.zstd']: + if fname.lower().endswith(suffix) and len(fname) > len(suffix): + fname = fname[: -len(suffix)] + break + + # If the file object is actually an IndexedBzip2File or such, we can't directly use the file size + # from os.stat and instead have to gather it from seek. Unfortunately, indexed_gzip does not support + # io.SEEK_END even though it could as it has the index ... + + + # Jiani: This branch will only be used when uploading a single file (not a directory) + # Jiani: We can not read throught the whole file because read throught large files are buggy () + fileObject.build_full_index() + # print(type(fileObject), fileObject.fileobj().tell()) + # while len(fileObject.read(1024 * 1024)) > 0: + # print("In read loop, data size: ", fileObject.fileobj().tell()) + # # self._updateProgressBar(progressBar, fileObject) + + # # Jiani: Since build_full_index() does not read + fileSize = fileObject.tell() + # fileSize = 0 + + # fmt: off + fileInfo = ( + "" , # 0 path + fname , # 1 + None , # 2 header offset + 0 , # 3 data offset + fileSize , # 4 + tarInfo.st_mtime if tarInfo else 0 , # 5 + tarInfo.st_mode if tarInfo else 0o777, # 6 + None , # 7 TAR file type. Currently unused. Overlaps with mode + None , # 8 linkname + tarInfo.st_uid if tarInfo else 0 , # 9 + tarInfo.st_gid if tarInfo else 0 , # 10 + False , # 11 isTar + False , # 12 isSparse, don't care if it is actually sparse or not because it is not in TAR + ) + # fmt: on + self._setFileInfo(fileInfo) + + # All the code below is for database finalizing which should not be done in a recursive call of createIndex! + if not openedConnection: + return + + # 5. Resort by (path,name). This one-time resort is faster than resorting on each INSERT (cache spill) + if self.printDebug >= 2: + print("Resorting files by path ...") + + try: + queriedLibSqliteVersion = sqlite3.connect(":memory:").execute("select sqlite_version();").fetchone() + libSqliteVersion = tuple(int(x) for x in queriedLibSqliteVersion[0].split('.')) + except Exception: + libSqliteVersion = (0, 0, 0) + + searchByTuple = """(path,name) NOT IN ( SELECT path,name""" + searchByConcat = """path || "/" || name NOT IN ( SELECT path || "/" || name""" + + cleanupDatabase = f""" + INSERT OR REPLACE INTO "files" SELECT * FROM "filestmp" ORDER BY "path","name",rowid; + DROP TABLE "filestmp"; + INSERT OR IGNORE INTO "files" + /* path name offsetheader offset size mtime mode type linkname uid gid istar issparse */ + SELECT path,name,offsetheader,offset,0,0,{int(0o555 | stat.S_IFDIR)},{int(tarfile.DIRTYPE)},"",0,0,0,0 + FROM "parentfolders" + WHERE {searchByTuple if libSqliteVersion >= (3,22,0) else searchByConcat} + FROM "files" WHERE mode & (1 << 14) != 0 + ) + ORDER BY "path","name"; + DROP TABLE "parentfolders"; + PRAGMA optimize; + """ + self.sqlConnection.executescript(cleanupDatabase) + + self.sqlConnection.commit() + + t1 = timer() + if self.printDebug >= 1: + print(f"Creating offset dictionary for {self.tarFileName} took {t1 - t0:.2f}s") + + @staticmethod + def _rowToFileInfo(row: Dict[str, Any]) -> FileInfo: + userData = SQLiteIndexedTarUserData( + # fmt: off + offset = row['offset'], + offsetheader = row['offsetheader'] if 'offsetheader' in row.keys() else 0, + istar = row['istar'], + issparse = row['issparse'] if 'issparse' in row.keys() else False, + # fmt: on + ) + + fileInfo = FileInfo( + # fmt: off + size = row['size'], + mtime = row['mtime'], + mode = row['mode'], + linkname = row['linkname'], + uid = row['uid'], + gid = row['gid'], + userdata = [userData], + # fmt: on + ) + + return fileInfo + + @overrides(MountSource) + def getFileInfo(self, path: str, fileVersion: int = 0) -> Optional[FileInfo]: + fileInfo = self._getFileInfo(path, fileVersion=fileVersion) + + if fileInfo is None: + return None + + assert isinstance(fileInfo, FileInfo) + return fileInfo + + def _getFileInfo( + self, + # fmt: off + fullPath : str, + listDir : bool = False, + listVersions : bool = False, + fileVersion : int = 0 + # fmt: on + ) -> Optional[Union[FileInfo, Dict[str, FileInfo]]]: + """ + This is the heart of this class' public interface! + + path : full path to file where '/' denotes TAR's root, e.g., '/', or '/foo' + listDir : if True, return a dictionary for the given directory path: { fileName : FileInfo, ... } + if False, return simple FileInfo to given path (directory or file) + fileVersion : If the TAR contains the same file path multiple times, by default only the last one is shown. + But with this argument other versions can be queried. Version 1 is the oldest one. + Version 0 translates to the most recent one for compatibility with tar --occurrence=. + Version -1 translates to the second most recent, and so on. + For listDir=True, the file version makes no sense and is ignored! + So, even if a folder was overwritten by a file, which is already not well supported by tar, + then listDir for that path will still list all contents of the overwritten folder or folders, + no matter the specified version. The file system layer has to take care that a directory + listing is not even requeted in the first place if it is not a directory. + FUSE already does this by calling getattr for all parent folders in the specified path first. + + If path does not exist, always return None + + If listVersions is true, then return metadata for all versions of a file possibly appearing more than once + in the TAR as a directory dictionary. listDir will then be ignored! + """ + # TODO cache last listDir as most often a stat over all entries will soon follow + + if not isinstance(fileVersion, int): + raise TypeError("The specified file version must be an integer!") + if not self.sqlConnection: + raise IndexNotOpenError("This method can not be called without an opened index database!") + + # also strips trailing '/' except for a single '/' and leading '/' + fullPath = '/' + os.path.normpath(fullPath).lstrip('/') + + if listVersions: + path, name = fullPath.rsplit('/', 1) + rows = self.sqlConnection.execute( + 'SELECT * FROM "files" WHERE "path" == (?) AND "name" == (?) ORDER BY "offsetheader" ASC', (path, name) + ) + result = {str(version + 1): self._rowToFileInfo(row) for version, row in enumerate(rows)} + return result + + if listDir: + # For listing directory entries the file version can't be applied meaningfully at this abstraction layer. + # E.g., should it affect the file version of the directory to list, or should it work on the listed files + # instead and if so how exactly if there aren't the same versions for all files available, ...? + # Or, are folders assumed to be overwritten by a new folder entry in a TAR or should they be union mounted? + # If they should be union mounted, like is the case now, then the folder version only makes sense for + # its attributes. + rows = self.sqlConnection.execute('SELECT * FROM "files" WHERE "path" == (?)', (fullPath.rstrip('/'),)) + directory = {} + gotResults = False + for row in rows: + gotResults = True + if row['name']: + directory[row['name']] = self._rowToFileInfo(row) + return directory if gotResults else None + + path, name = fullPath.rsplit('/', 1) + row = self.sqlConnection.execute( + f""" + SELECT * FROM "files" + WHERE "path" == (?) AND "name" == (?) + ORDER BY "offsetheader" {'DESC' if fileVersion is None or fileVersion <= 0 else 'ASC'} + LIMIT 1 OFFSET (?); + """, + (path, name, 0 if fileVersion is None else fileVersion - 1 if fileVersion > 0 else -fileVersion), + ).fetchone() + return self._rowToFileInfo(row) if row else None + + def _getFileInfoRow( + self, + # fmt: off + fullPath : str, + listDir : bool = False, + listVersions : bool = False, + fileVersion : int = 0 + # fmt: on + ) -> Optional[Union[FileInfo, Dict[str, FileInfo]]]: + """ + This file returns a fileInfo as database rows. + """ + # TODO cache last listDir as most often a stat over all entries will soon follow + + if not isinstance(fileVersion, int): + raise TypeError("The specified file version must be an integer!") + if not self.sqlConnection: + raise IndexNotOpenError("This method can not be called without an opened index database!") + + # also strips trailing '/' except for a single '/' and leading '/' + fullPath = '/' + os.path.normpath(fullPath).lstrip('/') + + if listVersions: + path, name = fullPath.rsplit('/', 1) + rows = self.sqlConnection.execute( + 'SELECT * FROM "files" WHERE "path" == (?) AND "name" == (?) ORDER BY "offsetheader" ASC', (path, name) + ) + result = {str(version + 1): self._rowToFileInfo(row) for version, row in enumerate(rows)} + return result + + if listDir: + # For listing directory entries the file version can't be applied meaningfully at this abstraction layer. + # E.g., should it affect the file version of the directory to list, or should it work on the listed files + # instead and if so how exactly if there aren't the same versions for all files available, ...? + # Or, are folders assumed to be overwritten by a new folder entry in a TAR or should they be union mounted? + # If they should be union mounted, like is the case now, then the folder version only makes sense for + # its attributes. + rows = self.sqlConnection.execute('SELECT * FROM "files" WHERE "path" == (?)', (fullPath.rstrip('/'),)) + directory = {} + gotResults = False + for row in rows: + gotResults = True + if row['name']: + directory[row['name']] = self._rowToFileInfo(row) + return directory if gotResults else None + + path, name = fullPath.rsplit('/', 1) + row = self.sqlConnection.execute( + f""" + SELECT * FROM "files" + WHERE "path" == (?) AND "name" == (?) + ORDER BY "offsetheader" {'DESC' if fileVersion is None or fileVersion <= 0 else 'ASC'} + LIMIT 1 OFFSET (?); + """, + (path, name, 0 if fileVersion is None else fileVersion - 1 if fileVersion > 0 else -fileVersion), + ).fetchone() + return row + + def isDir(self, path: str) -> bool: + """Return true if path exists and is a folder.""" + return self.listDir(path) is not None + + @overrides(MountSource) + def listDir(self, path: str) -> Optional[Iterable[str]]: + """ + Usability wrapper for getFileInfo(listDir=True) with FileInfo stripped if you are sure you don't need it. + """ + result = self._getFileInfo(path, listDir=True) + if isinstance(result, dict): + return result.keys() + return None + + @overrides(MountSource) + def fileVersions(self, path: str) -> int: + """ + Usability wrapper for getFileInfo(listVersions=True) with FileInfo stripped if you are sure you don't need it. + """ + fileVersions = self._getFileInfo(path, listVersions=True) + return len(fileVersions) if isinstance(fileVersions, dict) else 0 + + @overrides(MountSource) + def open(self, fileInfo: FileInfo) -> IO[bytes]: + assert fileInfo.userdata + tarFileInfo = fileInfo.userdata[-1] + assert isinstance(tarFileInfo, SQLiteIndexedTarUserData) + + # This is not strictly necessary but it saves two file object layers and therefore might be more performant. + # Furthermore, non-sparse files should be the much more likely case anyway. + if not tarFileInfo.issparse: + return cast(IO[bytes], StenciledFile(self.tarFileObject, [(tarFileInfo.offset, fileInfo.size)])) + + # The TAR file format is very simple. It's just a concatenation of TAR blocks. There is not even a + # global header, only the TAR block headers. That's why we can simply cut out the TAR block for + # the sparse file using StenciledFile and then use tarfile on it to expand the sparse file correctly. + tarBlockSize = tarFileInfo.offset - tarFileInfo.offsetheader + fileInfo.size + + tarSubFile = StenciledFile(self.tarFileObject, [(tarFileInfo.offsetheader, tarBlockSize)]) + # TODO It might be better to somehow call close on tarFile but the question is where and how. + # It would have to be appended to the __exit__ method of fileObject like if being decorated. + # For now this seems to work either because fileObject does not require tarFile to exist + # or because tarFile is simply not closed correctly here, I'm not sure. + # Sparse files are kinda edge-cases anyway, so it isn't high priority as long as the tests work. + tarFile = tarfile.open(fileobj=cast(IO[bytes], tarSubFile), mode='r:', encoding=self.encoding) + fileObject = tarFile.extractfile(next(iter(tarFile))) + if not fileObject: + raise CompressionError("tarfile.extractfile returned nothing!") + + return fileObject + + @overrides(MountSource) + def read(self, fileInfo: FileInfo, size: int, offset: int) -> bytes: + assert fileInfo.userdata + tarFileInfo = fileInfo.userdata[-1] + assert isinstance(tarFileInfo, SQLiteIndexedTarUserData) + + if tarFileInfo.issparse: + with self.open(fileInfo) as file: + file.seek(offset, os.SEEK_SET) + return file.read(size) + + # For non-sparse files, we can simply seek to the offset and read from it. + self.tarFileObject.seek(tarFileInfo.offset + offset, os.SEEK_SET) + return self.tarFileObject.read(size) + + def _tryAddParentFolders(self, path: str, offsetheader: int, offset: int) -> None: + # Add parent folders if they do not exist. + # E.g.: path = '/a/b/c' -> paths = [('', 'a'), ('/a', 'b'), ('/a/b', 'c')] + # Without the parentFolderCache, the additional INSERT statements increase the creation time + # from 8.5s to 12s, so almost 50% slowdown for the 8MiB test TAR! + pathParts = path.split("/") + paths = [ + p + # fmt: off + for p in ( + ( "/".join( pathParts[:i] ), pathParts[i] ) + for i in range( 1, len( pathParts ) ) + ) + # fmt: on + if p not in self.parentFolderCache + ] + if not paths: + return + + self.parentFolderCache += paths + # Assuming files in the TAR are sorted by hierarchy, the maximum parent folder cache size + # gives the maximum cacheable file nesting depth. High numbers lead to higher memory usage and lookup times. + if len(self.parentFolderCache) > 16: + self.parentFolderCache = self.parentFolderCache[-8:] + + if not self.sqlConnection: + raise IndexNotOpenError("This method can not be called without an opened index database!") + + # TODO This method is still not perfect but I do not know how to perfect it without loosing significant + # performance. Currently, adding implicit folders will fail when a file is overwritten implicitly with + # a folder and then overwritten by a file and then again overwritten by a folder. Because the parent + # folderwas already added implicitly the first time, the second time will be skipped. + # To solve this, I would have to add all parent folders for all files, which might easily explode + # the temporary database and the indexing performance by the folder depth. + # Also, I do not want to add versions for a parent folder for each implicitly added parent folder for + # each file, so I would have to sort out those in a post-processing step. E.g., sort by offsetheader + # and then clean out successive implicitly added folders as long as there is no file of the same name + # inbetween. + # The unmentioned alternative would be to lookup paths with LIKE but that is just madness because it + # will have a worse complexity of O(N) insteda of O(log(N)). + self.sqlConnection.executemany( + 'INSERT OR IGNORE INTO "parentfolders" VALUES (?,?,?,?)', + [(p[0], p[1], offsetheader, offset) for p in paths], + ) + + def _setFileInfo(self, row: tuple) -> None: + if not self.sqlConnection: + raise IndexNotOpenError("This method can not be called without an opened index database!") + + try: + self.sqlConnection.execute('INSERT OR REPLACE INTO "files" VALUES (' + ','.join('?' * len(row)) + ');', row) + except UnicodeEncodeError: + print("[Warning] Problem caused by file name encoding when trying to insert this row:", row) + print("[Warning] The file name will now be stored with the bad character being escaped") + print("[Warning] instead of being correctly interpreted.") + print("[Warning] Please specify a suitable file name encoding using, e.g., --encoding iso-8859-1!") + print("[Warning] A list of possible encodings can be found here:") + print("[Warning] https://docs.python.org/3/library/codecs.html#standard-encodings") + + checkedRow = [] + for x in list(row): # check strings + if isinstance(x, str): + try: + x.encode() + checkedRow += [x] + except UnicodeEncodeError: + # fmt: off + checkedRow += [ + x.encode( self.encoding, 'surrogateescape' ) + .decode( self.encoding, 'backslashreplace' ) + ] + # fmt: on + else: + checkedRow += [x] + + self.sqlConnection.execute( + 'INSERT OR REPLACE INTO "files" VALUES (' + ','.join('?' * len(row)) + ');', tuple(checkedRow) + ) + print("[Warning] The escaped inserted row is now:", row) + print() + + self._tryAddParentFolders(row[0], row[2], row[3]) + + def indexIsLoaded(self) -> bool: + """Returns true if the SQLite database has been opened for reading and a "files" table exists.""" + if not self.sqlConnection: + return False + + try: + self.sqlConnection.execute('SELECT * FROM "files" WHERE 0 == 1;') + except sqlite3.OperationalError: + self.sqlConnection = None + return False + + return True + + def loadIndex(self, indexFilePath: AnyStr) -> None: + """Loads the given index SQLite database and checks it for validity.""" + if self.indexIsLoaded(): + return + + t0 = time.time() + self.sqlConnection = self._openSqlDb(indexFilePath) + tables = [x[0] for x in self.sqlConnection.execute('SELECT name FROM sqlite_master WHERE type="table"')] + versions = None + try: + rows = self.sqlConnection.execute('SELECT * FROM versions;') + versions = {} + for row in rows: + versions[row[0]] = (row[2], row[3], row[4]) + except sqlite3.OperationalError: + pass + + try: + # Check indexes created with bugged bz2 decoder (bug existed when I did not store versions yet) + if 'bzip2blocks' in tables and 'versions' not in tables: + raise InvalidIndexError( + "The indexes created with version 0.3.0 through 0.3.3 for bzip2 compressed archives " + "are very likely to be wrong because of a bzip2 decoder bug.\n" + "Please delete the index or call ratarmount with the --recreate-index option!" + ) + + # Check for empty or incomplete indexes. Pretty safe to rebuild the index for these as they + # are so invalid, noone should miss them. So, recreate index by default for these cases. + if 'files' not in tables: + raise InvalidIndexError("SQLite index is empty") + + if 'filestmp' in tables or 'parentfolders' in tables: + raise InvalidIndexError("SQLite index is incomplete") + + # Check for pre-sparse support indexes + if ( + 'versions' not in tables + or 'index' not in versions + or len(versions['index']) < 2 + or versions['index'][1] < 2 + ): + print("[Warning] The found outdated index does not contain any sparse file information.") + print("[Warning] The index will also miss data about multiple versions of a file.") + print("[Warning] Please recreate the index if you have problems with those.") + + if 'metadata' in tables: + metadata = dict(self.sqlConnection.execute('SELECT * FROM metadata;')) + + if 'tarstats' in metadata: + values = json.loads(metadata['tarstats']) + tarStats = os.stat(self.tarFileName) + + # fmt: off + if ( + hasattr( tarStats, "st_size" ) + and 'st_size' in values + and tarStats.st_size != values['st_size'] + ): + raise InvalidIndexError( "TAR file for this SQLite index has changed size from", + values['st_size'], "to", tarStats.st_size) + # fmt: on + + if ( + self.verifyModificationTime + and hasattr(tarStats, "st_mtime") + and 'st_mtime' in values + and tarStats.st_mtime != values['st_mtime'] + ): + raise InvalidIndexError( + "The modification date for the TAR file", + values['st_mtime'], + "to this SQLite index has changed (" + str(tarStats.st_mtime) + ")", + ) + + # Check arguments used to create the found index. These are only warnings and not forcing a rebuild + # by default. + # TODO: Add --force options? + if 'arguments' in metadata: + indexArgs = json.loads(metadata['arguments']) + argumentsToCheck = [ + 'mountRecursively', + 'gzipSeekPointSpacing', + 'encoding', + 'stripRecursiveTarExtension', + 'ignoreZeros', + ] + differingArgs = [] + for arg in argumentsToCheck: + if arg in indexArgs and hasattr(self, arg) and indexArgs[arg] != getattr(self, arg): + differingArgs.append((arg, indexArgs[arg], getattr(self, arg))) + if differingArgs: + print("[Warning] The arguments used for creating the found index differ from the arguments ") + print("[Warning] given for mounting the archive now. In order to apply these changes, ") + print("[Warning] recreate the index using the --recreate-index option!") + for arg, oldState, newState in differingArgs: + print(f"[Warning] {arg}: index: {oldState}, current: {newState}") + + except Exception as e: + # indexIsLoaded checks self.sqlConnection, so close it before returning because it was found to be faulty + try: + self.sqlConnection.close() + except sqlite3.Error: + pass + self.sqlConnection = None + + raise e + + if self.printDebug >= 1: + # Legacy output for automated tests + print(f"Loading offset dictionary from {str(indexFilePath)} took {time.time() - t0:.2f}s") + + def _tryLoadIndex(self, indexFilePath: AnyStr) -> bool: + """calls loadIndex if index is not loaded already and provides extensive error handling""" + + if self.indexIsLoaded(): + return True + + if not os.path.isfile(indexFilePath): + return False + + try: + self.loadIndex(indexFilePath) + except Exception as exception: + if self.printDebug >= 3: + traceback.print_exc() + + print("[Warning] Could not load file:", indexFilePath) + print("[Info] Exception:", exception) + print("[Info] Some likely reasons for not being able to load the index file:") + print("[Info] - The index file has incorrect read permissions") + print("[Info] - The index file is incomplete because ratarmount was killed during index creation") + print("[Info] - The index file was detected to contain errors because of known bugs of older versions") + print("[Info] - The index file got corrupted because of:") + print("[Info] - The program exited while it was still writing the index because of:") + print("[Info] - the user sent SIGINT to force the program to quit") + print("[Info] - an internal error occured while writing the index") + print("[Info] - the disk filled up while writing the index") + print("[Info] - Rare lowlevel corruptions caused by hardware failure") + + print("[Info] This might force a time-costly index recreation, so if it happens often") + print(" and mounting is slow, try to find out why loading fails repeatedly,") + print(" e.g., by opening an issue on the public github page.") + + try: + os.remove(indexFilePath) + except OSError: + print("[Warning] Failed to remove corrupted old cached index file:", indexFilePath) + + if self.printDebug >= 3 and self.indexIsLoaded(): + print("Loaded index", indexFilePath) + + return self.indexIsLoaded() + + @staticmethod + def _detectCompression(fileobj: IO[bytes], printDebug: int = 0) -> Optional[str]: + if not isinstance(fileobj, io.IOBase): + return None + + oldOffset = fileobj.tell() + for compressionId, compression in supportedCompressions.items(): + # The header check is a necessary condition not a sufficient condition. + # Especially for gzip, which only has 2 magic bytes, false positives might happen. + # Therefore, only use the magic bytes based check if the module could not be found + # in order to still be able to print pinpoint error messages. + matches = compression.checkHeader(fileobj) + # fileobj.seek(oldOffset) + if not matches: + continue + + if compression.moduleName not in sys.modules and matches: + return compressionId + + try: + compressedFileobj = compression.open(fileobj) + # Reading 1B from a single-frame zst file might require decompressing it fully in order + # to get uncompressed file size! Avoid that. The magic bytes should suffice mostly. + # TODO: Make indexed_zstd not require the uncompressed size for the read call. + # if compressionId != 'zst': + # compressedFileobj.read(1) + # compressedFileobj.close() + # fileobj.seek(oldOffset) + return compressionId + except Exception as e: + if printDebug >= 2: + print(f"[Warning] A given file with magic bytes for {compressionId} could not be opened because:") + print(e) + # fileobj.seek(oldOffset) + + return None + + @staticmethod + def _detectTar(fileobj: IO[bytes], encoding: str, printDebug: int = 0) -> bool: + if not isinstance(fileobj, io.IOBase): + return False + + oldOffset = fileobj.tell() + isTar = False + try: + with tarfile.open(fileobj=fileobj, mode='r|', encoding=encoding): + isTar = True + except (tarfile.ReadError, tarfile.CompressionError) as e: + # if printDebug >= 3: + print(e) + print("[Info] File object", fileobj, "is not a TAR.") + + # fileobj.seek(oldOffset) + return isTar + + @staticmethod + def _openCompressedFile( + fileobj: IO[bytes], gzipSeekPointSpacing: int, encoding: str, parallelization: int, printDebug: int = 0, filename = None, + ) -> Any: + """ + Opens a file possibly undoing the compression. + Returns (tar_file_obj, raw_file_obj, compression, isTar). + raw_file_obj will be none if compression is None. + """ + compression = SQLiteIndexedTar._detectCompression(fileobj, printDebug=printDebug) + + if compression not in supportedCompressions: + return fileobj, None, compression, SQLiteIndexedTar._detectTar(fileobj, encoding, printDebug=printDebug) + + cinfo = supportedCompressions[compression] + if cinfo.moduleName not in sys.modules: + raise CompressionError( + f"Can't open a {compression} compressed file '{fileobj.name}' without {cinfo.moduleName} module!" + ) + + if compression == 'gz': + # drop_handles keeps a file handle opening as is required to call tell() during decoding + tar_file = indexed_gzip.IndexedGzipFile(fileobj=fileobj, drop_handles=False, spacing=gzipSeekPointSpacing) + elif compression == 'bz2': + tar_file = indexed_bzip2.open(fileobj, parallelization=parallelization) + else: + tar_file = cinfo.open(fileobj) + + # is_tar = SQLiteIndexedTar._detectTar(tar_file, encoding, printDebug=printDebug) + is_tar = filename.endswith(".tar.gz") # if it's .tar.gz + # return tar_file, fileobj, compression, SQLiteIndexedTar._detectTar(tar_file, encoding, printDebug=printDebug) + return tar_file, fileobj, compression, is_tar + + @staticmethod + def _uncheckedRemove(path: Optional[AnyStr]): + """ + Often cleanup is good manners but it would only be obnoxious if ratarmount crashed on unnecessary cleanup. + """ + if not path or not os.path.exists(path): + return + + try: + os.remove(path) + except Exception: + print("[Warning] Could not remove:", path) + + def _loadOrStoreCompressionOffsets(self): + if not self.indexFilePath or self.indexFilePath == ':memory:': + if self.printDebug >= 2: + print("[Info] Will skip storing compression seek data because the database is in memory.") + print("[Info] If the database is in memory, then this data will not be read anyway.") + return + + # This should be called after the TAR file index is complete (loaded or created). + # If the TAR file index was created, then tarfile has iterated over the whole file once + # and therefore completed the implicit compression offset creation. + if not self.sqlConnection: + raise IndexNotOpenError("This method can not be called without an opened index database!") + db = self.sqlConnection + fileObject = self.tarFileObject + + if ( + hasattr(fileObject, 'set_block_offsets') + and hasattr(fileObject, 'block_offsets') + and self.compression in ['bz2', 'zst'] + ): + if self.compression == 'bz2': + table_name = 'bzip2blocks' + elif self.compression == 'zst': + table_name = 'zstdblocks' + + try: + offsets = dict(db.execute(f"SELECT blockoffset,dataoffset FROM {table_name};")) + fileObject.set_block_offsets(offsets) + except Exception: + if self.printDebug >= 2: + print(f"[Info] Could not load {self.compression} block offset data. Will create it from scratch.") + + tables = [x[0] for x in db.execute('SELECT name FROM sqlite_master WHERE type="table";')] + if table_name in tables: + db.execute(f"DROP TABLE {table_name}") + db.execute(f"CREATE TABLE {table_name} ( blockoffset INTEGER PRIMARY KEY, dataoffset INTEGER )") + db.executemany(f"INSERT INTO {table_name} VALUES (?,?)", fileObject.block_offsets().items()) + db.commit() + return + + if ( + # fmt: off + hasattr( fileObject, 'import_index' ) + and hasattr( fileObject, 'export_index' ) + and self.compression == 'gz' + # fmt: on + ): + tables = [x[0] for x in db.execute('SELECT name FROM sqlite_master WHERE type="table"')] + + # indexed_gzip index only has a file based API, so we need to write all the index data from the SQL + # database out into a temporary file. For that, let's first try to use the same location as the SQLite + # database because it should have sufficient writing rights and free disk space. + gzindex = None + for tmpDir in [os.path.dirname(self.indexFilePath), None]: + if 'gzipindex' not in tables and 'gzipindexes' not in tables: + break + + # Try to export data from SQLite database. Note that no error checking against the existence of + # gzipindex table is done because the exported data itself might also be wrong and we can't check + # against this. Therefore, collate all error checking by catching exceptions. + + try: + gzindex = tempfile.mkstemp(dir=tmpDir)[1] + with open(gzindex, 'wb') as file: + if 'gzipindexes' in tables: + # Try to read index files containing very large gzip indexes + rows = db.execute('SELECT data FROM gzipindexes ORDER BY ROWID') + for row in rows: + file.write(row[0]) + elif 'gzipindex' in tables: + # Try to read legacy index files with exactly one blob. + # This is how old ratarmount version read it. I.e., if there were simply more than one + # blob in the same tbale, then it would ignore all but the first(?!) and I am not sure + # what would happen in that case. + # So, use a differently named table if there are multiple blobs. + file.write(db.execute('SELECT data FROM gzipindex').fetchone()[0]) + break + except Exception: + self._uncheckedRemove(gzindex) + gzindex = None + + if gzindex: + try: + fileObject.import_index(filename=gzindex) + return + except Exception: + pass + finally: + self._uncheckedRemove(gzindex) + + # Store the offsets into a temporary file and then into the SQLite database + if self.printDebug >= 2: + print("[Info] Could not load GZip Block offset data. Will create it from scratch.") + + # Transparently force index to be built if not already done so. build_full_index was buggy for me. + # Seeking from end not supported, so we have to read the whole data in in a loop + # Jiani: The build_full_index() is moved to _createIndex() and only call build_full_index() for uploading a single file. + # Because we can not read through the file again to build_full_index() + # while fileObject.read(1024 * 1024): + # pass + # fileObject.build_full_index() + + # The created index can unfortunately be pretty large and tmp might actually run out of memory! + # Therefore, try different paths, starting with the location where the index resides. + gzindex = None + for tmpDir in [os.path.dirname(self.indexFilePath), None]: + gzindex = tempfile.mkstemp(dir=tmpDir)[1] + try: + fileObject.export_index(filename=gzindex) + break + except indexed_gzip.ZranError: + self._uncheckedRemove(gzindex) + gzindex = None + + if not gzindex or not os.path.isfile(gzindex): + print("[Warning] The GZip index required for seeking could not be stored in a temporary file!") + print("[Info] This might happen when you are out of space in your temporary file and at the") + print("[Info] the index file location. The gzipindex size takes roughly 32kiB per 4MiB of") + print("[Info] uncompressed(!) bytes (0.8% of the uncompressed data) by default.") + raise RuntimeError("Could not initialize the GZip seek cache.") + if self.printDebug >= 2: + print("Exported GZip index size:", os.stat(gzindex).st_size) + + # Clean up unreadable older data. + if 'gzipindex' in tables: + db.execute('DROP TABLE gzipindex') + if 'gzipindexes' in tables: + db.execute('DROP TABLE gzipindexes') + + # The maximum blob size configured by SQLite is exactly 1 GB, see https://www.sqlite.org/limits.html + # Therefore, this should be smaller. Another argument for making it smaller is that this blob size + # will be held fully in memory temporarily. + # But, making it too small would result in too many non-backwards compatible indexes being created. + maxBlobSize = 256 * 1024 * 1024 # 128 MiB + + # Store contents of temporary file into the SQLite database + if os.stat(gzindex).st_size > maxBlobSize: + db.execute('CREATE TABLE gzipindexes ( data BLOB )') + with open(gzindex, 'rb') as file: + while True: + data = file.read(maxBlobSize) + if not data: + break + + # I'm pretty sure that the rowid can be used to query the rows with the insertion order: + # https://www.sqlite.org/autoinc.html + # > The usual algorithm is to give the newly created row a ROWID that is one larger than the + # largest ROWID in the table prior to the insert. + # The "usual" makes me worry a bit, but I think it is in reference to the AUTOINCREMENT feature. + db.execute('INSERT INTO gzipindexes VALUES (?)', (data,)) + else: + db.execute('CREATE TABLE gzipindex ( data BLOB )') + with open(gzindex, 'rb') as file: + db.execute('INSERT INTO gzipindex VALUES (?)', (file.read(),)) + + db.commit() + os.remove(gzindex) + return + + # Note that for xz seeking, loading and storing block indexes is unnecessary because it has an index included! + if self.compression in [None, 'xz']: + return + + assert False, ( + f"Could not load or store block offsets for {self.compression} " + "probably because adding support was forgotten!" + ) diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index 2c29f7795..9f77073bc 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -5,9 +5,11 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems from typing import Any, Dict, Union, Tuple, IO, cast -from ratarmountcore import SQLiteIndexedTar +from codalab.lib.beam.SQLiteIndexedTar import SQLiteIndexedTar # type: ignore +from codalab.lib.beam.MultiReaderFileStream import MultiReaderFileStream from contextlib import closing from codalab.worker.upload_util import upload_with_chunked_encoding +from threading import Thread from codalab.common import ( StorageURLScheme, @@ -15,9 +17,8 @@ StorageType, urlopen_with_retry, parse_linked_bundle_url, - httpopen_with_retry, ) -from codalab.worker.file_util import tar_gzip_directory, GzipStream +from codalab.worker.file_util import tar_gzip_directory, GzipStream, update_file_size from codalab.worker.bundle_state import State from codalab.lib import file_util, path_util, zip_util from codalab.objects.bundle import Bundle @@ -95,7 +96,6 @@ def upload_to_bundle_store(self, bundle: Bundle, source: Source, git: bool, unpa Given arguments are the same as UploadManager.upload_to_bundle_store(). Used when uploading from rest server.""" try: - # bundle_path = self._bundle_store.get_bundle_location(bundle.uuid) is_url, is_fileobj, filename = self._interpret_source(source) if is_url: assert isinstance(source, str) @@ -113,11 +113,21 @@ def upload_to_bundle_store(self, bundle: Bundle, source: Source, git: bool, unpa bundle_path = self._update_and_get_bundle_location( bundle, is_directory=source_ext in ARCHIVE_EXTS_DIR ) - self.write_fileobj(source_ext, source_fileobj, bundle_path, unpack_archive=True) + self.write_fileobj( + source_ext, + source_fileobj, + bundle_path, + unpack_archive=True, + bundle_uuid=bundle.uuid, + ) else: bundle_path = self._update_and_get_bundle_location(bundle, is_directory=False) self.write_fileobj( - source_ext, source_fileobj, bundle_path, unpack_archive=False + source_ext, + source_fileobj, + bundle_path, + unpack_archive=False, + bundle_uuid=bundle.uuid, ) except UsageError: @@ -236,55 +246,69 @@ def write_fileobj( else: output_fileobj = GzipStream(source_fileobj) + stream_file = MultiReaderFileStream(output_fileobj) + file_reader = stream_file.readers[0] + index_reader = stream_file.readers[1] + # Write archive file. if bundle_conn_str is not None: conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '') os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str try: - bytes_uploaded = 0 CHUNK_SIZE = 16 * 1024 - ITERATIONS_PER_DISK_CHECK = 2000 - iteration = 0 - with FileSystems.create( - bundle_path, compression_type=CompressionTypes.UNCOMPRESSED - ) as out: - while True: - to_send = output_fileobj.read(CHUNK_SIZE) - if not to_send: - break - out.write(to_send) - - # Update disk and check if client has gone over disk usage. - if self._client and iteration % ITERATIONS_PER_DISK_CHECK == 0: - self._client.update( - 'user/increment_disk_used', - {'disk_used_increment': len(to_send), 'bundle_uuid': bundle_uuid}, - ) - user_info = self._client.fetch('user') - if user_info['disk_used'] >= user_info['disk_quota']: - raise Exception( - 'Upload aborted. User disk quota exceeded. ' - 'To apply for more quota, please visit the following link: ' - 'https://codalab-worksheets.readthedocs.io/en/latest/FAQ/' - '#how-do-i-request-more-disk-quota-or-time-quota' + + def upload_file_content(): + iteration = 0 + ITERATIONS_PER_DISK_CHECK = 2000 + bytes_uploaded = 0 + + with FileSystems.create( + bundle_path, compression_type=CompressionTypes.UNCOMPRESSED + ) as out: + while True: + iteration += 1 + to_send = file_reader.read(CHUNK_SIZE) + if not to_send: + break + out.write(to_send) + + # Update disk and check if client has gone over disk usage. + if self._client and iteration % ITERATIONS_PER_DISK_CHECK == 0: + self._client.update( + 'user/increment_disk_used', + {'disk_used_increment': len(to_send), 'bundle_uuid': bundle_uuid}, ) + user_info = self._client.fetch('user') + if user_info['disk_used'] >= user_info['disk_quota']: + raise Exception( + 'Upload aborted. User disk quota exceeded. ' + 'To apply for more quota, please visit the following link: ' + 'https://codalab-worksheets.readthedocs.io/en/latest/FAQ/' + '#how-do-i-request-more-disk-quota-or-time-quota' + ) - bytes_uploaded += len(to_send) - if progress_callback is not None: - should_resume = progress_callback(bytes_uploaded) - if not should_resume: - raise Exception('Upload aborted by client') - iteration += 1 - with FileSystems.open( - bundle_path, compression_type=CompressionTypes.UNCOMPRESSED - ) as ttf, tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file: + bytes_uploaded += len(to_send) + if progress_callback is not None: + should_resume = progress_callback(bytes_uploaded) + if not should_resume: + raise Exception('Upload aborted by client') + + # temporary file that used to store index file + tmp_index_file = tempfile.NamedTemporaryFile(suffix=".sqlite") + + def create_index(): + is_dir = parse_linked_bundle_url(bundle_path).is_archive_dir SQLiteIndexedTar( - fileObject=ttf, - tarFileName="contents", # If saving a single file as a .gz archive, this file can be accessed by the "/contents" entry in the index. + fileObject=index_reader, + tarFileName="contents.tar.gz" + if is_dir + else "contents.gz", # If saving a single file as a .gz archive, this file can be accessed by the "/contents" entry in the index. writeIndex=True, clearIndexCache=True, indexFilePath=tmp_index_file.name, ) + + def upload_index(): if bundle_conn_str is not None: os.environ['AZURE_STORAGE_CONNECTION_STRING'] = index_conn_str with FileSystems.create( @@ -296,11 +320,40 @@ def write_fileobj( if not to_send: break out_index_file.write(to_send) - bytes_uploaded += len(to_send) - if progress_callback is not None: - should_resume = progress_callback(bytes_uploaded) - if not should_resume: - raise Exception('Upload aborted by client') + + # call API to update the indexed file size + + if not parse_linked_bundle_url(bundle_path).is_archive_dir and hasattr( + output_fileobj, "tell" + ): + try: + file_size = ( + output_fileobj.input_file_tell() + if hasattr(output_fileobj, "input_file_tell") + else output_fileobj.tell() + ) + if self._client: + self._client.update( + 'bundles/%s/contents/filesize/' % bundle_uuid, + {'filesize': file_size}, + ) + else: # directly update on server side + update_file_size(bundle_path, file_size) + except Exception as e: + print( + f"Skip update this type of data. The bundle path is: {bundle_path}. Exception: {repr(e)}" + ) + + threads = [Thread(target=upload_file_content), Thread(target=create_index)] + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + upload_index() + except Exception as err: raise err finally: # restore the origin connection string @@ -314,7 +367,8 @@ class UploadManager(object): the associated bundle metadata in the database. """ - def __init__(self, bundle_model, bundle_store): + def __init__(self, bundle_model, bundle_store, json_api_client=None): + self._client = json_api_client self._bundle_model = bundle_model self._bundle_store = bundle_store @@ -585,35 +639,47 @@ def upload_GCS_blob_storage( else: output_fileobj = GzipStream(fileobj) - # Write archive file. - upload_with_chunked_encoding( - method='PUT', - base_url=bundle_conn_str, - headers={'Content-type': 'application/octet-stream'}, - fileobj=output_fileobj, - query_params={}, - progress_callback=progress_callback, - bundle_uuid=bundle_uuid, - json_api_client=json_api_client, - ) - # upload the index file - with httpopen_with_retry(bundle_read_str) as ttf, tempfile.NamedTemporaryFile( - suffix=".sqlite" - ) as tmp_index_file: - SQLiteIndexedTar( - fileObject=ttf, - tarFileName="contents", - writeIndex=True, - clearIndexCache=True, - indexFilePath=tmp_index_file.name, - ) + stream_file = MultiReaderFileStream(output_fileobj) + file_reader = stream_file.readers[0] + index_reader = stream_file.readers[1] + + def upload_file_content(): + # Write archive file. upload_with_chunked_encoding( method='PUT', - base_url=index_conn_str, + base_url=bundle_conn_str, headers={'Content-type': 'application/octet-stream'}, + fileobj=file_reader, query_params={}, - fileobj=open(tmp_index_file.name, "rb"), progress_callback=None, bundle_uuid=bundle_uuid, json_api_client=self._client, ) + + def create_upload_index(): + # upload the index file + with tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file: + SQLiteIndexedTar( + fileObject=index_reader, + tarFileName="contents", + writeIndex=True, + clearIndexCache=True, + indexFilePath=tmp_index_file.name, + ) + upload_with_chunked_encoding( + method='PUT', + base_url=index_conn_str, + headers={'Content-type': 'application/octet-stream'}, + query_params={}, + fileobj=open(tmp_index_file.name, "rb"), + progress_callback=None, + json_api_client=self._client, + ) + + threads = [Thread(target=upload_file_content), Thread(target=create_upload_index)] + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() diff --git a/codalab/model/bundle_model.py b/codalab/model/bundle_model.py index f391c44a9..9eadfb464 100644 --- a/codalab/model/bundle_model.py +++ b/codalab/model/bundle_model.py @@ -1154,7 +1154,7 @@ def enforce_disk_quota(self, bundle, bundle_location): disk_left = self.get_user_disk_quota_left(bundle.owner_id) if data_size > disk_left: raise UsageError( - "Can't save bundle, bundle size %s greater than user's disk quota left: %s" + "Can't save bundle, user disk quota exceeded. Bundle size %s greater than user's disk quota left: %s" % (data_size, disk_left) ) diff --git a/codalab/rest/bundles.py b/codalab/rest/bundles.py index 676675896..6f8c1e8a0 100644 --- a/codalab/rest/bundles.py +++ b/codalab/rest/bundles.py @@ -19,9 +19,11 @@ precondition, UsageError, NotFoundError, + parse_linked_bundle_url, ) from codalab.lib import canonicalize, spec_util, worksheet_util, bundle_util from codalab.lib.beam.filesystems import LOCAL_USING_AZURITE, get_azure_bypass_conn_str +from codalab.worker.file_util import OpenIndexedArchiveFile, update_file_size from codalab.lib.server_util import ( RequestSource, bottle_patch as patch, @@ -772,6 +774,41 @@ def _fetch_bundle_contents_info(uuid, path=''): return {'data': info} +@patch( + '/bundles//contents/filesize/' % spec_util.UUID_STR, name='update_bundle_file_size' +) +def _update_bundle_file_size(uuid): + """ + This function is used to fix the file size field in the index.sqlite file. + This only allows user to increase the file size for a single file. + """ + + bundle_path = local.bundle_store.get_bundle_location(uuid) + file_size = request.json['data'][0]['attributes']['filesize'] + logging.info(f"File_size is : {file_size} {bundle_path} {uuid}") + + update_file_size(bundle_path, file_size) + + if ( + parse_linked_bundle_url(bundle_path).uses_beam + and not parse_linked_bundle_url(bundle_path).is_archive_dir + ): + # check wether the info is saved to index.sqlite + with OpenIndexedArchiveFile(bundle_path) as tf: + logging.info( + f"Modify file size in index.sqlit. New info is: {tf.getFileInfo('/contents')}" + ) # get the result of a fi + + bundles_dict = get_bundle_infos([uuid]) + + # Return bundles in original order + # Need to check if the UUID is in the dict, since there is a chance that a bundle is deleted + # right after being created. + bundles = [bundles_dict[uuid]] if uuid in bundles_dict.keys() else [] + logging.info(f"before return: {bundles}") + return BundleSchema(many=True).dump(bundles).data + + @put( '/bundles//netcat//' % spec_util.UUID_STR, name='netcat_bundle', diff --git a/codalab/server/__pycache__/rest_server.cpython-37.pyc.140656724843696 b/codalab/server/__pycache__/rest_server.cpython-37.pyc.140656724843696 new file mode 100644 index 000000000..e69de29bb diff --git a/codalab/server/bundle_manager.py b/codalab/server/bundle_manager.py index a830bbded..642530d9b 100644 --- a/codalab/server/bundle_manager.py +++ b/codalab/server/bundle_manager.py @@ -282,6 +282,7 @@ def _make_bundle(self, bundle): un_tar_directory(fileobj, dependency_path, 'gz') else: fileobj = self._download_manager.stream_file(target, gzipped=False) + with open(dependency_path, 'wb') as f: shutil.copyfileobj(fileobj, f) diff --git a/codalab/server/rest_server.py b/codalab/server/rest_server.py index 23f3556cc..e72693c92 100644 --- a/codalab/server/rest_server.py +++ b/codalab/server/rest_server.py @@ -59,7 +59,7 @@ environment=os.getenv('CODALAB_SENTRY_ENVIRONMENT'), integrations=[BottleIntegration()], traces_sample_rate=transaction_sample_rate, - _experiments={"profiles_sample_rate": profiles_sample_rate,}, + _experiments={"profiles_sample_rate": profiles_sample_rate,}, # type: ignore ) diff --git a/codalab/worker/download_util.py b/codalab/worker/download_util.py index e0897ccef..20d16d87d 100644 --- a/codalab/worker/download_util.py +++ b/codalab/worker/download_util.py @@ -269,6 +269,7 @@ def _get_info(path: str, depth: Union[int, float]) -> TargetInfo: perm=0o755, ), ) + if linked_bundle_path.archive_subpath: # Return the contents of a subpath within a directory. return _get_info(linked_bundle_path.archive_subpath, depth) diff --git a/codalab/worker/file_util.py b/codalab/worker/file_util.py index e01491c5d..71b50b04a 100644 --- a/codalab/worker/file_util.py +++ b/codalab/worker/file_util.py @@ -17,7 +17,10 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems import tempfile -from ratarmountcore import SQLiteIndexedTar, FileInfo + +# from ratarmountcore import SQLiteIndexedTar, FileInfo +from ratarmountcore import FileInfo +from codalab.lib.beam.SQLiteIndexedTar import SQLiteIndexedTar # type: ignore from typing import IO, cast NONE_PLACEHOLDER = '' @@ -285,9 +288,9 @@ def __enter__(self) -> IO[bytes]: raise IOError("Directories must be gzipped.") return GzipStream(TarSubdirStream(self.path)) else: - # Stream a single file from within the archive fs = TarFileStream(tf, finfo) return GzipStream(fs) if self.gzipped else fs + else: # Stream a directory or file from disk storage. if os.path.isdir(self.path): @@ -312,19 +315,48 @@ def __init__(self, fileobj: IO[bytes]): self.__input = fileobj self.__buffer = BytesBuffer() self.__gzip = gzip.GzipFile(None, mode='wb', fileobj=self.__buffer) + self.__size = 0 + self.__input_read_size = 0 - def read(self, num_bytes=None) -> bytes: + def _fill_buf_bytes(self, num_bytes=None): while num_bytes is None or len(self.__buffer) < num_bytes: s = self.__input.read(num_bytes) + self.__input_read_size += len(s) if not s: self.__gzip.close() break - self.__gzip.write(s) - return self.__buffer.read(num_bytes) + self.__gzip.write(s) # gzip the current file + + def read(self, num_bytes=None): + try: + self._fill_buf_bytes(num_bytes) + data = self.__buffer.read(num_bytes) + self.__size += len(data) + return data + except Exception as e: + logging.info("Error in GzipStream read() ", repr(e)) + return None def close(self): self.__input.close() + def peek(self, num_bytes): + self._fill_buf_bytes(num_bytes) + return self.__buffer.peek(num_bytes) + + def tell(self): + return self.__size + + def fileobj(self): + return self.__input + + def input_file_tell(self): + """Gives the location at the original uncompressed file.""" + if hasattr(self.__input, "tell"): + return self.__input.tell() + else: + return self.__input_read_size + def gzip_file(file_path: str) -> IO[bytes]: """ @@ -403,6 +435,7 @@ def get_file_size(file_path): with OpenFile(linked_bundle_path.bundle_path, 'rb') as fileobj: fileobj.seek(0, os.SEEK_END) return fileobj.tell() + # If the archive file is a .tar.gz file on Azure, open the specified archive subpath within the archive. # If it is a .gz file on Azure, open the "/contents" entry, which represents the actual gzipped file. with OpenIndexedArchiveFile(linked_bundle_path.bundle_path) as tf: @@ -423,6 +456,7 @@ def read_file_section(file_path, offset, length): Reads length bytes of the given file from the given offset. Return bytes. """ + if offset >= get_file_size(file_path): return b'' with OpenFile(file_path, 'rb') as fileobj: @@ -603,3 +637,37 @@ def sha256(file: str) -> str: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() + + +def update_file_size(bundle_path, file_size): + """ + This function is used to update the file size in index.sqlite. + Should only be used to update a single file's size. + """ + if ( + parse_linked_bundle_url(bundle_path).uses_beam + and not parse_linked_bundle_url(bundle_path).is_archive_dir + ): + with OpenIndexedArchiveFile(bundle_path) as tf: + # tf is a SQLiteTar file, which is a copy of original index file + finfo = tf._getFileInfoRow('/contents') + finfo = dict(finfo) + finfo['size'] = file_size + new_info = tuple([value for _, value in finfo.items()]) + logging.info(finfo) # get the result of a fi + tf._setFileInfo(new_info) + tf.sqlConnection.commit() # need to mannually commit here + logging.info(f"tf.index_file_name: {tf.indexFilePath}") + + # Update the index file stored in blob storage + FileSystems.delete([parse_linked_bundle_url(bundle_path).index_path]) + with FileSystems.create( + parse_linked_bundle_url(bundle_path).index_path, + compression_type=CompressionTypes.UNCOMPRESSED, + ) as f, open(tf.indexFilePath, "rb") as tif: + while True: + CHUNK_SIZE = 16 * 1024 + to_send = tif.read(CHUNK_SIZE) + if not to_send: + break + f.write(to_send) diff --git a/codalab/worker/main.py b/codalab/worker/main.py index 62e8dff4f..ce76a3cc2 100644 --- a/codalab/worker/main.py +++ b/codalab/worker/main.py @@ -13,6 +13,7 @@ import sys import psutil import requests +import tempfile from codalab.common import SingularityError from codalab.common import BundleRuntime @@ -217,6 +218,11 @@ def parse_args(): type=str, help='Path to the SSL cert for the Kubernetes cluster. Only applicable if --bundle-runtime is set to kubernetes.', ) + parser.add_argument( + '--kubernetes-cert', + type=str, + help='Contents of the SSL cert for the Kubernetes cluster. Only applicable if --bundle-runtime is set to kubernetes.', + ) return parser.parse_args() @@ -316,11 +322,18 @@ def main(): docker_runtime = None elif args.bundle_runtime == BundleRuntime.KUBERNETES.value: image_manager = NoOpImageManager() + if args.kubernetes_cert_path == "/dev/null": + # Create temp file + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write(args.kubernetes_cert) + kubernetes_cert_path = f.name + else: + kubernetes_cert_path = args.kubernetes_cert_path bundle_runtime_class = KubernetesRuntime( args.work_dir, args.kubernetes_auth_token, args.kubernetes_cluster_host, - args.kubernetes_cert_path, + kubernetes_cert_path, ) docker_runtime = None else: diff --git a/codalab/worker/runtime/__init__.py b/codalab/worker/runtime/__init__.py index 7940e3a11..8a961c710 100644 --- a/codalab/worker/runtime/__init__.py +++ b/codalab/worker/runtime/__init__.py @@ -75,3 +75,6 @@ def kill(self, container_id: str): def remove(self, container_id: str): raise NotImplementedError + + def get_node_availability_stats(self) -> dict: + raise NotImplementedError diff --git a/codalab/worker/runtime/kubernetes_runtime.py b/codalab/worker/runtime/kubernetes_runtime.py index 14ce15a7e..d4d2457e7 100644 --- a/codalab/worker/runtime/kubernetes_runtime.py +++ b/codalab/worker/runtime/kubernetes_runtime.py @@ -12,6 +12,7 @@ from codalab.common import BundleRuntime from codalab.worker.runtime import Runtime +import os import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -232,3 +233,15 @@ def remove(self, pod_name: str): f'Exception when calling Kubernetes api->delete_namespaced_pod...: {e}' ) raise e + + def get_node_availability_stats(self) -> dict: + node_name = os.getenv("CODALAB_KUBERNETES_NODE_NAME") + node = self.k8_api.read_node(name=node_name) + allocatable = node.status.allocatable + + return { + 'cpus': int(allocatable.get('cpu')), + 'gpus': int(allocatable.get('nvidia.com/gpu') or '0'), + 'memory_bytes': int(utils.parse_quantity(allocatable.get('memory'))), + 'free_disk_bytes': int(utils.parse_quantity(allocatable.get('ephemeral-storage'))), + } diff --git a/codalab/worker/tar_file_stream.py b/codalab/worker/tar_file_stream.py index 03ee7c9b0..18079f570 100644 --- a/codalab/worker/tar_file_stream.py +++ b/codalab/worker/tar_file_stream.py @@ -33,7 +33,7 @@ def _read_from_tar(self, num_bytes): """ contents = self.tf.read( fileInfo=self.finfo, - size=self.finfo.size + size=self.finfo.size # can this param be None? If this is None, it will read more original file. if num_bytes is None else min(self.finfo.size - self.pos, num_bytes), offset=self.pos, diff --git a/codalab/worker/un_gzip_stream.py b/codalab/worker/un_gzip_stream.py index 4e8c55520..79c1ba234 100644 --- a/codalab/worker/un_gzip_stream.py +++ b/codalab/worker/un_gzip_stream.py @@ -69,6 +69,7 @@ class UnGzipStream(GenericUncompressStream): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) + self.seekable = lambda: False class UnBz2Stream(GenericUncompressStream): @@ -260,11 +261,20 @@ def read(self, size: Optional[int] = None): if size < 0: ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:] self.__buf.appendleft(remainder) + size += len(remainder) + assert size == 0 + ret = b''.join(ret_list) self.__size -= len(ret) self.__pos += len(ret) return ret + def peek(self, size: int): + b = bytearray() + for i in range(0, min(size, len(self.__buf))): + b.extend(self.__buf[i]) + return bytes(b)[:size] + def flush(self): pass diff --git a/codalab/worker/worker.py b/codalab/worker/worker.py index c44d22cdc..8ba3d935d 100644 --- a/codalab/worker/worker.py +++ b/codalab/worker/worker.py @@ -147,7 +147,8 @@ def __init__( docker_network_external=self.docker_network_external, docker_runtime=docker_runtime, upload_bundle_callback=self.upload_bundle_contents, - assign_cpu_and_gpu_sets_fn=self.assign_cpu_and_gpu_sets, + cpuset=self.cpuset, + gpuset=self.gpuset, shared_file_system=self.shared_file_system, shared_memory_size_gb=shared_memory_size_gb, bundle_runtime=bundle_runtime, @@ -500,6 +501,17 @@ def checkin(self): 'is_terminating': self.terminate or self.terminate_and_restage, 'preemptible': self.preemptible, } + if self.bundle_runtime.name == BundleRuntime.KUBERNETES.value: + stats = self.bundle_runtime.get_node_availability_stats() + request = dict( + request, + **{ + 'cpus': stats['cpus'], + 'gpus': stats['gpus'], + 'memory_bytes': stats['memory_bytes'], + 'free_disk_bytes': stats['free_disk_bytes'], + }, + ) try: response = self.bundle_service.checkin(self.id, request) logger.info('Connected! Successful check in!') @@ -590,44 +602,6 @@ def process_runs(self): if run_state.stage != RunStage.FINISHED } - def assign_cpu_and_gpu_sets(self, request_cpus, request_gpus): - """ - Propose a cpuset and gpuset to a bundle based on given requested resources. - Note: no side effects (this is important: we don't want to maintain more state than necessary) - - Arguments: - request_cpus: integer - request_gpus: integer - - Returns a 2-tuple: - cpuset: assigned cpuset (str indices). - gpuset: assigned gpuset (str indices). - - Throws an exception if unsuccessful. - """ - cpuset, gpuset = set(map(str, self.cpuset)), set(map(str, self.gpuset)) - - for run_state in self.runs.values(): - if run_state.stage == RunStage.RUNNING: - cpuset -= run_state.cpuset - gpuset -= run_state.gpuset - - if len(cpuset) < request_cpus: - raise Exception( - "Requested more CPUs (%d) than available (%d currently out of %d on the machine)" - % (request_cpus, len(cpuset), len(self.cpuset)) - ) - if len(gpuset) < request_gpus: - raise Exception( - "Requested more GPUs (%d) than available (%d currently out of %d on the machine)" - % (request_gpus, len(gpuset), len(self.gpuset)) - ) - - def propose_set(resource_set, request_count): - return set(str(el) for el in list(resource_set)[:request_count]) - - return propose_set(cpuset, request_cpus), propose_set(gpuset, request_gpus) - @property def all_runs(self): """ diff --git a/codalab/worker/worker_monitoring.py b/codalab/worker/worker_monitoring.py index bfb421bf4..dd98a8465 100644 --- a/codalab/worker/worker_monitoring.py +++ b/codalab/worker/worker_monitoring.py @@ -3,7 +3,7 @@ from typing import Dict, Optional import sentry_sdk -from sentry_sdk.profiler import start_profiling +from sentry_sdk.profiler import start_profiling # type: ignore from .worker_run_state import RunState @@ -17,7 +17,7 @@ dsn=os.getenv('CODALAB_SENTRY_INGEST_URL'), environment=os.getenv('CODALAB_SENTRY_ENVIRONMENT'), traces_sample_rate=transaction_sample_rate, - _experiments={"profiles_sample_rate": profiles_sample_rate,}, + _experiments={"profiles_sample_rate": profiles_sample_rate,}, # type: ignore ) diff --git a/codalab/worker/worker_run_state.py b/codalab/worker/worker_run_state.py index ba7799fea..bbe81761c 100644 --- a/codalab/worker/worker_run_state.py +++ b/codalab/worker/worker_run_state.py @@ -167,7 +167,6 @@ def __init__( docker_network_external, # Docker network to add internet connected bundles to docker_runtime, # Docker runtime to use for containers (nvidia or runc) upload_bundle_callback, # Function to call to upload bundle results to the server - assign_cpu_and_gpu_sets_fn, # Function to call to assign CPU and GPU resources to each run shared_file_system, # If True, bundle mount is shared with server shared_memory_size_gb, # Shared memory size for the run container (in GB) bundle_runtime, # Runtime used to run bundles (docker or kubernetes) @@ -195,7 +194,6 @@ def __init__( fields={'disk_utilization': 0, 'running': True, 'lock': None} ) self.upload_bundle_callback = upload_bundle_callback - self.assign_cpu_and_gpu_sets_fn = assign_cpu_and_gpu_sets_fn self.shared_file_system = shared_file_system self.shared_memory_size_gb = shared_memory_size_gb @@ -237,19 +235,6 @@ def mount_dependency(dependency, shared_file_system): ) return run_state._replace(stage=RunStage.CLEANING_UP) - # Check CPU and GPU availability - try: - cpuset, gpuset = self.assign_cpu_and_gpu_sets_fn( - run_state.resources.cpus, run_state.resources.gpus - ) - except Exception as e: - message = "Unexpectedly unable to assign enough resources to bundle {}: {}".format( - run_state.bundle.uuid, str(e) - ) - logger.error(message) - logger.error(traceback.format_exc()) - return run_state._replace(run_status=message) - dependencies_ready = True status_messages = [] dependency_keys_to_paths: Dict[DependencyKey, str] = dict() diff --git a/codalab/worker_manager/kubernetes_worker_manager.py b/codalab/worker_manager/kubernetes_worker_manager.py index 0c8899983..14f8afb84 100644 --- a/codalab/worker_manager/kubernetes_worker_manager.py +++ b/codalab/worker_manager/kubernetes_worker_manager.py @@ -47,6 +47,12 @@ def add_arguments_to_subparser(subparser: ArgumentParser) -> None: help='Path to the SSL cert for the Kubernetes cluster', required=True, ) + subparser.add_argument( + '--cert', + type=str, + help='Contents of the SSL cert for the Kubernetes cluster', + required=True, + ) subparser.add_argument( '--nfs-volume-name', type=str, help='Name of the persistent volume for the NFS server.', ) @@ -120,7 +126,8 @@ def start_worker_job(self) -> None: command.extend(['--bundle-runtime', self.bundle_runtime]) command.extend(['--kubernetes-cluster-host', self.cluster_host]) command.extend(['--kubernetes-auth-token', self.auth_token]) - command.extend(['--kubernetes-cert-path', self.cert_path]) + command.extend(['--kubernetes-cert-path', '/dev/null']) + command.extend(['--kubernetes-cert', open(self.cert_path).read()]) worker_image: str = 'codalab/worker:' + os.environ.get('CODALAB_VERSION', 'latest') @@ -138,7 +145,7 @@ def start_worker_job(self) -> None: config: Dict[str, Any] = { 'apiVersion': 'v1', 'kind': 'Pod', - 'metadata': {'name': worker_name}, + 'metadata': {'name': worker_name, 'labels': {'type': 'cl-worker'}}, 'spec': { 'containers': [ { @@ -148,6 +155,10 @@ def start_worker_job(self) -> None: 'env': [ {'name': 'CODALAB_USERNAME', 'value': self.codalab_username}, {'name': 'CODALAB_PASSWORD', 'value': self.codalab_password}, + { + 'name': 'CODALAB_KUBERNETES_NODE_NAME', + 'valueFrom': {'fieldRef': {'fieldPath': 'spec.nodeName'}}, + }, ], 'resources': {'limits': limits, 'requests': requests}, 'volumeMounts': [ @@ -158,8 +169,29 @@ def start_worker_job(self) -> None: ], } ], + # Only one worker pod should be scheduled per node. + 'affinity': { + 'podAntiAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': [ + { + 'podAffinityTerm': { + 'labelSelector': { + "matchExpressions": [ + { + "key": "type", + "operator": "In", + "values": ["cl-worker"], + } + ] + }, + 'topologyKey': 'topology.kubernetes.io/zone', + } + } + ] + } + }, 'volumes': [ - {'name': 'certpath', 'hostPath': {'path': self.cert_path}}, + # {'name': 'certpath', 'hostPath': {'path': self.cert_path}}, { "name": self.nfs_volume_name, # When attaching a volume over NFS, use a persistent volume claim diff --git a/codalab_service.py b/codalab_service.py index d878df835..451f69257 100755 --- a/codalab_service.py +++ b/codalab_service.py @@ -527,6 +527,11 @@ def has_callable_default(self): type=str, help='Path to the generated SSL cert for the Kubernetes worker manager', ), + CodalabArg( + name=f'worker_manager_{worker_manager_type}_kubernetes_cert', + type=str, + help='Contents of the generated SSL cert for the Kubernetes worker manager', + ), ] diff --git a/docker_config/compose_files/docker-compose.yml b/docker_config/compose_files/docker-compose.yml index 3b7142816..aaefafd89 100644 --- a/docker_config/compose_files/docker-compose.yml +++ b/docker_config/compose_files/docker-compose.yml @@ -62,10 +62,12 @@ x-codalab-env: &codalab-env - CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CLUSTER_HOST=${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CLUSTER_HOST} - CODALAB_WORKER_MANAGER_CPU_KUBERNETES_AUTH_TOKEN=${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_AUTH_TOKEN} - CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH=${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH} + - CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT=${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT} - CODALAB_WORKER_MANAGER_GPU_BUNDLE_RUNTIME=${CODALAB_WORKER_MANAGER_GPU_BUNDLE_RUNTIME} - CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CLUSTER_HOST=${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CLUSTER_HOST} - CODALAB_WORKER_MANAGER_GPU_KUBERNETES_AUTH_TOKEN=${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_AUTH_TOKEN} - CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT_PATH=${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT_PATH} + - CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT=${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT} - CODALAB_WORKER_MANAGER_AWS_REGION=${CODALAB_WORKER_MANAGER_AWS_REGION} - CODALAB_WORKER_MANAGER_AWS_BATCH_JOB_DEFINITION_NAME=${CODALAB_WORKER_MANAGER_AWS_BATCH_JOB_DEFINITION_NAME} - CODALAB_WORKER_MANAGER_CPU_AWS_BATCH_QUEUE=${CODALAB_WORKER_MANAGER_CPU_AWS_BATCH_QUEUE} @@ -301,13 +303,14 @@ services: --cluster-host ${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CLUSTER_HOST} --auth-token ${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_AUTH_TOKEN} --cert-path ${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH} + --cert ${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT} --cpus ${CODALAB_WORKER_MANAGER_CPU_DEFAULT_CPUS} --memory-mb ${CODALAB_WORKER_MANAGER_CPU_DEFAULT_MEMORY_MB} <<: *codalab-base <<: *codalab-server volumes: - "${CODALAB_HOME}:${CODALAB_HOME}" - - ${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH}:${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH}:ro + - ${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH:-/dev/null}:${CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH:-/dev/null}:ro networks: - rest-server @@ -334,6 +337,7 @@ services: --cluster-host ${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CLUSTER_HOST} --auth-token ${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_AUTH_TOKEN} --cert-path ${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT_PATH} + --cert ${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT} --cpus ${CODALAB_WORKER_MANAGER_GPU_DEFAULT_CPUS} --gpus ${CODALAB_WORKER_MANAGER_DEFAULT_GPUS} --memory-mb ${CODALAB_WORKER_MANAGER_GPU_DEFAULT_MEMORY_MB} @@ -341,7 +345,7 @@ services: <<: *codalab-server volumes: - "${CODALAB_HOME}:${CODALAB_HOME}" - - ${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT_PATH}:${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT_PATH}:ro + - ${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT_PATH:-/dev/null}:${CODALAB_WORKER_MANAGER_GPU_KUBERNETES_CERT_PATH:-/dev/null}:ro networks: - rest-server diff --git a/docs/REST-API-Reference.md b/docs/REST-API-Reference.md index 1599b25a2..e2d8e23fa 100644 --- a/docs/REST-API-Reference.md +++ b/docs/REST-API-Reference.md @@ -610,6 +610,11 @@ Response format: } ``` +### `PATCH /bundles//contents/filesize/` + +This function is used to fix the file size field in the index.sqlite file. +This only allows user to increase the file size for a single file. + ### `PUT /bundles//netcat//` Send a raw bytestring into the specified port of the running bundle with uuid. diff --git a/notes.md b/notes.md new file mode 100644 index 000000000..8fb6d9d93 --- /dev/null +++ b/notes.md @@ -0,0 +1,115 @@ +``` +pip install -e . + +cl work https://worksheets.codalab.org:: + +# start up your local machine +codalab-service start -bd +codalab-service start -bds default worker2 + +# just start one instance (ex., if you only changed rest server code) +codalab-service start -bds rest-server + +# connect to your local machine +cl work http://localhost:: + + +docker ps + +``` + + + + + +todo + +codalab-service build -s worker && kind load docker-image codalab/worker:k8s_runtime --name codalab && codalab-service start -bds worker-manager-cpu && docker logs codalab_kubernetes-worker-manager-cpu_1 --follow + +## ws + +codalab-service start -bds ws-server && docker logs codalab_ws-server_1 --follow + +codalab-service start -bds rest-server && docker logs codalab_rest-server_1 --follow + +codalab-service start -bds rest-server init +docker exec -it codalab_rest-server_1 /bin/bash + +python3 scripts/create-root-user.py pwd + +``` +process: Fatal Python error: Segmentation fault +process: +process: Current thread 0x00007f95614a2740 (most recent call first): +process: File "/opt/conda/lib/python3.7/site-packages/MySQLdb/connections.py", line 164 in __init__ +process: File "/opt/conda/lib/python3.7/site-packages/MySQLdb/__init__.py", line 84 in Connect +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 493 in connect +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py", line 114 in connect +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 656 in __connect +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 440 in __init__ +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 309 in _create_connection +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 137 in _do_get +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 495 in checkout +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 778 in _checkout +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 364 in connect +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2338 in _wrap_pool_connect +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/engine/threadlocal.py", line 76 in _contextual_connect +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2088 in _optional_conn_ctx_manager +process: File "/opt/conda/lib/python3.7/contextlib.py", line 112 in __enter__ +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2096 in _run_visitor +process: File "/opt/conda/lib/python3.7/site-packages/sqlalchemy/sql/schema.py", line 4556 in create_all +process: File "/opt/codalab-worksheets/codalab/model/bundle_model.py", line 120 in create_tables +process: File "/opt/codalab-worksheets/codalab/model/bundle_model.py", line 92 in __init__ +process: File "/opt/codalab-worksheets/codalab/model/mysql_model.py", line 54 in __init__ +process: File "/opt/codalab-worksheets/codalab/lib/codalab_manager.py", line 359 in model +process: File "scripts/create-root-user.py", line 14 in +process: 139 +``` + + + + + +/home/ubuntu/environment/codalab/codalab-worksheets/venv/lib/python3.8/site-packages/ratarmountcore/SQLiteIndexedTar.py + +/home/ubuntu/environment/codalab/codalab-worksheets/venv/lib/python3.8/site-packages/ratarmountcore/compressions.py + + + +codalab-service start -s azurite +CODALAB_DEFAULT_BUNDLE_STORE_NAME=azure-store-default codalab-service start -s default azurite +sh ./tests/test-setup-default-store.sh +codalab-service start -bs rest-server && python test_runner.py make + +cl uedit codalab -d 8m +codalab-service start -bs rest-server && cl upload --store azure-store-default venv/lib/python3.8/site-packages/botocore + + +CODALAB_DEFAULT_BUNDLE_STORE_NAME=azure-store-default codalab-service start -bs rest-server && python test_runner.py make + +cl make $(cl upload -c "hello") + +cl make 0xe7a19c5b2f074b2c9582333f40febee8 --store test + +---- + +k8s test + +codalab-service build --pull +VERSION=kstats sh ./scripts/local-k8s/setup-ci.sh +codalab-service build -s worker && kind load docker-image "codalab/worker:kstats" --name codalab +export CODALAB_SERVER=http://nginx +export CODALAB_WORKER_MANAGER_CPU_BUNDLE_RUNTIME=kubernetes +export CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CLUSTER_HOST=https://codalab-control-plane:6443 +export CODALAB_WORKER_MANAGER_TYPE=kubernetes +export CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH=/dev/null +export CODALAB_WORKER_MANAGER_CPU_KUBERNETES_AUTH_TOKEN=/dev/null +export CODALAB_WORKER_MANAGER_CPU_DEFAULT_CPUS=1 +export CODALAB_WORKER_MANAGER_CPU_DEFAULT_MEMORY_MB=100 +export CODALAB_WORKER_MANAGER_MIN_CPU_WORKERS=0 +export CODALAB_WORKER_MANAGER_MAX_CPU_WORKERS=1 +codalab-service start --services worker-manager-cpu + +python3 test_runner.py resources + +kubectl exec --stdin --tty cl-worker-f628165974b1456ba73c2d1e6408ab7a -- /bin/bash \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 42c7cbdc2..22c38bcd3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,8 +16,8 @@ marshmallow-jsonapi==0.15.1 marshmallow==2.15.1 setuptools>=40.0.0 argcomplete==1.12.3 -indexed_gzip==1.6.3 -ratarmountcore==0.3.2 +indexed_gzip==1.7.0 +ratarmountcore==0.1.3 PyYAML==5.4 psutil==5.7.2 six==1.15.0 diff --git a/scripts/local-k8s/kind-config.yaml b/scripts/local-k8s/kind-config.yaml index e2750f20f..103a33492 100644 --- a/scripts/local-k8s/kind-config.yaml +++ b/scripts/local-k8s/kind-config.yaml @@ -6,4 +6,4 @@ networking: apiServerPort: 6443 nodes: - role: control-plane - image: kindest/node:v1.21.10@sha256:84709f09756ba4f863769bdcabe5edafc2ada72d3c8c44d6515fc581b66b029c \ No newline at end of file + image: kindest/node:v1.22.15@sha256:7d9708c4b0873f0fe2e171e2b1b7f45ae89482617778c1c875f1053d4cef2e41 \ No newline at end of file diff --git a/scripts/local-k8s/setup-ci.sh b/scripts/local-k8s/setup-ci.sh index 89f61a73c..9dc2fabd2 100644 --- a/scripts/local-k8s/setup-ci.sh +++ b/scripts/local-k8s/setup-ci.sh @@ -22,6 +22,7 @@ export CODALAB_WORKER_MANAGER_CPU_BUNDLE_RUNTIME=kubernetes export CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CLUSTER_HOST=https://codalab-control-plane:6443 export CODALAB_WORKER_MANAGER_TYPE=kubernetes export CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT_PATH=/dev/null +export CODALAB_WORKER_MANAGER_CPU_KUBERNETES_CERT=/dev/null export CODALAB_WORKER_MANAGER_CPU_KUBERNETES_AUTH_TOKEN=/dev/null export CODALAB_WORKER_MANAGER_CPU_DEFAULT_CPUS=1 export CODALAB_WORKER_MANAGER_CPU_DEFAULT_MEMORY_MB=100 diff --git a/tests/cli/files/done b/tests/cli/files/done new file mode 100644 index 000000000..e69de29bb diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 4bbfe4d07..212b747de 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -482,6 +482,7 @@ def __exit__(self, exc_type, exc_value, tb): # Clean up and restore original worksheet print("[*][*] CLEANING UP") + return switch_user('codalab') # root user _run_command([cl, 'work', self.original_worksheet]) @@ -873,7 +874,7 @@ def test_upload1(ctx): _run_command([cl, 'work', worksheet_uuid]) # expect to fail when we upload something more than 2k bytes check_contains( - 'User disk quota exceeded', + 'disk quota exceeded', _run_command( [cl, 'upload', '-w', worksheet_uuid, test_path('codalab.png')] + suffix, expected_exit_code=1, @@ -1230,6 +1231,38 @@ def test_upload_default_bundle_store(ctx): check_contains(bundle_store_name, _run_command([cl, "info", uuid])) +@TestModule.register('parallel') +def test_parallel(ctx): + """Ensures bundles can run in parallel.""" + uuid = _run_command([cl, 'run', 'sleep 60']) + wait_until_state(uuid, State.RUNNING) + uuid2 = _run_command([cl, 'run', 'sleep 60']) + wait_until_state(uuid2, State.RUNNING) + check_equals(get_info(uuid, "state"), State.RUNNING) + wait(uuid) + wait(uuid2) + + +@TestModule.register('kubernetes_runtime') +def test_kubernetes_runtime(ctx): + """Tests various guarantees of the kubernetes runtime. + Should only be called when a kubernetes worker manager with + the kubernetes runtime is run.""" + + # Ensure that only one worker is run per node. First, we launch a lot of bundles, + # then ensure they only ran on one worker. + uuids = [_run_command([cl, 'run', 'sleep 180', '--request-memory', '500m']) for _ in range(10)] + wait_until_state(uuids[0], State.RUNNING) + num_running_states = len([get_info(uuid, "state") == State.RUNNING for uuid in uuids]) + # Ensure that not all bundles are running (as they should be queued waiting for the worker to be free) + assert num_running_states < len(uuids) + for uuid in uuids: + wait(uuid) + # Ensure all bundles ran on the same worker. + remote = get_info(uuids[0], "remote") + assert all([get_info(uuid, "remote") == remote for uuid in uuids]) + + @TestModule.register('store_add') def test_store_add(ctx): """ @@ -2946,6 +2979,10 @@ def test_unicode(ctx): @TestModule.register('workers') def test_workers(ctx): + # Spin up a run in case a worker isn't already running, so it can be started by the worker manager. + uuid = _run_command([cl, 'run', 'echo']) + wait(uuid) + result = _run_command([cl, 'workers']) lines = result.split("\n") @@ -2979,6 +3016,45 @@ def test_workers(ctx): worker_info = lines[2].split() assert len(worker_info) >= 10 + # Make sure that when we run a worker that uses resources, the worker's available resources are decremented accordingly. + cpus_original, gpus_original, free_memory_original, free_disk_original = worker_info[1:5] + cpus_used, cpus_total = (int(i) for i in cpus_original.split("/")) + gpus_used, gpus_total = (int(i) for i in gpus_original.split("/")) + free_memory_original = int(free_memory_original) + free_disk_original = int(free_disk_original) + uuid = _run_command( + [ + cl, + 'run', + 'sleep 100', + '--request-cpus', + str(cpus_total - cpus_used), + '--request-gpus', + str(gpus_total - gpus_used), + ], + request_memory=free_memory_original - 1024, + request_disk=free_disk_original - 1024, + ) + wait_until_state(uuid, State.RUNNING) + result = _run_command([cl, 'workers']) + lines = result.split("\n") + worker_info = lines[2].split() + cpus, gpus, free_memory, free_disk = worker_info[1:5] + check_equals(f'{cpus_total}/{cpus_total}', cpus) + check_equals(f'{gpus_total}/{gpus_total}', gpus) + check_equals('0', free_memory) + check_equals('0', free_disk) + + wait(uuid) + result = _run_command([cl, 'workers']) + lines = result.split("\n") + worker_info = lines[2].split() + cpus, gpus, free_memory, free_disk = worker_info[1:5] + check_equals(cpus_original, cpus) + check_equals(gpus_original, gpus) + check_equals(free_memory_original, free_memory) + check_equals(free_disk_original, free_disk) + @TestModule.register('sharing_workers') def test_sharing_workers(ctx): diff --git a/tests/unit/server/bundle_manager/make_bundles_test.py b/tests/unit/server/bundle_manager/make_bundles_test.py index c66caf731..5e4a23438 100644 --- a/tests/unit/server/bundle_manager/make_bundles_test.py +++ b/tests/unit/server/bundle_manager/make_bundles_test.py @@ -167,7 +167,6 @@ def test_blob_storage_dependency(self): unpack=True, use_azure_blob_beta=True, ) - self.make_bundles_and_wait() bundle = self.bundle_manager._model.get_bundle(bundle.uuid) diff --git a/tests/unit/server/upload_download_test.py b/tests/unit/server/upload_download_test.py index 7365a2483..e79d5f324 100644 --- a/tests/unit/server/upload_download_test.py +++ b/tests/unit/server/upload_download_test.py @@ -71,6 +71,7 @@ def test_not_found(self): def check_file_target_contents(self, target): """Checks to make sure that the specified file has the contents 'hello world'.""" + # This can not be checked, Since with self.download_manager.stream_file(target, gzipped=False) as f: self.assertEqual(f.read(), b"hello world")