diff --git a/.gitignore b/.gitignore index 7d5e31d5..80ab4c82 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ __pycache__ # allow a sample network in the repo !local-scripts/small.yaml +!local-scripts/kind.yaml diff --git a/Cargo.lock b/Cargo.lock index 06b10317..d8bf0295 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -87,9 +87,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.4" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "anstyle-parse" @@ -215,7 +215,7 @@ checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.1.1", "async-executor", - "async-io 2.3.0", + "async-io 2.3.1", "async-lock 3.3.0", "blocking", "futures-lite 2.2.0", @@ -244,9 +244,9 @@ dependencies = [ [[package]] name = "async-io" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb41eb19024a91746eba0773aa5e16036045bbf45733766661099e182ea6a744" +checksum = "8f97ab0c5b00a7cdbe5a371b9a782ee7be1316095885c8a4ea1daf490eb0ef65" dependencies = [ "async-lock 3.3.0", "cfg-if", @@ -254,8 +254,8 @@ dependencies = [ "futures-io", "futures-lite 2.2.0", "parking", - "polling 3.3.2", - "rustix 0.38.30", + "polling 3.4.0", + "rustix 0.38.31", "slab", "tracing", "windows-sys 0.52.0", @@ -347,31 +347,12 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "atoi" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" -dependencies = [ - "num-traits", -] - [[package]] name = "atomic-waker" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" -[[package]] -name = "atomic-write-file" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edcdbedc2236483ab103a53415653d6b4442ea6141baf1ffa85df29635e88436" -dependencies = [ - "nix", - "rand", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -508,9 +489,6 @@ name = "bitflags" version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" -dependencies = [ - "serde", -] [[package]] name = "bitvec" @@ -726,7 +704,7 @@ dependencies = [ [[package]] name = "ceramic-core" version = "0.9.0" -source = "git+https://github.com/3box/rust-ceramic?branch=main#bc3cceaa80cadf278233799598d13fe52c9131c3" +source = "git+https://github.com/3box/rust-ceramic?branch=main#dba11dadf30ac9cf6bb98223c92c9f7014f9eb8b" dependencies = [ "anyhow", "async-trait", @@ -745,7 +723,6 @@ dependencies = [ "serde_bytes", "serde_ipld_dagcbor", "serde_json", - "sqlx", "ssi", "unsigned-varint", ] @@ -753,7 +730,7 @@ dependencies = [ [[package]] name = "ceramic-core" version = "0.9.0" -source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#393b8761da4082c1f304fabe994d07d497dbd603" +source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#dba11dadf30ac9cf6bb98223c92c9f7014f9eb8b" dependencies = [ "anyhow", "async-trait", @@ -772,7 +749,6 @@ dependencies = [ "serde_bytes", "serde_ipld_dagcbor", "serde_json", - "sqlx", "ssi", "unsigned-varint", ] @@ -780,7 +756,7 @@ dependencies = [ [[package]] name = "ceramic-event" version = "0.9.0" -source = "git+https://github.com/3box/rust-ceramic?branch=main#bc3cceaa80cadf278233799598d13fe52c9131c3" +source = "git+https://github.com/3box/rust-ceramic?branch=main#dba11dadf30ac9cf6bb98223c92c9f7014f9eb8b" dependencies = [ "anyhow", "ceramic-core 0.9.0 (git+https://github.com/3box/rust-ceramic?branch=main)", @@ -812,9 +788,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.32" +version = "0.4.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" +checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1015,9 +991,9 @@ checksum = "05ca71f324d19e85a2e976be04b5ecbb193253794a75adfe2e5044c8bef03f6a" [[package]] name = "cookie" -version = "0.16.2" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" +checksum = "7efb37c3e1ccb1ff97164ad95ac1606e8ccd35b3fa0a7d99a304c7f4a428cc24" dependencies = [ "percent-encoding", "time", @@ -1026,12 +1002,12 @@ dependencies = [ [[package]] name = "cookie_store" -version = "0.16.2" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d606d0fba62e13cf04db20536c05cb7f13673c161cb47a47a82b9b9e7d3f1daa" +checksum = "387461abbc748185c3a6e1673d826918b450b87ff22639429c694619a83b6cf6" dependencies = [ "cookie", - "idna 0.2.3", + "idna 0.3.0", "log", "publicsuffix", "serde", @@ -1075,21 +1051,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32fast" version = "1.3.2" @@ -1108,15 +1069,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -1233,12 +1185,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "fc5d6b04b3fd0ba9926f945895de7d806260a2d7431ba82e7edaecb043c4c6b8" dependencies = [ - "darling_core 0.20.3", - "darling_macro 0.20.3", + "darling_core 0.20.5", + "darling_macro 0.20.5", ] [[package]] @@ -1271,9 +1223,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "04e48a959bcd5c761246f5d090ebc2fbf7b9cd527a492b07a67510c108f1e7e3" dependencies = [ "fnv", "ident_case", @@ -1307,11 +1259,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "1d1545d67a2149e1d93b7e5c7752dce5a7426eb5d1357ddcfd89336b94444f77" dependencies = [ - "darling_core 0.20.3", + "darling_core 0.20.5", "quote", "syn 2.0.48", ] @@ -1509,12 +1461,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86e3bdc80eee6e16b2b6b0f87fbc98c04bee3455e35174c0de1a125d0688c632" -[[package]] -name = "dotenvy" -version = "0.15.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" - [[package]] name = "downcast" version = "0.11.0" @@ -1589,9 +1535,6 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -dependencies = [ - "serde", -] [[package]] name = "elliptic-curve" @@ -1666,17 +1609,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "etcetera" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" -dependencies = [ - "cfg-if", - "home", - "windows-sys 0.48.0", -] - [[package]] name = "event-listener" version = "2.5.3" @@ -1766,15 +1698,9 @@ dependencies = [ [[package]] name = "fiat-crypto" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" - -[[package]] -name = "finl_unicode" -version = "1.2.0" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382" [[package]] name = "fixed-hash" @@ -1808,17 +1734,6 @@ dependencies = [ "spin 0.9.8", ] -[[package]] -name = "flume" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" -dependencies = [ - "futures-core", - "futures-sink", - "spin 0.9.8", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1903,17 +1818,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-intrusive" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" -dependencies = [ - "futures-core", - "lock_api", - "parking_lot", -] - [[package]] name = "futures-io" version = "0.3.30" @@ -2065,7 +1969,7 @@ dependencies = [ "chrono", "ctrlc", "downcast-rs", - "flume 0.10.14", + "flume", "futures", "gumdrop", "http", @@ -2158,7 +2062,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.1.0", + "indexmap 2.2.2", "slab", "tokio", "tokio-util", @@ -2205,15 +2109,6 @@ dependencies = [ "allocator-api2", ] -[[package]] -name = "hashlink" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" -dependencies = [ - "hashbrown 0.14.3", -] - [[package]] name = "hdrhistogram" version = "7.5.4" @@ -2232,15 +2127,12 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3" [[package]] name = "hex" @@ -2404,9 +2296,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.59" +version = "0.1.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -2431,17 +2323,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.3.0" @@ -2475,9 +2356,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -2565,7 +2446,7 @@ dependencies = [ [[package]] name = "iroh-car" version = "0.9.0" -source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#393b8761da4082c1f304fabe994d07d497dbd603" +source = "git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main#dba11dadf30ac9cf6bb98223c92c9f7014f9eb8b" dependencies = [ "cid 0.10.1", "futures", @@ -2594,15 +2475,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.10" @@ -2611,9 +2483,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -2792,9 +2664,9 @@ dependencies = [ [[package]] name = "jsonpath-rust" -version = "0.3.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06cc127b7c3d270be504572364f9569761a180b981919dd0d87693a7f5fb7829" +checksum = "96acbc6188d3bd83519d053efec756aa4419de62ec47be7f28dec297f7dc9eb0" dependencies = [ "pest", "pest_derive", @@ -2928,7 +2800,10 @@ dependencies = [ "ceramic-core 0.9.0 (git+https://github.com/ceramicnetwork/rust-ceramic.git?branch=main)", "ceramic-http-client", "clap", + "did-method-key", + "ed25519-dalek", "goose", + "hex", "iroh-car", "keramik-common", "libipld 0.16.0", @@ -2950,9 +2825,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.88.0" +version = "0.88.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f358a05ae5408b9e9a80c82326cc6e3141dd15b0d141e46d18873879dd4ec26" +checksum = "462fe330a0617b276ec864c2255810adcdf519ecb6844253c54074b2086a97bc" dependencies = [ "k8s-openapi", "kube-client", @@ -2963,9 +2838,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.88.0" +version = "0.88.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca11c8214a2197b0008948ee0b155fab030d16e2aa7cd1ae88e7236dda760a53" +checksum = "7fe0d65dd6f3adba29cfb84f19dfe55449c7f6c35425f9d8294bec40313e0b64" dependencies = [ "base64 0.21.7", "bytes", @@ -2998,9 +2873,9 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.88.0" +version = "0.88.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90821c3f37216c297b663a4e27abb354fcdf62082a68bf3123ab456fc21617f1" +checksum = "a6b42844e9172f631b8263ea9ce003b9251da13beb1401580937ad206dd82f4c" dependencies = [ "chrono", "form_urlencoded", @@ -3016,11 +2891,11 @@ dependencies = [ [[package]] name = "kube-derive" -version = "0.88.0" +version = "0.88.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f7c40cf24b5ce5c3146cb1f24aaacbf0d8b027023a4cc8c10b0f798ff65cc23" +checksum = "f5b5a111ee287bd237b8190b8c39543ea9fd22f79e9c32a36c24e08234bcda22" dependencies = [ - "darling 0.20.3", + "darling 0.20.5", "proc-macro2", "quote", "serde_json", @@ -3029,9 +2904,9 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.88.0" +version = "0.88.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea80f9546d04ecf630e101ce62344baab93c8fc9ded905b8e77a23a52e30f90e" +checksum = "2bc06275064c81056fbb28ea876b3fb339d970e8132282119359afca0835c0ea" dependencies = [ "ahash 0.8.7", "async-trait", @@ -3152,9 +3027,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libipld" @@ -3348,17 +3223,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "libsqlite3-sys" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" -dependencies = [ - "cc", - "pkg-config", - "vcpkg", -] - [[package]] name = "linked-hash-map" version = "0.5.6" @@ -3436,28 +3300,12 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matches" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" - [[package]] name = "matchit" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" -[[package]] -name = "md-5" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" -dependencies = [ - "cfg-if", - "digest 0.10.7", -] - [[package]] name = "memchr" version = "2.7.1" @@ -3509,9 +3357,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] @@ -3780,6 +3628,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-derive" version = "0.3.3" @@ -3921,7 +3775,7 @@ checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ "futures-core", "futures-sink", - "indexmap 2.1.0", + "indexmap 2.2.2", "js-sys", "once_cell", "pin-project-lite", @@ -4070,12 +3924,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "paste" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" - [[package]] name = "pct-str" version = "1.2.0" @@ -4127,9 +3975,9 @@ checksum = "b687ff7b5da449d39e418ad391e5e08da53ec334903ddbb921db208908fc372c" [[package]] name = "pest" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f200d8d83c44a45b21764d1916299752ca035d15ecd46faca3e9a2a2bf6ad06" +checksum = "219c0dcc30b6a27553f9cc242972b67f75b60eb0db71f0b5462f38b058c41546" dependencies = [ "memchr", "thiserror", @@ -4138,9 +3986,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcd6ab1236bbdb3a49027e920e693192ebfe8913f6d60e294de57463a493cfde" +checksum = "22e1288dbd7786462961e69bfd4df7848c1e37e8b74303dbdab82c3a9cdd2809" dependencies = [ "pest", "pest_generator", @@ -4148,9 +3996,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a31940305ffc96863a735bef7c7994a00b325a7138fdbc5bda0f1a0476d3275" +checksum = "1381c29a877c6d34b8c176e734f35d7f7f5b3adaefe940cb4d1bb7af94678e2e" dependencies = [ "pest", "pest_meta", @@ -4161,9 +4009,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7ff62f5259e53b78d1af898941cdcdccfae7385cf7d793a6e55de5d05bb4b7d" +checksum = "d0934d6907f148c22a3acbda520c7eed243ad7487a30f51f6ce52b58b7077a8a" dependencies = [ "once_cell", "pest", @@ -4172,18 +4020,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", @@ -4224,17 +4072,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "pkcs1" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" -dependencies = [ - "der 0.7.8", - "pkcs8 0.10.2", - "spki 0.7.3", -] - [[package]] name = "pkcs8" version = "0.8.0" @@ -4296,14 +4133,14 @@ dependencies = [ [[package]] name = "polling" -version = "3.3.2" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "545c980a3880efd47b2e262f6a4bb6daad6555cf3367aa9c4e52895f69537a41" +checksum = "30054e72317ab98eddd8561db0f6524df3367636884b7b21b703e4b280a84a14" dependencies = [ "cfg-if", "concurrent-queue", "pin-project-lite", - "rustix 0.38.30", + "rustix 0.38.31", "tracing", "windows-sys 0.52.0", ] @@ -4614,7 +4451,7 @@ checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.4", + "regex-automata 0.4.5", "regex-syntax 0.8.2", ] @@ -4629,9 +4466,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b7fa1134405e2ec9353fd416b17f8dacd46c473d7d3fd1cf202706a14eb792a" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", @@ -4652,9 +4489,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "async-compression", "base64 0.21.7", @@ -4684,6 +4521,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-native-tls", @@ -4756,7 +4594,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "pkcs1 0.3.3", + "pkcs1", "pkcs8 0.8.0", "rand_core", "smallvec", @@ -4764,26 +4602,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rsa" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" -dependencies = [ - "const-oid 0.9.6", - "digest 0.10.7", - "num-bigint-dig", - "num-integer", - "num-traits", - "pkcs1 0.7.5", - "pkcs8 0.10.2", - "rand_core", - "signature 2.2.0", - "spki 0.7.3", - "subtle", - "zeroize", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -4815,9 +4633,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.30" +version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ "bitflags 2.4.2", "errno", @@ -4999,9 +4817,9 @@ checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" [[package]] name = "serde" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" dependencies = [ "serde_derive", ] @@ -5046,9 +4864,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", @@ -5091,9 +4909,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.111" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", @@ -5157,7 +4975,7 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "881b6f881b17d13214e5d494c939ebab463d01264ce1811e9d4ac3a882e7695f" dependencies = [ - "darling 0.20.3", + "darling 0.20.5", "proc-macro2", "quote", "syn 2.0.48", @@ -5165,11 +4983,11 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.30" +version = "0.9.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" +checksum = "adf8a49373e98a4c5f0ceb5d05aa7c648d75f63774981ed95b7c7443bbd50c6e" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.2", "itoa", "ryu", "serde", @@ -5189,17 +5007,6 @@ dependencies = [ "opaque-debug 0.3.0", ] -[[package]] -name = "sha1" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", -] - [[package]] name = "sha1_smol" version = "1.0.0" @@ -5432,213 +5239,6 @@ dependencies = [ "der 0.7.8", ] -[[package]] -name = "sqlformat" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" -dependencies = [ - "itertools 0.12.0", - "nom", - "unicode_categories", -] - -[[package]] -name = "sqlx" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dba03c279da73694ef99763320dea58b51095dfe87d001b1d4b5fe78ba8763cf" -dependencies = [ - "sqlx-core", - "sqlx-macros", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", -] - -[[package]] -name = "sqlx-core" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d84b0a3c3739e220d94b3239fd69fb1f74bc36e16643423bd99de3b43c21bfbd" -dependencies = [ - "ahash 0.8.7", - "atoi", - "byteorder", - "bytes", - "crc", - "crossbeam-queue", - "dotenvy", - "either", - "event-listener 2.5.3", - "futures-channel", - "futures-core", - "futures-intrusive", - "futures-io", - "futures-util", - "hashlink", - "hex", - "indexmap 2.1.0", - "log", - "memchr", - "once_cell", - "paste", - "percent-encoding", - "serde", - "serde_json", - "sha2 0.10.8", - "smallvec", - "sqlformat", - "thiserror", - "tokio", - "tokio-stream", - "tracing", - "url", -] - -[[package]] -name = "sqlx-macros" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89961c00dc4d7dffb7aee214964b065072bff69e36ddb9e2c107541f75e4f2a5" -dependencies = [ - "proc-macro2", - "quote", - "sqlx-core", - "sqlx-macros-core", - "syn 1.0.109", -] - -[[package]] -name = "sqlx-macros-core" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0bd4519486723648186a08785143599760f7cc81c52334a55d6a83ea1e20841" -dependencies = [ - "atomic-write-file", - "dotenvy", - "either", - "heck", - "hex", - "once_cell", - "proc-macro2", - "quote", - "serde", - "serde_json", - "sha2 0.10.8", - "sqlx-core", - "sqlx-mysql", - "sqlx-sqlite", - "syn 1.0.109", - "tempfile", - "tokio", - "url", -] - -[[package]] -name = "sqlx-mysql" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e37195395df71fd068f6e2082247891bc11e3289624bbc776a0cdfa1ca7f1ea4" -dependencies = [ - "atoi", - "base64 0.21.7", - "bitflags 2.4.2", - "byteorder", - "bytes", - "crc", - "digest 0.10.7", - "dotenvy", - "either", - "futures-channel", - "futures-core", - "futures-io", - "futures-util", - "generic-array 0.14.7", - "hex", - "hkdf", - "hmac", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "percent-encoding", - "rand", - "rsa 0.9.6", - "serde", - "sha1", - "sha2 0.10.8", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror", - "tracing", - "whoami", -] - -[[package]] -name = "sqlx-postgres" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6ac0ac3b7ccd10cc96c7ab29791a7dd236bd94021f31eec7ba3d46a74aa1c24" -dependencies = [ - "atoi", - "base64 0.21.7", - "bitflags 2.4.2", - "byteorder", - "crc", - "dotenvy", - "etcetera", - "futures-channel", - "futures-core", - "futures-io", - "futures-util", - "hex", - "hkdf", - "hmac", - "home", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "rand", - "serde", - "serde_json", - "sha1", - "sha2 0.10.8", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror", - "tracing", - "whoami", -] - -[[package]] -name = "sqlx-sqlite" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "210976b7d948c7ba9fced8ca835b11cbb2d677c59c79de41ac0d397e14547490" -dependencies = [ - "atoi", - "flume 0.11.0", - "futures-channel", - "futures-core", - "futures-executor", - "futures-intrusive", - "futures-util", - "libsqlite3-sys", - "log", - "percent-encoding", - "serde", - "sqlx-core", - "tracing", - "url", - "urlencoding", -] - [[package]] name = "sshkeys" version = "0.3.2" @@ -5782,7 +5382,7 @@ dependencies = [ "num-traits", "p256", "rand", - "rsa 0.6.1", + "rsa", "serde", "simple_asn1", "ssi-crypto", @@ -5804,7 +5404,7 @@ dependencies = [ "k256 0.13.3", "p256", "rand", - "rsa 0.6.1", + "rsa", "serde", "serde_json", "sha2 0.10.8", @@ -5975,17 +5575,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "stringprep" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb41d74e231a107a1b4ee36bd1214b11285b77768d2e3824aedafa988fd36ee6" -dependencies = [ - "finl_unicode", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "strsim" version = "0.9.3" @@ -6102,14 +5691,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ "cfg-if", "fastrand 2.0.1", - "redox_syscall", - "rustix 0.38.30", + "rustix 0.38.31", "windows-sys 0.52.0", ] @@ -6181,12 +5769,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.31" +version = "0.3.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", "itoa", + "num-conv", "powerfmt", "serde", "time-core", @@ -6201,10 +5790,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ + "num-conv", "time-core", ] @@ -6234,9 +5824,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -6369,7 +5959,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.2", "toml_datetime", "winnow", ] @@ -6625,9 +6215,9 @@ dependencies = [ [[package]] name = "treediff" -version = "4.0.2" +version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52984d277bdf2a751072b5df30ec0377febdb02f7696d64c2d7d54630bac4303" +checksum = "4d127780145176e2b5d16611cc25a900150e86e9fd79d3bde6ff3a37359c9cb5" dependencies = [ "serde_json", ] @@ -6730,24 +6320,12 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-segmentation" -version = "1.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" - [[package]] name = "unicode-xid" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" -[[package]] -name = "unicode_categories" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" - [[package]] name = "unsafe-libyaml" version = "0.2.10" @@ -6809,9 +6387,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "value-bag" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cdbaf5e132e593e9fc1de6a15bbec912395b11fb9719e061cf64f804524c503" +checksum = "126e423afe2dd9ac52142e7e9d5ce4135d7e13776c529d27fd6bc49f19e3280b" [[package]] name = "vcpkg" @@ -6848,9 +6426,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -6858,9 +6436,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", @@ -6873,9 +6451,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ "cfg-if", "js-sys", @@ -6885,9 +6463,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -6895,9 +6473,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", @@ -6908,15 +6486,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "web-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" +checksum = "96565907687f7aceb35bc5fc03770a8a0471d82e479f25832f54a0e3f4b28446" dependencies = [ "js-sys", "wasm-bindgen", @@ -6934,15 +6512,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" - -[[package]] -name = "whoami" -version = "1.4.1" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "winapi" @@ -7118,9 +6690,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.34" +version = "0.5.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7cf47b659b318dccbd69cc4797a39ae128f533dce7902a1096044d1967b9c16" +checksum = "5389a154b01683d28c77f8f68f49dea75f0a4da32557a58f68ee51ebba472d29" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 7153f4e5..d60eeb64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ ceramic-core = { git = "https://github.com/ceramicnetwork/rust-ceramic.git", bra iroh-car = { git = "https://github.com/ceramicnetwork/rust-ceramic.git", branch = "main" } env_logger = "0.10.0" expect-patch = { path = "./expect-patch/" } +hex = "0.4.3" keramik-common = { path = "./common/", default-features = false } multiaddr = "0.17" multibase = "0.9.1" diff --git a/local-scripts/build-local.sh b/local-scripts/build-local.sh new file mode 100755 index 00000000..1f775d2b --- /dev/null +++ b/local-scripts/build-local.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +docker buildx build --load -t keramik/operator:dev --target operator . +docker buildx build --load -t keramik/runner:dev --target runner . + +kind load docker-image keramik/runner:dev +kind load docker-image keramik/operator:dev \ No newline at end of file diff --git a/local-scripts/kind.yaml b/local-scripts/kind.yaml new file mode 100644 index 00000000..d183e196 --- /dev/null +++ b/local-scripts/kind.yaml @@ -0,0 +1,4 @@ +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +featureGates: + MaxUnavailableStatefulSet: true \ No newline at end of file diff --git a/local-scripts/small.yaml b/local-scripts/small.yaml index 40021156..0e2bba64 100644 --- a/local-scripts/small.yaml +++ b/local-scripts/small.yaml @@ -4,14 +4,17 @@ metadata: name: small spec: replicas: 2 - bootstrap: - image: runner:dev - imagePullPolicy: IfNotPresent + monitoring: + namespaced: true + devMode: true + # bootstrap: + # image: keramik/runner:dev + # imagePullPolicy: IfNotPresent cas: ipfs: go: {} ceramic: - ipfs: - rust: {} + rust: { env: { CERAMIC_ONE_RECON: "true" } } env: - CERAMIC_PUBSUB_QPS_LIMIT: "1000" \ No newline at end of file + CERAMIC_PUBSUB_QPS_LIMIT: "1000" diff --git a/operator/Cargo.toml b/operator/Cargo.toml index 4f9ceefe..2b277662 100644 --- a/operator/Cargo.toml +++ b/operator/Cargo.toml @@ -45,7 +45,7 @@ anyhow = { workspace = true, optional = true } async-trait = { version = "0.1.68", optional = true } clap = { workspace = true, optional = true } futures = { version = "0.3", optional = true } -hex = { version = "0.4.3", optional = true } +hex = { workspace = true, optional = true } k8s-openapi = { version = "0.21", features = [ "v1_26", "schemars", diff --git a/operator/src/network/ceramic.rs b/operator/src/network/ceramic.rs index 43edb0b0..5f00352d 100644 --- a/operator/src/network/ceramic.rs +++ b/operator/src/network/ceramic.rs @@ -148,6 +148,7 @@ impl CeramicConfig { if NETWORK_DEV_MODE_RESOURCES.load(std::sync::atomic::Ordering::Relaxed) { Self { resource_limits: ResourceLimitsConfig::dev_default(), + postgres_resource_limits: ResourceLimitsConfig::dev_default(), ..Default::default() } } else { diff --git a/operator/src/simulation/controller.rs b/operator/src/simulation/controller.rs index 96e0bcf7..625f882b 100644 --- a/operator/src/simulation/controller.rs +++ b/operator/src/simulation/controller.rs @@ -603,8 +603,8 @@ mod tests { }, { "name": "SIMULATE_MANAGER", - @@ -80,8 +80,8 @@ - "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + @@ -89,8 +89,8 @@ + } } ], - "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", @@ -627,8 +627,8 @@ mod tests { }, { "name": "SIMULATE_TARGET_PEER", - @@ -76,8 +76,8 @@ - "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + @@ -85,8 +85,8 @@ + } } ], - "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", @@ -651,8 +651,8 @@ mod tests { }, { "name": "SIMULATE_TARGET_PEER", - @@ -76,8 +76,8 @@ - "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + @@ -85,8 +85,8 @@ + } } ], - "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", @@ -683,10 +683,10 @@ mod tests { stub.manager_job.patch(expect![[r#" --- original +++ modified - @@ -78,6 +78,10 @@ - { - "name": "DID_PRIVATE_KEY", - "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + @@ -87,6 +87,10 @@ + "name": "ceramic-admin" + } + } + }, + { + "name": "SIMULATE_THROTTLE_REQUESTS", @@ -698,10 +698,10 @@ mod tests { stub.worker_jobs[0].patch(expect![[r#" --- original +++ modified - @@ -74,6 +74,10 @@ - { - "name": "DID_PRIVATE_KEY", - "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + @@ -83,6 +83,10 @@ + "name": "ceramic-admin" + } + } + }, + { + "name": "SIMULATE_THROTTLE_REQUESTS", @@ -713,10 +713,10 @@ mod tests { stub.worker_jobs[1].patch(expect![[r#" --- original +++ modified - @@ -74,6 +74,10 @@ - { - "name": "DID_PRIVATE_KEY", - "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + @@ -83,6 +83,10 @@ + "name": "ceramic-admin" + } + } + }, + { + "name": "SIMULATE_THROTTLE_REQUESTS", @@ -746,10 +746,10 @@ mod tests { stub.manager_job.patch(expect![[r#" --- original +++ modified - @@ -78,6 +78,10 @@ - { - "name": "DID_PRIVATE_KEY", - "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + @@ -87,6 +87,10 @@ + "name": "ceramic-admin" + } + } + }, + { + "name": "SIMULATE_TARGET_REQUESTS", diff --git a/operator/src/simulation/manager.rs b/operator/src/simulation/manager.rs index 1c993863..9c75ffb1 100644 --- a/operator/src/simulation/manager.rs +++ b/operator/src/simulation/manager.rs @@ -3,8 +3,8 @@ use std::collections::BTreeMap; use k8s_openapi::api::{ batch::v1::JobSpec, core::v1::{ - ConfigMapVolumeSource, Container, EnvVar, PodSpec, PodTemplateSpec, ServicePort, - ServiceSpec, Volume, VolumeMount, + ConfigMapVolumeSource, Container, EnvVar, EnvVarSource, PodSpec, PodTemplateSpec, + SecretKeySelector, ServicePort, ServiceSpec, Volume, VolumeMount, }, }; use kube::core::ObjectMeta; @@ -103,6 +103,18 @@ pub fn manager_job_spec(config: ManagerConfig) -> JobSpec { ), ..Default::default() }, + EnvVar { + name: "CERAMIC_ADMIN_PRIVATE_KEY".to_owned(), + value_from: Some(EnvVarSource { + secret_key_ref: Some(SecretKeySelector { + key: "private-key".to_owned(), + name: Some("ceramic-admin".to_owned()), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }, ]; if let Some(throttle_requests) = config.throttle_requests { env_vars.push(EnvVar { diff --git a/operator/src/simulation/testdata/default_stubs/manager_job b/operator/src/simulation/testdata/default_stubs/manager_job index 4bfdf40d..1658bc58 100644 --- a/operator/src/simulation/testdata/default_stubs/manager_job +++ b/operator/src/simulation/testdata/default_stubs/manager_job @@ -78,6 +78,15 @@ Request { { "name": "DID_PRIVATE_KEY", "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + }, + { + "name": "CERAMIC_ADMIN_PRIVATE_KEY", + "valueFrom": { + "secretKeyRef": { + "key": "private-key", + "name": "ceramic-admin" + } + } } ], "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", diff --git a/operator/src/simulation/testdata/default_stubs/worker_job_0 b/operator/src/simulation/testdata/default_stubs/worker_job_0 index c360ee8e..7b29ab68 100644 --- a/operator/src/simulation/testdata/default_stubs/worker_job_0 +++ b/operator/src/simulation/testdata/default_stubs/worker_job_0 @@ -74,6 +74,15 @@ Request { { "name": "DID_PRIVATE_KEY", "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + }, + { + "name": "CERAMIC_ADMIN_PRIVATE_KEY", + "valueFrom": { + "secretKeyRef": { + "key": "private-key", + "name": "ceramic-admin" + } + } } ], "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", diff --git a/operator/src/simulation/testdata/default_stubs/worker_job_1 b/operator/src/simulation/testdata/default_stubs/worker_job_1 index cee194f0..c154f0be 100644 --- a/operator/src/simulation/testdata/default_stubs/worker_job_1 +++ b/operator/src/simulation/testdata/default_stubs/worker_job_1 @@ -74,6 +74,15 @@ Request { { "name": "DID_PRIVATE_KEY", "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + }, + { + "name": "CERAMIC_ADMIN_PRIVATE_KEY", + "valueFrom": { + "secretKeyRef": { + "key": "private-key", + "name": "ceramic-admin" + } + } } ], "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", diff --git a/operator/src/simulation/testdata/worker_job_2 b/operator/src/simulation/testdata/worker_job_2 index a0f101fe..4d6f06b9 100644 --- a/operator/src/simulation/testdata/worker_job_2 +++ b/operator/src/simulation/testdata/worker_job_2 @@ -74,6 +74,15 @@ Request { { "name": "DID_PRIVATE_KEY", "value": "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a" + }, + { + "name": "CERAMIC_ADMIN_PRIVATE_KEY", + "valueFrom": { + "secretKeyRef": { + "key": "private-key", + "name": "ceramic-admin" + } + } } ], "image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest", diff --git a/operator/src/simulation/worker.rs b/operator/src/simulation/worker.rs index 4db8ae84..a2cd4fc1 100644 --- a/operator/src/simulation/worker.rs +++ b/operator/src/simulation/worker.rs @@ -3,7 +3,8 @@ use std::collections::BTreeMap; use k8s_openapi::api::{ batch::v1::JobSpec, core::v1::{ - ConfigMapVolumeSource, Container, EnvVar, PodSpec, PodTemplateSpec, Volume, VolumeMount, + ConfigMapVolumeSource, Container, EnvVar, EnvVarSource, PodSpec, PodTemplateSpec, + SecretKeySelector, Volume, VolumeMount, }, }; @@ -80,6 +81,18 @@ pub fn worker_job_spec(config: WorkerConfig) -> JobSpec { ), ..Default::default() }, + EnvVar { + name: "CERAMIC_ADMIN_PRIVATE_KEY".to_owned(), + value_from: Some(EnvVarSource { + secret_key_ref: Some(SecretKeySelector { + key: "private-key".to_owned(), + name: Some("ceramic-admin".to_owned()), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }, ]; if let Some(throttle_requests) = config.throttle_requests { diff --git a/runner/Cargo.toml b/runner/Cargo.toml index 76ca152b..f89cd015 100644 --- a/runner/Cargo.toml +++ b/runner/Cargo.toml @@ -14,7 +14,9 @@ iroh-car.workspace = true ceramic-http-client = { git = "https://github.com/3box/ceramic-http-client-rs.git", branch = "main", default-features = false } #ceramic-http-client = { path = "../../ceramic-http-client-rs", default-features = false } clap.workspace = true +did-method-key = "0.2" goose = { version = "0.16", features = ["gaggle"] } +hex.workspace = true keramik-common = { workspace = true, features = ["telemetry", "tokio-console"] } libipld = "0.16.0" multihash.workspace = true @@ -22,7 +24,7 @@ opentelemetry.workspace = true rand = "0.8.5" redis = { version = "0.24", features = ["tokio-comp"] } reqwest.workspace = true -schemars = "0.8.12" +schemars.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true @@ -31,6 +33,8 @@ tracing.workspace = true tracing-subscriber.workspace = true multibase.workspace = true +ed25519-dalek = "2.1" + [dev-dependencies] test-log = "0.2" diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index 5454235f..a036a48f 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -1,17 +1,20 @@ -pub mod model_reuse; +pub mod model_instance; mod models; pub mod new_streams; pub mod query; -pub mod recon_sync; pub mod simple; pub mod util; pub mod write_only; -use ceramic_http_client::api::StreamsResponseOrError; +use ceramic_core::ssi::did::{DIDMethod, DocumentBuilder, Source}; +use ceramic_core::ssi::jwk::{self, Base64urlUInt, Params, JWK}; use ceramic_http_client::ceramic_event::{DidDocument, JwkSigner}; use ceramic_http_client::CeramicHttpClient; use models::RandomModelInstance; +use serde::{Deserialize, Serialize}; + +use crate::simulate::Scenario; pub type CeramicClient = CeramicHttpClient; @@ -23,9 +26,147 @@ pub struct Credentials { impl Credentials { pub async fn from_env() -> Result { - let did = DidDocument::new(&std::env::var("DID_KEY").unwrap()); - let private_key = std::env::var("DID_PRIVATE_KEY").unwrap(); + let did = DidDocument::new(&std::env::var("DID_KEY").expect("DID_KEY is required")); + let private_key = std::env::var("DID_PRIVATE_KEY").expect("DID_PRIVATE_KEY is required"); let signer = JwkSigner::new(did.clone(), &private_key).await?; Ok(Self { signer, did }) } + + pub async fn admin_from_env() -> Result { + // TODO: move DID from private key to rust-ceramic + // There is a private function (ed25519_parse_private) in spruceid ssi that does this. + // it's possible I'm missing an easier way that would avoid bringing in the dependency on ed25519_dalek + let private_key = std::env::var("CERAMIC_ADMIN_PRIVATE_KEY") + .expect("CERAMIC_ADMIN_PRIVATE_KEY is required"); + + let data = hex::decode(&private_key)?; + + let key: ed25519_dalek::SigningKey = data[..].try_into()?; + let key = JWK::from(Params::OKP(jwk::OctetParams { + curve: "Ed25519".to_string(), + public_key: Base64urlUInt(ed25519_dalek::VerifyingKey::from(&key).as_bytes().to_vec()), + private_key: Some(Base64urlUInt(data.to_owned())), + })); + + let did = Self::generate_did_for_jwk(&key)?; + let signer = JwkSigner::new(did.clone(), &private_key).await?; + Ok(Self { signer, did }) + } + + pub async fn new_generate_did_key() -> Result { + let (pk, did) = Self::generate_did_and_pk()?; + let signer = JwkSigner::new(did.clone(), &pk).await?; + Ok(Self { signer, did }) + } + + fn generate_did_for_jwk(key: &JWK) -> anyhow::Result { + let did = did_method_key::DIDKey + .generate(&Source::Key(key)) + .ok_or_else(|| anyhow::anyhow!("Failed to generate DID"))?; + + let doc: DidDocument = DocumentBuilder::default() + .id(did) + .build() + .map_err(|e| anyhow::anyhow!("failed to build DID document: {}", e))?; + tracing::debug!("Generated DID: {:?}", doc); + Ok(doc) + } + + /// Returns (Private Key, DID Document) + fn generate_did_and_pk() -> anyhow::Result<(String, DidDocument)> { + let key = jwk::JWK::generate_ed25519()?; + let private_key = if let Params::OKP(params) = &key.params { + let pk = params + .private_key + .as_ref() + .ok_or_else(|| anyhow::anyhow!("No private key"))?; + hex::encode(pk.0.as_slice()) + } else { + anyhow::bail!("Invalid private key"); + }; + + let did = Self::generate_did_for_jwk(&key)?; + Ok((private_key, did)) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub enum DidType { + /// One DID for all users + Shared, + /// A unique DID key for each user + UserDidKey, + // Use CACAOs for each user + //UserCacao, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub enum ReuseType { + /// Create a new model or model instance document for each user + PerUser, + /// Create a new model for each node (worker) + PerNode, + /// Reuse the same model or model instance document for all users + Shared, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct CeramicScenarioParameters { + pub did_type: DidType, + /// Whether models should be shared or independent + pub model_reuse: ReuseType, + /// How many model instance documents to create in advance for each model. + pub model_instance_reuse: ReuseType, + pub number_of_documents: usize, +} + +impl From for CeramicScenarioParameters { + fn from(value: Scenario) -> Self { + // did_type: DidType::UserDidKey and model_instance_reuse: ReuseType::Shared is an invalid combination + // as we'll try to update documents owned by another controller and just log lots of errors + match value { + Scenario::CeramicSimple => Self { + did_type: DidType::Shared, + model_reuse: ReuseType::PerUser, + model_instance_reuse: ReuseType::PerUser, + number_of_documents: 1, + }, + Scenario::CeramicModelReuse => Self { + did_type: DidType::UserDidKey, + model_reuse: ReuseType::Shared, + model_instance_reuse: ReuseType::PerUser, + number_of_documents: 1, + }, + Scenario::CeramicWriteOnly => Self { + did_type: DidType::UserDidKey, + model_reuse: ReuseType::Shared, + model_instance_reuse: ReuseType::PerUser, + number_of_documents: 1, + }, + Scenario::CeramicNewStreams => Self { + did_type: DidType::UserDidKey, + model_reuse: ReuseType::PerUser, + model_instance_reuse: ReuseType::PerUser, + number_of_documents: 0, + }, + Scenario::CeramicNewStreamsBenchmark => Self { + did_type: DidType::UserDidKey, + model_reuse: ReuseType::Shared, + model_instance_reuse: ReuseType::PerUser, + number_of_documents: 0, + }, + Scenario::CeramicQuery => Self { + did_type: DidType::Shared, + model_reuse: ReuseType::PerUser, + model_instance_reuse: ReuseType::PerUser, + number_of_documents: 3, + }, + Scenario::IpfsRpc | Scenario::ReconEventSync | Scenario::ReconEventKeySync => { + panic!("Not supported for non ceramic scenarios") + } + } + } } diff --git a/runner/src/scenario/ceramic/model_instance.rs b/runner/src/scenario/ceramic/model_instance.rs new file mode 100644 index 00000000..0c64b0f8 --- /dev/null +++ b/runner/src/scenario/ceramic/model_instance.rs @@ -0,0 +1,718 @@ +use std::{str::FromStr, sync::Arc, time::Duration}; + +use ceramic_http_client::{ + api::{self, Pagination, StreamsResponse, StreamsResponseOrError}, + ceramic_event::StreamId, + CeramicHttpClient, FilterQuery, ModelAccountRelation, ModelDefinition, +}; +use goose::{metrics::GooseRequestMetric, prelude::*}; +use redis::AsyncCommands; +use tracing::{debug, info, warn}; + +use crate::{ + goose_try, + scenario::{ + ceramic::{ + models::{self, RandomModelInstance}, + CeramicClient, Credentials, + }, + get_redis_client, is_goose_global_leader, is_goose_lead_user, is_goose_lead_worker, + }, +}; + +use super::CeramicScenarioParameters; + +const SMALL_MODEL_ID_KEY: &str = "small_model_reuse_model_id"; +const LARGE_MODEL_ID_KEY: &str = "large_model_reuse_model_id"; +const SMALL_MID_ID_KEY: &str = "small_model_reuse_mid_id"; +const LARGE_MID_ID_KEY: &str = "large_model_reuse_mid_id"; + +pub(crate) async fn set_key_to_stream_id( + conn: &mut redis::aio::Connection, + key: &str, + stream_id: &StreamId, +) { + let _: () = conn.set(key, stream_id.to_string()).await.unwrap(); +} + +pub(crate) async fn loop_until_key_value_set( + conn: &mut redis::aio::Connection, + key: &str, +) -> StreamId { + loop { + if conn.exists(key).await.unwrap() { + let id: String = conn.get(key).await.unwrap(); + return StreamId::from_str(&id) + .map_err(|e| { + tracing::error!("invalid stream: {:?} ", e); + e + }) + .unwrap(); + } else { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CountResponse { + pub count: i32, +} + +#[derive(Clone, Debug)] +pub struct EnvBasedConfig { + /// The DID that can be used to create models and access the admin API + pub admin_cli: CeramicClient, + /// The DID that should be used to interact with ceramic (may be the same as admin DID) + pub user_cli: CeramicClient, + /// The redis client to use for sharing model info between users + pub redis_cli: redis::Client, + /// The scenario parameters + params: CeramicScenarioParameters, +} + +#[derive(Clone, Debug)] +pub struct GooseUserInfo { + pub global_leader: bool, + /// True if this user is the lead user on the worker + pub lead_user: bool, + /// True if this is the lead worker process + pub lead_worker: bool, +} + +#[derive(Clone, Debug)] +pub struct CeramicModelInstanceTestUser { + /// Config that needs to exist before starting the scenario + config: EnvBasedConfig, + /// True if this user is the global leader + pub user_info: GooseUserInfo, + /// The ID of the small model + pub small_model_id: StreamId, + /// The ID of the small model instance documents set up by the test + pub small_model_instance_ids: Vec, + /// The ID of the large model + pub large_model_id: StreamId, + /// The ID of the large model instance documents set up by the test + pub large_model_instance_ids: Vec, +} + +impl CeramicModelInstanceTestUser { + pub fn user_cli(&self) -> &CeramicClient { + &self.config.user_cli + } + + pub fn redis_cli(&self) -> &redis::Client { + &self.config.redis_cli + } + + /// Call this before starting the scenario to verify everything is configured appropriately + /// It could be in the `setup_scenario` function, but it's "nice" to crash the worker rather than + /// only error in the setup function, which is harder to notice right away. + pub async fn prep_scenario( + params: CeramicScenarioParameters, + ) -> anyhow::Result { + let redis_cli = get_redis_client().await?; + let creds = Credentials::admin_from_env().await?; + let admin_cli = CeramicHttpClient::new(creds.signer); + + let user_cli = match params.did_type { + super::DidType::Shared => { + let creds = Credentials::from_env().await?; + CeramicHttpClient::new(creds.signer) + } + super::DidType::UserDidKey => { + let creds = Credentials::new_generate_did_key().await?; + CeramicHttpClient::new(creds.signer) + } + }; + + Ok(EnvBasedConfig { + admin_cli, + user_cli, + redis_cli, + params, + }) + } + + pub async fn setup_mid_scenario( + user: &mut GooseUser, + config: EnvBasedConfig, + ) -> TransactionResult { + Self::setup_scenario(user, config) + .await + .map_err(|e| { + tracing::error!("failed to setup scenario: {}", e); + e + }) + .unwrap(); + Ok(()) + } + /// Builds the CeramicLoadTestUser and stores it as user session data + async fn setup_scenario(user: &mut GooseUser, config: EnvBasedConfig) -> anyhow::Result<()> { + let lead_user = is_goose_lead_user(); // we cache this as the implementation is not idempotent + let global_leader = is_goose_global_leader(lead_user); + debug!(params=?config.params, "setting up scenario"); + let (small_model_id, large_model_id) = match config.params.model_reuse { + super::ReuseType::PerUser => { + Self::generate_list_models(user, &config.admin_cli, &config.redis_cli, true, None) + .await? + } + super::ReuseType::Shared => { + let (small, large) = Self::generate_list_models( + user, + &config.admin_cli, + &config.redis_cli, + global_leader, + None, + ) + .await?; + // js ceramic subscribes to the meta model, so we'll get all models created synced to us. we just need to make sure they sync before starting + Self::ensure_model_exists(user, &config.user_cli, &small).await?; + Self::ensure_model_exists(user, &config.user_cli, &large).await?; + (small, large) + } + crate::scenario::ceramic::ReuseType::PerNode => { + // we need to adjust the redis key when we want to avoid overwriting the same key from different nodes. usually we want to share (empty), + // but in this case we want to only share for a single worker (all users) + Self::generate_list_models( + user, + &config.admin_cli, + &config.redis_cli, + lead_user, + Some(goose::get_worker_id().to_string()), + ) + .await? + } + }; + + if lead_user { + ModelInstanceRequests::index_model( + user, + &config.admin_cli, + &small_model_id, + "index_small", + ) + .await?; + ModelInstanceRequests::index_model( + user, + &config.admin_cli, + &large_model_id, + "index_large", + ) + .await?; + + Self::subscribe_to_model(user, &small_model_id).await?; + Self::subscribe_to_model(user, &large_model_id).await?; + } + + let (small_model_instance_ids, large_model_instance_ids) = match config + .params + .model_instance_reuse + { + super::ReuseType::PerUser => { + Self::generate_mids( + user, + &config.user_cli, + &config.redis_cli, + &small_model_id, + &large_model_id, + config.params.number_of_documents, + true, + None, + ) + .await? + } + super::ReuseType::PerNode => { + Self::generate_mids( + user, + &config.user_cli, + &config.redis_cli, + &small_model_id, + &large_model_id, + config.params.number_of_documents, + lead_user, + Some(goose::get_worker_id().to_string()), + ) + .await? + } + super::ReuseType::Shared => { + if config.params.number_of_documents != 1 { + warn!("Shared model instance reuse only supports 1 document per model currently. Only using the first document for each model."); + } + Self::generate_mids( + user, + &config.user_cli, + &config.redis_cli, + &small_model_id, + &large_model_id, + config.params.number_of_documents, + global_leader, + None, + ) + .await? + } + }; + + let resp = Self { + config, + user_info: GooseUserInfo { + lead_user, + global_leader, + lead_worker: is_goose_lead_worker(), + }, + small_model_id, + small_model_instance_ids, + large_model_id, + large_model_instance_ids, + }; + + user.set_session_data(Arc::new(resp)); + info!("scenario setup complete"); + Ok(()) + } + + /// This is awkward. But as we set ourself up, we should be able to guanratee that the user data is set. + /// We use an Arc as we want to allow borrowing this 'context' data, while taking a separate &mut GooseUser + /// request. It would be possible to do this, and match on which "type" of stream update we're making, but + /// it's nice to accept all the stream IDs etc as parameters to functions. + pub fn user_data(user: &GooseUser) -> &Arc { + user.get_session_data_unchecked() + } + + pub fn _random_model_for_user(&self, user: &GooseUser) -> &StreamId { + let idx = user.weighted_users_index % self.large_model_instance_ids.len(); + let val = self.large_model_instance_ids.get(idx); + // we should always have a value, since the index is always less than the length + val.unwrap() + } + + /// returns (small, large) model IDs + async fn generate_list_models( + user: &mut GooseUser, + admin_cli: &CeramicClient, + redis_cli: &redis::Client, + should_create: bool, + redis_postfix: Option, + ) -> Result<(StreamId, StreamId), TransactionError> { + let mut conn = redis_cli.get_async_connection().await.unwrap(); + let (small_key, large_key) = if let Some(pf) = redis_postfix { + ( + format!("{}_{}", SMALL_MODEL_ID_KEY, pf), + format!("{}_{}", LARGE_MODEL_ID_KEY, pf), + ) + } else { + ( + SMALL_MODEL_ID_KEY.to_string(), + LARGE_MODEL_ID_KEY.to_string(), + ) + }; + if should_create { + info!("generating model IDs"); + let small_model = ModelDefinition::new::( + "load_test_small_model", + ModelAccountRelation::List, + ) + .unwrap(); + let small_model_id = + ModelInstanceRequests::setup_model(user, admin_cli, small_model, "small").await?; + + let large_model = ModelDefinition::new::( + "load_test_large_model", + ModelAccountRelation::List, + ) + .unwrap(); + let large_model_id = + ModelInstanceRequests::setup_model(user, admin_cli, large_model, "large").await?; + + let _ = set_key_to_stream_id(&mut conn, &small_key, &small_model_id).await; + let _ = set_key_to_stream_id(&mut conn, &large_key, &large_model_id).await; + + Ok((small_model_id, large_model_id)) + } else { + info!("waiting for shared model IDs to be set in redis"); + let small = loop_until_key_value_set(&mut conn, &small_key).await; + let large = loop_until_key_value_set(&mut conn, &large_key).await; + Ok((small, large)) + } + } + + async fn subscribe_to_model(user: &mut GooseUser, model_id: &StreamId) -> TransactionResult { + let request_builder = user + .get_request_builder( + &GooseMethod::Post, + &format!("/ceramic/interests/model/{}", model_id), + )? + .timeout(Duration::from_secs(5)); + let req = GooseRequest::builder() + .set_request_builder(request_builder) + .expect_status_code(204) + .build(); + let _goose = user.request(req).await?; + Ok(()) + } + + async fn ensure_model_exists( + user: &mut GooseUser, + cli: &CeramicClient, + model_id: &StreamId, + ) -> anyhow::Result<()> { + let now = std::time::SystemTime::now(); + loop { + match ModelInstanceRequests::get_stream_int( + user, + cli, + model_id, + "ensure_model_exists", + false, + ) + .await + { + Ok((r, _)) => { + info!("got response: {:?}", r); + if r.resolve("ensure_model_exists").is_ok() { + info!( + "model {} exists after {} seconds", + model_id, + now.elapsed().unwrap().as_secs() + ); + break; + } + } + Err(e) => { + info!("failed to get model: {:?}", e); + } + } + if now.elapsed().unwrap() > Duration::from_secs(60) { + anyhow::bail!("timed out waiting for model to exist: {}", model_id); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn generate_mids( + user: &mut GooseUser, + cli: &CeramicClient, + redis_cli: &redis::Client, + small_model_id: &StreamId, + large_model_id: &StreamId, + number_of_documents: usize, + should_create: bool, + redis_postfix: Option, + ) -> Result<(Vec, Vec), TransactionError> { + if number_of_documents == 0 { + // not all scenarios care to prep documents in advance + return Ok((vec![], vec![])); + } + + let (small_key, large_key) = if let Some(pf) = redis_postfix { + ( + format!("{}_{}", SMALL_MID_ID_KEY, pf), + format!("{}_{}", LARGE_MID_ID_KEY, pf), + ) + } else { + (SMALL_MID_ID_KEY.to_string(), LARGE_MID_ID_KEY.to_string()) + }; + + let mut conn = redis_cli.get_async_connection().await.unwrap(); + if should_create { + info!("generating {} model instance IDs", number_of_documents); + + let mut small_model_instance_ids = vec![]; + let mut large_model_instance_ids = vec![]; + for i in 0..number_of_documents { + let small_model_instance_id = ModelInstanceRequests::create_model_instance( + user, + cli, + small_model_id, + "small", + &models::SmallModel::random(), + ) + .await?; + let large_model_instance_id = ModelInstanceRequests::create_model_instance( + user, + cli, + large_model_id, + "large", + // the following values are used to make assumptions about the values set in the model instance documents during the "query" test currently + &models::LargeModel::new( + format!("large_model_{}", i), + i.to_string(), + 10 + (i * 10) as i64, + ), + ) + .await?; + // we only support one shared ID for now, so just add the first one we create + if i == 0 { + let _ = + set_key_to_stream_id(&mut conn, &small_key, &small_model_instance_id).await; + let _ = + set_key_to_stream_id(&mut conn, &large_key, &large_model_instance_id).await; + } + small_model_instance_ids.push(small_model_instance_id); + large_model_instance_ids.push(large_model_instance_id); + } + + Ok((small_model_instance_ids, large_model_instance_ids)) + } else { + info!("waiting for shared model instance IDs"); + + let small = loop_until_key_value_set(&mut conn, &small_key).await; + let large = loop_until_key_value_set(&mut conn, &large_key).await; + Ok((vec![small], vec![large])) + } + } +} + +#[derive(Clone, Debug)] +pub struct ModelInstanceRequests {} + +impl ModelInstanceRequests { + pub async fn get_stream_tx( + user: &mut GooseUser, + cli: &CeramicClient, + stream_id: &StreamId, + tx_name: &str, + ) -> TransactionResult { + let _goose = Self::get_stream(user, cli, stream_id, tx_name).await?; + Ok(()) + } + + pub async fn get_and_replace_stream_tx( + user: &mut GooseUser, + cli: &CeramicClient, + model_id: &StreamId, + model_instance_id: &StreamId, + tx_name: &str, + data: &T, + ) -> TransactionResult + where + T: serde::Serialize, + { + let (resp, _) = Self::get_stream(user, cli, model_instance_id, tx_name).await?; + Self::replace_stream_tx(user, cli, model_id, &resp, tx_name, data).await?; + Ok(()) + } + + pub async fn replace_stream_tx( + user: &mut GooseUser, + cli: &CeramicClient, + model_id: &StreamId, + prev: &StreamsResponse, + tx_name: &str, + data: &T, + ) -> TransactionResult + where + T: serde::Serialize, + { + let update_req = cli + .create_replace_request(model_id, prev, data) + .await + .unwrap(); + let commits_url = user.build_url(cli.commits_endpoint())?; + + let req: reqwest::RequestBuilder = user.client.post(commits_url).json(&update_req); + let name = format!("Replace_{}", tx_name); + let req = GooseRequest::builder() + .name(name.as_str()) + .method(GooseMethod::Post) + .set_request_builder(req) + .expect_status_code(200) + .build(); + let mut goose = user.request(req).await?; + let resp: StreamsResponseOrError = goose.response?.json().await?; + goose_try!(user, &name, &mut goose.request, resp.resolve(tx_name))?; + Ok(()) + } + + pub async fn query_model( + user: &mut GooseUser, + cli: &CeramicClient, + model_id: &StreamId, + filter: Option, + ) -> Result<(api::QueryResponse, GooseRequestMetric), TransactionError> { + let req = cli + .create_query_request(model_id, filter, Pagination::default()) + .await + .unwrap(); + let goose = user + .request( + GooseRequest::builder() + .name("query_model") + .method(GooseMethod::Post) + .set_request_builder( + user.client + .post(user.build_url(cli.collection_endpoint())?) + .json(&req), + ) + .expect_status_code(200) + .build(), + ) + .await?; + let resp: api::QueryResponse = goose.response?.json().await?; + + Ok((resp, goose.request)) + } + + #[allow(dead_code)] + pub async fn query_model_count( + user: &mut GooseUser, + model_id: &StreamId, + ) -> Result<(CountResponse, GooseRequestMetric), TransactionError> { + let req = serde_json::json!({ + "model": model_id.to_string(), + }); + let goose = user + .request( + GooseRequest::builder() + .name("query_model_count") + .method(GooseMethod::Post) + .set_request_builder( + user.client + .post(user.build_url("/api/v0/collection/count")?) + .json(&req), + ) + .expect_status_code(200) + .build(), + ) + .await?; + let resp: CountResponse = goose.response?.json().await?; + + Ok((resp, goose.request)) + } + + pub async fn get_stream_int( + user: &mut GooseUser, + cli: &CeramicClient, + stream_id: &StreamId, + name: &str, + expect_success: bool, + ) -> Result<(StreamsResponseOrError, GooseRequestMetric), TransactionError> { + let streams_url = user.build_url(&format!("{}/{}", cli.streams_endpoint(), stream_id))?; + let get_stream_req = GooseRequest::builder() + .name(name) + .method(GooseMethod::Get) + .set_request_builder(user.client.get(streams_url)) + .expect_status_code(200) + .build(); + + // unfortunately goose logs an error if it's a 500, so we expect 500 to avoid some noised + let mut goose = user.request(get_stream_req).await?; + let resp: StreamsResponseOrError = goose.response?.json().await?; + if !expect_success { + user.set_success(&mut goose.request)?; + } + + Ok((resp, goose.request)) + } + + pub async fn get_stream( + user: &mut GooseUser, + cli: &CeramicClient, + stream_id: &StreamId, + tx_name: &str, + ) -> Result<(StreamsResponse, GooseRequestMetric), TransactionError> { + let name = format!("Get_{}", tx_name); + let (resp, mut goose) = Self::get_stream_int(user, cli, stream_id, &name, true).await?; + let resp = goose_try!(user, &name, &mut goose, { resp.resolve(tx_name) })?; + Ok((resp, goose)) + } + + pub async fn setup_model( + user: &mut GooseUser, + cli: &CeramicClient, + model: ModelDefinition, + tx_name: &str, + ) -> Result { + let url = user.build_url(cli.streams_endpoint())?; + let req = cli.create_model_request(&model).await.unwrap(); + let req = user.client.post(url).json(&req); + let name = format!("setup_model_{}", tx_name); + let req = GooseRequest::builder() + .name(name.as_str()) + .method(GooseMethod::Post) + .set_request_builder(req) + .expect_status_code(200) + .build(); + let mut goose = user.request(req).await?; + let resp: api::StreamsResponseOrError = goose.response?.json().await?; + let resp = goose_try!(user, &name, &mut goose.request, { resp.resolve(&name) })?; + Ok(resp.stream_id) + } + + pub async fn create_model_instance( + user: &mut GooseUser, + cli: &CeramicClient, + model: &StreamId, + tx_name: &str, + data: &T, + ) -> Result { + let url = user.build_url(cli.streams_endpoint())?; + let req = cli.create_list_instance_request(model, data).await.unwrap(); + let req = user.client.post(url).json(&req); + let name = format!("create_model_instance_{}", tx_name); + let req = GooseRequest::builder() + .name(name.as_str()) + .method(GooseMethod::Post) + .set_request_builder(req) + .expect_status_code(200) + .build(); + let mut goose = user.request(req).await?; + let resp: api::StreamsResponseOrError = goose.response?.json().await?; + let resp = goose_try!(user, &name, &mut goose.request, { resp.resolve(tx_name) })?; + Ok(resp.stream_id) + } + + pub async fn index_model( + user: &mut GooseUser, + cli: &CeramicClient, + model_id: &StreamId, + tx_name: &str, + ) -> Result<(), TransactionError> { + let name = format!("admin_code_{}", tx_name); + let result = user + .request( + GooseRequest::builder() + .name(name.as_str()) + .method(GooseMethod::Get) + .set_request_builder( + user.client.get(user.build_url(cli.admin_code_endpoint())?), + ) + .expect_status_code(200) + .build(), + ) + .await?; + let resp: api::AdminCodeResponse = result.response?.json().await?; + let name = format!("create_index_model_{}", tx_name); + let req = cli + .create_index_model_request(model_id, &resp.code) + .await + .unwrap(); + let mut goose = user + .request( + GooseRequest::builder() + .name(name.as_str()) + .method(GooseMethod::Post) + .set_request_builder( + user.client + .post(user.build_url(cli.index_endpoint())?) + .json(&req), + ) + .expect_status_code(200) + .build(), + ) + .await?; + let resp = goose.response?; + if resp.status().is_success() { + Ok(()) + } else { + user.set_failure( + &format!("index_model_{}", name), + &mut goose.request, + None, + Some(&format!("Failed to index model: {}", resp.text().await?)), + ) + } + } +} diff --git a/runner/src/scenario/ceramic/model_reuse.rs b/runner/src/scenario/ceramic/model_reuse.rs deleted file mode 100644 index c73c10bd..00000000 --- a/runner/src/scenario/ceramic/model_reuse.rs +++ /dev/null @@ -1,159 +0,0 @@ -use crate::goose_try; -use crate::scenario::ceramic::models::LargeModel; -use crate::scenario::ceramic::util::{goose_error, index_model, setup_model, setup_model_instance}; -use crate::scenario::ceramic::{CeramicClient, Credentials}; -use crate::scenario::get_redis_client; -use ceramic_http_client::api::StreamsResponseOrError; -use ceramic_http_client::ceramic_event::{JwkSigner, StreamId}; -use ceramic_http_client::{CeramicHttpClient, ModelAccountRelation, ModelDefinition}; -use goose::prelude::*; -use redis::AsyncCommands; -use std::str::FromStr; -use std::{sync::Arc, time::Duration}; -use tracing::instrument; - -#[derive(Clone)] -pub(crate) struct ModelReuseLoadTestUserData { - pub(crate) cli: CeramicHttpClient, - pub(crate) redis_cli: redis::Client, - pub(crate) model_id: StreamId, -} - -const MODEL_ID_KEY: &str = "model_reuse_model_id"; -const MODEL_INSTANCE_ID_KEY: &str = "model_reuse_model_instance_id"; - -pub async fn scenario() -> Result { - let creds = Credentials::from_env().await.map_err(goose_error)?; - let cli = CeramicHttpClient::new(creds.signer); - let redis_cli = get_redis_client().await?; - - let test_start = Transaction::new(Arc::new(move |user| { - Box::pin(setup(user, cli.clone(), redis_cli.clone())) - })) - .set_name("setup") - .set_on_start(); - - let create_instance_tx = transaction!(create_instance).set_name("create_instance"); - let get_instance_tx = transaction!(get_instance).set_name("get_instance"); - - Ok(scenario!("CeramicModelReuseScenario") - .register_transaction(test_start) - .register_transaction(create_instance_tx) - .register_transaction(get_instance_tx)) -} - -pub(crate) async fn set_model_id( - conn: &mut redis::aio::Connection, - model_id: &StreamId, - key: &str, -) { - let _: () = conn.set(key, model_id.to_string()).await.unwrap(); -} - -pub(crate) async fn get_model_id(conn: &mut redis::aio::Connection, key: &str) -> StreamId { - loop { - if conn.exists(key).await.unwrap() { - let id: String = conn.get(key).await.unwrap(); - return StreamId::from_str(&id) - .map_err(|e| { - tracing::error!("invalid stream: {:?} ", e); - e - }) - .unwrap(); - } else { - tokio::time::sleep(Duration::from_millis(100)).await; - } - } -} - -#[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)] -async fn setup( - user: &mut GooseUser, - cli: CeramicClient, - redis_cli: redis::Client, -) -> TransactionResult { - let mut conn = redis_cli.get_async_connection().await.unwrap(); - let model_id = if user.weighted_users_index == 0 { - let model_definition = ModelDefinition::new::( - "model_reuse_query_model", - ModelAccountRelation::List, - ) - .unwrap(); - let model_id = setup_model(user, &cli, model_definition).await?; - index_model(user, &cli, &model_id).await?; - - let _ = set_model_id(&mut conn, &model_id, MODEL_ID_KEY).await; - - model_id - } else { - get_model_id(&mut conn, MODEL_ID_KEY).await - }; - - let user_data = ModelReuseLoadTestUserData { - cli, - redis_cli, - model_id, - }; - - user.set_session_data(user_data); - - Ok(()) -} - -async fn create_instance(user: &mut GooseUser) -> TransactionResult { - let user_data: ModelReuseLoadTestUserData = { - let data: &ModelReuseLoadTestUserData = user.get_session_data_unchecked(); - data.clone() - }; - let cli = &user_data.cli; - let mut conn = user_data.redis_cli.get_async_connection().await.unwrap(); - - let id = setup_model_instance( - user, - cli, - &user_data.model_id, - &LargeModel { - creator: "keramik".to_string(), - name: "model-reuse-model-instance".to_string(), - description: "a".to_string(), - tpe: 10, - }, - ) - .await?; - - let _: () = conn - .rpush(MODEL_INSTANCE_ID_KEY, id.to_string()) - .await - .unwrap(); - - Ok(()) -} - -async fn get_model_instance_id(conn: &mut redis::aio::Connection) -> StreamId { - loop { - let len: usize = conn.llen(MODEL_INSTANCE_ID_KEY).await.unwrap(); - if len > 0 { - let id: String = conn.lpop(MODEL_INSTANCE_ID_KEY, None).await.unwrap(); - return StreamId::from_str(&id).unwrap(); - } else { - tokio::time::sleep(Duration::from_millis(100)).await; - } - } -} - -async fn get_instance(user: &mut GooseUser) -> TransactionResult { - let user_data: &ModelReuseLoadTestUserData = user.get_session_data_unchecked(); - let cli: &CeramicClient = &user_data.cli; - let mut redis_conn = user_data.redis_cli.get_async_connection().await.unwrap(); - let model_instance_id = get_model_instance_id(&mut redis_conn).await; - let url = user.build_url(&format!("{}/{}", cli.streams_endpoint(), model_instance_id,))?; - let mut goose = user.get(&url).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( - user, - "get", - &mut goose.request, - resp.resolve("get_instance") - )?; - Ok(()) -} diff --git a/runner/src/scenario/ceramic/models.rs b/runner/src/scenario/ceramic/models.rs index 52f28a0b..975be690 100644 --- a/runner/src/scenario/ceramic/models.rs +++ b/runner/src/scenario/ceramic/models.rs @@ -1,5 +1,8 @@ use ceramic_http_client::GetRootSchema; -use rand::prelude::*; +use rand::{ + distributions::{Alphanumeric, DistString}, + prelude::*, +}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -43,6 +46,31 @@ pub struct LargeModel { impl GetRootSchema for LargeModel {} +impl LargeModel { + pub fn new(name: String, description: String, tpe: i64) -> Self { + Self { + creator: "keramik".to_string(), + name, + description, + tpe, + } + } + + pub fn random_1kb() -> Self { + let mut rng = thread_rng(); + + Self { + creator: "keramik".to_string(), + name: format!( + "keramik-large-model-{}", + Alphanumeric.sample_string(&mut rand::thread_rng(), 40) + ), + description: Alphanumeric.sample_string(&mut rand::thread_rng(), 200), + tpe: rng.gen_range(0..100), + } + } +} + impl RandomModelInstance for LargeModel { fn random() -> Self { let mut rng = thread_rng(); diff --git a/runner/src/scenario/ceramic/new_streams.rs b/runner/src/scenario/ceramic/new_streams.rs index ac7c7e81..abfc9f62 100644 --- a/runner/src/scenario/ceramic/new_streams.rs +++ b/runner/src/scenario/ceramic/new_streams.rs @@ -1,22 +1,110 @@ -use crate::goose_try; -use ceramic_http_client::CeramicHttpClient; -use goose::prelude::*; use std::sync::Arc; -use crate::scenario::ceramic::util::goose_error; -use crate::scenario::ceramic::{ - models, - simple::{setup, LoadTestUserData}, - Credentials, RandomModelInstance, StreamsResponseOrError, +use goose::prelude::*; +use redis::AsyncCommands; +use tracing::info; + +use crate::scenario::{ + ceramic::{model_instance::CountResponse, models, RandomModelInstance}, + get_redis_client, }; -pub async fn scenario() -> Result { - let creds = Credentials::from_env().await.map_err(goose_error)?; - let cli = CeramicHttpClient::new(creds.signer); +use super::{ + model_instance::{CeramicModelInstanceTestUser, ModelInstanceRequests}, + CeramicScenarioParameters, +}; + +static REQWEST_CLIENT: std::sync::OnceLock = std::sync::OnceLock::new(); + +/// returns (worker_id, count) +pub async fn benchmark_scenario_metrics(worker_cnt: usize, nonce: u64) -> Vec<(usize, i32)> { + let redis_cli = get_redis_client().await.unwrap(); + let mut conn = redis_cli.get_async_connection().await.unwrap(); + let mut res = vec![]; + // the worker IDs is 1 indexed, so we need to add 1 to the range + for i in 1..worker_cnt + 1 { + let before = conn + .get::<_, Option>(new_cnt_key(true, i, nonce)) + .await + .unwrap(); + let after = conn + .get::<_, Option>(new_cnt_key(false, i, nonce)) + .await + .unwrap(); + let before = before.and_then(|s| s.parse::().ok()); + let after = after.and_then(|s| s.parse::().ok()); + match (before, after) { + (Some(before), Some(after)) => { + res.push((i, after - before)); + } + _ => { + tracing::warn!("missing entry for worker {}", i); + } + } + } + res +} + +fn new_cnt_key(before: bool, worker_id: usize, nonce: u64) -> String { + let modifier = if before { "start" } else { "end" }; + format!("new_mid_count_{}_{}_{}", worker_id, modifier, nonce) +} + +async fn store_metrics(user: &mut GooseUser, on_start: bool, nonce: u64) -> TransactionResult { + // there is a goose bug where the manager closes the flume channel used to throttle requests, and then our on stop + // requests all fail to execute. So instead of using the user to make the request, we use a reqwest client + // if the throttle value is not set, it doesn't apply, but it's easier to just use the reqwest client in all cases + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + if user_data.user_info.lead_user { + let url = user.build_url("/api/v0/collection/count")?; + let client = REQWEST_CLIENT.get_or_init(reqwest::Client::new); + let resp = client + .post(&url) + .json(&serde_json::json!({ + "model": &user_data.large_model_id.to_string(), + })) + .send() + .await? + .json::() + .await; + let cnt = match resp { + Ok(cnt) => cnt.count, + Err(e) => { + tracing::error!("failed to get model count: {}", e); + if on_start { + 0 + } else { + panic!("failed to get model count: {}", e); + } + } + }; + info!(data=?cnt, stream_id=%user_data.large_model_id, "got model count"); + + let mut conn = user_data.redis_cli().get_async_connection().await.unwrap(); + let _: () = conn + .set( + new_cnt_key(on_start, goose::get_worker_id(), nonce), + cnt.to_string(), + ) + .await + .unwrap(); + } + + Ok(()) +} + +pub async fn small_large_scenario( + params: CeramicScenarioParameters, +) -> Result { + let config = CeramicModelInstanceTestUser::prep_scenario(params) + .await + .unwrap(); - let setup_cli = cli; let test_start = Transaction::new(Arc::new(move |user| { - Box::pin(setup(user, setup_cli.clone())) + Box::pin(CeramicModelInstanceTestUser::setup_mid_scenario( + user, + config.clone(), + )) })) .set_name("setup") .set_on_start(); @@ -32,56 +120,89 @@ pub async fn scenario() -> Result { .register_transaction(instantiate_large_model)) } -async fn instantiate_small_model(user: &mut GooseUser) -> TransactionResult { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - let model = user_data.small_model_id.clone(); - let cli = &user_data.cli; - let req = cli - .create_list_instance_request(&model, &models::SmallModel::random()) +// the nonce is used to ensure that the metrics stored in redis for the run are unique +pub async fn benchmark_scenario( + params: CeramicScenarioParameters, + nonce: u64, +) -> Result { + let config = CeramicModelInstanceTestUser::prep_scenario(params) .await .unwrap(); - let req = GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder( - user.client - .post(user.build_url(cli.streams_endpoint())?) - .json(&req), - ) - .expect_status_code(200) - .build(); - let mut goose = user.request(req).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( + + let test_start = Transaction::new(Arc::new(move |user| { + Box::pin(CeramicModelInstanceTestUser::setup_mid_scenario( + user, + config.clone(), + )) + })) + .set_name("setup") + .set_on_start(); + + let before_metrics = Transaction::new(Arc::new(move |user| { + Box::pin(store_metrics(user, true, nonce)) + })) + .set_name("before_metrics") + .set_on_start(); + + let after_metrics = Transaction::new(Arc::new(move |user| { + Box::pin(store_metrics(user, false, nonce)) + })) + .set_name("after_metrics") + .set_on_stop(); + + let instantiate_large_model = + transaction!(instantiate_large_model_1kb).set_name("instantiate_1k_mid"); + + Ok(scenario!("CeramicNewStreamsBenchmark") + .register_transaction(test_start) + .register_transaction(before_metrics) + .register_transaction(instantiate_large_model) + .register_transaction(after_metrics)) +} + +async fn instantiate_small_model(user: &mut GooseUser) -> TransactionResult { + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + ModelInstanceRequests::create_model_instance( user, - "create_instance", - &mut goose.request, - resp.resolve("instantiate_small_model") - )?; + user_data.user_cli(), + &user_data.small_model_id, + "instantiate_small_model_instance", + &models::SmallModel::random(), + ) + .await?; Ok(()) } async fn instantiate_large_model(user: &mut GooseUser) -> TransactionResult { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - let model = user_data.large_model_id.clone(); - let cli = &user_data.cli; - let url = user.build_url(cli.streams_endpoint())?; - let req = cli - .create_list_instance_request(&model, &models::LargeModel::random()) - .await - .unwrap(); - let req = user.client.post(url).json(&req); - let req = GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder(req) - .expect_status_code(200) - .build(); - let mut goose = user.request(req).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + ModelInstanceRequests::create_model_instance( user, - "create_instance", - &mut goose.request, - resp.resolve("instantiate_large_model") - )?; + user_data.user_cli(), + &user_data.large_model_id, + "instantiate_large_model_instance", + &models::LargeModel::random(), + ) + .await?; + + Ok(()) +} + +async fn instantiate_large_model_1kb(user: &mut GooseUser) -> TransactionResult { + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + if user_data.user_info.lead_worker { + ModelInstanceRequests::create_model_instance( + user, + user_data.user_cli(), + &user_data.large_model_id, + "instantiate_large_model_instance", + &models::LargeModel::random_1kb(), + ) + .await?; + } else { + tracing::debug!( + "Not lead worker. Just sleeping instead of creating large model instance document" + ); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // this doesn't block cpu + } Ok(()) } diff --git a/runner/src/scenario/ceramic/query.rs b/runner/src/scenario/ceramic/query.rs index 586b2d3a..0bea293d 100644 --- a/runner/src/scenario/ceramic/query.rs +++ b/runner/src/scenario/ceramic/query.rs @@ -1,32 +1,30 @@ use crate::goose_try; +use crate::scenario::ceramic::model_instance::ModelInstanceRequests; use crate::scenario::ceramic::models::LargeModel; -use crate::scenario::ceramic::util::{goose_error, index_model, setup_model, setup_model_instance}; -use crate::scenario::ceramic::{CeramicClient, Credentials}; -use ceramic_http_client::api::{Pagination, StreamsResponse, StreamsResponseOrError}; -use ceramic_http_client::ceramic_event::{JwkSigner, StreamId}; -use ceramic_http_client::{ - api, CeramicHttpClient, FilterQuery, ModelAccountRelation, ModelDefinition, OperationFilter, -}; +use ceramic_http_client::api::StreamsResponse; +use ceramic_http_client::ceramic_event::StreamId; +use ceramic_http_client::{FilterQuery, OperationFilter}; +use goose::metrics::GooseRequestMetric; use goose::prelude::*; use std::collections::HashMap; use std::sync::Arc; use tracing::instrument; +use super::model_instance::{CeramicModelInstanceTestUser, EnvBasedConfig}; +use super::CeramicScenarioParameters; + #[derive(Clone)] struct QueryLoadTestUserData { - cli: CeramicHttpClient, - model_id: StreamId, model_instance_id1: StreamId, model_instance_id2: StreamId, model_instance_id3: StreamId, + instance1_starting_int_value: i64, + instance2_starting_int_value: i64, + instance3_starting_int_value: i64, } -const INSTANCE1_STARTING_INT_VALUE: i64 = 10; -const INSTANCE2_STARTING_INT_VALUE: i64 = 20; -const INSTANCE3_STARTING_INT_VALUE: i64 = 30; - impl QueryLoadTestUserData { - fn model_id_for_user(&self, user: &GooseUser) -> &StreamId { + fn model_instance_id_for_user(&self, user: &GooseUser) -> &StreamId { match user.weighted_users_index % 3 { 0 => &self.model_instance_id1, 1 => &self.model_instance_id2, @@ -36,9 +34,9 @@ impl QueryLoadTestUserData { fn int_value_for_user(&self, user: &GooseUser) -> i64 { let start = match user.weighted_users_index % 3 { - 0 => INSTANCE1_STARTING_INT_VALUE, - 1 => INSTANCE2_STARTING_INT_VALUE, - _ => INSTANCE3_STARTING_INT_VALUE, + 0 => self.instance1_starting_int_value, + 1 => self.instance2_starting_int_value, + _ => self.instance3_starting_int_value, }; let increment = user.get_iterations() % 2 == 0; if increment { @@ -49,11 +47,11 @@ impl QueryLoadTestUserData { } } -pub async fn scenario() -> Result { - let creds = Credentials::from_env().await.map_err(goose_error)?; - let cli = CeramicHttpClient::new(creds.signer); - - let test_start = Transaction::new(Arc::new(move |user| Box::pin(setup(user, cli.clone())))) +pub async fn scenario(params: CeramicScenarioParameters) -> Result { + let config = CeramicModelInstanceTestUser::prep_scenario(params) + .await + .unwrap(); + let test_start = Transaction::new(Arc::new(move |user| Box::pin(setup(user, config.clone())))) .set_name("setup") .set_on_start(); @@ -71,56 +69,20 @@ pub async fn scenario() -> Result { } #[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)] -async fn setup(user: &mut GooseUser, cli: CeramicClient) -> TransactionResult { - let model_definition = - ModelDefinition::new::("load_test_query_model", ModelAccountRelation::List) - .unwrap(); - let model_id = setup_model(user, &cli, model_definition).await?; - index_model(user, &cli, &model_id).await?; - - let id1 = setup_model_instance( - user, - &cli, - &model_id, - &LargeModel { - creator: "keramik".to_string(), - name: "load-test-query-model-1".to_string(), - description: "a".to_string(), - tpe: INSTANCE1_STARTING_INT_VALUE, - }, - ) - .await?; - let id2 = setup_model_instance( - user, - &cli, - &model_id, - &LargeModel { - creator: "keramik".to_string(), - name: "load-test-query-model-2".to_string(), - description: "b".to_string(), - tpe: INSTANCE2_STARTING_INT_VALUE, - }, - ) - .await?; - let id3 = setup_model_instance( - user, - &cli, - &model_id, - &LargeModel { - creator: "keramik".to_string(), - name: "load-test-query-model-3".to_string(), - description: "c".to_string(), - tpe: INSTANCE3_STARTING_INT_VALUE, - }, - ) - .await?; - +async fn setup(user: &mut GooseUser, config: EnvBasedConfig) -> TransactionResult { + CeramicModelInstanceTestUser::setup_mid_scenario(user, config.clone()).await?; + let data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + // we copy the data here just to make it simpler to work with in the tests + // the CeramicModelInstanceTestUser probably has too much logic built in already.. we don't _always_ need + // large and small models, but it seems fine to setup thing for MIDs consistently.. and we make assumptions + // about the values set in the model instance documents for the large model let user_data = QueryLoadTestUserData { - cli, - model_id, - model_instance_id1: id1, - model_instance_id2: id2, - model_instance_id3: id3, + model_instance_id1: data.large_model_instance_ids.first().unwrap().clone(), + model_instance_id2: data.large_model_instance_ids.get(1).unwrap().clone(), + model_instance_id3: data.large_model_instance_ids.get(2).unwrap().clone(), + instance1_starting_int_value: 10, + instance2_starting_int_value: 20, + instance3_starting_int_value: 30, }; user.set_session_data(user_data); @@ -128,103 +90,83 @@ async fn setup(user: &mut GooseUser, cli: CeramicClient) -> TransactionResult { Ok(()) } +async fn query_large_mid_verify_edges( + user: &mut GooseUser, + filter: FilterQuery, +) -> Result<(GooseRequestMetric, LargeModel), TransactionError> { + let user_data: Arc = + CeramicModelInstanceTestUser::user_data(user).to_owned(); + let (resp, mut metrics) = ModelInstanceRequests::query_model( + user, + user_data.user_cli(), + &user_data.large_model_id, + Some(filter), + ) + .await?; + + if resp.edges.first().is_none() { + goose_try!(user, "query", &mut metrics, { + Err(anyhow::anyhow!("no edges returned")) + })?; + } + let resp: LargeModel = goose_try!(user, "query", &mut metrics, { + resp.edges + .into_iter() + .next() + .ok_or_else(|| anyhow::anyhow!("no edges returned")) + .and_then(|edge| serde_json::from_value(edge.node.content).map_err(anyhow::Error::from)) + })?; + + Ok((metrics, resp)) +} + async fn query_models_pre_update(user: &mut GooseUser) -> TransactionResult { let mut where_filter = HashMap::new(); where_filter.insert( "description".to_string(), - OperationFilter::EqualTo("a".into()), + OperationFilter::EqualTo("1".into()), ); let filter = FilterQuery::Where(where_filter); - let user_data: &QueryLoadTestUserData = user.get_session_data_unchecked(); - let req = user_data - .cli - .create_query_request(&user_data.model_id, Some(filter), Pagination::default()) - .await - .unwrap(); - let cli = &user_data.cli; - let mut goose = user - .request( - GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder( - user.client - .post(user.build_url(cli.collection_endpoint())?) - .json(&req), - ) - .expect_status_code(200) - .build(), - ) - .await?; - let resp: api::QueryResponse = goose.response?.json().await?; - if resp.edges.first().is_none() { - goose_try!(user, "query", &mut goose.request, { - Err(anyhow::anyhow!("no edges returned")) - })?; - } + query_large_mid_verify_edges(user, filter).await?; + Ok(()) } -async fn get_data_and_response<'a>( - user: &'a mut GooseUser, - req: GooseRequest<'a>, - new_value: i64, +async fn get_large_mid_verify_edges( + user: &mut GooseUser, + stream_id: &StreamId, + name: &str, ) -> Result<(StreamsResponse, LargeModel), TransactionError> { - let mut goose = user.request(req).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - let resp = goose_try!(user, "get", &mut goose.request, { - resp.resolve("update_large_model_get") - })?; - let mut data: LargeModel = goose_try!(user, "get", &mut goose.request, { + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + + let (resp, mut metrics) = + ModelInstanceRequests::get_stream(user, user_data.user_cli(), stream_id, name).await?; + let data: LargeModel = goose_try!(user, &format!("verify_edges_{}", name), &mut metrics, { resp.state .as_ref() .ok_or_else(|| anyhow::anyhow!("No content")) .and_then(|st| serde_json::from_value(st.content.clone()).map_err(anyhow::Error::from)) })?; - data.tpe = new_value; + Ok((resp, data)) } async fn update_models(user: &mut GooseUser) -> TransactionResult { - let user_data = { + let (new_value, model_id) = { let data: &QueryLoadTestUserData = user.get_session_data_unchecked(); - data.clone() + let new_value = data.int_value_for_user(user); + let model_id = data.model_instance_id_for_user(user).clone(); + (new_value, model_id) }; - let new_value = user_data.int_value_for_user(user); - let model_id = user_data.model_id_for_user(user); - let cli = &user_data.cli; - let streams_url = user.build_url(&format!("{}/{}", cli.streams_endpoint(), model_id))?; - let req = GooseRequest::builder() - .method(GooseMethod::Get) - .set_request_builder(user.client.get(streams_url)) - .expect_status_code(200) - .build(); - let (resp, data) = get_data_and_response(user, req, new_value).await?; - let req = cli - .create_replace_request(model_id, &resp, data) - .await - .unwrap(); + let name = "update_models"; + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + let cli = &user_data.user_cli(); - let mut goose = user - .request( - GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder( - user.client - .post(user.build_url(cli.commits_endpoint()).unwrap()) - .json(&req), - ) - .expect_status_code(200) - .build(), - ) - .await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( - user, - "update", - &mut goose.request, - resp.resolve("update_large_model") - )?; + let (resp, mut data) = get_large_mid_verify_edges(user, &model_id, name).await?; + data.tpe = new_value; + + ModelInstanceRequests::replace_stream_tx(user, cli, &model_id, &resp, name, &data).await?; Ok(()) } @@ -239,36 +181,10 @@ async fn query_models_post_update(user: &mut GooseUser) -> TransactionResult { OperationFilter::EqualTo(expected_value.into()), ); let filter = FilterQuery::Where(where_filter); + let (mut metrics, resp) = query_large_mid_verify_edges(user, filter).await?; - let req = user_data - .cli - .create_query_request(&user_data.model_id, Some(filter), Pagination::default()) - .await - .unwrap(); - let cli = &user_data.cli; - let mut goose = user - .request( - GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder( - user.client - .post(user.build_url(cli.collection_endpoint())?) - .json(&req), - ) - .expect_status_code(200) - .build(), - ) - .await?; - let resp: api::QueryResponse = goose.response?.json().await?; - let resp: LargeModel = goose_try!(user, "query", &mut goose.request, { - resp.edges - .into_iter() - .next() - .ok_or_else(|| anyhow::anyhow!("no edges returned")) - .and_then(|edge| serde_json::from_value(edge.node.content).map_err(anyhow::Error::from)) - })?; if resp.tpe != expected_value { - goose_try!(user, "query", &mut goose.request, { + goose_try!(user, "query", &mut metrics, { Err(anyhow::anyhow!("field not updated")) })?; } diff --git a/runner/src/scenario/ceramic/simple.rs b/runner/src/scenario/ceramic/simple.rs index f057485f..962a08cb 100644 --- a/runner/src/scenario/ceramic/simple.rs +++ b/runner/src/scenario/ceramic/simple.rs @@ -1,74 +1,30 @@ -use super::{models::RandomModelInstance, CeramicClient, Credentials}; - -use crate::goose_try; -use crate::scenario::ceramic::models; -use crate::scenario::ceramic::util::{goose_error, setup_model, setup_model_instance}; - -use ceramic_http_client::api::StreamsResponseOrError; -use ceramic_http_client::ceramic_event::StreamId; -use ceramic_http_client::CeramicHttpClient; -use ceramic_http_client::{ModelAccountRelation, ModelDefinition}; -use goose::prelude::*; use std::sync::Arc; -use tracing::instrument; -pub(crate) struct LoadTestUserData { - pub cli: CeramicClient, - pub small_model_id: StreamId, - pub small_model_instance_id: StreamId, - pub large_model_id: StreamId, - pub large_model_instance_id: StreamId, -} - -#[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)] -pub(crate) async fn setup(user: &mut GooseUser, cli: CeramicClient) -> TransactionResult { - let small_model = ModelDefinition::new::( - "load_test_small_model", - ModelAccountRelation::List, - ) - .unwrap(); - let small_model_id = setup_model(user, &cli, small_model).await?; - let small_model_instance_id = - setup_model_instance(user, &cli, &small_model_id, &models::SmallModel::random()).await?; - let large_model = ModelDefinition::new::( - "load_test_large_model", - ModelAccountRelation::List, - ) - .unwrap(); - let large_model_id = setup_model(user, &cli, large_model).await?; - let large_model_instance_id = - setup_model_instance(user, &cli, &large_model_id, &models::LargeModel::random()).await?; - - let user_data = LoadTestUserData { - cli, - small_model_id, - small_model_instance_id, - large_model_id, - large_model_instance_id, - }; - - user.set_session_data(user_data); - - Ok(()) -} +use crate::scenario::ceramic::{ + model_instance::CeramicModelInstanceTestUser, + models::{self, RandomModelInstance}, +}; +use goose::prelude::*; -pub async fn scenario() -> Result { - let creds = Credentials::from_env().await.map_err(goose_error)?; - let cli = CeramicHttpClient::new(creds.signer); +use super::{model_instance::ModelInstanceRequests, CeramicScenarioParameters}; - let setup_cli = cli; +// unique_dids: if true, each user will create a new DID otherwise will share one admin DID +pub async fn scenario(params: CeramicScenarioParameters) -> Result { + let config = CeramicModelInstanceTestUser::prep_scenario(params) + .await + .unwrap(); let test_start = Transaction::new(Arc::new(move |user| { - Box::pin(setup(user, setup_cli.clone())) + Box::pin(CeramicModelInstanceTestUser::setup_mid_scenario( + user, + config.clone(), + )) })) .set_name("setup") .set_on_start(); let update_small_model = transaction!(update_small_model).set_name("update_small_model"); - let get_small_model = transaction!(get_small_model).set_name("get_small_model"); - let update_large_model = transaction!(update_large_model).set_name("update_large_model"); - let get_large_model = transaction!(get_large_model).set_name("get_large_model"); Ok(scenario!("CeramicSimpleScenario") @@ -80,138 +36,51 @@ pub async fn scenario() -> Result { } pub(crate) async fn update_small_model(user: &mut GooseUser) -> TransactionResult { - let (model, url, req) = { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - let model = user_data.small_model_id.clone(); - let cli = &user_data.cli; - let streams_url = user.build_url(&format!( - "{}/{}", - cli.streams_endpoint(), - user_data.small_model_instance_id - ))?; - let req = GooseRequest::builder() - .method(GooseMethod::Get) - .set_request_builder(user.client.get(streams_url)) - .expect_status_code(200) - .build(); - let commits_url = user.build_url(cli.commits_endpoint())?; - (model, commits_url, req) - }; - let resp = user.request(req).await?; - let resp: StreamsResponseOrError = resp.response?.json().await?; - let resp = resp.resolve("update_small_model_get").unwrap(); - - let req = { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - user_data - .cli - .create_replace_request(&model, &resp, &models::SmallModel::random()) - .await - .unwrap() - }; - let req = user.client.post(url).json(&req); - let mut goose = user - .request( - GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder(req) - .expect_status_code(200) - .build(), - ) - .await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + let data = models::SmallModel::random(); + ModelInstanceRequests::get_and_replace_stream_tx( user, - "update", - &mut goose.request, - resp.resolve("update_small_model") - )?; - Ok(()) + user_data.user_cli(), + &user_data.small_model_id, + user_data.small_model_instance_ids.first().unwrap(), + "update_small_model_instance", + &data, + ) + .await } pub(crate) async fn get_small_model(user: &mut GooseUser) -> TransactionResult { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - let cli: &CeramicClient = &user_data.cli; - let url = user.build_url(&format!( - "{}/{}", - cli.streams_endpoint(), - user_data.small_model_instance_id - ))?; - let mut goose = user.get(&url).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + ModelInstanceRequests::get_stream_tx( user, - "get", - &mut goose.request, - resp.resolve("get_small_instance") - )?; - Ok(()) + user_data.user_cli(), + user_data.large_model_instance_ids.first().unwrap(), + "small_model_instance", + ) + .await } pub(crate) async fn update_large_model(user: &mut GooseUser) -> TransactionResult { - let (model, url, req) = { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - let model = user_data.large_model_id.clone(); - let cli = &user_data.cli; - let streams_url = user.build_url(&format!( - "{}/{}", - cli.streams_endpoint(), - user_data.large_model_instance_id - ))?; - let req = GooseRequest::builder() - .method(GooseMethod::Get) - .set_request_builder(user.client.get(streams_url)) - .expect_status_code(200) - .build(); - let commits_url = user.build_url(cli.commits_endpoint())?; - (model, commits_url, req) - }; - let mut goose = user.request(req).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - let resp = goose_try!(user, "update", &mut goose.request, { - resp.resolve("update_large_model_get") - })?; - - let req = { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - user_data - .cli - .create_replace_request(&model, &resp, &models::LargeModel::random()) - .await - .unwrap() - }; - let req = user.client.post(url).json(&req); - let req = GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder(req) - .expect_status_code(200) - .build(); - let mut goose = user.request(req).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + let data = models::LargeModel::random(); + ModelInstanceRequests::get_and_replace_stream_tx( user, - "update", - &mut goose.request, - resp.resolve("update_large_model") - )?; - Ok(()) + user_data.user_cli(), + &user_data.large_model_id, + user_data.large_model_instance_ids.first().unwrap(), + "update_large_model_instance", + &data, + ) + .await } pub(crate) async fn get_large_model(user: &mut GooseUser) -> TransactionResult { - let user_data: &LoadTestUserData = user.get_session_data_unchecked(); - let cli: &CeramicClient = &user_data.cli; - let url = user.build_url(&format!( - "{}/{}", - cli.streams_endpoint(), - user_data.large_model_instance_id - ))?; - let mut goose = user.get(&url).await?; - let resp: StreamsResponseOrError = goose.response?.json().await?; - goose_try!( + let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned(); + ModelInstanceRequests::get_stream_tx( user, - "get", - &mut goose.request, - resp.resolve("get_large_instance") - )?; - Ok(()) + user_data.user_cli(), + user_data.large_model_instance_ids.first().unwrap(), + "large_model_instance", + ) + .await } diff --git a/runner/src/scenario/ceramic/util.rs b/runner/src/scenario/ceramic/util.rs index de573b3e..6d69ad66 100644 --- a/runner/src/scenario/ceramic/util.rs +++ b/runner/src/scenario/ceramic/util.rs @@ -1,7 +1,3 @@ -use crate::scenario::ceramic::CeramicClient; -use ceramic_http_client::{api, ceramic_event::StreamId, ModelDefinition}; -use goose::goose::{GooseMethod, GooseRequest, GooseUser}; -use goose::prelude::TransactionError; use goose::GooseError; pub fn goose_error(err: anyhow::Error) -> GooseError { @@ -25,91 +21,3 @@ macro_rules! goose_try { } }; } - -pub async fn setup_model( - user: &mut GooseUser, - cli: &CeramicClient, - model: ModelDefinition, -) -> Result { - let url = user.build_url(cli.streams_endpoint())?; - let req = cli.create_model_request(&model).await.unwrap(); - let req = user.client.post(url).json(&req); - let req = GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder(req) - .expect_status_code(200) - .build(); - let mut goose = user.request(req).await?; - let resp: api::StreamsResponseOrError = goose.response?.json().await?; - let resp = goose_try!(user, "setup_model", &mut goose.request, { - resp.resolve("setup_model") - })?; - Ok(resp.stream_id) -} - -pub async fn setup_model_instance( - user: &mut GooseUser, - cli: &CeramicClient, - model: &StreamId, - data: &T, -) -> Result { - let url = user.build_url(cli.streams_endpoint())?; - let req = cli.create_list_instance_request(model, data).await.unwrap(); - let req = user.client.post(url).json(&req); - let req = GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder(req) - .expect_status_code(200) - .build(); - let mut goose = user.request(req).await?; - let resp: api::StreamsResponseOrError = goose.response?.json().await?; - let resp = goose_try!(user, "setup_model_instance", &mut goose.request, { - resp.resolve("setup_model_instance") - })?; - Ok(resp.stream_id) -} - -pub async fn index_model( - user: &mut GooseUser, - cli: &CeramicClient, - model_id: &StreamId, -) -> Result<(), TransactionError> { - let result = user - .request( - GooseRequest::builder() - .method(GooseMethod::Get) - .set_request_builder(user.client.get(user.build_url(cli.admin_code_endpoint())?)) - .expect_status_code(200) - .build(), - ) - .await?; - let resp: api::AdminCodeResponse = result.response?.json().await?; - let req = cli - .create_index_model_request(model_id, &resp.code) - .await - .unwrap(); - let mut goose = user - .request( - GooseRequest::builder() - .method(GooseMethod::Post) - .set_request_builder( - user.client - .post(user.build_url(cli.index_endpoint())?) - .json(&req), - ) - .expect_status_code(200) - .build(), - ) - .await?; - let resp = goose.response?; - if resp.status().is_success() { - Ok(()) - } else { - user.set_failure( - "index_model", - &mut goose.request, - None, - Some(&format!("Failed to index model: {}", resp.text().await?)), - ) - } -} diff --git a/runner/src/scenario/ceramic/write_only.rs b/runner/src/scenario/ceramic/write_only.rs index c9ee37dc..9fc0a61f 100644 --- a/runner/src/scenario/ceramic/write_only.rs +++ b/runner/src/scenario/ceramic/write_only.rs @@ -1,19 +1,19 @@ -use ceramic_http_client::CeramicHttpClient; use goose::prelude::*; use std::sync::Arc; -use crate::scenario::ceramic::simple::{setup, update_large_model, update_small_model}; -use crate::scenario::ceramic::util::goose_error; +use crate::scenario::ceramic::simple::{update_large_model, update_small_model}; -use super::Credentials; +use super::{model_instance::CeramicModelInstanceTestUser, CeramicScenarioParameters}; -pub async fn scenario() -> Result { - let creds = Credentials::from_env().await.map_err(goose_error)?; - let cli = CeramicHttpClient::new(creds.signer); - - let setup_cli = cli; +pub async fn scenario(params: CeramicScenarioParameters) -> Result { + let config = CeramicModelInstanceTestUser::prep_scenario(params) + .await + .unwrap(); let setup = Transaction::new(Arc::new(move |user| { - Box::pin(setup(user, setup_cli.clone())) + Box::pin(CeramicModelInstanceTestUser::setup_mid_scenario( + user, + config.clone(), + )) })) .set_name("setup") .set_on_start(); diff --git a/runner/src/scenario/mod.rs b/runner/src/scenario/mod.rs index fbeb1cd9..75e3c658 100644 --- a/runner/src/scenario/mod.rs +++ b/runner/src/scenario/mod.rs @@ -3,9 +3,34 @@ use goose::GooseError; pub mod ceramic; pub mod ipfs_block_fetch; +pub mod recon_sync; -pub async fn get_redis_client() -> Result { +static FIRST_USER: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true); + +pub(crate) async fn get_redis_client() -> Result { let redis_host = std::env::var("REDIS_CONNECTION_STRING").unwrap_or("redis://redis:6379".to_string()); redis::Client::open(redis_host).map_err(|e| goose_error(e.into())) } + +/// True if this is the 'leader' i.e. the worker process that should make requests for sharing with others +pub(crate) fn is_goose_lead_worker() -> bool { + goose::get_worker_id() == 1 +} + +/// True if this is the lead user on that worker i.e. if you want to do something once for a worker. +/// Only returns true once until being reset. You should cache the result and call `reset_goose_lead_user` +/// if you want to reacquire the lead user status. +pub(crate) fn is_goose_lead_user() -> bool { + FIRST_USER.swap(false, std::sync::atomic::Ordering::SeqCst) +} + +/// True if this is the lead worker process and the lead user on that worker i.e. if you want to do something once for the simulation +pub(crate) fn is_goose_global_leader(lead_user: bool) -> bool { + is_goose_lead_worker() && lead_user +} + +/// Reset the lead user flag so another process can act as the lead user in the future +pub(crate) fn reset_goose_lead_user() { + FIRST_USER.store(true, std::sync::atomic::Ordering::SeqCst); +} diff --git a/runner/src/scenario/ceramic/recon_sync.rs b/runner/src/scenario/recon_sync.rs similarity index 87% rename from runner/src/scenario/ceramic/recon_sync.rs rename to runner/src/scenario/recon_sync.rs index 740a717b..64700c2c 100644 --- a/runner/src/scenario/ceramic/recon_sync.rs +++ b/runner/src/scenario/recon_sync.rs @@ -1,5 +1,8 @@ -use crate::scenario::ceramic::model_reuse::{get_model_id, set_model_id}; -use crate::scenario::get_redis_client; +use crate::scenario::ceramic::model_instance::{loop_until_key_value_set, set_key_to_stream_id}; +use crate::scenario::{ + get_redis_client, is_goose_global_leader, is_goose_lead_user, is_goose_lead_worker, + reset_goose_lead_user, +}; use ceramic_core::{Cid, EventId}; use ceramic_http_client::ceramic_event::{StreamId, StreamIdType}; use goose::prelude::*; @@ -7,7 +10,7 @@ use libipld::cid; use multihash::{Code, MultihashDigest}; use rand::rngs::ThreadRng; use rand::Rng; -use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::sync::atomic::AtomicU64; use std::{sync::Arc, time::Duration}; use tracing::{info, instrument}; @@ -17,21 +20,11 @@ pub(crate) const CREATE_EVENT_TX_NAME: &str = "create_new_event"; // it's a lot simpler to access request metrics (a map) than tx metrics (a vec) pub(crate) const CREATE_EVENT_REQ_NAME: &str = "POST create_new_event"; -static FIRST_USER: AtomicBool = AtomicBool::new(true); static NEW_EVENT_CNT: AtomicU64 = AtomicU64::new(0); static TOTAL_BYTES_GENERATED: AtomicU64 = AtomicU64::new(0); -fn should_request_events() -> bool { - goose::get_worker_id() == 1 -} - -/// we only want one user to create and subscribe to the model -fn is_first_user() -> bool { - FIRST_USER.swap(false, std::sync::atomic::Ordering::SeqCst) -} - #[derive(Clone)] -struct ReconLoadTestUserData { +struct ReconCeramicModelInstanceTestUser { model_id: StreamId, with_data: bool, } @@ -48,7 +41,7 @@ async fn init_scenario(with_data: bool) -> Result { } async fn log_results(_user: &mut GooseUser) -> TransactionResult { - if is_first_user() { + if is_goose_lead_user() { let cnt = NEW_EVENT_CNT.load(std::sync::atomic::Ordering::Relaxed); let bytes = TOTAL_BYTES_GENERATED.load(std::sync::atomic::Ordering::Relaxed); info!( @@ -78,12 +71,21 @@ pub async fn event_key_sync_scenario() -> Result { let test_start = init_scenario(false).await?; let create_new_event = transaction!(create_new_event).set_name(CREATE_EVENT_TX_NAME); + let reset_single_user = transaction!(reset_first_user) + .set_name("reset_first_user") + .set_on_start(); Ok(scenario!("ReconKeySync") .register_transaction(test_start) + .register_transaction(reset_single_user) .register_transaction(create_new_event)) } +async fn reset_first_user(_user: &mut GooseUser) -> TransactionResult { + reset_goose_lead_user(); + Ok(()) +} + /// One user on one node creates a model. /// One user on each node subscribes to the model via Recon #[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)] @@ -93,24 +95,24 @@ async fn setup( with_data: bool, ) -> TransactionResult { let mut conn = redis_cli.get_async_connection().await.unwrap(); - let first = is_first_user(); - let model_id = if should_request_events() && first { + let first = is_goose_global_leader(is_goose_lead_user()); + let model_id = if first { info!("creating model for event ID sync test"); // We only need a model ID we do not need it to be a real model. let model_id = StreamId { r#type: StreamIdType::Model, cid: random_cid(), }; - set_model_id(&mut conn, &model_id, MODEL_ID_KEY).await; + set_key_to_stream_id(&mut conn, MODEL_ID_KEY, &model_id).await; model_id } else { - get_model_id(&mut conn, MODEL_ID_KEY).await + loop_until_key_value_set(&mut conn, MODEL_ID_KEY).await }; tracing::debug!(%model_id, "syncing model"); let path = format!("/ceramic/interests/model/{}", model_id); - let user_data = ReconLoadTestUserData { + let user_data = ReconCeramicModelInstanceTestUser { model_id, with_data, }; @@ -125,23 +127,19 @@ async fn setup( .build(); let _goose = user.request(req).await?; - if first { - // reset it so we can use it again during shutdown - FIRST_USER.store(true, std::sync::atomic::Ordering::SeqCst); - } Ok(()) } /// Generate a random event that the nodes are interested in. Only one node should create but all /// users do it so that we can generate a lot of events. async fn create_new_event(user: &mut GooseUser) -> TransactionResult { - if !should_request_events() { + if !is_goose_lead_worker() { // No work is performed while awaiting on the sleep future to complete (from tokio::time::sleep docs) // it's not high resolution but we don't need it to be since we're already waiting half a second tokio::time::sleep(Duration::from_millis(500)).await; Ok(()) } else { - let user_data: &ReconLoadTestUserData = user + let user_data: &ReconCeramicModelInstanceTestUser = user .get_session_data() .expect("we are missing sync_event_id user data"); diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index adce7f20..8d30ff12 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -17,13 +17,16 @@ use reqwest::Url; use tracing::{error, info, warn}; use crate::{ - scenario::{ceramic, ipfs_block_fetch}, + scenario::{ + ceramic::{self, new_streams}, + ipfs_block_fetch, recon_sync, + }, utils::parse_peers_info, CommandResult, }; // FIXME: is it worth attaching metrics to the peer info? -const IPFS_SERVICE_METRICS_PORT: u32 = 9465; +const IPFS_SERVICE_METRICS_PORT: &str = "9465"; const EVENT_SYNC_METRIC_NAME: &str = "ceramic_store_key_insert_count_total"; /// Options to Simulate command @@ -92,16 +95,24 @@ pub struct Topology { pub enum Scenario { /// Queries the Id of the IPFS peers. IpfsRpc, - /// Simple Ceramic Scenario + /// Simple Ceramic Scenario that creates two models and MIDs and then and gets the streams + /// and then replaces the existing model instance document with a new one. It uses one DID for all users, + /// and creates a new model instance document for each user. CeramicSimple, - /// WriteOnly Ceramic Scenario + /// Same requests as the CeramicSimple scenario but changes the DID and MID ownership. Every user has their own + /// DID and creates a Model Instance Document, that they then replace throughout the scenario. + CeramicModelReuse, + /// WriteOnly Ceramic Scenario that creates two models and replaces the model instance documents. CeramicWriteOnly, - /// New Streams Ceramic Scenario + /// New Streams Ceramic Scenario. Only creates new model instance documents rather than updating anything. CeramicNewStreams, - /// Simple Query Scenario + /// Scenario that creates new model instance documents that are about 1kb in size and verifies that they sync + /// to the other nodes. This is a benchmark scenario for e2e testing, simliar to the recon event sync scenario, + /// but covering using js-ceramic rather than talking directly to the ipfs API. + CeramicNewStreamsBenchmark, + /// Simple Query Scenario. Creates multiple MIDs for a model and then queries the model instance documents, + /// updates the document, and then queries again to verify the update was persisted. CeramicQuery, - /// Scenario to reuse the same model id and query instances across workers - CeramicModelReuse, /// Nodes subscribe to same model. One node generates new events, recon syncs event keys and data to peers. ReconEventSync, /// Nodes subscribe to same model. One node generates new events, recon syncs event keys to peers. @@ -118,6 +129,7 @@ impl Scenario { Scenario::CeramicSimple => "ceramic_simple", Scenario::CeramicWriteOnly => "ceramic_write_only", Scenario::CeramicNewStreams => "ceramic_new_streams", + Scenario::CeramicNewStreamsBenchmark => "ceramic_new_streams_benchmark", Scenario::CeramicQuery => "ceramic_query", Scenario::CeramicModelReuse => "ceramic_model_reuse", Scenario::ReconEventSync => "recon_event_sync", @@ -133,6 +145,7 @@ impl Scenario { Self::CeramicSimple | Self::CeramicWriteOnly | Self::CeramicNewStreams + | Self::CeramicNewStreamsBenchmark | Self::CeramicQuery | Self::CeramicModelReuse => Ok(peer .ceramic_addr() @@ -251,13 +264,21 @@ impl ScenarioState { async fn build_goose_scenario(&mut self) -> Result { let scenario = match self.scenario { Scenario::IpfsRpc => ipfs_block_fetch::scenario(self.topo)?, - Scenario::CeramicSimple => ceramic::simple::scenario().await?, - Scenario::CeramicWriteOnly => ceramic::write_only::scenario().await?, - Scenario::CeramicNewStreams => ceramic::new_streams::scenario().await?, - Scenario::CeramicQuery => ceramic::query::scenario().await?, - Scenario::CeramicModelReuse => ceramic::model_reuse::scenario().await?, - Scenario::ReconEventSync => ceramic::recon_sync::event_sync_scenario().await?, - Scenario::ReconEventKeySync => ceramic::recon_sync::event_key_sync_scenario().await?, + Scenario::CeramicSimple => ceramic::simple::scenario(self.scenario.into()).await?, + Scenario::CeramicModelReuse => ceramic::simple::scenario(self.scenario.into()).await?, + Scenario::CeramicWriteOnly => { + ceramic::write_only::scenario(self.scenario.into()).await? + } + Scenario::CeramicNewStreams => { + ceramic::new_streams::small_large_scenario(self.scenario.into()).await? + } + Scenario::CeramicNewStreamsBenchmark => { + ceramic::new_streams::benchmark_scenario(self.scenario.into(), self.topo.nonce) + .await? + } + Scenario::CeramicQuery => ceramic::query::scenario(self.scenario.into()).await?, + Scenario::ReconEventSync => recon_sync::event_sync_scenario().await?, + Scenario::ReconEventKeySync => recon_sync::event_key_sync_scenario().await?, }; self.collect_before_metrics().await?; Ok(scenario) @@ -275,7 +296,7 @@ impl ScenarioState { async fn get_peers_counter_metric( &self, metric_name: &str, - metrics_port: u32, + metrics_path: &str, // may include the port and path ) -> Result>> { // This is naive and specific to our requirement of getting a prometheus counter. let mut results = Vec::with_capacity(self.peers.len()); @@ -286,7 +307,7 @@ impl ScenarioState { if let Some(addr) = peer.ceramic_addr() { let addr = addr.parse::()?; if let Some(host) = addr.host_str() { - let url = format!("http://{}:{}", host, metrics_port); + let url = format!("http://{}:{}", host, metrics_path); let metric = self .metrics_collector .collect_counter(url.parse()?, metric_name) @@ -311,6 +332,10 @@ impl ScenarioState { | Scenario::CeramicNewStreams | Scenario::CeramicQuery | Scenario::CeramicModelReuse => Ok(()), + Scenario::CeramicNewStreamsBenchmark => { + // we collect things in the scenario and use redit to propagate the metrics to the manager + Ok(()) + } Scenario::ReconEventSync | Scenario::ReconEventKeySync => { let peers = self .get_peers_counter_metric(EVENT_SYNC_METRIC_NAME, IPFS_SERVICE_METRICS_PORT) @@ -346,13 +371,44 @@ impl ScenarioState { | Scenario::CeramicNewStreams | Scenario::CeramicQuery | Scenario::CeramicModelReuse => (CommandResult::Success, None), + Scenario::CeramicNewStreamsBenchmark => { + let res = + new_streams::benchmark_scenario_metrics(self.peers.len(), self.topo.nonce) + .await; + if res.is_empty() { + return ( + CommandResult::Failure(anyhow!("No metrics collected")), + None, + ); + } + let target = self.target_request_rate.unwrap_or(300); + let mut errors = vec![]; + for (worker_id, count) in res { + let rps = count as f64 / metrics.duration as f64; + if rps < target as f64 { + let msg = format!( + "Worker {} did not meet the target request rate: {} < {} (total requests: {} over {})", + worker_id, rps, target, count, metrics.duration + ); + warn!(msg); + errors.push(msg); + } else { + info!("worker {} met threshold! {} > {}", worker_id, rps, target); + } + } + if errors.is_empty() { + (CommandResult::Success, None) + } else { + (CommandResult::Failure(anyhow!(errors.join("\n"))), None) + } + } Scenario::ReconEventSync | Scenario::ReconEventKeySync => { // It'd be easy to make work for other scenarios if they defined a rate and metric. However, the scenario we're // interested in is asymmetrical in what the workers do, and we're trying to look at what happens to other nodes, // which is not how most scenarios work. It also uses the IPFS metrics endpoint. We could parameterize or use a // trait, but we don't yet have a use case, and might need to use transactions, or multiple requests, or something // entirely different. Anyway, to avoid generalizing the exception we keep it simple. - let req_name = ceramic::recon_sync::CREATE_EVENT_REQ_NAME; + let req_name = recon_sync::CREATE_EVENT_REQ_NAME; let metric = match metrics .requests @@ -364,7 +420,7 @@ impl ScenarioState { Err(e) => return (e, None), }; - self.validate_scenario_success_int( + self.validate_recon_scenario_success_int( metrics.duration as u64, metric.success_count as u64, ) @@ -373,8 +429,8 @@ impl ScenarioState { } } - /// Removed from `validate_scenario_success` to make testing easier as it's hard to - async fn validate_scenario_success_int( + /// Removed from `validate_scenario_success` to make testing easier as constructing the GooseMetrics appropriately is difficult + async fn validate_recon_scenario_success_int( &self, run_time_seconds: u64, request_cnt: u64, @@ -388,7 +444,8 @@ impl ScenarioState { | Scenario::CeramicWriteOnly | Scenario::CeramicNewStreams | Scenario::CeramicQuery - | Scenario::CeramicModelReuse => (CommandResult::Success, None), + | Scenario::CeramicModelReuse + | Scenario::CeramicNewStreamsBenchmark => (CommandResult::Success, None), Scenario::ReconEventSync | Scenario::ReconEventKeySync => { let default_rate = 300; let metric_name = EVENT_SYNC_METRIC_NAME; @@ -921,7 +978,7 @@ mod test { state.collect_before_metrics().await.unwrap(); state - .validate_scenario_success_int(run_time, run_time * request_cnt) + .validate_recon_scenario_success_int(run_time, run_time * request_cnt) .await .0 }