diff --git a/.clang-format b/.clang-format index 525f560..5846e19 100644 --- a/.clang-format +++ b/.clang-format @@ -28,7 +28,7 @@ SpacesInAngles: 'false' SpacesInContainerLiterals: 'false' SpacesInParentheses: 'false' SpacesInSquareBrackets: 'false' -Standard: c++17 +Standard: c++20 UseTab: Never SortIncludes: true ColumnLimit: 100 diff --git a/CMakeLists.txt b/CMakeLists.txt index 2616072..9c219cd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ -cmake_minimum_required(VERSION 3.18) +cmake_minimum_required(VERSION 3.20) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) @@ -20,7 +20,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) set(CMAKE_POSITION_INDEPENDENT_CODE ON) -add_library(spns STATIC +add_library(spns-hivemind SHARED spns/hivemind.cpp spns/pg.cpp spns/swarmpubkey.cpp @@ -35,6 +35,8 @@ find_package(PkgConfig REQUIRED) pkg_check_modules(SODIUM REQUIRED IMPORTED_TARGET libsodium>=1.0.18) pkg_check_modules(OXENC REQUIRED IMPORTED_TARGET liboxenc>=1.4.0) pkg_check_modules(OXENMQ REQUIRED IMPORTED_TARGET liboxenmq>=1.2.22) +pkg_check_modules(LIBQUIC REQUIRED IMPORTED_TARGET liboxenquic>=1.7) +pkg_check_modules(FMT REQUIRED IMPORTED_TARGET fmt>=10) pkg_check_modules(OXENLOGGING REQUIRED IMPORTED_TARGET liboxen-logging>=1.2.0) pkg_check_modules(NLOHMANN_JSON REQUIRED IMPORTED_TARGET nlohmann_json>=3.7.0) pkg_check_modules(SYSTEMD REQUIRED IMPORTED_TARGET libsystemd) @@ -52,26 +54,44 @@ endif() include(cmake/submodule_check.cmake) include(cmake/system_or_submodule.cmake) -system_or_submodule(LIBPQXX libpqxx libpqxx::pqxx libpqxx>=7.10.0 libpqxx) +system_or_submodule(LIBPQXX libpqxx libpqxx::pqxx libpqxx>=8 libpqxx) -target_link_libraries(spns PRIVATE +target_link_libraries(spns-hivemind PRIVATE PkgConfig::SODIUM PkgConfig::OXENC PkgConfig::OXENMQ - PkgConfig::OXENLOGGING + PkgConfig::LIBQUIC PkgConfig::NLOHMANN_JSON PkgConfig::SYSTEMD PUBLIC - libpqxx::pqxx) + libpqxx::pqxx + PkgConfig::OXENLOGGING + PkgConfig::FMT) -set_target_properties(spns PROPERTIES INTERPROCEDURAL_OPTIMIZATION ON) +set_target_properties(spns-hivemind PROPERTIES INTERPROCEDURAL_OPTIMIZATION ON) set(PYBIND11_FINDPYTHON ON CACHE INTERNAL "") add_subdirectory(pybind11) pybind11_add_module( - core + spns_hivemind spns/pybind.cpp) +target_link_libraries(spns_hivemind PUBLIC spns-hivemind) + +add_executable(spns spns/main.cpp) +target_link_libraries(spns PUBLIC spns-hivemind PRIVATE pybind11::embed) +set_target_properties(spns PROPERTIES + INSTALL_RPATH "$ORIGIN/../${CMAKE_INSTALL_LIBDIR}" + OUTPUT_NAME SPNS) + +include(GNUInstallDirs) +install(TARGETS spns spns-hivemind + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} +) -target_link_libraries(core PUBLIC spns) -set_target_properties(core PROPERTIES - LIBRARY_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR}/spns) +execute_process(COMMAND ${Python_EXECUTABLE} -m site --user-site + OUTPUT_VARIABLE PYTHON_USER_SITE + OUTPUT_STRIP_TRAILING_WHITESPACE) +install(TARGETS spns_hivemind DESTINATION ${PYTHON_USER_SITE}) +cmake_path(ABSOLUTE_PATH CMAKE_INSTALL_PREFIX NORMALIZE OUTPUT_VARIABLE install_path) +set_target_properties(spns_hivemind PROPERTIES INSTALL_RPATH "${install_path}/${CMAKE_INSTALL_LIBDIR}") diff --git a/libpqxx b/libpqxx index 1ca80b0..5899bd8 160000 --- a/libpqxx +++ b/libpqxx @@ -1 +1 @@ -Subproject commit 1ca80b0e638f6182426c5b11255069cae4fbd542 +Subproject commit 5899bd8236f2ac7d36e7a92d9d65a27635709fe8 diff --git a/spns.ini.example b/spns.ini.example index a7c68e5..b14d7a0 100644 --- a/spns.ini.example +++ b/spns.ini.example @@ -21,13 +21,18 @@ listen = ipc://./hivemind.sock # connecting to the listen_curve address. listen_curve_admin = 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef +# Optional address(es) on which we listen for public quic requests (i.e. subscribe or unsubscribe +# from clients). This is an alternative to submitting via onion request can be used instead of +# OMQ, e.g. over Session Router. `[keys]:quic` must be set when this option is enabled. +# +# The first address in this list that is either an IPv4 address, or the IPv6 any address (`[::]`) +# will be used for *outgoing* connections to SNs. +quic_listen = [::]:5522 + # OMQ address where we can make RPC requests to oxend. This can be a local oxend (e.g. # ipc:///path/to/oxend.sock), a plaintext address, or a curve-encrypted address with pubkey. oxend_rpc = tcp://localhost:22029 -# How often (in seconds) the main process re-checks existing subscriptions (for push renewals, expiries, etc.) -subs_interval = 10 - # How many SN connections we attempt to establish at once. Can be large to make a huge burst of # connections at startup, or lower to pace those connections a little more. max_connects = 1000 @@ -37,15 +42,6 @@ max_connects = 1000 # often last somewhat longer than this, but this is the minimum). filter_lifetime = 300 -# How many separate oxenmq instances to start to talk to network service nodes. Increasing this can -# be helpful if a single oxenmq instance (typically the proxy thread) starts bottlenecking under -# heavy load. If this is set to 1 or greater then this many extra servers are started and each -# connection to a remote service node is assigned in round-robin order across the instances, while -# the main local oxenmq instance will be used only for non-push requests (subscriptions, timers, -# communication with notifiers, local admin stats endpoints, etc.). If unset or set to 0 then just -# one oxenmq instance will be used for everything (both local and push traffic). -#omq_push_instances = 4 - # How long the main hivemind process will wait at startup before establishing connections to # the network's service nodes. This delay is designed to allow subordinate notification processes # to connect to the hivemind to ensure that notification services are ready after a restart before @@ -61,18 +57,21 @@ startup_wait = 8.0 [keys] -# This section lists the files containing keys needed by the PN server. Each file is the 32-bytes -# private key, either in binary or as a single 64-character hex string. -# -# You can generate these X25519 keys using: +# This section lists the files containing keys needed by the PN server. + + +# These two keys are X25519 keys used for internal hivemind communications and for decode onion +# requests. You can generate these X25519 keys using: # # ./make-x25519-key.py FILENAME # - hivemind = key_x25519 - onionreq = onionreq_x25519 +# This key is the Ed25519 used for the QUIC listener, if QUIC listening is enabled via the +# [hivemind] quic_listen option. You can create it using `session-router-config --key FILENAME` +# (from the session-router-bin package). +quic = key_ed25519 [notify-firebase] diff --git a/spns/bytes.hpp b/spns/bytes.hpp index d74635e..162c88d 100644 --- a/spns/bytes.hpp +++ b/spns/bytes.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -19,12 +20,14 @@ struct bytes : std::array { static constexpr size_t SIZE = N; using std::array::data; - std::basic_string_view view() const { return {data(), SIZE}; } - std::string_view sv() const { return {reinterpret_cast(data()), SIZE}; } - std::basic_string_view usv() const { - return {reinterpret_cast(data()), SIZE}; + constexpr std::span span() const { return {data(), SIZE}; } + template + std::span span() const { + return std::span{reinterpret_cast(data()), SIZE}; } + std::string_view sv() const { return {reinterpret_cast(data()), SIZE}; } + std::string hex() const { return oxenc::to_hex(this->begin(), this->end()); } // Implicit conversion to unsigned char* for easier passing into libsodium functions @@ -52,6 +55,7 @@ concept bytes_subtype = is_bytes; struct AccountID : bytes<33> {}; struct Ed25519PK : bytes<32> {}; +struct Ed25519Secret : bytes<64> {}; struct X25519PK : bytes<32> {}; struct X25519SK : bytes<32> {}; struct SubaccountTag : bytes<36> {}; @@ -78,16 +82,6 @@ struct Subaccount { } }; -template , int> = 0> -inline std::basic_string_view as_bsv(const T& v) { - return {reinterpret_cast(v.data()), T::SIZE}; -} - -template , int> = 0> -inline std::basic_string_view as_usv(const T& v) { - return {reinterpret_cast(v.data()), v.size()}; -} - // std::hash-implementing class that "hashes" by just reading the size_t-size bytes starting at the // 16th byte. template && (T::SIZE >= 32)>> diff --git a/spns/conf.py b/spns/conf.py new file mode 100644 index 0000000..70250d4 --- /dev/null +++ b/spns/conf.py @@ -0,0 +1,226 @@ +import configparser +import os +import re +import logging +import coloredlogs +from nacl.public import PrivateKey +from nacl.signing import SigningKey +import spns_hivemind + +logger = logging.getLogger("spns") + +# Set up colored logging; we come back to set the level once we know it +coloredlogs.install(milliseconds=True, isatty=True, logger=logger) + +# Keypairs; the "hivemind" and "quic" keys in here gets set in `config` for the main hivemind +# instance; "onionreq" is the main onionreq keypair; other keys can be set as well (e.g. for +# notifiers). +PRIVKEYS = {} +PUBKEYS = {} + +# We stash anything in a `[notify-xyz]` into `NOTIFY['xyz']` for notifiers to piggyback on the +# config. +NOTIFY = {} + + +truthy = ("y", "yes", "Y", "Yes", "true", "True", "on", "On", "1") +falsey = ("n", "no", "N", "No", "false", "False", "off", "Off", "0") +booly = truthy + falsey + + +def looks_true(val): + """Returns True if val is a common true value, False if a common false value, None if neither.""" + if val in truthy: + return True + if val in falsey: + return False + return None + + +def load_config(conf_ini: str | None = None) -> spns_hivemind.Config: + if conf_ini: + if not os.path.exists(conf_ini): + raise RuntimeError(f"Given config ini file {conf_ini} does not exist") + elif "SPNS_CONFIG" in os.environ: + conf_ini = os.environ["SPNS_CONFIG"] + if conf_ini and not os.path.exists(conf_ini): + raise RuntimeError( + f"SPNS_CONFIG={conf_ini} specified, but path does not exist!" + ) + + if not conf_ini: + conf_ini = "spns.ini" + if not os.path.exists(conf_ini): + raise RuntimeError( + "spns.ini does not exist; either create it or specify a path using the conf_ini= argument, " + "or the SPNS_CONFIG=... environment variable to specify an alternate config file" + ) + + logger.info(f"Loading config from {conf_ini}") + cp = configparser.ConfigParser() + cp.read(conf_ini) + + # Set log level up first so that it's here for the rest of the settings + if "log" in cp.sections() and "level" in cp["log"]: + coloredlogs.install(level=cp["log"]["level"], logger=logger) + + def path_exists(path): + return not path or os.path.exists(path) + + def val_or_none(v): + return v or None + + def days_to_seconds(v): + return float(v) * 86400.0 + + def days_to_seconds_or_none(v): + return days_to_seconds(v) if v else None + + def set_of_strs(v): + return {s for s in re.split("[,\\s]+", v) if s != ""} + + def bool_opt(name): + return (name, lambda x: x in booly, lambda x: x in truthy) + + # Map of: section => { param => ('config_property', test lambda, value lambda) } + # config_property is the string name of the config object property to set + # test lambda returns True/False for validation (if None/omitted, accept anything) + # value lambda extracts the value (if None/omitted use str value as-is) + setting_map = { + "db": {"url": ("pg_connect", lambda x: x.startswith("postgresql"))}, + # 'keys': ... special handling ... + "hivemind": { + "subs_interval": ("subs_interval", None, int), + "max_connects": ("max_pending_connects", None, int), + "filter_lifetime": ("filter_lifetime", None, int), + "startup_wait": ("notifier_wait", None, lambda x: round(1000 * float(x))), + "notifiers_expected": ( + "notifiers_expected", + None, + lambda x: set(z for z in (y.strip() for y in x.split(",")) if z), + ), + "listen": ("hivemind_sock", lambda x: re.search("^(?:tcp|ipc)://.", x)), + "listen_curve": ("hivemind_curve", lambda x: re.search("^tcp://.", x)), + "listen_curve_admin": ( + "hivemind_curve_admin", + lambda x: re.search(r"^(?:[a-fA-F0-9]{64}\s+)*[a-fA-F0-9]{64}\s*$", x), + lambda x: set(bytes.fromhex(y) for y in x.split() if y), + ), + "oxend_rpc": ( + "oxend_rpc", + lambda x: re.search("^(?:tcp|ipc|curve)://.", x), + ), + "quic_listen": ( + "quic_listen", + lambda x: re.search( + r"^(?:(?:\[[0-9a-fA-F:.]+\]|(?:\d+\.){3}\d+):\d+(?=\s|$)\s*)+$", x + ), + lambda x: [y for y in x.split() if y], + ), + }, + } + + config = spns_hivemind.Config() + + def parse_option(fields, s, opt): + if opt not in fields: + logger.warning(f"Ignoring unknown config setting [{s}].{opt} in {conf_ini}") + return + conf = fields[opt] + value = cp[s][opt] + + assert isinstance(conf, tuple) and 1 <= len(conf) <= 3 + nonlocal config + assert hasattr(config, conf[0]) + + if len(conf) >= 2 and conf[1]: + if not conf[1](value): + raise RuntimeError(f"Invalid value [{s}].{opt}={value} in {conf_ini}") + + if len(conf) >= 3 and conf[2]: + value = conf[2](value) + + logger.debug(f"Set config.{conf[0]} = {value}") + setattr(config, conf[0], value) + + for s in cp.sections(): + if s == "keys": + for opt in cp["keys"]: + rawlen = None + if opt in ("hivemind", "onionreq"): + rawlen = 32 + elif opt in ("quic"): + rawlen = 64 + else: + raise RuntimeError(f"Don't know key type for key '{opt}'") + + keybytes = None + filename = cp["keys"][opt] + with open(filename, "rb") as f: + keybytes = f.read() + if len(keybytes) >= 2 * rawlen: + # Assume hex-encoded + keyhex = keybytes.decode().strip() + if len(keyhex) != 2 * rawlen: + raise RuntimeError( + f"Could not read '{filename}' for option [keys]{opt}: invalid file size" + ) + if any(x not in "0123456789abcdefABCDEF" for x in keyhex): + raise RuntimeError( + f"Could not read '{filename}' for option [keys]{opt}: expected bytes or hex" + ) + + keybytes = bytes.fromhex(keyhex) + + elif len(keybytes) != rawlen: + raise RuntimeError( + f"Could not read '{filename}' for option [keys]{opt}: invalid file size" + ) + + if rawlen == 32: # X25519 privkey + PRIVKEYS[opt] = PrivateKey(keybytes) + PUBKEYS[opt] = PRIVKEYS[opt].public_key + else: + PRIVKEYS[opt] = SigningKey(keybytes[0:32]) + PUBKEYS[opt] = PRIVKEYS[opt].verify_key + + if PUBKEYS[opt].encode() != keybytes[32:]: + raise RuntimeError( + f"[keys]{opt} keypair invalid: seed (bytes 0-31) does not yield pubkey (bytes 32-63)" + ) + + logger.info( + f"Loaded {opt} keypair with pubkey {PUBKEYS[opt].encode().hex()}" + ) + elif s == "log": + for opt in cp["log"]: + if opt == "level": + spns_hivemind.logger.set_level(cp["log"][opt]) + elif opt.startswith("level-") and len(opt) > 6: + logger.warning( + f"{opt} = ... is deprecated; use a compound level=... instead" + ) + spns_hivemind.logger.set_level(f'{opt[6:]}={cp["log"][opt]}') + else: + logger.warning( + f"Ignoring unknown log item [log] {opt} in {conf_ini}" + ) + + elif s.startswith("notify-"): + for opt in cp[s]: + NOTIFY.setdefault(s[7:], {})[opt] = cp[s][opt] + + elif s in setting_map: + for opt in cp[s]: + parse_option(setting_map[s], s, opt) + + else: + logger.warning(f"Ignoring unknown section [{s}] in {conf_ini}") + + config.omq_privkey = PRIVKEYS["hivemind"].encode() + config.omq_pubkey = PUBKEYS["hivemind"].encode() + + if "quic" in PRIVKEYS: + config.quic_keys = PRIVKEYS["quic"].encode() + PUBKEYS["quic"].encode() + + return config diff --git a/spns/config.hpp b/spns/config.hpp index a72acc9..1525e32 100644 --- a/spns/config.hpp +++ b/spns/config.hpp @@ -4,8 +4,8 @@ #include #include +#include #include -#include #include #include "bytes.hpp" @@ -27,8 +27,15 @@ struct Config { std::unordered_set hivemind_curve_admin; // The main hivemind omq listening keypair. Must be set explicitly. - X25519PK pubkey; - X25519SK privkey; + X25519PK omq_pubkey; + X25519SK omq_privkey; + + // Listening address(es) for QUIC + std::vector quic_listen; + + // The Ed25519 keypair to use for QUIC connections. This is the 64-byte libsodium secret key + // value (i.e. the 32 byte seed followed by 32 byte pubkey). + std::optional quic_keys; std::chrono::seconds filter_lifetime = 10min; @@ -40,15 +47,6 @@ struct Config { // have a registered notifier for all of the services in this set. std::unordered_set notifiers_expected; - // How often we recheck for re-subscriptions for push renewals, expiries, etc. - std::chrono::seconds subs_interval = 30s; - - // Number of extra oxenmq instances to start up for push notifications. If 0 then no extra ones - // are started and just the main oxenmq instance is used for everything. The extra instances - // are used exlusively for push notifications; each connection to a new SN is round-robin - // assigned across the instances. - int omq_push_instances = 0; - // Maximum connections we will attempt to establish simultaneously (we can have more, we just // won't try to open more than this at once until some succeed or fail). You can set this to 0 // for a "dry run" mode where no connections at all will be made. diff --git a/spns/config.py b/spns/config.py index 4498e2b..a597ffd 100644 --- a/spns/config.py +++ b/spns/config.py @@ -1,199 +1,5 @@ -import configparser -import os -import re -import logging -import coloredlogs -from nacl.public import PrivateKey -from spns.core import Config, logger as core_logger -import oxenmq +# Same as spns.conf except that it loads a config on initialization (available as config). -logger = logging.getLogger("spns") +from .conf import logger, PRIVKEYS, PUBKEYS, NOTIFY, load_config -# Set up colored logging; we come back to set the level once we know it -coloredlogs.install(milliseconds=True, isatty=True, logger=logger) - -# Global config; we set values in here, then pass it to HiveMind during startup. -config = Config() - -# Keypairs; the "hivemind" key in here gets set in `config` for the main hivemind instance; -# "onionreq" is the main onionreq keypair; other keys can be set as well (e.g. for notifiers). -PRIVKEYS = {} -PUBKEYS = {} - -# We stash anything in a `[notify-xyz]` into `NOTIFY['xyz']` for notifiers to piggyback on the -# config. -NOTIFY = {} - -# Will be true if we're running as a uwsgi app, false otherwise; used where we need to do things -# only in one case or another (e.g. database initialization only via app mode). -RUNNING_AS_APP = False -try: - import uwsgi # noqa: F401 - - RUNNING_AS_APP = True -except ImportError: - pass - - -truthy = ("y", "yes", "Y", "Yes", "true", "True", "on", "On", "1") -falsey = ("n", "no", "N", "No", "false", "False", "off", "Off", "0") -booly = truthy + falsey - - -def looks_true(val): - """Returns True if val is a common true value, False if a common false value, None if neither.""" - if val in truthy: - return True - if val in falsey: - return False - return None - - -def load_config(): - if "SPNS_CONFIG" in os.environ: - conf_ini = os.environ["SPNS_CONFIG"] - if conf_ini and not os.path.exists(conf_ini): - raise RuntimeError(f"SPNS_CONFIG={conf_ini} specified, but path does not exist!") - else: - conf_ini = "spns.ini" - if not os.path.exists(conf_ini): - raise RuntimeError( - "spns.ini does not exist; either create it or use SPNS_CONFIG=... to specify an" - " alternate config file" - ) - - if not conf_ini: - return - - logger.info(f"Loading config from {conf_ini}") - cp = configparser.ConfigParser() - cp.read(conf_ini) - - # Set log level up first so that it's here for the rest of the settings - if "log" in cp.sections() and "level" in cp["log"]: - coloredlogs.install(level=cp["log"]["level"], logger=logger) - - def path_exists(path): - return not path or os.path.exists(path) - - def val_or_none(v): - return v or None - - def days_to_seconds(v): - return float(v) * 86400.0 - - def days_to_seconds_or_none(v): - return days_to_seconds(v) if v else None - - def set_of_strs(v): - return {s for s in re.split("[,\\s]+", v) if s != ""} - - def bool_opt(name): - return (name, lambda x: x in booly, lambda x: x in truthy) - - # Map of: section => { param => ('config_property', test lambda, value lambda) } - # global is the string name of the global variable to set - # test lambda returns True/False for validation (if None/omitted, accept anything) - # value lambda extracts the value (if None/omitted use str value as-is) - setting_map = { - "db": {"url": ("pg_connect", lambda x: x.startswith("postgresql"))}, - # 'keys': ... special handling ... - "hivemind": { - "subs_interval": ("subs_interval", None, int), - "max_connects": ("max_pending_connects", None, int), - "filter_lifetime": ("filter_lifetime", None, int), - "omq_push_instances": ("omq_push_instances", None, int), - "startup_wait": ("notifier_wait", None, lambda x: round(1000 * float(x))), - "notifiers_expected": ( - "notifiers_expected", - None, - lambda x: set(z for z in (y.strip() for y in x.split(",")) if z), - ), - "listen": ("hivemind_sock", lambda x: re.search("^(?:tcp|ipc)://.", x)), - "listen_curve": ("hivemind_curve", lambda x: re.search("^tcp://.", x)), - "listen_curve_admin": ( - "hivemind_curve_admin", - lambda x: re.search("^(?:[a-fA-F0-9]{64}\s+)*[a-fA-F0-9]{64}\s*$", x), - lambda x: set(bytes.fromhex(y) for y in x.split() if y), - ), - "oxend_rpc": ("oxend_rpc", lambda x: re.search("^(?:tcp|ipc|curve)://.", x)), - }, - } - - def parse_option(fields, s, opt): - if opt not in fields: - logger.warning(f"Ignoring unknown config setting [{s}].{opt} in {conf_ini}") - return - conf = fields[opt] - value = cp[s][opt] - - assert isinstance(conf, tuple) and 1 <= len(conf) <= 3 - global config - assert hasattr(config, conf[0]) - - if len(conf) >= 2 and conf[1]: - if not conf[1](value): - raise RuntimeError(f"Invalid value [{s}].{opt}={value} in {conf_ini}") - - if len(conf) >= 3 and conf[2]: - value = conf[2](value) - - logger.debug(f"Set config.{conf[0]} = {value}") - setattr(config, conf[0], value) - - for s in cp.sections(): - if s == "keys": - for opt in cp["keys"]: - filename = cp["keys"][opt] - with open(filename, "rb") as f: - keybytes = f.read() - if len(keybytes) == 32: - privkey = PrivateKey(keybytes) - else: - # Assume hex-encoded - keyhex = keybytes.decode().strip() - if len(keyhex) != 64: - raise RuntimeError( - f"Could not read '{filename}' for option [keys]{opt}: invalid file size" - ) - if any(x not in "0123456789abcdefABCDEF" for x in keyhex): - raise RuntimeError( - f"Could not read '{filename}' for option [keys]{opt}: expected bytes or hex" - ) - - privkey = PrivateKey(bytes.fromhex(keyhex)) - PRIVKEYS[opt] = privkey - PUBKEYS[opt] = privkey.public_key - - logger.info( - f"Loaded {opt} X25519 keypair with pubkey {PUBKEYS[opt].encode().hex()}" - ) - elif s == "log": - for opt in cp["log"]: - if opt == "level": - core_logger.set_level(cp["log"][opt]) - elif opt.startswith("level-") and len(opt) > 6: - core_logger.set_level(opt[6:], cp["log"][opt]) - else: - logger.warning(f"Ignoring unknown log item [log] {opt} in {conf_ini}") - - elif s.startswith("notify-"): - for opt in cp[s]: - NOTIFY.setdefault(s[7:], {})[opt] = cp[s][opt] - - elif s in setting_map: - for opt in cp[s]: - parse_option(setting_map[s], s, opt) - - else: - logger.warning(f"Ignoring unknown section [{s}] in {conf_ini}") - - config.privkey = PRIVKEYS["hivemind"].encode() - config.pubkey = PUBKEYS["hivemind"].encode() - - -try: - load_config() -except Exception as e: - logger.critical(f"Failed to load config: {e}") - raise +config = load_config() diff --git a/spns/hive/snode.cpp b/spns/hive/snode.cpp index 6c5a6f2..9609f99 100644 --- a/spns/hive/snode.cpp +++ b/spns/hive/snode.cpp @@ -1,18 +1,23 @@ #include "snode.hpp" +#include #include #include #include +#include #include #include -#include #include +#include +#include +#include #include #include #include "../bytes.hpp" #include "../hivemind.hpp" +#include "subscription.hpp" namespace spns::hive { @@ -24,9 +29,8 @@ using namespace std::literals; thread_local std::mt19937_64 rng{std::random_device{}()}; -SNode::SNode(HiveMind& hivemind, oxenmq::OxenMQ& omq, oxenmq::address addr, uint64_t swarm) : +SNode::SNode(HiveMind& hivemind, quic::RemoteAddress addr, uint64_t swarm) : hivemind_{hivemind}, - omq_{omq}, addr_{std::move(addr)}, swarm_{swarm} @@ -35,109 +39,117 @@ SNode::SNode(HiveMind& hivemind, oxenmq::OxenMQ& omq, oxenmq::address addr, uint } void SNode::connect() { - std::lock_guard lock{mutex_}; - - if (!conn_) { - if (hivemind_.allow_connect()) { - conn_ = omq_.connect_remote( - addr_, - [this](oxenmq::ConnectionID c) { on_connected(c); }, - [this](oxenmq::ConnectionID c, std::string_view err) { - on_connect_fail(c, err); - }, - oxenmq::AuthLevel::basic); - log::debug(cat, "Establishing connection to {}", addr_.full_address()); - } + assert(hivemind_.loop().inside()); + if (conn_) + return; + if (cooldown_until_) { + if (*cooldown_until_ > std::chrono::steady_clock::now()) + return; + cooldown_until_.reset(); } -} + if (!hivemind_.allow_connect()) + return; -void SNode::connect(oxenmq::address addr) { - bool reconnect; - { - std::lock_guard lock{mutex_}; - reconnect = addr != addr_; - } + conn_ = hivemind_.quic_out().connect( + addr_, + hivemind_.creds_out(), + [this](quic::Connection& c) { on_connected(c); }, + [this](quic::Connection& c, uint64_t ec) { on_disconnected(c, ec); }, + quic::opt::keep_alive{10s}); + stream_ = conn_->open_stream(); + stream_->register_handler("notify", [this](quic::message msg) { on_notify(std::move(msg)); }); + + log::debug( + cat, "Initiated connection to {} @ {}", oxenc::to_hex(addr_.view_remote_key()), addr_); +} - if (reconnect) { - log::debug( - cat, - "disconnecting; addr changing from {} to {}", - addr_.full_address(), - addr.full_address()); +void SNode::connect(quic::RemoteAddress addr) { + assert(hivemind_.loop().inside()); + if (addr != addr_) { + log::debug(cat, "disconnecting; addr changing from {} to {}", addr_, addr); disconnect(); - { - std::lock_guard lock{mutex_}; - addr_ = std::move(addr); - } + addr_ = std::move(addr); } connect(); } void SNode::disconnect() { - std::lock_guard lock{mutex_}; - - log::debug(cat, "disconnecting from {}", addr_.full_address()); + assert(hivemind_.loop().inside()); connected_ = false; if (conn_) { - omq_.disconnect(conn_); - conn_ = {}; + log::debug(cat, "disconnecting from {}", addr_); + stream_.reset(); + conn_->close_connection(); + conn_.reset(); } } -void SNode::on_connected(oxenmq::ConnectionID c) { +void SNode::on_connected(quic::Connection& c) { + assert(hivemind_.loop().inside()); bool no_conn = false; - { - std::lock_guard lock{mutex_}; - - log::debug(cat, "Connection established to {}", addr_.full_address()); - cooldown_fails_ = 0; - cooldown_until_.reset(); - - if (!conn_) { - // Our conn got replaced from under us, which probably means we are disconnecting, so do - // nothing. - no_conn = true; - } else { - // We either just connected or reconnected, so reset any re-subscription times (so that - // after a reconnection we force a re-subscription for everyone): - auto now = system_clock::now(); - for (auto& [id, next] : next_) - next = system_epoch; + log::debug(cat, "Connection established to {}", addr_); + cooldown_fails_ = 0; + cooldown_until_.reset(); - connected_ = true; - } + if (!conn_) { + // Our conn got replaced from under us, which probably means we are disconnecting, so do + // nothing. + no_conn = true; + } else { + // We either just connected or reconnected, so reset any re-subscription times (so that + // after a reconnection we force a re-subscription for everyone): + auto now = system_clock::now(); + for (auto& [id, next] : next_) + next = system_epoch; + + connected_ = true; } hivemind_.finished_connect(); if (!no_conn) - hivemind_.check_my_subs(*this, true); + hivemind_.check_my_subs(*this); } -void SNode::on_connect_fail(oxenmq::ConnectionID c, std::string_view reason) { - { - std::lock_guard lock{mutex_}; - - auto cooldown = cooldown_fails_ >= CONNECT_COOLDOWN.size() - ? CONNECT_COOLDOWN.back() - : CONNECT_COOLDOWN[cooldown_fails_]; - cooldown_until_ = steady_clock::now() + cooldown; - cooldown_fails_++; - - log::warning( - cat, - "Connection to {} failed: {} ({} consecutive failure(s); retrying in {}s)", - addr_.full_address(), - reason, - cooldown_fails_, - cooldown.count()); - - connected_ = false; - conn_ = {}; +void SNode::on_disconnected(quic::Connection& c, uint64_t ec) { + assert(hivemind_.loop().inside()); + bool is_failed_connect = !connected_.exchange(false); + + if (hivemind_.has_quic_out()) { + // If we don't have a quic object that means we're shutting down and this is the callback + // fired during shutdown, so don't do anything. + if (is_failed_connect) { + auto cooldown = cooldown_fails_ >= CONNECT_COOLDOWN.size() + ? CONNECT_COOLDOWN.back() + : CONNECT_COOLDOWN[cooldown_fails_]; + cooldown_until_ = steady_clock::now() + cooldown; + cooldown_fails_++; + + log::warning( + cat, + "Connection to {} failed (ec={}). {} consecutive failure(s); retrying in {}", + addr_, + ec, + cooldown_fails_, + cooldown); + } else { + log::warning( + cat, + "Disconnected from {} (ec={}); reconnecting in {}", + addr_, + ec, + RECONNECT_WAIT); + cooldown_until_ = steady_clock::now() + RECONNECT_WAIT; + cooldown_fails_ = 0; + } } - hivemind_.finished_connect(); + stream_.reset(); + conn_.reset(); + + if (hivemind_.has_quic_out() && is_failed_connect) + hivemind_.finished_connect(); } /// Adds a new account to be signed up for subscriptions, if it is not already subscribed. @@ -147,7 +159,10 @@ void SNode::on_connect_fail(oxenmq::ConnectionID c, std::string_view reason) { /// If `force_now` is True then the account is scheduled for subscription at the next update /// even if already exists. void SNode::add_account(const SwarmPubkey& account, bool force_now) { - std::lock_guard lock{mutex_}; + // This isn't *always* called from directly inside: we also call it in multithreaded batch jobs + // where we *have* blocked the loop, but do work in threads (and don't touch different SNodes + // from different worker threads). + // assert(hivemind_.loop().inside()); auto [it, inserted] = subs_.insert(account); if (inserted) @@ -166,7 +181,7 @@ void SNode::add_account(const SwarmPubkey& account, bool force_now) { } void SNode::reset_swarm(uint64_t new_swarm) { - std::lock_guard lock{mutex_}; + assert(hivemind_.loop().inside()); next_.clear(); subs_.clear(); @@ -174,7 +189,9 @@ void SNode::reset_swarm(uint64_t new_swarm) { } void SNode::remove_stale_swarm_members(const std::vector& swarm_ids) { - std::lock_guard lock{mutex_}; + // We *aren't* actually directly inside the loop when this is called: we are called from a + // multithreaded batch job where the loop is blocked pending completion of the jobs. + // assert(hivemind_.loop().inside()); for (auto& s : subs_) s.update_swarm(swarm_ids); @@ -187,42 +204,106 @@ void SNode::remove_stale_swarm_members(const std::vector& swarm_ids) { } void SNode::check_subs( - const std::unordered_map>& all_subs, - bool initial_subs, - bool fast) { + const std::unordered_map>& all_subs) { + assert(hivemind_.loop().inside()); + // log::trace(cat, "check subs"); if (!connected_) { - { - std::lock_guard lock{mutex_}; - - if (conn_) - return; // We're already trying to connect - - // If we failed recently we'll be in cooldown mode for a while, so might not connect - // right away yet. - if (cooldown_until_) { - if (*cooldown_until_ > steady_clock::now()) - return; - cooldown_until_.reset(); - } + if (conn_) + return; // We're already trying to connect + + // If we failed recently we'll be in cooldown mode for a while, so might not connect + // right away yet. + if (cooldown_until_) { + if (*cooldown_until_ > steady_clock::now()) + return; + cooldown_until_.reset(); } // We'll get called automatically as soon as the connection gets established, so just // make sure we are already connecting and don't do anything else for now. - return connect(); // NB: must not hold lock when calling this + connect(); + return; } - std::string req_body = "l"; // We'll add the "e" later auto now = system_clock::now(); + auto req = std::make_optional(); + + size_t subreq_count = 0, next_added = 0, req_count = 0; + + bool rate_limited = false; + + auto submit_req = [&req, &subreq_count, &rate_limited, this] { + stream_->command( + "monitor", + std::move(*req).str(), + /*timeout=*/60s, + [this, expected_count = subreq_count, rate_limited](quic::message response) { + if (!response) { + log::warning( + cat, + "Subscriptions request of {} subscriptions to {} failed: {}", + expected_count, + addr_, + response.timed_out ? "timeout"sv : response.body()); + return; + } + + try { + oxenc::bt_list_consumer results{response.body()}; + int good = 0, bad = 0; + while (!results.is_finished()) { + auto r = results.consume_dict_consumer(); + if (r.skip_until("error")) { + bad++; + log::warning( + cat, "Subscription failure: {}", r.consume_string_view()); + } else if (r.skip_until("success")) + good++; + } + if (bad || good != expected_count) + log::warning( + cat, + "Subscriptions request for {} subscriptions to {} returned {} " + "success, {} failures", + expected_count, + addr_, + good, + bad); + else + log::debug( + cat, + "Successful subscription request to {} for {} subscriptions", + addr_, + good); + + } catch (const std::exception& e) { + log::warning( + cat, + "Failed to parse 'monitor' response from {}: {}", + addr_, + e.what()); + } + + // If our previous iteration we stopped because we hit the limit (rather than + // because we had nothing more to send) then check again to queue more + // immediately: + if (rate_limited) + hivemind_.loop().call_soon([this] { hivemind_.check_my_subs(*this); }); + }); + // There's currently no exposed way to clear the internal string in a bt producer, so just + // reset it. In theory if this is allocation bottlenecked we could revisit this to reuse an + // external, fixed buffer, but for now just reset it. + req.emplace(); + subreq_count = 0; + }; - size_t next_added = 0, req_count = 0; + auto sig_min = now - hive::Subscription::SIGNATURE_EXPIRY + 10s; + auto sig_max = now + hive::Subscription::SIGNATURE_EARLY; - std::lock_guard lock{mutex_}; - while (req_body.size() < SUBS_REQUEST_LIMIT && !next_.empty()) { + while (!next_.empty()) { const auto& [maybe_acct, next] = next_.front(); if (next > now) break; - if (fast && next > system_epoch) - break; if (!maybe_acct) { next_.pop_front(); @@ -237,29 +318,11 @@ void SNode::check_subs( continue; } - std::vector buf; for (const auto& sub : subs->second) { + if (sub.sig_ts < sig_min || sub.sig_ts > sig_max) + continue; - // Size estimate; this can be over, but mustn't be under the actual size we'll need. - constexpr size_t base_size = 0 + 3 + - 12 // 1:t and i...e where ... is a 10-digit timestamp - + 3 + 67 // 1:s and 64:... - + 3 + 36 // 1:p and 33:... (also covers 1:P and 32:...) - + 3 + 2 // 1:n and the le of the l...e list - + 3 + 3 // 1:d and i1e (only if want_data) - + 3 + 67 + 3 + 39 // 1:S, 64:..., 1:T, 36:... (for subaccount auth) - ; - - // The biggest int expression we have is i-32768e; this is almost certainly overkill - // most of the time though, but no matter. - auto size = base_size + sub.namespaces.size() * 8; - - auto old_size = req_body.size(); - req_body.resize(old_size + size); - - char* start = req_body.data() + old_size; - - oxenc::bt_dict_producer dict{start, size}; + auto dict = req->append_dict(); // keys in ascii-sorted order! if (acct.session_ed) @@ -270,15 +333,13 @@ void SNode::check_subs( } if (sub.want_data) dict.append("d", 1); - dict.append_list("n").extend(sub.namespaces.begin(), sub.namespaces.end()); + dict.append_list("n").extend(sub.namespaces.cbegin(), sub.namespaces.cend()); if (!acct.session_ed) dict.append("p", acct.id.sv()); dict.append("s", sub.sig.sv()); - dict.append("t", sub.sig_ts); - - // Resize away any extra buffer space we didn't fill - req_body.resize(dict.end() - req_body.data()); + dict.append("t", sub.sig_ts.time_since_epoch().count()); + subreq_count++; req_count++; } @@ -288,15 +349,19 @@ void SNode::check_subs( next_.emplace_back(acct, now + delay); next_added++; next_.pop_front(); - } - if (req_body.size() == 1) // just the initial "l" - return; + if (subreq_count >= SUBS_REQUEST_LIMIT) { + rate_limited = true; + break; + } + } - req_body += 'e'; + if (subreq_count) + submit_req(); - // The randomness of our delay will mean the tail of the list isn't sorted, so re-sort from - // the lowest possible value we could have inserted (now + RESUBSCRIBE_MIN) to the end. + // The randomness of our delay to the next re-subscription means that the tail of the list won't + // be sorted, so re-sort from the lowest possible value we could have inserted (now + + // RESUBSCRIBE_MIN) to the end. // Everything we didn't touch should already be sorted: assert(std::is_sorted( @@ -318,26 +383,16 @@ void SNode::check_subs( return a.second < b.second; })); - auto on_reply = [this, right_away = initial_subs && req_body.size() >= SUBS_REQUEST_LIMIT]( - bool success, std::vector data) { - if (!success) { - // TODO: log something about failed request, but otherwise ignore it. We don't - // worry about the subscriptions that might lapse because we have full swarm - // redundancy so it really doesn't matter if a subscription with one or two of - // the swarm members times out. - } - if (right_away) { - // We're doing the initial subscriptions, and sent a size-limited request so we - // likely have more that we want to subscribe to ASAP: so we continue as soon as - // we get the reply back so that we're subscribing as quickly as possible - // without having more than one (large) subscription request in flight at a - // time. - hivemind_.check_my_subs(*this, true); - } - }; + log::log( + cat, + req_count ? log::Level::debug : log::Level::trace, + "Submitted (re-)subscriptions to {} accounts on {}", + req_count, + addr_); +} - omq_.request(conn_, "monitor.messages", std::move(on_reply), std::move(req_body)); - log::debug(cat, "(Re-)subscribing to {} accounts from {}", req_count, addr_.full_address()); +void SNode::on_notify(quic::message msg) { + hivemind_.on_message_notification(std::move(msg)); } } // namespace spns::hive diff --git a/spns/hive/snode.hpp b/spns/hive/snode.hpp index 96be865..bf12f12 100644 --- a/spns/hive/snode.hpp +++ b/spns/hive/snode.hpp @@ -1,7 +1,5 @@ #pragma once -#include - #include #include #include @@ -10,6 +8,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -25,24 +27,28 @@ class HiveMind; namespace spns::hive { +namespace quic = oxen::quic; + using namespace std::literals; -// Maximum size of simultaneous subscriptions in a single subscription request; if we overflow then -// any stragglers wait until the next request, delaying them by a few seconds. (This is not a rock -// hard limit: we estimate slightly and stop as soon as we exceed it, which means we can go over it -// a bit after appending the last record). -inline constexpr size_t SUBS_REQUEST_LIMIT = 5'000'000; +// Maximum number of simultaneous subscriptions in a single subscription request to a single SN; if +// we have more than this then we send this many and wait for the response before sending more. +inline constexpr size_t SUBS_REQUEST_LIMIT = 2000; // How long (in seconds) after a successful subscription before we re-subscribe; each subscription // gets a uniform random value between these two values (to spread out the renewal requests a bit). inline constexpr std::chrono::seconds RESUBSCRIBE_MIN = 45min; inline constexpr std::chrono::seconds RESUBSCRIBE_MAX = 55min; -// How long we wait (in seconds) after a connection failure to a snode storage server before +// How long we wait (in seconds) after a failed connection attempt to a snode storage server before // re-trying the connection; we use the first value after the first failure, the second one after // the second failure, and so on (if we run off the end we use the last value). inline constexpr std::array CONNECT_COOLDOWN = {10s, 30s, 60s, 120s}; +// How long are a disconnection from a SNode until we attempt to reconnect. If the reconnection +// fails then we enter the cooldown, above. +inline constexpr auto RECONNECT_WAIT = 1s; + template >> inline std::string_view as_sv(const T& data) { return {reinterpret_cast(data.data()), T::SIZE}; @@ -52,16 +58,13 @@ class SNode { // Class managing a connection to a single service node HiveMind& hivemind_; - oxenmq::OxenMQ& omq_; - oxenmq::ConnectionID conn_; - oxenmq::address addr_; + std::shared_ptr conn_; + quic::RemoteAddress addr_; + std::shared_ptr stream_; std::atomic connected_ = false; std::unordered_set subs_; uint64_t swarm_; - std::mutex mutex_; // Mutex for our local stuff; we must *never* do something with hivemind - // that requires a (HiveMind) lock while we hold this. - using system_clock = std::chrono::system_clock; using steady_clock = std::chrono::steady_clock; using system_time = system_clock::time_point; @@ -78,7 +81,7 @@ class SNode { public: const uint64_t& swarm{swarm_}; - SNode(HiveMind& hivemind, oxenmq::OxenMQ& omq, oxenmq::address addr, uint64_t swarm); + SNode(HiveMind& hivemind, quic::RemoteAddress addr, uint64_t swarm); ~SNode() { disconnect(); } @@ -87,7 +90,7 @@ class SNode { /// address. /// /// Does nothing if already connected to the given address. - void connect(oxenmq::address addr); + void connect(quic::RemoteAddress addr); /// Initiates a connection, if not already connected, to the current address. void connect(); @@ -96,10 +99,6 @@ class SNode { void disconnect(); - void on_connected(oxenmq::ConnectionID c); - - void on_connect_fail(oxenmq::ConnectionID c, std::string_view reason); - /// Adds a new account to be signed up for subscriptions, if it is not already subscribed. /// The new account's subscription will be submitted to the SS the next time check_subs() is /// called (either automatically or manually). @@ -123,18 +122,15 @@ class SNode { /// Check our subscriptions to resubscribe to any that need it. Takes a reference to hivemind's /// master list of all subscriptions (to be able to pull subscription details from). /// - /// If initial_subs is true then this is the initial request and we fire off a batch of - /// subscriptions and then another batch upon reply, etc. until there are no more subs to send; - /// otherwise we fire off just up to SUBS_LIMIT re-subscriptions. - /// - /// If `fast` is true then we only look for and process unix-epoch leading elements, which are - /// the ones we put on we a brand new subscription comes in. - /// /// This method is *only* called from HiveMind. - void check_subs( - const std::unordered_map>& subs, - bool initial_subs = false, - bool fast = false); + void check_subs(const std::unordered_map>& subs); + + private: + void on_connected(quic::Connection& c); + + void on_disconnected(quic::Connection& c, uint64_t errcode); + + void on_notify(quic::message msg); }; } // namespace spns::hive diff --git a/spns/hive/subscription.cpp b/spns/hive/subscription.cpp index e8024c3..2eb1df4 100644 --- a/spns/hive/subscription.cpp +++ b/spns/hive/subscription.cpp @@ -1,5 +1,6 @@ #include "subscription.hpp" +#include #include #include @@ -12,7 +13,6 @@ #include #include #include -#include #include #include @@ -20,21 +20,25 @@ namespace spns::hive { -template +template static void append_int(std::string& s, Int val) { char sig_ts_buf[20]; auto [end, ec] = std::to_chars(std::begin(sig_ts_buf), std::end(sig_ts_buf), val); s.append(sig_ts_buf, end - sig_ts_buf); } +static void append_int(std::string& s, std::chrono::sys_seconds val) { + append_int(s, val.time_since_epoch().count()); +} Subscription::Subscription( const SwarmPubkey& pubkey, std::optional subaccount_, std::vector namespaces_, bool want_data_, - int64_t sig_ts_, + std::chrono::sys_seconds sig_ts_, Signature sig_, - bool _skip_validation) : + bool _skip_validation, + std::chrono::sys_seconds now) : subaccount{std::move(subaccount_)}, namespaces{std::move(namespaces_)}, @@ -42,24 +46,22 @@ Subscription::Subscription( sig_ts{sig_ts_}, sig{std::move(sig_)} { - if (namespaces.empty()) + if (namespaces.size() == 0) throw std::invalid_argument{"Subscription: namespaces missing or empty"}; - for (size_t i = 0; i < namespaces.size() - 1; i++) { - if (namespaces[i] > namespaces[i + 1]) + for (size_t i = 1; i < namespaces.size(); i++) { + if (namespaces[i - 1] > namespaces[i]) throw std::invalid_argument{"Subscription: namespaces are not sorted numerically"}; - if (namespaces[i] == namespaces[i + 1]) + if (namespaces[i - 1] == namespaces[i]) throw std::invalid_argument{"Subscription: namespaces contains duplicates"}; } - if (!sig_ts) + if (sig_ts == std::chrono::sys_seconds{}) throw std::invalid_argument{"Subscription: signature timestamp is missing"}; - auto now = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - if (sig_ts <= now - 14 * 24 * 60 * 60) - throw std::invalid_argument{"Subscription: sig_ts timestamp is too old"}; - if (sig_ts >= now + 24 * 60 * 60) + if (now - sig_ts >= SIGNATURE_EXPIRY) + throw std::invalid_argument{ + "Subscription: sig_ts timestamp is too old ({} ago)"_format(now - sig_ts)}; + if (sig_ts - now >= SIGNATURE_EARLY) throw std::invalid_argument{"Subscription: sig_ts timestamp is too far in the future"}; if (!_skip_validation) { diff --git a/spns/hive/subscription.hpp b/spns/hive/subscription.hpp index a35bdb2..40e19af 100644 --- a/spns/hive/subscription.hpp +++ b/spns/hive/subscription.hpp @@ -1,15 +1,12 @@ #pragma once -#include -#include +#include #include #include -#include #include +#include #include #include -#include -#include #include "../bytes.hpp" #include "../swarmpubkey.hpp" @@ -40,12 +37,13 @@ class subscribe_error : public std::runtime_error { }; struct Subscription { - static constexpr std::chrono::seconds SIGNATURE_EXPIRY{14 * 24h}; + static constexpr auto SIGNATURE_EXPIRY = 14 * 24h; + static constexpr auto SIGNATURE_EARLY = 24h; std::optional subaccount; std::vector namespaces; bool want_data; - int64_t sig_ts; + std::chrono::sys_seconds sig_ts; Signature sig; Subscription( @@ -53,9 +51,11 @@ struct Subscription { std::optional subaccout_, std::vector namespaces_, bool want_data_, - int64_t sig_ts_, + std::chrono::sys_seconds sig_ts_, Signature sig_, - bool _skip_validation = false); + bool _skip_validation = false, + std::chrono::sys_seconds now = + std::chrono::floor(std::chrono::system_clock::now())); // Returns true if `this` and `other` represent the same subscription as far as upstream swarm // subscription is concerned. That is: same subaccount tag, same namespaces, and same want_data @@ -64,12 +64,16 @@ struct Subscription { bool is_same(const Subscription& other) const { return is_same(other.subaccount, other.namespaces, other.want_data); } - // Same as above, but takes the constituent parts. + + // Similar to the above, but takes a pqxx::array for comparison (to allow comparison before + // construction of the Subscription object). bool is_same( const std::optional& o_subaccount, const std::vector& o_namespaces, bool o_want_data) const { - return Subaccount::is_same(subaccount, o_subaccount) && namespaces == o_namespaces && + return Subaccount::is_same(subaccount, o_subaccount) && + namespaces.size() == o_namespaces.size() && + std::equal(namespaces.cbegin(), namespaces.cend(), o_namespaces.cbegin()) && want_data == o_want_data; } @@ -79,7 +83,7 @@ struct Subscription { // *only* valid for two Subscriptions referring to the same account! bool covers(const Subscription& other) const; - bool is_expired(int64_t now) const { return sig_ts < now - SIGNATURE_EXPIRY.count(); } + bool is_expired(std::chrono::sys_seconds now) const { return now - sig_ts >= SIGNATURE_EXPIRY; } bool is_newer(const Subscription& other) const { return sig_ts > other.sig_ts; } }; diff --git a/spns/hivemind.cpp b/spns/hivemind.cpp index 1d2efe6..c718713 100644 --- a/spns/hivemind.cpp +++ b/spns/hivemind.cpp @@ -1,22 +1,33 @@ #include "hivemind.hpp" #include +#include #include #include #include #include +#include +#include #include #include #include +#include #include #include +#include +#include +#include +#include #include #include #include +#include +#include #include "blake2b.hpp" #include "hive/signature.hpp" +#include "hive/subscription.hpp" namespace spns { @@ -29,43 +40,20 @@ std::atomic next_hivemind_id{1}; HiveMind::HiveMind(Config conf_in) : config{std::move(conf_in)}, pool_{config.pg_connect}, - omq_{std::string{config.pubkey.sv()}, std::string{config.privkey.sv()}, false, nullptr}, + omq_{std::string{config.omq_pubkey.sv()}, + std::string{config.omq_privkey.sv()}, + false, + nullptr}, object_id_{next_hivemind_id++} { - fiddle_rlimit_nofile(); + // fiddle_rlimit_nofile(); sd_notify(0, "STATUS=Initializing OxenMQ"); - while (omq_push_.size() < config.omq_push_instances) { - auto& o = omq_push_.emplace_back( - std::string{config.pubkey.sv()}, std::string{config.privkey.sv()}, false, nullptr); - o.MAX_SOCKETS = 50000; - o.MAX_MSG_SIZE = 10 * 1024 * 1024; - o.EPHEMERAL_ROUTING_ID = false; - // Since we're splitting the load, we reduce number of workers per push server to - // ceil(instances/N) + 1 (the +1 because the load is probably not perfectly evenly - // distributed). - o.set_general_threads( - 1 + (std::thread::hardware_concurrency() + config.omq_push_instances - 1) / - config.omq_push_instances); - } - omq_push_next_ = omq_push_.begin(); - - if (omq_push_.empty()) { - // the main omq_ is dealing with push conns and notifications so increase limits - omq_.MAX_SOCKETS = 50000; - omq_.MAX_MSG_SIZE = 10 * 1024 * 1024; - omq_.EPHEMERAL_ROUTING_ID = false; - - // We always need to ensure we have some batch threads available because for swarm updates - // we keep a lock held during the batching and need to ensure that there will always be some - // workers available, even if a couple workers lock waiting on that lock. - omq_.set_batch_threads(std::max(4, std::thread::hardware_concurrency() / 2)); - } else { - // When in multi-instance mode the main worker can get by with fewer threads - omq_.set_general_threads(std::max(4, std::thread::hardware_concurrency() / 4)); - omq_.set_batch_threads(std::max(4, std::thread::hardware_concurrency() / 4)); - } + // OMQ is only used for local traffic from the web worker (new/renewed subscriptions) and the + // individual notifiers, so we don't need a huge number of threads. + omq_.set_general_threads(std::max(4, std::thread::hardware_concurrency() / 8)); + omq_.set_batch_threads(std::max(4, std::thread::hardware_concurrency() / 2)); // We listen on a local socket for connections from other local services (web frontend, // notification services). @@ -97,34 +85,11 @@ HiveMind::HiveMind(Config conf_in) : log::info(cat, "Listening for incoming connections on {}", log_addr); } - // Keep a fairly large queue so that we can handle a sudden influx of notifications; if using - // multiple instances, use smaller individual queues but with a slightly higher overall queue. - int notify_queue_size = omq_push_.size() <= 1 ? 10000 : 5000; - // Invoked by our oxend to notify of a new block: - omq_.add_category("notify", oxenmq::AuthLevel::basic, /*reserved_threads=*/0, notify_queue_size) + omq_.add_category("notify", oxenmq::AuthLevel::basic) .add_command("block", ExcWrapper{*this, &HiveMind::on_new_block, "on_new_block"}); - if (omq_push_.empty()) - omq_.add_command( - "notify", - "message", - ExcWrapper{*this, &HiveMind::on_message_notification, "on_message_notification"}); - else - for (auto& push : omq_push_) - push.add_category( - "notify", - oxenmq::AuthLevel::basic, - /*reserved_threads=*/0, - notify_queue_size) - .add_command( - "message", - ExcWrapper{ - *this, - &HiveMind::on_message_notification, - "on_message_notification"}); - - omq_.add_category("push", oxenmq::AuthLevel::none) + omq_.add_category("push", oxenmq::AuthLevel::none, 0, 1000) // Adds/updates a subscription. This is called from the HTTP process to pass along an // incoming (re)subscription. The request must be json such as: @@ -168,11 +133,10 @@ HiveMind::HiveMind(Config conf_in) : // Note that the "message" strings are subject to change and should not be relied on // programmatically; instead rely on the "error" or "success" values. .add_request_command( - "subscribe", ExcWrapper{*this, &HiveMind::on_subscribe, "on_subscribe", true}) + "subscribe", ExcWrapper{*this, &HiveMind::on_subscribe, "on_subscribe"}) .add_request_command( - "unsubscribe", - ExcWrapper{*this, &HiveMind::on_unsubscribe, "on_unsubscribe", true}) + "unsubscribe", ExcWrapper{*this, &HiveMind::on_unsubscribe, "on_unsubscribe"}) // end of "push." commands ; @@ -248,22 +212,71 @@ HiveMind::HiveMind(Config conf_in) : // end of "admin." commands ; - notify_proc_thread_ = std::thread{[this] { process_notifications(); }}; + const auto out_alpn = quic::opt::outbound_alpn("oxenstorage"); + const auto in_alpn = quic::opt::inbound_alpn("spns"); + + if (!config.quic_listen.empty()) { + if (!config.quic_keys) + throw std::runtime_error{"Configuration error: quic_listen requires quic_keys"}; + creds_in_ = quic::GNUTLSCreds::make_from_ed_seckey(config.quic_keys->sv()); + creds_out_ = creds_in_; + + for (const auto& addr : config.quic_listen) { + // We only use one address for outbound connections, but it needs to be IPv4 capable + std::optional maybe_out_alpn; + if (!quic_out_ && + (addr.is_ipv4() || (addr.is_ipv6() && addr.is_any_addr() && addr.dual_stack))) + maybe_out_alpn = out_alpn; + + auto ep = quic::Endpoint::endpoint(loop_, addr, in_alpn, maybe_out_alpn); + + ep->listen( + creds_in_, + [this](quic::Connection& c, quic::Endpoint& e, std::optional) { + return e.loop.make_shared( + c, e, [this](quic::message m) { on_quic_request(m); }); + }); + + if (maybe_out_alpn) + quic_out_ = ep; + quic_in_.push_back(std::move(ep)); + } + } else { + // TODO: Once all nodes are running the SS version that comes with 11.6.0+ we can just get + // rid of creds_out_ and remove it from the `connect()` calls to do unauthenticated + // connections (which are allowed in SS 2.11.1+, which comes with 11.6.0+), but in earlier + // versions of quic that results in stateless reset-rejected connections. + // + // Thus for now, this randomly generated key: + std::array pk; + std::array sk; + crypto_sign_ed25519_keypair(pk.data(), sk.data()); + creds_out_ = quic::GNUTLSCreds::make_from_ed_seckey( + std::string_view{reinterpret_cast(sk.data()), sk.size()}); + } + if (!quic_out_) { + // Either we aren't listening, or aren't listening on any IPv4 capable address so add + // another non-listening endpoint that we can use for outbound connections. + quic_out_ = quic::Endpoint::endpoint(loop_, quic::Address{}, out_alpn); + } sd_notify(0, "STATUS=Cleaning database"); + log::info(cat, "Performing initial database clean-up"); db_cleanup(); sd_notify(0, "STATUS=Loading existing subscriptions"); - load_saved_subscriptions(); + try { + load_saved_subscriptions(); + } catch (const std::exception& e) { + log::critical(cat, "Failed to load saved subscriptions: {}", e.what()); + throw; + } - { - std::lock_guard lock{mutex_}; + notify_proc_thread_ = std::thread{[this] { process_notifications(); }}; + loop_.call_get([this] { sd_notify(0, "STATUS=Starting OxenMQ"); log::info(cat, "Starting OxenMQ"); omq_.start(); - for (auto& o : omq_push_) - o.start(); - log::info(cat, "Started OxenMQ"); sd_notify(0, "STATUS=Connecting to oxend"); @@ -308,21 +321,18 @@ HiveMind::HiveMind(Config conf_in) : log::info(cat, "Connected to oxend"); sd_notify(0, "READY=1\nSTATUS=Waiting for notifiers"); + }); - if (config.notifier_wait > 0s) { - // Wait for notification servers that start up before or alongside us to connect: - auto wait_until = steady_clock::now() + config.notifier_wait; - log::info( - cat, - "Waiting for notifiers to register (max {})", - wait_until - steady_clock::now()); - while (!notifier_startup_done(wait_until)) { - mutex_.unlock(); - std::this_thread::sleep_for(25ms); - mutex_.lock(); - } - log::info(cat, "Done waiting for notifiers; {} registered", services_.size()); - } + if (config.notifier_wait > 0s) { + // Wait for notification servers that start up before or alongside us to connect: + auto wait_until = steady_clock::now() + config.notifier_wait; + log::info( + cat, + "Waiting for notifiers to register (max {})", + wait_until - steady_clock::now()); + while (!loop_.call_get([this, wait_until] { return notifier_startup_done(wait_until); })) + std::this_thread::sleep_for(25ms); + log::info(cat, "Done waiting for notifiers; {} registered", services_.size()); } // Set our ready flag, and process any requests that accumulated while we were starting up. @@ -331,17 +341,14 @@ HiveMind::HiveMind(Config conf_in) : refresh_sns(); omq_.add_timer([this] { db_cleanup(); }, 30s); - // This is for operations that can be high latency, like re-subscriptions, clearing expiries, - // etc.: - omq_.add_timer([this] { subs_slow(); }, config.subs_interval); + + // This ticker handles re-subscriptions, clearing expiries, etc.; it's on a relatively slow + // timer because nothing in here is time critical: + subs_ticker_ = loop_.call_every(10s, [this] { check_subs(); }); // For updating systemd Status line omq_.add_timer([this] { log_stats(); }, 15s); - // This one is much more frequent: it handles any immediate subscription duties (e.g. to - // deal with a new subscriber we just added): - omq_.add_timer([this] { subs_fast(); }, 100ms); - log::info(cat, "Startup complete"); } @@ -350,6 +357,25 @@ HiveMind::~HiveMind() { notify_push_sock().send(zmq::message_t{"QUIT"sv}, zmq::send_flags::none); notify_proc_thread_.join(); } + loop_.call_get([this] { + // We destroy the Endpoints first, because during quic_opt_'s destruction it triggers + // disconnect callbacks which still have captured pointers into the SNode objects living in + // swarms_/sns_. We clear quic_in_ at the same time because, although it doesn't have the + // same callback issue, quic_out_ might be shared with one of the elements in quic_in_ and + // so could keep it alive if we don't clear it too and it is indeed shared. + // + // We transfer the quic_out_ shared ptr into a local variable here so that quic_out_ itself + // will be empty when the callbacks fire, which allow them to detect that it is during + // shutdown and alter their behaviour. (Shared ptr object destruction happens before the + // pointer is cleared, so without transferring it would still be set during disconnect + // callbacks). + auto q = std::move(quic_out_); + quic_in_.clear(); + q.reset(); + + swarms_.clear(); + sns_.clear(); + }); } zmq::socket_t& HiveMind::notify_push_sock() { @@ -425,26 +451,67 @@ void HiveMind::defer_request(oxenmq::Message&& m, ExcWrapper& callback) { // Must have flipped between the check and now, so don't actually defer it callback(m); } +void HiveMind::defer_request(quic::message&& m, ExcWrapper& callback) { + { + std::lock_guard lock{deferred_mutex_}; + if (!ready) { + deferred_.emplace_back(std::move(m), callback); + return; + } + } + // Must have flipped between the check and now, so don't actually defer it + callback(m); +} DeferredRequest::DeferredRequest(oxenmq::Message&& m, ExcWrapper& callback) : - message{m.oxenmq, std::move(m.conn), std::move(m.access), std::move(m.remote)}, + message{std::in_place_type, + m.oxenmq, + std::move(m.conn), + std::move(m.access), + std::move(m.remote)}, callback{callback} { + auto& msg = std::get(message); data.reserve(m.data.size()); for (const auto& d : m.data) - message.data.emplace_back(data.emplace_back(d)); + msg.data.emplace_back(data.emplace_back(d)); } +DeferredRequest::DeferredRequest(quic::message&& m, ExcWrapper& callback) : + message{std::move(m)}, callback{callback} {} + +static void json_error(oxenmq::Message& m, hive::SUBSCRIBE err, std::string_view msg) { + int code = static_cast(err); + log::debug(cat, "Replying with error code {}: {}", code, msg); + m.send_reply(nlohmann::json{{"error", code}, {"message", msg}}.dump()); +} +static void json_error(quic::message& m, hive::SUBSCRIBE err, std::string_view msg) { + int code = static_cast(err); + log::debug(cat, "Replying with error code {}: {}", code, msg); + m.respond(nlohmann::json{{"error", code}, {"message", msg}}.dump(), /*error=*/true); +} + void ExcWrapper::operator()(oxenmq::Message& m) { try { - (hivemind.*meth)(m); + (hivemind.*(std::get<0>(meth)))(m); } catch (const startup_request_defer&) { hivemind.defer_request(std::move(m), *this); } catch (const std::exception& e) { log::error(cat, "Exception in HiveMind::{}: {}", meth_name, e.what()); - if (is_json_request) { - m.send_reply(nlohmann::json{ - {"error", static_cast(hive::SUBSCRIBE::INTERNAL_ERROR)}, - {"message", "An internal error occurred while processing your request"}} - .dump()); - } + json_error( + m, + hive::SUBSCRIBE::INTERNAL_ERROR, + "An internal error occurred while processing your request"); + } +} +void ExcWrapper::operator()(quic::message& m) { + try { + (hivemind.*(std::get<1>(meth)))(m); + } catch (const startup_request_defer&) { + hivemind.defer_request(std::move(m), *this); + } catch (const std::exception& e) { + log::error(cat, "Exception in HiveMind::{}: {}", meth_name, e.what()); + json_error( + m, + hive::SUBSCRIBE::INTERNAL_ERROR, + "An internal error occurred while processing your request"); } } @@ -463,24 +530,17 @@ void HiveMind::on_reg_service(oxenmq::Message& m) { return; } - bool added = false, replaced = false; - { - std::lock_guard lock{mutex_}; - auto [it, ins] = services_.emplace(service, m.conn); + loop_.call([this, cid = m.conn, service = std::move(service)] { + bool added = false, replaced = false; + auto [it, ins] = services_.emplace(service, cid); if (ins) - added = true; - else if (m.conn != it->second) { - it->second = m.conn; - replaced = true; - } - } - - if (added) - log::info(cat, "'{}' notification service registered", service); - else if (replaced) - log::info(cat, "'{}' notification service reconnected/reregistered", service); - else - log::trace(cat, "'{}' notification service confirmed (already registered)", service); + log::info(cat, "'{}' notification service registered", service); + else if (cid != it->second) { + it->second = cid; + log::info(cat, "'{}' notification service reconnected/reregistered", service); + } else + log::trace(cat, "'{}' notification service confirmed (already registered)", service); + }); } static void set_stat( @@ -517,18 +577,10 @@ extern "C" inline void message_buffer_destroy(void*, void* hint) { delete static_cast(hint); } -void HiveMind::on_message_notification(oxenmq::Message& m) { - if (m.data.size() != 1) { - log::warning( - cat, - "Unexpected message notification: {}-part data, expected 1-part", - m.data.size()); - return; - } - - // Put the message into a new string, and then transfer ownership to the notification processer +void HiveMind::on_message_notification(quic::message m) { + // Put the message into a new object, then transfer ownership to the notification processer // by sending the pointer value: the processor then picks up the pointer and takes ownership. - auto* push = new std::string{m.data[0]}; + auto* push = new quic::message{std::move(m)}; uintptr_t ptr = reinterpret_cast(push); std::string cmd = "PUSH"; cmd += std::string_view{reinterpret_cast(&ptr), sizeof(ptr)}; @@ -538,7 +590,7 @@ void HiveMind::on_message_notification(oxenmq::Message& m) { void HiveMind::process_notifications() { notify_pull_.bind("inproc://notify_handler"); - std::vector process; + std::vector> process; zmq::message_t msg; constexpr size_t MAX_BATCH = 100; @@ -554,9 +606,7 @@ void HiveMind::process_notifications() { // Read back the pointer and take back ownership uintptr_t ptr; std::memcpy(&ptr, cmd.data() + 4, sizeof(uintptr_t)); - auto* str = reinterpret_cast(ptr); - process.push_back(std::move(*str)); - delete str; + process.emplace_back().reset(reinterpret_cast(ptr)); } else { log::error(cat, "Error: received unexpected internal proc command {}", cmd); } @@ -573,178 +623,186 @@ void HiveMind::process_notifications() { log::debug(cat, "Processing {} network message notifications", process.size()); - std::lock_guard lock{mutex_}; - - if (auto now = steady_clock::now(); now >= filter_rotate_time_) { - filter_rotate_ = std::move(filter_); - filter_.clear(); - filter_rotate_time_ = now + config.filter_lifetime; - } - - std::string buf; - size_t notify_count = 0; + loop_.call_get([&] { + if (auto now = steady_clock::now(); now >= filter_rotate_time_) { + filter_rotate_ = std::move(filter_); + filter_.clear(); + filter_rotate_time_ = now + config.filter_lifetime; + } - auto conn = pool_.get(); - pqxx::work tx{conn}; + std::string buf; + size_t notify_count = 0; - for (const auto& msg : process) { + auto conn = pool_.get(); + pqxx::work tx{conn}; - oxenc::bt_dict_consumer dict{msg}; + for (const auto& msg : process) { + try { + oxenc::bt_dict_consumer dict{msg->body()}; - // Parse oxen-storage-server notification: - if (!dict.skip_until("@")) { - log::warning(cat, "Unexpected notification: missing account (@)"); - continue; - } - auto account_str = dict.consume_string_view(); - AccountID account; - if (account_str.size() != account.SIZE) { - log::warning(cat, "Unexpected notification: wrong account size (@)"); - continue; - } - std::memcpy(account.data(), account_str.data(), account.size()); + // Parse oxen-storage-server notification: + if (!dict.skip_until("@")) { + log::warning(cat, "Unexpected notification: missing account (@)"); + continue; + } + auto account_str = dict.consume_string_view(); + AccountID account; + if (account_str.size() != account.SIZE) { + log::warning(cat, "Unexpected notification: wrong account size (@)"); + continue; + } + std::memcpy(account.data(), account_str.data(), account.size()); - if (!dict.skip_until("h")) { - log::warning(cat, "Unexpected notification: missing msg hash (h)"); - continue; - } - auto hash = dict.consume_string_view(); - if (bool too_small = hash.size() < MSG_HASH_MIN_SIZE; - too_small || hash.size() > MSG_HASH_MAX_SIZE) { - log::warning(cat, "Unexpected notification: msg hash too small"); - continue; - } + if (!dict.skip_until("h")) { + log::warning(cat, "Unexpected notification: missing msg hash (h)"); + continue; + } + auto hash = dict.consume_string_view(); + if (bool too_small = hash.size() < MSG_HASH_MIN_SIZE; + too_small || hash.size() > MSG_HASH_MAX_SIZE) { + log::warning(cat, "Unexpected notification: msg hash too small"); + continue; + } - if (!dict.skip_until("n")) { - log::warning(cat, "Unexpected notification: missing namespace (n)"); - continue; - } - auto ns = dict.consume_integer(); + if (!dict.skip_until("n")) { + log::warning(cat, "Unexpected notification: missing namespace (n)"); + continue; + } + auto ns = dict.consume_integer(); - if (!dict.skip_until("t")) { - log::warning(cat, "Unexpected notification: missing message timestamp (t)"); - continue; - } - auto timestamp_ms = dict.consume_integer(); + if (!dict.skip_until("t")) { + log::warning(cat, "Unexpected notification: missing message timestamp (t)"); + continue; + } + auto timestamp_ms = dict.consume_integer(); - if (!dict.skip_until("z")) { - log::warning(cat, "Unexpected notification: missing message expiry (z)"); - continue; - } - auto expiry_ms = dict.consume_integer(); + if (!dict.skip_until("z")) { + log::warning(cat, "Unexpected notification: missing message expiry (z)"); + continue; + } + auto expiry_ms = dict.consume_integer(); - std::optional maybe_data; - if (dict.skip_until("~")) - maybe_data = dict.consume_string_view(); + auto maybe_data = dict.maybe("~"); - log::trace( - cat, - "Got a notification for {}, msg hash {}, namespace {}, timestamp {}, exp {}, " - "data " - "{}B", - account.hex(), - hash, - ns, - timestamp_ms, - expiry_ms, - maybe_data ? fmt::to_string(maybe_data->size()) : "(N/A)"); - - // [(want_data, enc_key, service, svcid, svcdata), ...] - std::vector>> - notifies; - std::vector filter_vals; - - auto result = tx.exec( - R"( + log::trace( + cat, + "Got a notification for {}, msg hash {}, namespace {}, timestamp {}, " + "exp {}, data {}B", + account.hex(), + hash, + ns, + timestamp_ms, + expiry_ms, + maybe_data ? fmt::to_string(maybe_data->size()) : "(N/A)"); + + // [(want_data, enc_key, service, svcid, svcdata), ...] + std::vector>> + notifies; + std::vector filter_vals; + + auto result = tx.exec( + R"( SELECT want_data, enc_key, service, svcid, svcdata FROM subscriptions WHERE account = $1 AND EXISTS(SELECT 1 FROM sub_namespaces WHERE subscription = id AND namespace = $2))", - {account, ns}); - notifies.reserve(result.size()); - filter_vals.reserve(result.size()); - for (auto row : result) { - row.to(notifies.emplace_back()); - auto& [_wd, _ek, service, svcid, _sd] = notifies.back(); - filter_vals.push_back(blake2b(service, svcid, hash)); - } - - if (notifies.empty()) { - log::debug(cat, "No active notifications match, ignoring notification"); - continue; - } - - assert(filter_vals.size() == notifies.size()); - auto filter_it = filter_vals.begin(); - for (auto& [want_data, enc_key, service, svcid, svcdata] : notifies) { - auto& filt_hash = *filter_it++; - - if (filter_rotate_.count(filt_hash) || !filter_.insert(filt_hash).second) { - log::debug(cat, "Ignoring duplicate notification"); - continue; - } else { - log::trace(cat, "Not filtered: {}", filt_hash.hex()); - } - - oxenmq::ConnectionID conn; - if (auto it = services_.find(service); it != services_.end()) - conn = it->second; - else { - log::warning( - cat, - "Notification depends on unregistered service {}, ignoring", - service); - continue; - } - - // We overestimate a little here (e.g. allowing for 20 spaces for string - // lengths) because a few extra bytes of allocation doesn't really matter. - size_t size_needed = 2 + 35 + // 0: 32:service (or shorter) - 3 + 21 + svcid.size() + // 1:& N:svcid - 3 + 35 + // 1:^ 32:enckey - 3 + 21 + hash.size() + // 1:# N:hash - 3 + 36 + // 1:@ 33:account - 3 + 8 + // 1:n i-32768e - 3 + 15 + // 1:t i1695078498534e (timestamp) - 3 + 15 + // 1:t i1695078498534e (expiry) - (svcdata ? 3 + 21 + svcdata->size() : 0) + - (want_data && maybe_data ? 3 + 21 + maybe_data->size() : 0); - - if (buf.size() < size_needed) - buf.resize(size_needed); + {account, ns}); + notifies.reserve(result.size()); + filter_vals.reserve(result.size()); + for (auto row : result) { + row.to(notifies.emplace_back()); + auto& [_wd, _ek, service, svcid, _sd] = notifies.back(); + filter_vals.push_back(blake2b(service, svcid, hash)); + } - oxenc::bt_dict_producer dict{buf.data(), buf.data() + buf.size()}; + if (notifies.empty()) { + log::debug(cat, "No active notifications match, ignoring notification"); + continue; + } - try { - // NB: ascii sorted keys - dict.append("", service); - if (svcdata) - dict.append("!", as_sv(*svcdata)); - dict.append("#", hash); - dict.append("&", svcid); - dict.append("@", account.sv()); - dict.append("^", enc_key.sv()); - dict.append("n", ns); - dict.append("t", timestamp_ms); - dict.append("z", expiry_ms); - if (want_data && maybe_data) - dict.append("~", *maybe_data); + assert(filter_vals.size() == notifies.size()); + auto filter_it = filter_vals.begin(); + for (auto& [want_data, enc_key, service, svcid, svcdata] : notifies) { + auto& filt_hash = *filter_it++; + + if (filter_rotate_.count(filt_hash) || !filter_.insert(filt_hash).second) { + log::trace(cat, "Ignoring duplicate notification"); + continue; + } else { + log::trace(cat, "Not filtered: {}", filt_hash.hex()); + } + + oxenmq::ConnectionID conn; + if (auto it = services_.find(service); it != services_.end()) + conn = it->second; + else { + log::warning( + cat, + "Notification depends on unregistered service {}, ignoring", + service); + continue; + } + + // We overestimate a little here (e.g. allowing for 20 spaces for string + // lengths) because a few extra bytes of allocation doesn't really matter. + size_t size_needed = + 2 + 35 + // 0: 32:service (or shorter) + 3 + 21 + svcid.size() + // 1:& N:svcid + 3 + 35 + // 1:^ 32:enckey + 3 + 21 + hash.size() + // 1:# N:hash + 3 + 36 + // 1:@ 33:account + 3 + 8 + // 1:n i-32768e + 3 + 15 + // 1:t i1695078498534e (timestamp) + 3 + 15 + // 1:t i1695078498534e (expiry) + (svcdata ? 3 + 21 + svcdata->size() : 0) + + (want_data && maybe_data ? 3 + 21 + maybe_data->size() : 0); + + if (buf.size() < size_needed) + buf.resize(size_needed); + + oxenc::bt_dict_producer dict{buf.data(), buf.data() + buf.size()}; + + try { + // NB: ascii sorted keys + dict.append("", service); + if (svcdata) + dict.append("!", as_sv(*svcdata)); + dict.append("#", hash); + dict.append("&", svcid); + dict.append("@", account.sv()); + dict.append("^", enc_key.sv()); + dict.append("n", ns); + dict.append("t", timestamp_ms); + dict.append("z", expiry_ms); + if (want_data && maybe_data) + dict.append("~", *maybe_data); + } catch (const std::exception& e) { + log::critical( + cat, "failed to build notifier message: bad size estimation?"); + continue; + } + + log::debug(cat, "Sending push via {} notifier", service); + omq_.send(conn, "notifier.push", dict.view()); + notify_count++; + } } catch (const std::exception& e) { - log::critical(cat, "failed to build notifier message: bad size estimation?"); + log::warning(cat, "Failed to process incoming notification: {}", e.what()); continue; } - - log::debug(cat, "Sending push via {} notifier", service); - omq_.send(conn, "notifier.push", dict.view()); - notify_count++; } - } - increment_stat(tx, "", "notifications", notify_count); - increment_stat(tx, "", "pushes", process.size()); - tx.commit(); - pushes_processed_ += process.size(); + increment_stat(tx, "", "notifications", notify_count); + increment_stat(tx, "", "pushes", process.size()); + tx.commit(); + pushes_processed_ += process.size(); - process.clear(); + process.clear(); + }); } } @@ -803,7 +861,7 @@ void HiveMind::on_service_stats(oxenmq::Message& m) { } } -nlohmann::json HiveMind::get_stats_json() { +void HiveMind::get_stats_json(std::function when_ready) { auto result = nlohmann::json{}; { @@ -841,67 +899,69 @@ nlohmann::json HiveMind::get_stats_json() { tx.commit(); } - { - std::lock_guard lock{mutex_}; - size_t n_conns = 0; - for (auto& sn : sns_) - n_conns += sn.second->connected(); - - result["block_hash"] = last_block_.first; - result["block_height"] = last_block_.second; - result["swarms"] = swarms_.size(); - result["snodes"] = sns_.size(); - result["accounts_monitored"] = subscribers_.size(); - result["connections"] = n_conns; - result["pending_connections"] = pending_connects_.load(); - result["uptime"] = - std::chrono::duration(system_clock::now() - startup_time).count(); - } - return result; + loop_.call_soon( + [when_ready = std::move(when_ready), result = std::move(result), this]() mutable { + size_t n_conns = 0; + for (auto& sn : sns_) + n_conns += sn.second->connected(); + + result["block_hash"] = last_block_.first; + result["block_height"] = last_block_.second; + result["swarms"] = swarms_.size(); + result["snodes"] = sns_.size(); + result["accounts_monitored"] = subscribers_.size(); + result["connections"] = n_conns; + result["pending_connections"] = pending_connects_.load(); + result["uptime"] = + std::chrono::duration(system_clock::now() - startup_time).count(); + + when_ready(std::move(result)); + }); } void HiveMind::on_get_stats(oxenmq::Message& m) { - m.send_reply(get_stats_json().dump()); + get_stats_json([m = m.send_later()](nlohmann::json stats) { m(stats.dump()); }); } -void HiveMind::log_stats(std::string_view pre_cmd) { - auto s = get_stats_json(); - - std::list notifiers; - for (auto& [k, v] : s.items()) - if (starts_with(k, "last.")) - if (auto t = v.get(); t >= unix_timestamp(startup_time) && - t >= unix_timestamp(system_clock::now() - 1min)) - notifiers.push_back(k.substr(5)); - - int64_t total_notifies = 0; - for (auto& [service, data] : s["notifier"].items()) - if (auto it = data.find("notifies"); it != data.end()) - total_notifies += it->get(); - - auto stat_line = fmt::format( - "SN conns: {}/{} ({} pending); Height: {}; Accts/Subs: {}/{}; svcs: {}; notifies: {}; " - "pushes recv'd: {}", - s["connections"].get(), - s["snodes"].get(), - s["pending_connections"].get(), - s["block_height"].get(), - s["accounts_monitored"].get(), - s["subscriptions"]["total"].get(), - "{}"_format(fmt::join(notifiers, ", ")), - total_notifies, - pushes_processed_.load()); - - auto sd_out = pre_cmd.empty() ? "STATUS={}"_format(stat_line) - : "{}\nSTATUS={}"_format(pre_cmd, stat_line); - sd_notify(0, sd_out.c_str()); - - if (auto now = std::chrono::steady_clock::now(); now - last_stats_logged >= 4min + 55s) { - log::info(stats, "Status: {}", stat_line); - last_stats_logged = now; - } else { - log::debug(stats, "Status: {}", stat_line); - } +void HiveMind::log_stats(std::string pre_cmd) { + get_stats_json([this, pre_cmd = std::move(pre_cmd)](nlohmann::json s) { + std::list notifiers; + for (auto& [k, v] : s.items()) + if (starts_with(k, "last.")) + if (auto t = v.get(); t >= unix_timestamp(startup_time) && + t >= unix_timestamp(system_clock::now() - 1min)) + notifiers.push_back(k.substr(5)); + + int64_t total_notifies = 0; + for (auto& [service, data] : s["notifier"].items()) + if (auto it = data.find("notifies"); it != data.end()) + total_notifies += it->get(); + + auto stat_line = fmt::format( + "SN conns: {}/{} ({} pending); Height: {}; Accts/Subs: {}/{}; svcs: {}; notifies: " + "{}; " + "pushes recv'd: {}", + s["connections"].get(), + s["snodes"].get(), + s["pending_connections"].get(), + s["block_height"].get(), + s["accounts_monitored"].get(), + s["subscriptions"]["total"].get(), + "{}"_format(fmt::join(notifiers, ", ")), + total_notifies, + pushes_processed_.load()); + + auto sd_out = pre_cmd.empty() ? "STATUS={}"_format(stat_line) + : "{}\nSTATUS={}"_format(pre_cmd, stat_line); + sd_notify(0, sd_out.c_str()); + + if (auto now = std::chrono::steady_clock::now(); now - last_stats_logged >= 4min + 55s) { + log::info(stats, "Status: {}", stat_line); + last_stats_logged = now; + } else { + log::debug(stats, "Status: {}", stat_line); + } + }); } void HiveMind::on_drop_registrations(oxenmq::Message& m) { @@ -952,8 +1012,9 @@ void HiveMind::on_drop_registrations(oxenmq::Message& m) { deleted); } +template Reply> static void sub_json_set_one_response( - oxenmq::Message::DeferredSend&& m, + Reply& reply, nlohmann::json& response, size_t i, std::atomic& remaining, @@ -964,9 +1025,9 @@ static void sub_json_set_one_response( if (--remaining == 0) { // This is the last response set, so we have to send all the responses if (!multi) - m(response[0].dump()); + reply(response[0].dump()); else - m(response.dump()); + reply(response.dump()); } } @@ -976,7 +1037,7 @@ void HiveMind::on_notifier_validation( std::atomic& remaining, bool multi, bool success, - oxenmq::Message::DeferredSend replier, + const std::function& replier, std::string service, const SwarmPubkey& pubkey, std::shared_ptr sub, @@ -1039,8 +1100,6 @@ void HiveMind::on_notifier_validation( std::move(service_data), *enc_key, std::move(*sub)); - if (newsub) - have_new_subs_ = true; response[newsub ? "added" : "updated"] = true; message = newsub ? "Subscription successful" : "Resubscription successful"; @@ -1077,8 +1136,7 @@ void HiveMind::on_notifier_validation( if (!message.empty()) response["message"] = std::move(message); - sub_json_set_one_response( - std::move(replier), final_response, i, remaining, multi, std::move(response)); + sub_json_set_one_response(replier, final_response, i, remaining, multi, std::move(response)); } std::tuple, int64_t, Signature, std::string, nlohmann::json> @@ -1110,41 +1168,75 @@ HiveMind::sub_unsub_args(nlohmann::json& args) { } oxenmq::ConnectionID HiveMind::sub_unsub_service_conn(const std::string& service) { - { - std::lock_guard lock{mutex_}; + return loop_.call_get([&] { if (auto it = services_.find(service); it != services_.end()) return it->second; - } - throw hive::subscribe_error{ - hive::SUBSCRIBE::SERVICE_NOT_AVAILABLE, - service + " notification service not currently available"}; + throw hive::subscribe_error{ + hive::SUBSCRIBE::SERVICE_NOT_AVAILABLE, + service + " notification service not currently available"}; + }); } -static void json_error(oxenmq::Message& m, hive::SUBSCRIBE err, std::string_view msg) { - int code = static_cast(err); - log::debug(cat, "Replying with error code {}: {}", code, msg); - m.send_reply(nlohmann::json{{"error", code}, {"message", msg}}.dump()); -} +namespace { -void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { - ready_or_defer(); + std::string_view get_body(oxenmq::Message& m) { + return m.data.at(0); + } + std::string_view get_body(quic::message& m) { + return m.body(); + } - nlohmann::json args; - try { - args = nlohmann::json::parse(m.data.at(0)); - } catch (const nlohmann::json::exception&) { - log::debug(cat, "Subscription failed: bad json"); - return json_error(m, hive::SUBSCRIBE::BAD_INPUT, "Invalid JSON"); - } catch (const std::out_of_range&) { - log::debug(cat, "Subscription failed: no request data provided"); - return json_error(m, hive::SUBSCRIBE::BAD_INPUT, "Invalid request: missing request data"); + template + static std::optional parse_sub_unsub(M& m) { + std::optional args; + try { + args = nlohmann::json::parse(get_body(m)); + } catch (const nlohmann::json::exception&) { + log::debug(cat, "Subscription failed: bad json"); + json_error(m, hive::SUBSCRIBE::BAD_INPUT, "Invalid JSON"); + return std::nullopt; + } catch (const std::out_of_range&) { + log::debug(cat, "Subscription failed: no request data provided"); + json_error(m, hive::SUBSCRIBE::BAD_INPUT, "Invalid request: missing request data"); + return std::nullopt; + } + if (!(args->is_array() || args->is_object())) { + log::debug(cat, "Subscription failed: bad json -- expected object or array"); + json_error( + m, + hive::SUBSCRIBE::BAD_INPUT, + "Invalid JSON: expected object or array of objects"); + return std::nullopt; + } + + return args; } - if (!(args.is_array() || args.is_object())) { - log::debug(cat, "Subscription failed: bad json -- expected object or array"); - return json_error( - m, hive::SUBSCRIBE::BAD_INPUT, "Invalid JSON: expected object or array of objects"); + + struct missing_parameter : std::out_of_range { + missing_parameter(std::string_view key) : + std::out_of_range{"Missing required parameter '{}'"_format(key)} {} + }; + + nlohmann::json& at(nlohmann::json& obj, std::string_view key) { + try { + return obj.at(key); + } catch (...) { + throw missing_parameter{key}; + } } +} // namespace + +template +void HiveMind::on_sub_unsub_impl(Message& msg, bool subscribe) { + ready_or_defer(); + + nlohmann::json args; + if (auto a = parse_sub_unsub(msg)) + args = std::move(*a); + else + return; + const bool multi = args.is_array(); auto response = std::make_shared(); @@ -1158,9 +1250,27 @@ void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { args = std::move(single); } + if (args.empty()) { + json_error(msg, hive::SUBSCRIBE::BAD_INPUT, "Invalid request: {} list cannot be empty"); + return; + } + for (auto& e : args) response->push_back(nlohmann::json::object()); + std::function replier; + if constexpr (std::same_as) + replier = msg.send_later(); + else { + replier = [reqid = msg.rid(), + wstr = std::weak_ptr{msg.stream()}](std::string_view response) { + auto str = wstr.lock(); + if (!str) + return; + str->respond(reqid, response); + }; + } + for (size_t i = 0; i < args.size(); i++) { auto& e = args[i]; @@ -1174,8 +1284,8 @@ void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { oxenmq::OxenMQ::ReplyCallback reply_handler; if (subscribe) { - auto enc_key = from_hex_or_b64(e.at("enc_key").get()); - auto namespaces = e.at("namespaces").get>(); + auto enc_key = from_hex_or_b64(at(e, "enc_key").get()); + auto namespaces = at(e, "namespaces").get>(); reply_handler = [this, response, @@ -1186,21 +1296,21 @@ void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { sub = std::make_shared( // Throws on bad sig pubkey, std::move(subaccount), - e.at("namespaces").get>(), - e.at("data").get(), - e.at("sig_ts").get(), + at(e, "namespaces").get>(), + at(e, "data").get(), + std::chrono::sys_seconds{std::chrono::seconds{ + at(e, "sig_ts").get()}}, std::move(sig)), pubkey = pubkey, enc_key = std::move(enc_key), - replier = m.send_later()]( - bool success, std::vector data) mutable { + replier](bool success, std::vector data) mutable { on_notifier_validation( *response, i, *remaining, multi, success, - std::move(replier), + replier, std::move(service), std::move(pubkey), std::move(sub), @@ -1227,15 +1337,14 @@ void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { service = service, pubkey = pubkey, unsub = UnsubData{std::move(sig), std::move(subaccount), sig_ts}, - replier = m.send_later()]( - bool success, std::vector data) mutable { + replier](bool success, std::vector data) mutable { on_notifier_validation( *response, i, *remaining, multi, success, - std::move(replier), + replier, std::move(service), std::move(pubkey), nullptr, @@ -1252,9 +1361,9 @@ void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { service, service_info.dump()); - } catch (const std::out_of_range& e) { - log::debug(cat, "Sub failed: missing param {}", e.what()); - error = {hive::SUBSCRIBE::BAD_INPUT, "Missing required parameter"}; + } catch (const missing_parameter& e) { + log::debug(cat, "Request failed: {}", e.what()); + error = {hive::SUBSCRIBE::BAD_INPUT, e.what()}; } catch (const hive::subscribe_error& e) { error = {e.code, e.what()}; } catch (const std::exception& e) { @@ -1271,7 +1380,7 @@ void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { code, error->second); sub_json_set_one_response( - m.send_later(), + replier, *response, i, *remaining, @@ -1281,6 +1390,29 @@ void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe) { // Otherwise the reply is getting deferred and handled later in on_notifier_validation } } +template void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe); +template void HiveMind::on_sub_unsub_impl(quic::message& m, bool subscribe); + +void HiveMind::on_quic_request(quic::message& m) { + omq_.job([this, m = std::move(m)]() mutable { + if (m.endpoint() == "subscribe") + handle_quic_subscribe(m); + else if (m.endpoint() == "unsubscribe") + handle_quic_unsubscribe(m); + else if (m.endpoint() == "ping") + m.respond("pong"); + else + json_error(m, hive::SUBSCRIBE::BAD_INPUT, "No such endpoint '{}'"_format(m.endpoint())); + }); +} + +void HiveMind::quic_subscribe(quic::message& m) { + on_sub_unsub_impl(m, true); +} + +void HiveMind::quic_unsubscribe(quic::message& m) { + on_sub_unsub_impl(m, true); +} void HiveMind::on_subscribe(oxenmq::Message& m) { on_sub_unsub_impl(m, true); @@ -1304,14 +1436,13 @@ void HiveMind::refresh_sns() { oxend_, "rpc.get_service_nodes", [this](bool success, std::vector data) { - if (success) { - on_sns_response(std::move(data)); - } else { + if (success) + loop_.call_get([&] { on_sns_response(std::move(data)); }); + else log::warning( cat, "get_service_nodes request failed: {}", "{}"_format(fmt::join(data, " "))); - } }, _get_sns_params); } @@ -1351,8 +1482,6 @@ void HiveMind::on_sns_response(std::vector data) { return; } - std::unique_lock lock{mutex_}; - bool swarms_changed = false; auto new_hash = res.at("block_hash").get(); auto new_height = res.at("height").get(); @@ -1380,19 +1509,19 @@ void HiveMind::on_sns_response(std::vector data) { last_block_ = {std::move(new_hash), new_height}; } - std::unordered_map> sns; + std::unordered_map> sns; sns.reserve(sn_st.size()); for (const auto& s : sn_st) { - auto pkx = s.at("pubkey_x25519").get(); + auto pked = s.at("pubkey_ed25519").get(); auto ip = s.at("public_ip").get(); auto port = s.at("storage_lmq_port").get(); auto swarm = s.at("swarm_id").get(); - if (pkx.size() == 64 && !ip.empty() && ip != "0.0.0.0" && port > 0 && + if (pked.size() == 64 && !ip.empty() && ip != "0.0.0.0" && port > 0 && swarm != INVALID_SWARM_ID) sns.emplace( std::piecewise_construct, - std::forward_as_tuple(from_hex_or_b64(pkx)), + std::forward_as_tuple(from_hex_or_b64(pked)), std::forward_as_tuple(std::move(ip), port, swarm)); } @@ -1405,13 +1534,13 @@ void HiveMind::on_sns_response(std::vector data) { // disconnect from these (if any are connected). int dropped = 0; for (auto it = sns_.begin(); it != sns_.end();) { - const auto& [xpk, snode] = *it; - if (sns.count(xpk)) { + const auto& [pk, snode] = *it; + if (sns.count(pk)) { ++it; continue; } - log::debug(cat, "Disconnecting {}", xpk); + log::debug(cat, "Disconnecting {}", pk); swarms_[snode->swarm].erase(snode); snode->disconnect(); it = sns_.erase(it); @@ -1420,11 +1549,11 @@ void HiveMind::on_sns_response(std::vector data) { std::unordered_set> new_or_changed_sns; - for (const auto& [xpk, details] : sns) { + for (const auto& [pk, details] : sns) { const auto& [ip, port, swarm] = details; - oxenmq::address addr{"tcp://{}:{}"_format(ip, port), as_sv(xpk.view())}; + quic::RemoteAddress addr{pk.span(), ip, port}; - if (auto it = sns_.find(xpk); it != sns_.end()) { + if (auto it = sns_.find(pk); it != sns_.end()) { // We already know about this service node from the last update, but it might // have changed address or swarm, in which case we want to disconnect and then // store it as "new" so that we reconnect to it (if required) later. (We don't @@ -1442,16 +1571,9 @@ void HiveMind::on_sns_response(std::vector data) { // otherwise. snode->connect(std::move(addr)); } else { - // If we are using separate oxenmq instances for push handling then select the next - // one, round-robin style: - if (!omq_push_.empty() && omq_push_next_ == omq_push_.end()) - omq_push_next_ = omq_push_.begin(); - - auto& omq_instance = omq_push_.empty() ? omq_ : *omq_push_next_++; // New snode - auto snode = - std::make_shared(*this, omq_instance, std::move(addr), swarm); - sns_.emplace(xpk, snode); + auto snode = loop_.make_shared(*this, std::move(addr), swarm); + sns_.emplace(pk, snode); swarms_[swarm].insert(snode); new_or_changed_sns.insert(snode); } @@ -1470,7 +1592,7 @@ void HiveMind::on_sns_response(std::vector data) { // If we had a change to the network's swarms then we need to trigger a full recheck of // swarm membership, ejecting any pubkeys that moved while adding all pubkeys again to // be sure they are in each(possibly new) slot. - if (swarms_changed) { + if (swarms_changed || !new_or_changed_sns.empty()) { int sw_changes = 0; // Recalculate the swarm id of all subscribers: for (auto& [pk, v] : subscribers_) @@ -1482,6 +1604,10 @@ void HiveMind::on_sns_response(std::vector data) { batch.reserve(swarms_.size()); for (auto& [swid, snodes] : swarms_) { batch.add_job([this, swid = swid, s = &snodes] { + // This seems suspicious that we are modifying multiple SN objects from + // different threads, but we aren't because each SNode is only inside `swarms_` + // once, and so there shouldn't be any overlap across SNs while this batch is + // running. for (auto& sn : *s) sn->remove_stale_swarm_members(swarm_ids_); for (auto& [swarmpk, v] : subscribers_) @@ -1490,19 +1616,16 @@ void HiveMind::on_sns_response(std::vector data) { sn->add_account(swarmpk); }); } - // We release the lock *without* unlocking it below, then deal with finally unlocking - // it in the completion function when we finish at the end of the batch job. - batch.completion([this](auto&&) mutable { - std::unique_lock lock{mutex_, std::adopt_lock}; - check_subs(); - }); + std::promise done; + batch.completion([&done](auto&&) { done.set_value(); }); omq_.batch(std::move(batch)); - // Leak the lock: - lock.release(); - - } else if (!new_or_changed_sns.empty()) { + done.get_future().get(); + check_subs(); + } +#if 0 // TODO: this is much slower than the above full scan; investigate + else if (!new_or_changed_sns.empty()) { // Otherwise swarms stayed the same(which means no accounts changed swarms), but // snodes might have moved in / out of existing swarms, so re-add any subscribers to // swarm changers to ensure they have all the accounts that belong to them. @@ -1522,56 +1645,53 @@ void HiveMind::on_sns_response(std::vector data) { check_subs(); } +#endif } catch (const std::exception& e) { log::warning(cat, "An exception occured while processing the SN update: {}", e.what()); } } -// Re-checks all SN subscriptions; the mutex must be held externally. -void HiveMind::check_subs(bool fast) { - for (const auto& [xpk, snode] : sns_) { +void HiveMind::check_subs() { + assert(loop_.inside()); + + for (const auto& [pk, snode] : sns_) { try { - snode->check_subs(subscribers_, false, fast); + snode->check_subs(subscribers_); } catch (const std::exception& e) { - log::warning(cat, "Failed to check subs on {}: {}", xpk, e.what()); + log::warning(cat, "Failed to check subs on {}: {}", pk, e.what()); } } } -void HiveMind::check_my_subs(hive::SNode& snode, bool initial) { - std::lock_guard lock{mutex_}; - snode.check_subs(subscribers_, initial); -} +void HiveMind::make_conns() { + assert(loop_.inside()); -void HiveMind::subs_slow() { - // Ignore the confirm response from this; we can't really do anything with it, we just want - // to make sure we stay subscribed. - omq_.request(oxend_, "sub.block", nullptr); + for (const auto& [pk, snode] : sns_) { + if (pending_connects_ >= config.max_pending_connects) + return; - { - std::lock_guard lock{mutex_}; - check_subs(); + snode->connect(); } } -void HiveMind::subs_fast() { - if (have_new_subs_.exchange(false)) { - std::lock_guard lock{mutex_}; - check_subs(true); - } +void HiveMind::check_my_subs(hive::SNode& snode) { + snode.check_subs(subscribers_); } void HiveMind::finished_connect() { - bool try_more = pending_connects_ >= config.max_pending_connects; + assert(loop_.inside()); + + bool try_more = pending_connects_ < config.max_pending_connects; log::trace(cat, "finished connection; {}triggering more", try_more ? "" : "not "); --pending_connects_; - if (try_more) { - std::lock_guard lock{mutex_}; - check_subs(); - } + if (try_more) + make_conns(); } bool HiveMind::allow_connect() { + if (!quic_out_) + return false; + int count = ++pending_connects_; if (count > config.max_pending_connects) { --pending_connects_; @@ -1588,77 +1708,147 @@ bool HiveMind::allow_connect() { void HiveMind::load_saved_subscriptions() { - // mutex_ lock not needed: we are only ever called before oxenmq startup in the constructor - // (i.e. before there are any other threads to worry about). - - auto started = steady_clock::now(); - auto last_print = started; - auto conn = pool_.get(); pqxx::work txn{conn}; - auto [total] = txn.query1("SELECT COUNT(*) FROM subscriptions"); - log::info(cat, "Loading {} stored subscriptions from database", total); - - int64_t count = 0, unique = 0; - for (auto [acc, ed, sub_tag, sub_sig, sig, sigts, wd, ns_arr] : - txn - .stream, - std::optional, - std::optional, - Signature, - int64_t, - bool, - Int16ArrayLoader>(R"( + auto [total] = txn.query1("SELECT COUNT(*) FROM subscriptions"); + + int n = std::max(1, std::thread::hardware_concurrency() / 4); + + log::info(cat, "Loading {} stored subscriptions from database using {} threads", total, n); + + AccountID jagerman; + auto jagerman_id = "05fb466d312e1666ad1c84c4ee55b7e034151c0e366a313d95d11436a5f36e1e75"sv; + oxenc::from_hex(jagerman_id.begin(), jagerman_id.end(), jagerman.begin()); + + std::vector, // Not optional, but we need to default construct + std::optional, + Signature, + std::chrono::sys_seconds, + bool, + std::vector>>>> + threads; + + threads.resize(n); + for (auto& [t, rows] : threads) + // There is some small randomness in how many rows each threads get, so reserve 10% extra + // the perfectly even value. + rows.reserve(11 * total / (10 * n)); + + for (int i = 0; i < n; i++) { + threads[i].first = std::thread{[i, n, &rows = threads[i].second, &pool = pool_] { + auto conn = pool.get(); + pqxx::work txn{conn}; + + for (auto [acc, ed, sub_tag, sub_sig, sig_in, sigts_in, wd_in, ns_arr_in] : + txn + .stream, + std::optional, + std::optional, + Signature, + int64_t, + bool, + pqxx::array>(R"( SELECT account, session_ed25519, subaccount_tag, subaccount_sig, signature, signature_ts, want_data, ARRAY(SELECT namespace FROM sub_namespaces WHERE subscription = id ORDER BY namespace) -FROM subscriptions)")) { - auto [it, ins] = subscribers_.emplace( - std::piecewise_construct, - std::forward_as_tuple(std::move(acc), std::move(ed), /*_skip_validation=*/true), - std::forward_as_tuple()); - - std::optional subaccount; - if (sub_tag && sub_sig) { - subaccount.emplace(); - subaccount->tag = std::move(*sub_tag); - subaccount->sig = std::move(*sub_sig); - } +FROM subscriptions +WHERE id % {0} = {1})"_format(n, i))) { + + auto& [swpk, subaccount, sig, sigts, wd, ns_arr] = rows.emplace_back(); + swpk.emplace(acc, ed, /*skip_validation=*/true); + if (sub_tag && sub_sig) { + subaccount.emplace(); + subaccount->tag = std::move(*sub_tag); + subaccount->sig = std::move(*sub_sig); + } + sig = std::move(sig_in); + sigts = std::chrono::sys_seconds{std::chrono::seconds{sigts_in}}; + wd = wd_in; + ns_arr = std::vector{ns_arr_in.begin(), ns_arr_in.end()}; + } + }}; + } - // Weed out potential duplicates: if two+ devices are subscribed to the same - // account with all the same relevant subscription settings then we can just - // keep whichever one is newer. - bool dupe = false; - for (auto& existing : it->second) { - if (existing.is_same(subaccount, ns_arr.a, wd)) { - if (sigts > existing.sig_ts) { - existing.sig_ts = sigts; - existing.sig = std::move(sig); + int64_t th_count = 0; + for (auto& [t, rows] : threads) { + t.join(); + log::debug(cat, "Thead got {} rows", rows.size()); + th_count += rows.size(); + } + + log::info(cat, "Collected {} rows via threads", th_count); + + steady_clock::time_point now; + std::chrono::sys_seconds sys_now; + std::pair sigts_cutoff; + auto update_clocks = [&] { + now = steady_clock::now(); + sys_now = std::chrono::floor(system_clock::now()); + sigts_cutoff.first = sys_now - hive::Subscription::SIGNATURE_EXPIRY + 5s; + sigts_cutoff.second = sys_now + hive::Subscription::SIGNATURE_EARLY; + }; + update_clocks(); + int last_decile = 0; + + int64_t raw_count = 0, count = 0, unique = 0; + for (auto& [t, rows] : threads) { + for (auto& [swpk, subaccount, sig, sigts, wd, ns_arr] : rows) { + raw_count++; + + if (sigts <= sigts_cutoff.first || sigts >= sigts_cutoff.second) + // Don't try loading entries that are about to expire + continue; + + // DEBUG + // if (!std::equal(swpk->id.begin(), swpk->id.begin() + 3, jagerman.begin())) + // continue; + // log::critical(cat, "LOADED {}", swpk->id); + + auto& sub = subscribers_[*swpk]; + + // Weed out potential duplicates: if two+ devices are subscribed to the same + // account with all the same relevant subscription settings then we can just + // keep whichever one is newer. + bool dupe = false; + for (auto& existing : sub) { + if (existing.is_same(subaccount, ns_arr, wd)) { + if (sigts > existing.sig_ts) { + existing.sig_ts = sigts; + existing.sig = std::move(sig); + } + dupe = true; + break; } - dupe = true; - break; } - } - if (!dupe) { - unique++; - it->second.emplace_back( - it->first, - std::move(subaccount), - std::move(ns_arr.a), - wd, - sigts, - std::move(sig), - /*_skip_validation=*/true); - } + if (!dupe) { + unique++; + sub.emplace_back( + std::move(*swpk), + std::move(subaccount), + std::move(ns_arr), + wd, + sigts, + std::move(sig), + /*_skip_validation=*/true, + sys_now); + } - if (++count % 25000 == 0) { - auto now = steady_clock::now(); - auto elapsed = now - last_print; - if (elapsed >= 1s) { - log::info(cat, "... processed {}/{} subscriptions", count, total); - last_print = now; + if (++count % 1000 == 0 || raw_count == th_count) { + update_clocks(); + if (int decile = 10 * raw_count / th_count; decile != last_decile) { + log::info( + cat, + "... processed {}/{} ({}%) subscriptions", + raw_count, + total, + decile * 10); + last_decile = decile; + } } } } @@ -1683,7 +1873,7 @@ bool HiveMind::add_subscription( auto conn = pool_.get(); pqxx::work tx{conn}; - auto result = tx.query01( + auto result = tx.query01>( R"( SELECT id, @@ -1695,10 +1885,11 @@ WHERE {pubkey.id, service, service_id}); int64_t id; if (result) { - auto& [row_id, sig_ts, ns_arr] = *result; + auto& [row_id, sig_ts, ns_arr_in] = *result; + std::vector ns_arr{ns_arr_in.cbegin(), ns_arr_in.cend()}; id = row_id; - insert_ns = ns_arr.a != sub.namespaces; + insert_ns = ns_arr != sub.namespaces; log::trace(cat, "updating subscription for {}", pubkey.id.hex()); tx.exec( R"( @@ -1711,7 +1902,7 @@ WHERE id = $1 sub.subaccount ? std::optional{sub.subaccount->tag} : std::nullopt, sub.subaccount ? std::optional{sub.subaccount->sig} : std::nullopt, sub.sig, - sub.sig_ts, + sub.sig_ts.time_since_epoch().count(), sub.want_data, enc_key, service_data}) @@ -1733,7 +1924,7 @@ RETURNING id sub.subaccount ? std::optional{sub.subaccount->tag} : std::nullopt, sub.subaccount ? std::optional{sub.subaccount->sig} : std::nullopt, sub.sig, - sub.sig_ts, + sub.sig_ts.time_since_epoch().count(), sub.want_data, enc_key, service, @@ -1756,34 +1947,35 @@ RETURNING id tx.commit(); - std::lock_guard lock{mutex_}; - pubkey.update_swarm(swarm_ids_); + return loop_.call_get([&] { + pubkey.update_swarm(swarm_ids_); - auto& subscriptions = subscribers_[pubkey]; - bool found_existing = false; - for (auto& existing : subscriptions) { - if (existing.is_same(sub)) { - if (sub.is_newer(existing)) { - existing.sig = sub.sig; - existing.sig_ts = sub.sig_ts; + auto& subscriptions = subscribers_[pubkey]; + bool found_existing = false; + for (auto& existing : subscriptions) { + if (existing.is_same(sub)) { + if (sub.is_newer(existing)) { + existing.sig = sub.sig; + existing.sig_ts = sub.sig_ts; + } + found_existing = true; + break; } - found_existing = true; - break; } - } - if (!found_existing) - subscriptions.push_back(std::move(sub)); - - // If this is actually adding a new subscription (and not just renewing an - // existing one) then we need to force subscription (or resubscription) on all - // of the account's swarm members to get the subscription active ASAP. - // (Otherwise don't do anything because we already have an equivalent - // subscription in place). - if (new_sub) - for (auto& sn : swarms_[pubkey.swarm]) - sn->add_account(pubkey, /*force_now=*/true); - - return new_sub; + if (!found_existing) + subscriptions.push_back(std::move(sub)); + + // If this is actually adding a new subscription (and not just renewing an + // existing one) then we need to force subscription (or resubscription) on all + // of the account's swarm members to get the subscription active ASAP. + // (Otherwise don't do anything because we already have an equivalent + // subscription in place). + if (new_sub) + for (auto& sn : swarms_[pubkey.swarm]) + sn->add_account(pubkey, /*force_now=*/true); + + return new_sub; + }); } /// Removes a subscription for monitoring. Returns true if the given pubkey was diff --git a/spns/hivemind.hpp b/spns/hivemind.hpp index 1cea3f2..fa13589 100644 --- a/spns/hivemind.hpp +++ b/spns/hivemind.hpp @@ -23,6 +23,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -41,6 +44,8 @@ namespace spns { +namespace quic = oxen::quic; + // How long until we expire subscriptions (relative to the signature timestamp). This can be no // more than 14 days (because that's the subscription cutoff for storage server), but can also be // less. @@ -61,7 +66,7 @@ inline constexpr size_t MSG_DATA_MAX_SIZE = 76'800; // Storage server limit inline constexpr auto _get_sns_params = R"({ "active_only": true, "fields": { - "pubkey_x25519": true, + "pubkey_ed25519": true, "public_ip": true, "storage_lmq_port": true, "swarm_id": true, @@ -80,34 +85,35 @@ struct startup_request_defer {}; class ExcWrapper { private: HiveMind& hivemind; - void (HiveMind::*const meth)(oxenmq::Message&); + std::variant< + void (HiveMind::* const)(oxenmq::Message&), + void (HiveMind::* const)(quic::message&)> + meth; const std::string meth_name; - bool is_json_request; public: ExcWrapper( - HiveMind& hivemind, - void (HiveMind::*meth)(oxenmq::Message&), - std::string meth_name, - bool is_json_request = false) : - hivemind{hivemind}, - meth{meth}, - meth_name{std::move(meth_name)}, - is_json_request{is_json_request} {} + HiveMind& hivemind, void (HiveMind::*meth)(oxenmq::Message&), std::string meth_name) : + hivemind{hivemind}, meth{meth}, meth_name{std::move(meth_name)} {} + + ExcWrapper(HiveMind& hivemind, void (HiveMind::*meth)(quic::message&), std::string meth_name) : + hivemind{hivemind}, meth{meth}, meth_name{std::move(meth_name)} {} void operator()(oxenmq::Message& m); + void operator()(quic::message& m); }; // If requests arrive during startup we copy the request here to defer calling until after // startup completes. struct DeferredRequest { - oxenmq::Message message; + std::variant message; std::vector data; ExcWrapper& callback; DeferredRequest(oxenmq::Message&& m, ExcWrapper& callback); + DeferredRequest(quic::message&& m, ExcWrapper& callback); - void operator()() && { callback(message); } + void operator()() && { std::visit(callback, message); } }; class HiveMind { @@ -116,15 +122,22 @@ class HiveMind { const Config config; private: - std::mutex mutex_; - // OxenMQ server for internal communications, proxied subscriptions, etc. + // std::mutex mutex_; + // OxenMQ server for internal communications, proxied subscriptions, etc. oxenmq::OxenMQ omq_; - // OxenMQ *clients* that connect to service nodes, if the omq_push_instances setting is used. - std::list omq_push_; - decltype(omq_push_)::iterator omq_push_next_; + + // QUIC endpoint used to connect to service nodes (for receiving pushes) + quic::Loop loop_; + std::shared_ptr creds_out_, creds_in_; + std::list> quic_in_; + std::shared_ptr quic_out_; // Might be set to one of the elements of quic_in_ + // if there is a suitable one for outbound use. + + std::shared_ptr subs_ticker_; + PGConnPool pool_; - const int object_id_; // Thread-safe unique id for this HiveMind object + const int object_id_; // Thread-safe unique id for this HiveMind object zmq::context_t notification_ctx_{}; zmq::socket_t notify_pull_{notification_ctx_, zmq::socket_type::pull}; std::thread notify_proc_thread_; @@ -133,8 +146,8 @@ class HiveMind { zmq::socket_t& notify_push_sock(); std::atomic pushes_processed_{0}; - // xpk -> SNode - std::unordered_map> sns_; + // edpk -> SNode + std::unordered_map> sns_; // swarmid -> {SNode...} std::unordered_map>> swarms_; @@ -167,9 +180,6 @@ class HiveMind { // Will be set to true once we are ready to start taking requests std::atomic ready{false}; - // Set to true if we have new subs we need to deal with ASAP - std::atomic have_new_subs_{false}; - void ready_or_defer() { if (!ready) throw startup_request_defer{}; @@ -182,6 +192,7 @@ class HiveMind { std::mutex deferred_mutex_; std::list deferred_; void defer_request(oxenmq::Message&& m, ExcWrapper& callback); + void defer_request(quic::message&& m, ExcWrapper& callback); public: HiveMind(Config conf_); @@ -191,8 +202,6 @@ class HiveMind { private: void on_reg_service(oxenmq::Message& m); - void on_message_notification(oxenmq::Message& m); - void process_notifications(); /// Called from a notifier service periodically to report statistics. @@ -211,12 +220,12 @@ class HiveMind { /// integer and string values are permitted (+keys only allow integers). void on_service_stats(oxenmq::Message& m); - nlohmann::json get_stats_json(); + void get_stats_json(std::function when_ready); void on_get_stats(oxenmq::Message& m); std::chrono::steady_clock::time_point last_stats_logged = std::chrono::steady_clock::now() - 1h; - void log_stats(std::string_view pre_cmd = "WATCHDOG=1"sv); + void log_stats(std::string pre_cmd = "WATCHDOG=1"s); using UnsubData = std::tuple, int64_t>; void on_notifier_validation( @@ -225,7 +234,7 @@ class HiveMind { std::atomic& remaining, bool multi, bool success, - oxenmq::Message::DeferredSend replier, + const std::function& replier, std::string service, const SwarmPubkey& pubkey, std::shared_ptr sub, @@ -246,9 +255,16 @@ class HiveMind { oxenmq::ConnectionID sub_unsub_service_conn(const std::string& service); + void on_quic_request(quic::message& m); + void quic_subscribe(quic::message& m); + void quic_unsubscribe(quic::message& m); + ExcWrapper handle_quic_subscribe{*this, &HiveMind::quic_subscribe, "quic_subscribe"}; + ExcWrapper handle_quic_unsubscribe{*this, &HiveMind::quic_unsubscribe, "quic_unsubscribe"}; + void on_subscribe(oxenmq::Message& m); void on_unsubscribe(oxenmq::Message& m); - void on_sub_unsub_impl(oxenmq::Message& m, bool subscribe); + template + void on_sub_unsub_impl(Message& m, bool subscribe); void db_cleanup(); @@ -257,12 +273,11 @@ class HiveMind { void on_sns_response(std::vector data); - // Re-checks all SN subscriptions; the mutex must be held externally. `fast` is whether this is - // a quick, only-new-subs check or a regular check. - void check_subs(bool fast = false); + // Re-checks all SN subscriptions for and new subscriptions or needed resubscriptions; must be + // called on the loop_ thread. + void check_subs(); - void subs_slow(); - void subs_fast(); + void make_conns(); public: /// Called when initiating a connection: if this returns a evaluates-as-true object then the @@ -273,9 +288,22 @@ class HiveMind { bool allow_connect(); void finished_connect(); - // Called *without* the mutex to check the subs of a single snode; this is typically called from - // within hive::SNode after first connecting. - void check_my_subs(hive::SNode& snode, bool initial); + // Accesses the quic endpoint used to connect to SNodes + quic::Endpoint& quic_out() { return *quic_out_; } + + bool has_quic_out() { return (bool)quic_out_; } + + // Accesses the quic loop governing SNode connections (and various internals) + quic::Loop& loop() { return loop_; } + + // Returns the creds object for establishing outbound connections. TODO: This isn't needed once + // all nodes are running an updated libquic, but older libquic doesn't allow no-creds + // connections. + const std::shared_ptr& creds_out() { return creds_out_; } + + // Internal helper called by SNode with itself to dispatch a call back into itself with a + // reference to the HiveMind's master subscriber container. + void check_my_subs(hive::SNode& snode); void load_saved_subscriptions(); @@ -337,6 +365,12 @@ class HiveMind { std::string service_id, const Signature& sig, int64_t sig_ts); + + // Called from SNode when we get a message notification + void on_message_notification(quic::message m); }; +extern template void HiveMind::on_sub_unsub_impl(oxenmq::Message& m, bool subscribe); +extern template void HiveMind::on_sub_unsub_impl(quic::message& m, bool subscribe); + } // namespace spns diff --git a/spns/hivemind.py b/spns/hivemind.py deleted file mode 100644 index 1b51122..0000000 --- a/spns/hivemind.py +++ /dev/null @@ -1,44 +0,0 @@ -from .core import HiveMind, logger as core_logger -from . import config -from .config import logger -import time -import signal -import os - - -def run(): - """Runs a HiveMind instance indefinitely""" - - hivemind = None - - def stop(*args): - nonlocal hivemind - if hivemind: - logger.info("Shutting down hivemind") - hivemind.stop() - logger.info("Hivemind stopped") - hivemind = None - - def sig_die(signum, frame): - raise OSError(f"Caught signal {signal.Signals(signum).name}") - - try: - logger.info("Starting hivemind") - core_logger.start("stderr") - hivemind = HiveMind(config.config) - logger.info("Hivemind started") - - signal.signal(signal.SIGHUP, sig_die) - signal.signal(signal.SIGINT, sig_die) - - while True: - time.sleep(3600) - except Exception as e: - logger.critical(f"HiveMind died: {e}") - - if hivemind: - stop() - - -if __name__ == "__main__": - run() diff --git a/spns/main.cpp b/spns/main.cpp new file mode 100644 index 0000000..8c03ce7 --- /dev/null +++ b/spns/main.cpp @@ -0,0 +1,74 @@ +#include +#include +#include + +#include +#include + +#include "config.hpp" +#include "hivemind.hpp" + +using namespace std::literals; +namespace py = pybind11; + +auto cat = oxen::log::Cat("spns"); + +int usage(std::string_view argv0, std::string_view err = ""sv) { + if (!err.empty()) + oxen::log::error(cat, "Error: {}\n", err); + + oxen::log::error(cat, "Usage: {} /path/to/spns.ini\n", argv0); + return 1; +} + +int main(int argc, char** argv) { + + namespace log = oxen::log; + log::add_sink(oxen::log::Type::Print, "stderr"); + + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + sigaddset(&sigset, SIGTERM); + pthread_sigmask(SIG_BLOCK, &sigset, nullptr); + + if (argc != 2 || argv[1] == "-h"sv || argv[1] == "--help"sv) + return usage(argv[0]); + + std::string_view ini{argv[1]}; + + log::info(cat, "Loading config from {}", ini); + + spns::Config conf; + try { + py::scoped_interpreter interp{}; + try { + auto mconf = py::module_::import("spns.conf"); + conf = std::move(mconf.attr("load_config")(ini).cast()); + } catch (const pybind11::error_already_set& e) { + // Rewrap the exception because calling `e.what()` on this requires an active python + // interpreter, which we won't have when we leave the other `try {}`. + throw std::runtime_error{e.what()}; + } + } catch (const pybind11::error_already_set&) { + log::critical(cat, "Failed to load python interpreter"); + return 2; + } catch (const std::exception& e) { + log::critical( + cat, "Failed to load configuration file {}: {}", ini, e.what()); + return 1; + } + + log::info(cat, "Initializing hivemind"); + spns::HiveMind hivemind{std::move(conf)}; + log::info(cat, "Hivemind started"); + + // We don't need any signal handler: when a SIGINT/SIGTERM comes in we just catch it here, stop + // waiting, and thus roll off the end of the main() function, which destroys the HiveMind + // instance which shuts everything down. + int sig; + sigwait(&sigset, &sig); + log::warning(cat, "Received signal {}; shutting down", sig); + + return 0; +} diff --git a/spns/notifiers/apns.py b/spns/notifiers/apns.py index 5d8a1b8..139375c 100644 --- a/spns/notifiers/apns.py +++ b/spns/notifiers/apns.py @@ -45,7 +45,7 @@ from .. import config from ..config import logger -from ..core import SUBSCRIBE +from spns_hivemind import SUBSCRIBE from .util import encrypt_notify_payload, derive_notifier_key, warn_on_except, NotifyStats import oxenc diff --git a/spns/notifiers/dummy.py b/spns/notifiers/dummy.py index fa87456..bc7c12a 100644 --- a/spns/notifiers/dummy.py +++ b/spns/notifiers/dummy.py @@ -11,7 +11,7 @@ from .util import derive_notifier_key, warn_on_except from .. import config from ..config import logger -from ..core import SUBSCRIBE +from spns_hivemind import SUBSCRIBE from datetime import timedelta omq = None diff --git a/spns/notifiers/fcm.py b/spns/notifiers/fcm.py deleted file mode 100644 index f69e8f9..0000000 --- a/spns/notifiers/fcm.py +++ /dev/null @@ -1,124 +0,0 @@ -# Google Firebase push notification server, using aiofcm for higher performance than the stock -# bloated Google Firebase Python API. -# - -from aiofcm import FCM, Message, PRIORITY_HIGH -import asyncio - -from .. import config -from ..config import logger -from ..core import SUBSCRIBE -from .util import encrypt_payload, warn_on_except - -from oxenc import bt_serialize, bt_deserialize, to_base64 - -omq = None -hivemind = None -loop = None -fcm = None - -# Whenever we add/change fields we increment this so that a Session client could figure out what it -# is speaking to: -SPNS_FCM_VERSION = 1 - -# If our JSON payload hits 4000 bytes then Google will reject it, so we limit ourselves to this size -# *before* encryption + base64 encoding. If the source message exceeds this, we send an alternative -# "too big" response in the metadata instead of including the message. -MAX_MSG_SIZE = 2500 - - -@warn_on_except -def validate(msg: Message): - parts = msg.data() - if len(parts) != 2 or parts[0] != b"firebase": - logger.warning("Internal error: invalid input to notifier.validate") - msg.reply(str(SUBSCRIBE.ERROR.value), "Internal error") - return - - try: - data = json.loads(parts[1]) - - # We require just the device token, passed as `token`: - token = data["token"] - if not token: - raise ValueError(f"Invalid firebase device token") - msg.reply("0", token) - except KeyError as e: - msg.reply(str(SUBSCRIBE.BAD_INPUT.value), f"Error: missing required key {e}") - except Exception as e: - msg.reply(str(SUBSCRIBE.ERROR.value), str(e)) - - -def make_notifier(msg: Message): - async def fcm_notify(): - global fcm - max_retries = config.NOTIFY["firebase"].get("retries", 0) - retry_sleep = config.NOTIFY["firebase"].get("retry_interval", 10) - retries = max_retries - while True: - response = await fcm.send_message(msg) - if response.is_successful: - return - if retries > 0: - retries -= 1 - await asyncio.sleep(retry_sleep) - else: - logger.warning( - f"Failed to send notification: {response.status} ({response.description}); giving up after {max_retries} retries" - ) - - return fcm_notify - - -@warn_on_except -def push_notification(msg: Message): - data = bt_deserialize(msg[0]) - - enc_payload = encrypt_notify_payload(data, max_msg_size=MAX_MSG_SIZE) - - device_token = data["&"] # unique service id, as we returned from validate - - msg = Message( - device_token=device_token, data={"enc_payload": enc_payload}, priority=PRIORITY_HIGH - ) - - global loop - asyncio.run_coroutine_threadsafe(make_notifier(msg), loop) - - -def run(): - """Runs the asyncio event loop, forever.""" - - # These do not and *should* not match hivemind or any other notifier: that is, each notifier - # needs its own unique keypair. We do, however, want it to persist so that we can - # restart/reconnect and receive messages sent while we where restarting. - key = derive_notifier_key(__name__) - - global omq, hivemind, firebase - - omq = OxenMQ(pubkey=key.public_key.encode(), privkey=key.encode()) - - cat = omq.add_category("notifier", AuthLevel.basic) - cat.add_request_command("validate", validate) - cat.add_command("push", push_notification) - - omq.start() - - hivemind = omq.connect_remote( - Address(config.config.hivemind_sock), auth_level=AuthLevel.basic, ephemeral_routing_id=False - ) - - conf = config.NOTIFY["firebase"] - fcm = FCM() # FIXME? - - omq.send(hivemind, "admin.register_service", "firebase") - - try: - loop.run_forever() - finally: - loop.close() - - omq.disconnect(hivemind) - hivemind = None - omq = None - loop = None diff --git a/spns/notifiers/firebase.py b/spns/notifiers/firebase.py deleted file mode 100644 index 525acb6..0000000 --- a/spns/notifiers/firebase.py +++ /dev/null @@ -1,233 +0,0 @@ -# Google Firebase push notification server - -from .. import config -from ..config import logger -from ..core import SUBSCRIBE -from .util import ( - encrypt_notify_payload, - derive_notifier_key, - warn_on_except, - NotifyStats, -) - -import firebase_admin -from firebase_admin import messaging -from firebase_admin.exceptions import * - -import oxenc -from oxenmq import OxenMQ, Message, Address, AuthLevel - -import asyncio -import datetime -import time -import json -import signal -import systemd.daemon -from threading import Lock - -loop = None -omq = None -hivemind = None -firebase_app = None - -notify_queue = [] -queue_lock = Lock() -queue_timer = None - -# Whenever we add/change fields we increment this so that a Session client could figure out what it -# is speaking to: -SPNS_FIREBASE_VERSION = 1 - -# If our JSON payload hits 4000 bytes then Google will reject it, so we limit ourselves to this size -# *before* encryption + base64 encoding. If the source message exceeds this, we send an alternative -# "too big" response in the metadata instead of including the message. -MAX_MSG_SIZE = 2500 - -# Firebase max simultaneous notifications: -MAX_NOTIFIES = 500 - - -stats = NotifyStats() - - -@warn_on_except -def validate(msg: Message): - parts = msg.data() - if len(parts) != 2 or parts[0] != b"firebase": - logger.warning("Internal error: invalid input to notifier.validate") - msg.reply(str(SUBSCRIBE.ERROR.value), "Internal error") - return - - try: - data = json.loads(parts[1]) - - # We require just the device token, passed as `token`: - token = data["token"] - if not token: - raise ValueError(f"Invalid firebase device token") - msg.reply("0", token) - except KeyError as e: - msg.reply(str(SUBSCRIBE.BAD_INPUT.value), f"Error: missing required key {e}") - except Exception as e: - msg.reply(str(SUBSCRIBE.ERROR.value), str(e)) - - -@warn_on_except -def push_notification(msg: Message): - data = oxenc.bt_deserialize(msg.data()[0]) - - enc_payload = encrypt_notify_payload(data, max_msg_size=MAX_MSG_SIZE) - - # Send messages that are likely to illicit a device notification as high priority: specifically, - # has a body, and is in namespace 0 (DMs) or 11 (group "v2" messages). - # This doesn't catch them all, e.g. typing notifications, but definitely excludes things like - # data-too-big messages and config updates which definitely won't notify. - priority = "high" if b"~" in data and data[b"n"] in (0, 11) else "normal" - - # unique service id, as we returned from validate - device_token = data[b"&"].decode() - - msg = messaging.Message( - data={ - "enc_payload": oxenc.to_base64(enc_payload), - "spns": f"{SPNS_FIREBASE_VERSION}", - }, - token=device_token, - android=messaging.AndroidConfig(priority=priority), - ) - - global notify_queue, queue_lock - with queue_lock: - notify_queue.append(msg) - - -@warn_on_except -def send_pending(): - global notify_queue, queue_lock, firebase_app, loop, stats - with queue_lock: - queue, notify_queue = notify_queue, [] - - i = 0 - results = [] - while i < len(queue): - results.append( - asyncio.run_coroutine_threadsafe( - messaging.send_each_async( - messages=queue[i : i + MAX_NOTIFIES], app=firebase_app - ), - loop, - ) - ) - i += MAX_NOTIFIES - - results = [f.result() for f in results] - # FIXME: process/reschedule failures? - - with stats.lock: - stats.notifies += len(queue) - - -@warn_on_except -def ping(): - """Makes sure we are registered and reports updated stats to hivemind; called every few seconds""" - global omq, hivemind, stats - omq.send(hivemind, "admin.register_service", "firebase") - omq.send( - hivemind, "admin.service_stats", "firebase", oxenc.bt_serialize(stats.collect()) - ) - systemd.daemon.notify( - f"WATCHDOG=1\nSTATUS=Running; {stats.total_notifies} notifications, " - f"{stats.total_retries} retries, {stats.total_failures} failures" - ) - - -def start(): - """Starts up the firebase listener.""" - - # These do not and *should* not match hivemind or any other notifier: that is, each notifier - # needs its own unique keypair. We do, however, want it to persist so that we can - # restart/reconnect and receive messages sent while we where restarting. - key = derive_notifier_key("firebase") - - global omq, hivemind, firebase_app, queue_timer - - omq = OxenMQ(pubkey=key.public_key.encode(), privkey=key.encode()) - - cat = omq.add_category("notifier", AuthLevel.basic) - cat.add_request_command("validate", validate) - cat.add_command("push", push_notification) - - omq.add_timer(ping, datetime.timedelta(seconds=5)) - - conf = config.NOTIFY["firebase"] - queue_timer = omq.add_timer( - send_pending, - datetime.timedelta(seconds=float(conf["notify_interval"])), - thread=omq.add_tagged_thread("firebasenotify"), - ) - - omq.start() - - hivemind = omq.connect_remote( - Address(config.config.hivemind_sock), - auth_level=AuthLevel.basic, - ephemeral_routing_id=False, - ) - - firebase_app = firebase_admin.initialize_app( - firebase_admin.credentials.Certificate(conf["token_file"]) - ) - - omq.send(hivemind, "admin.register_service", "firebase") - - -def disconnect(flush_pending=True): - global omq, hivemind, queue_timer - omq.disconnect(hivemind) - omq.cancel_timer(queue_timer) - omq = None - hivemind = None - - # In case we have pending incoming notifications still to process - time.sleep(0.5) - - if flush_pending: - send_pending() - - -def run(startup_delay=4.0): - """Runs the firebase notifier, forever.""" - - global omq, loop - - if startup_delay > 0: - time.sleep(startup_delay) - - logger.info("Starting firebase notifier") - systemd.daemon.notify("STATUS=Initializing firebase notifier...") - try: - loop = asyncio.new_event_loop() - start() - except Exception as e: - logger.critical(f"Failed to start firebase notifier: {e}") - raise e - - logger.info("Firebase notifier started") - systemd.daemon.notify("READY=1\nSTATUS=Started") - - def sig_die(signum, frame): - loop.stop() - raise OSError(f"Caught signal {signal.Signals(signum).name}") - - try: - signal.signal(signal.SIGHUP, sig_die) - signal.signal(signal.SIGINT, sig_die) - - loop.run_forever() - - except Exception as e: - logger.error(f"firebase notifier died via exception: {e}") - - -if __name__ == "__main__": - run(startup_delay=0) diff --git a/spns/notifiers/huawei.py b/spns/notifiers/huawei.py index 1562c5d..3052aca 100644 --- a/spns/notifiers/huawei.py +++ b/spns/notifiers/huawei.py @@ -2,7 +2,7 @@ from .. import config from ..config import logger -from ..core import SUBSCRIBE +from spns_hivemind import SUBSCRIBE from .util import encrypt_notify_payload, derive_notifier_key, warn_on_except, NotifyStats from hms.src import push_admin diff --git a/spns/pg.cpp b/spns/pg.cpp index 86fcdf7..bc635d7 100644 --- a/spns/pg.cpp +++ b/spns/pg.cpp @@ -50,7 +50,7 @@ void PGConnPool::clear_idle_conns() { if (max_idle_time > 0s) { auto cutoff = steady_clock::now() - max_idle_time; - while (idle_conns_.front().second < cutoff) + while (!idle_conns_.empty() && idle_conns_.front().second < cutoff) idle_conns_.pop_front(); } } @@ -69,7 +69,9 @@ std::unique_ptr PGConnPool::make_conn() { log::debug(cat, "Creating pg connection"); std::lock_guard lock{mutex_}; count_++; - return std::make_unique(pg_connect_); + auto conn = std::make_unique(pg_connect_); + conn->set_client_encoding("UTF8"); + return conn; } PGConn::~PGConn() { @@ -78,27 +80,3 @@ PGConn::~PGConn() { } } // namespace spns - -namespace pqxx { - -spns::Int16ArrayLoader string_traits::from_string(std::string_view in) { - if (in.size() <= 2) - return {}; - auto* pos = in.data(); - assert(*pos == '{'); - pos++; - auto* back = &in.back(); - assert(*back == '}'); - spns::Int16ArrayLoader vals; - vals.a.reserve(std::count(pos, back, ',')); - while (pos < back) { - auto& ns = vals.a.emplace_back(); - auto [ptr, ec] = std::from_chars(pos, back, ns); - assert(ec == std::errc()); - assert(ptr == back || *ptr == ','); - pos = ptr + 1; - } - return vals; -} - -} // namespace pqxx diff --git a/spns/pg.hpp b/spns/pg.hpp index 9060472..bce0071 100644 --- a/spns/pg.hpp +++ b/spns/pg.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include "bytes.hpp" @@ -53,7 +54,7 @@ class PGConnPool { /// timer: connections get killed off only when retrieving or releasing a connection). 0 or /// negative mean there is no idle timeout. After changing this you may want to call /// `clear_idle_conns()` to apply the new setting to currently idle connections. - std::chrono::milliseconds max_idle_time = 10min; + std::chrono::milliseconds max_idle_time = 30s; /// Maximum number of idle connections we will keep alive. If 0 then we never keep any idle /// connections at all and each call to `get()` will have to reconnect. @@ -99,83 +100,4 @@ class PGConnPool { std::unique_ptr make_conn(); }; -// Helper for extracting namespaces from a pg array -struct Int16ArrayLoader { - std::vector a; -}; - } // namespace spns - -namespace pqxx { - -template <> -inline const std::string type_name{"spns::AccountID"}; -template <> -inline const std::string type_name{"spns::Ed25519PK"}; -template <> -inline const std::string type_name{"spns::SubaccountTag"}; -template <> -inline const std::string type_name{"spns::Signature"}; -template <> -inline const std::string type_name{"spns::EncKey"}; - -template -struct spns_byte_helper { - static constexpr size_t SIZE = T::SIZE; - static T from_string(std::string_view text) { - const auto size = internal::size_unesc_bin(text.size()); - if (size != SIZE) - throw conversion_error{ - "Invalid byte length (" + std::to_string(size) + ") for spns::bytes<" + - std::to_string(SIZE) + ">-derived object\n" -#ifndef NDEBUG - + std::string{text} -#endif - }; - T val; - internal::unesc_bin(text, val.data()); - return val; - } - - using BSV_traits = string_traits>; - - static zview to_buf(char* begin, char* end, const T& val) { - return BSV_traits::to_buf(begin, end, {val.data(), val.size()}); - } - static char* into_buf(char* begin, char* end, const T& val) { - return BSV_traits::into_buf(begin, end, {val.data(), val.size()}); - } - static std::size_t size_buffer(const T&) noexcept { return internal::size_esc_bin(SIZE); } -}; - -template <> -struct string_traits : spns_byte_helper {}; -template <> -struct string_traits : spns_byte_helper {}; -template <> -struct string_traits : spns_byte_helper {}; -template <> -struct string_traits : spns_byte_helper {}; -template <> -struct string_traits : spns_byte_helper {}; - -template <> -struct string_traits { - static spns::Int16ArrayLoader from_string(std::string_view in); -}; - -template <> -struct nullness : pqxx::no_null {}; -template <> -struct nullness : pqxx::no_null {}; -template <> -struct nullness : pqxx::no_null {}; -template <> -struct nullness : pqxx::no_null {}; -template <> -struct nullness : pqxx::no_null {}; - -template <> -struct nullness : pqxx::no_null {}; - -} // namespace pqxx diff --git a/spns/pybind.cpp b/spns/pybind.cpp index e7d7781..f54d441 100644 --- a/spns/pybind.cpp +++ b/spns/pybind.cpp @@ -1,8 +1,10 @@ #include +#include #include #include #include +#include #include "bytes.hpp" #include "config.hpp" @@ -41,9 +43,34 @@ struct type_caster>> { return py::bytes(reinterpret_cast(src.data()), src.size()); } }; + +template <> +struct type_caster { + PYBIND11_TYPE_CASTER(oxen::quic::Address, const_name("quic_address")); + + bool load(handle src, bool) { + if (py::isinstance(src)) { + value = oxen::quic::Address::parse(py::cast(src)); + if (value.is_ipv6() && value.is_any_addr()) + value.dual_stack = true; + return true; + } + return false; + } + + static handle cast( + const oxen::quic::Address& src, return_value_policy /* policy */, handle /* parent */) { + return py::str(fmt::format( + "{}{}{}:{}", + src.is_ipv6() ? "[" : "", + src.host(), + src.is_ipv6() ? "]" : "", + src.port())); + } +}; } // namespace pybind11::detail -PYBIND11_MODULE(core, m) { +PYBIND11_MODULE(spns_hivemind, m) { using namespace spns; @@ -78,13 +105,21 @@ PYBIND11_MODULE(core, m) { "set of X25519 pubkeys recognized as admin for incoming `hivemind_curve` " "connections") .def_readwrite( - "pubkey", - &Config::pubkey, + "omq_pubkey", + &Config::omq_pubkey, "X25519 server pubkey; must be set (the default value will not work)") .def_readwrite( - "privkey", - &Config::privkey, + "omq_privkey", + &Config::omq_privkey, "X25519 server privkey; must be set (the default value will not work)") + .def_readwrite( + "quic_keys", + &Config::quic_keys, + "Ed25519 combined secret key + pubkey value (64 bytes), used for QUIC.") + .def_readwrite( + "quic_listen", + &Config::quic_listen, + "Optional listen addresses for quic (requires that `quic_key` is set)") .def_property( "filter_lifetime", [](Config& self) { return self.filter_lifetime.count(); }, @@ -104,19 +139,6 @@ PYBIND11_MODULE(core, m) { "Set of notification services that we expect; if non-empty then we will stop " "the `notifier_wait` time early once we have registered notifiers for all the " "values set here.") - .def_property( - "subs_interval", - [](Config& self) { return self.subs_interval.count(); }, - [](Config& self, int64_t seconds) { self.subs_interval = 1s * seconds; }, - "how frequently, in seconds, between subscription rechecks (for push renewals, " - "expiries, etc.)") - .def_readwrite( - "omq_push_instances", - &Config::omq_push_instances, - "How many dedicated oxenmq instances to use for handle push notifications; if " - "1 or greater then this many separate oxenmq instances will be started to deal " - "with push requests; if 0 then the main oxenmq server will be used for " - "everything.") .def_readwrite( "max_pending_connects", &Config::max_pending_connects, @@ -126,51 +148,11 @@ PYBIND11_MODULE(core, m) { class Logger {}; py::class_{m, "logger"} - .def_static( - "start", - [](const std::string& out) { - oxen::log::clear_sinks(); - if (out == "stdout" || out == "-" || out == "") - oxen::log::add_sink(oxen::log::Type::Print, "stdout"); - else if (out == "stderr") - oxen::log::add_sink(oxen::log::Type::Print, "stderr"); - else - oxen::log::add_sink(oxen::log::Type::File, out); - }) - .def_static( - "set_level", - [](const std::string& level) { - oxen::log::reset_level(oxen::log::level_from_string(level)); - }, - "Sets/resets the log level of all spns.core log categories to the given " - "value.\n" - "Can be any of 'trace', 'debug', 'info', 'warn', 'error', 'critical', or " - "'none'.", - "level"_a) .def_static( "set_level", - [](const std::string& cat, const std::string& level) { - oxen::log::set_level(cat, oxen::log::level_from_string(level)); - }, - "Sets/resets the log level of a single spns.core log categories to the given " - "value.\n" - "Can be any of 'trace', 'debug', 'info', 'warning', 'error', 'critical', or " - "'none'.", - "category"_a, - "level"_a) - .def_static( - "get_level", - [](const std::string& cat) { oxen::log::get_level(cat); }, - "Gets the log level of the given spns.core log category") - .def_static( - "get_level", - [](const std::string& cat) { oxen::log::get_level(cat); }, - "Gets the log level of the given spns.core log category") - .def_static( - "get_level", - []() { oxen::log::get_level_default(); }, - "Gets the default log level of spns.core categories (those that have not been " - "changed via a category-specific `set_level`)") + [](std::string_view cat_levels) { oxen::log::apply_categories(cat_levels); }, + "Sets/resets the log level; can be a global level (e.g. 'info') or individual " + "'cat=debug' level values") // ; diff --git a/spns/register.py b/spns/register.py index cc1029b..c306f62 100644 --- a/spns/register.py +++ b/spns/register.py @@ -1,6 +1,6 @@ from . import web from .web import app -from .core import SUBSCRIBE +from spns_hivemind import SUBSCRIBE from flask import request, jsonify, Response diff --git a/spns/swarmpubkey.hpp b/spns/swarmpubkey.hpp index 1c1e834..dcb822b 100644 --- a/spns/swarmpubkey.hpp +++ b/spns/swarmpubkey.hpp @@ -18,7 +18,7 @@ struct SwarmPubkey { mutable uint64_t swarm; bool operator==(const SwarmPubkey& other) const { return id == other.id; } - bool operator!=(const SwarmPubkey& other) const { return !(*this == other); } + bool operator!=(const SwarmPubkey& other) const = default; SwarmPubkey(AccountID account_id, std::optional ed, bool _skip_validation = false); diff --git a/spns/utils.hpp b/spns/utils.hpp index 5e4acdf..127cb5e 100644 --- a/spns/utils.hpp +++ b/spns/utils.hpp @@ -1,10 +1,10 @@ #pragma once #include -#include #include #include +#include #include #include diff --git a/spns/web.py b/spns/web.py index 9e8ab64..927f1e8 100644 --- a/spns/web.py +++ b/spns/web.py @@ -3,11 +3,12 @@ import coloredlogs import uwsgi import oxenmq +from spns_hivemind import core_logger from uwsgidecorators import postfork app = flask.Flask(__name__) coloredlogs.install( - milliseconds=True, isatty=True, logger=app.logger, level=config.core_logger.get_level() + milliseconds=True, isatty=True, logger=app.logger, level=core_logger.get_level() ) # Monkey-patch app.get/post/etc. for Flask <2 compatibility; this has to be before the imports, diff --git a/systemd/spns-hivemind.service b/systemd/spns-hivemind.service index 55d347d..d7dc87f 100644 --- a/systemd/spns-hivemind.service +++ b/systemd/spns-hivemind.service @@ -11,8 +11,8 @@ WatchdogSec=1min WorkingDirectory=/home/push/session-push-notification-server LimitNOFILE=16384 Restart=always -RestartSec=5s -ExecStart=/usr/bin/python3 -mspns.hivemind +RestartSec=2s +ExecStart=/home/push/bin/SPNS spns.ini [Install] WantedBy=multi-user.target diff --git a/systemd/spns-notifier@.service b/systemd/spns-notifier@.service index 7d7a390..39a2b46 100644 --- a/systemd/spns-notifier@.service +++ b/systemd/spns-notifier@.service @@ -12,7 +12,7 @@ Type=notify WatchdogSec=1min WorkingDirectory=/home/push/session-push-notification-server Restart=always -RestartSec=5s +RestartSec=2s ExecStart=/usr/bin/python3 -mspns.notifiers.%i [Install]