Skip to content

Commit a535c3a

Browse files
committed
Merge branch 'master' into redis-namespace
2 parents c394a52 + 96eef0e commit a535c3a

26 files changed

Lines changed: 141 additions & 44 deletions

CHANGELOG.rst

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ Added
6969
working on StackStorm, improve our security posture, and improve CI reliability thanks in part
7070
to pants' use of PEX lockfiles. This is not a user-facing addition.
7171
#6118 #6141 #6133 #6120 #6181 #6183 #6200 #6237 #6229 #6240 #6241 #6244 #6251 #6253
72-
#6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278 #6283
72+
#6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278 #6282 #6283
7373
Contributed by @cognifloyd
7474
* Build of ST2 EL9 packages #6153
7575
Contributed by @amanda11
@@ -93,6 +93,12 @@ Added
9393
If you experience any issues when using this experimental feature, please file an issue. #6277
9494
Contributed by @cognifloyd
9595

96+
* Add new option `[messaging].prefix` to configure the prefix used in RabbitMQ exchanges and queues.
97+
The default is `st2` (resulting in exchange names like `st2.execution` and `st2.sensor`).
98+
This is primarily designed to support safely running tests in parallel where creating a vhost for
99+
each parallel test run would be a maintenance burden. #6282
100+
Contributed by @cognifloyd
101+
96102
3.8.1 - December 13, 2023
97103
-------------------------
98104
Fixed

conf/st2.conf.sample

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ connection_retries = 10
230230
connection_retry_wait = 10000
231231
# Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).
232232
login_method = None
233+
# Prefix for all exchange and queue names.
234+
prefix = st2
233235
# Use SSL / TLS to connect to the messaging server. Same as appending "?ssl=true" at the end of the connection URL string.
234236
ssl = False
235237
# ca_certs file contains a set of concatenated CA certificates, which are used to validate certificates passed from RabbitMQ.

conf/st2.dev.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ ssh_key_file = /home/vagrant/.ssh/stanley_rsa
103103

104104
[messaging]
105105
url = amqp://guest:guest@127.0.0.1:5672/
106+
prefix = st2dev
106107
# Uncomment to test SSL options
107108
#url = amqp://guest:guest@127.0.0.1:5671/
108109
#ssl = True

pants-plugins/uses_services/rabbitmq_rules.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from dataclasses import dataclass
1717
from textwrap import dedent
18+
from typing import Tuple
1819

1920
from pants.backend.python.goals.pytest_runner import (
2021
PytestPluginSetupRequest,
@@ -27,6 +28,8 @@
2728
VenvPexProcess,
2829
rules as pex_rules,
2930
)
31+
from pants.core.goals.test import TestExtraEnv
32+
from pants.engine.env_vars import EnvironmentVars
3033
from pants.engine.fs import CreateDigest, Digest, FileContent
3134
from pants.engine.rules import collect_rules, Get, MultiGet, rule
3235
from pants.engine.process import FallibleProcessResult, ProcessCacheScope
@@ -54,13 +57,17 @@ class UsesRabbitMQRequest:
5457
# These config opts for integration tests are in:
5558
# conf/st2.tests*.conf st2tests/st2tests/fixtures/conf/st2.tests*.conf
5659
# (changed by setting ST2_CONFIG_PATH env var inside the tests)
57-
# TODO: for unit tests: modify code to pull mq connect settings from env vars
58-
# TODO: for int tests: modify st2.tests*.conf on the fly to set the per-pantsd-slot vhost
59-
# and either add env vars for mq connect settings or modify conf files as well
60+
# These can also be updated via the ST2_MESSAGING_* env vars (which oslo_config reads).
61+
# Integration tests should pass these changes onto subprocesses via the same env vars.
6062

61-
# with our version of oslo.config (newer are slower) we can't directly override opts w/ environment variables.
63+
mq_urls: Tuple[str] = ("amqp://guest:guest@127.0.0.1:5672//",)
6264

63-
mq_urls: tuple[str] = ("amqp://guest:guest@127.0.0.1:5672//",)
65+
@classmethod
66+
def from_env(cls, env: EnvironmentVars) -> UsesRabbitMQRequest:
67+
default = cls()
68+
url = env.get("ST2_MESSAGING__URL", None)
69+
mq_urls = (url,) if url else default.mq_urls
70+
return UsesRabbitMQRequest(mq_urls=mq_urls)
6471

6572

6673
@dataclass(frozen=True)
@@ -83,9 +90,12 @@ def is_applicable(cls, target: Target) -> bool:
8390
)
8491
async def rabbitmq_is_running_for_pytest(
8592
request: PytestUsesRabbitMQRequest,
93+
test_extra_env: TestExtraEnv,
8694
) -> PytestPluginSetup:
8795
# this will raise an error if rabbitmq is not running
88-
_ = await Get(RabbitMQIsRunning, UsesRabbitMQRequest())
96+
_ = await Get(
97+
RabbitMQIsRunning, UsesRabbitMQRequest.from_env(env=test_extra_env.env)
98+
)
8999

90100
return PytestPluginSetup()
91101

@@ -167,6 +177,16 @@ async def rabbitmq_is_running(
167177
"""
168178
),
169179
service_start_cmd_generic="systemctl start rabbitmq-server",
180+
env_vars_hint=dedent(
181+
"""\
182+
You can also export the ST2_MESSAGING__URL env var to automatically use any
183+
RabbitMQ host, local or remote, while running unit and integration tests.
184+
If needed, you can also override the default exchange/queue name prefix
185+
by exporting ST2_MESSAGING__PREFIX. Note that tests always add a numeric
186+
suffix to the exchange/queue name prefix so that tests can safely run
187+
in parallel.
188+
"""
189+
),
170190
),
171191
)
172192

pants-plugins/uses_services/rabbitmq_rules_test.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,14 @@ def run_rabbitmq_is_running(
5151
"--backend-packages=uses_services",
5252
*(extra_args or ()),
5353
],
54-
env_inherit={"PATH", "PYENV_ROOT", "HOME"},
54+
env_inherit={
55+
"PATH",
56+
"PYENV_ROOT",
57+
"HOME",
58+
"ST2_MESSAGING__URL",
59+
"ST2_MESSAGING__PREFIX",
60+
"ST2TESTS_PARALLEL_SLOT",
61+
},
5562
)
5663
result = rule_runner.request(
5764
RabbitMQIsRunning,
@@ -62,7 +69,7 @@ def run_rabbitmq_is_running(
6269

6370
# Warning this requires that rabbitmq be running
6471
def test_rabbitmq_is_running(rule_runner: RuleRunner) -> None:
65-
request = UsesRabbitMQRequest()
72+
request = UsesRabbitMQRequest.from_env(env=rule_runner.environment)
6673
mock_platform = platform(os="TestMock")
6774

6875
# we are asserting that this does not raise an exception

pants.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ extra_env_vars = [
248248
"ST2_DATABASE__CONNECTION_TIMEOUT",
249249
"ST2_DATABASE__USERNAME",
250250
"ST2_DATABASE__PASSWORD",
251+
# Use these to override RabbitMQ connection details
252+
"ST2_MESSAGING__URL",
253+
"ST2_MESSAGING__PREFIX", # Tests will modify this to be "{prefix}{ST2TESTS_PARALLEL_SLOT}"
251254
# Use these to override Redis connection details
252255
"ST2TESTS_REDIS_HOST",
253256
"ST2TESTS_REDIS_PORT",

st2actions/tests/integration/test_actions_queue_consumer.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
import random
1818
import eventlet
1919

20-
from kombu import Exchange
21-
from kombu import Queue
2220
from unittest import TestCase
2321

2422
from st2common.transport.consumers import ActionsQueueConsumer
23+
from st2common.transport.kombu import Exchange, Queue
2524
from st2common.transport.publishers import PoolPublisher
2625
from st2common.transport import utils as transport_utils
2726
from st2common.models.db.liveaction import LiveActionDB
@@ -35,7 +34,7 @@ class ActionsQueueConsumerTestCase(TestCase):
3534

3635
def test_stop_consumption_on_shutdown(self):
3736
exchange = Exchange("st2.execution.test", type="topic")
38-
queue_name = "test-" + str(random.randint(1, 10000))
37+
queue_name = f"st2.test-{random.randint(1, 10000)}"
3938
queue = Queue(
4039
name=queue_name, exchange=exchange, routing_key="#", auto_delete=True
4140
)

st2common/benchmarks/micro/test_publisher_compression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
monkey_patch()
1818

19-
from kombu import Exchange
2019
from kombu.serialization import pickle
2120

2221
import os
@@ -27,6 +26,7 @@
2726

2827
from st2common.models.db.liveaction import LiveActionDB
2928
from st2common.transport import publishers
29+
from st2common.transport.kombu import Exchange
3030

3131
from common import FIXTURES_DIR
3232
from common import PYTEST_FIXTURE_FILE_PARAM_DECORATOR

st2common/st2common/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,11 @@ def register_opts(ignore_errors=False):
404404
help="Compression algorithm to use for compressing the payloads which are sent over "
405405
"the message bus. Defaults to no compression.",
406406
),
407+
cfg.StrOpt(
408+
"prefix",
409+
default="st2",
410+
help="Prefix for all exchange and queue names.",
411+
),
407412
]
408413

409414
do_register_opts(messaging_opts, "messaging", ignore_errors)

st2common/st2common/stream/listener.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,15 @@ def get_consumers(self, consumer, channel):
5858
raise NotImplementedError("get_consumers() is not implemented")
5959

6060
def processor(self, model=None):
61+
exchange_prefix = cfg.CONF.messaging.prefix
62+
6163
def process(body, message):
6264
meta = message.delivery_info
63-
event_name = "%s__%s" % (meta.get("exchange"), meta.get("routing_key"))
65+
event_prefix = meta.get("exchange", "")
66+
if exchange_prefix != "st2" and event_prefix.startswith(exchange_prefix):
67+
# use well-known event names over configurable exchange names
68+
event_prefix = event_prefix.replace(f"{exchange_prefix}.", "st2.", 1)
69+
event_name = f"{event_prefix}__{meta.get('routing_key')}"
6470

6571
try:
6672
if model:

0 commit comments

Comments
 (0)