Skip to content

Commit ffd788b

Browse files
authored
Merge pull request #6985 from Flowminder/separate-cache-cleanup
Separate cache cleanup
2 parents bb9acc2 + d206c09 commit ffd788b

13 files changed

Lines changed: 238 additions & 133 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
99
### Added
1010

1111
### Changed
12+
- Flowmachine's cache cleanup service is now a separate docker container from the main server. [#5911](https://github.com/Flowminder/FlowKit/issues/5911)
13+
> [!WARNING]
14+
> You will need to update your FlowKit deployment to include an additional flowmachine container which sets the command to `"cache-cleanup"`.
1215
1316
### Fixed
1417

Makefile

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,14 @@ DOCKER_COMPOSE_SYNTHETICDATA_FILE ?= docker-compose-syntheticdata.yml
2525
DOCKER_COMPOSE_FILE_BUILD ?= docker-compose-build.yml
2626
DOCKER_COMPOSE_TESTDATA_FILE_BUILD ?= docker-compose-testdata-build.yml
2727
DOCKER_COMPOSE_SYNTHETICDATA_FILE_BUILD ?= docker-compose-syntheticdata-build.yml
28-
DOCKER_SERVICES ?= flowdb flowapi flowmachine flowauth flowmachine_query_locker flowetl flowetl_db worked_examples
28+
DOCKER_SERVICES ?= flowdb flowapi flowmachine flowauth flowmachine_query_locker flowmachine_cache_cleanup flowetl flowetl_db worked_examples
2929
DOCKER_SERVICES_TO_START = $(patsubst flowdb%,flowdb,$(DOCKER_SERVICES))
30-
services := flowmachine flowmachine_query_locker flowapi flowauth flowdb worked_examples flowdb_testdata flowdb_synthetic_data flowetl flowetl_db
30+
services := flowmachine flowmachine_query_locker flowmachine_cache_cleanup flowapi flowauth flowdb worked_examples flowdb_testdata flowdb_synthetic_data flowetl flowetl_db
3131
space :=
3232
space +=
3333
DOCKER_COMPOSE := docker compose -f $(DOCKER_COMPOSE_FILE)
3434
FLOWDB_SERVICE := $(filter flowdb%, $(DOCKER_SERVICES))
3535

36-
# Add autoflow if specified
37-
NUM_AUTOFLOW=$(words $(filter autoflow%, $(DOCKER_SERVICES)))
38-
ifeq ($(NUM_AUTOFLOW),1)
39-
DOCKER_COMPOSE += -f $(DOCKER_COMPOSE_AUTOFLOW_FILE)
40-
endif
4136

4237
# Check that at most one flowdb service is present in DOCKER_SERVICES
4338
NUM_SPECIFIED_FLOWDB_SERVICES=$(words $(filter flowdb%, $(DOCKER_SERVICES)))

docker-compose.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,30 @@ services:
9999
- db
100100
- redis
101101

102+
flowmachine_cache_cleanup:
103+
container_name: flowmachine_cache_cleanup
104+
image: flowminder/flowmachine:${CONTAINER_TAG:-latest}
105+
depends_on:
106+
- flowmachine_query_locker
107+
command:
108+
- "cache-cleanup"
109+
environment:
110+
- FLOWMACHINE_PORT=5555
111+
- FLOWMACHINE_LOG_LEVEL=${FLOWMACHINE_LOG_LEVEL:?Must set FLOWMACHINE_LOG_LEVEL env var}
112+
- FLOWMACHINE_SERVER_DEBUG_MODE=${FLOWMACHINE_SERVER_DEBUG_MODE:?Must set FLOWMACHINE_SERVER_DEBUG_MODE env var}
113+
- FLOWMACHINE_SERVER_DISABLE_DEPENDENCY_CACHING=${FLOWMACHINE_SERVER_DISABLE_DEPENDENCY_CACHING:?Must set FLOWMACHINE_SERVER_DISABLE_DEPENDENCY_CACHING env var}
114+
- FLOWDB_PORT=5432
115+
- FLOWDB_HOST=flowdb
116+
- FLOWMACHINE_FLOWDB_USER=${FLOWMACHINE_FLOWDB_USER:?Must set FLOWMACHINE_FLOWDB_USER env var}
117+
- FLOWMACHINE_FLOWDB_PASSWORD=${FLOWMACHINE_FLOWDB_PASSWORD:?Must set FLOWMACHINE_FLOWDB_PASSWORD env var}
118+
- REDIS_HOST=flowmachine_query_locker
119+
- REDIS_PORT=6379
120+
- REDIS_PASSWORD=${REDIS_PASSWORD:?Must set REDIS_PASSWORD env var}
121+
restart: always
122+
networks:
123+
- db
124+
- redis
125+
102126
worked_examples:
103127
container_name: worked_examples
104128
image: flowminder/flowkit-examples:${CONTAINER_TAG:-latest}

docs/source/administrator/deployment.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ We recommend running FlowETL using the celery scheduler, in which case you will
118118
Generating Fernet keys
119119

120120
A convenient way to generate Fernet keys is to use the python [cryptography](https://cryptography.io/) package. After installing, you can generate a new key by running `python -c "from cryptography.fernet import Fernet;print(Fernet.generate_key().decode())"`.
121-
121+
=
122122

123123
See also the [airflow documentation](https://airflow.apache.org/docs/stable/) for other configuration options which you can provide as environment variables.
124124

flowmachine.Dockerfile

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ FROM python:3.12-bullseye@sha256:8a0a0b029be52d01c57e91037658a47fc46e37abd299454
77
ARG SOURCE_VERSION=0+unknown
88
ENV SOURCE_VERSION=${SOURCE_VERSION}
99
ENV SOURCE_TREE=FlowKit-${SOURCE_VERSION}
10+
ENV TINI_VERSION="v0.19.0"
11+
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
12+
RUN chmod +x /tini
1013
WORKDIR /${SOURCE_TREE}/flowmachine
1114
COPY ./flowmachine/Pipfile* ./
1215
RUN apt-get update && \
@@ -21,8 +24,9 @@ RUN apt-get update && \
2124
pipenv run pip install --no-deps --no-cache-dir . && \
2225
apt-get -y remove git && \
2326
apt purge -y --auto-remove && \
24-
rm -rf /var/lib/apt/lists/*
25-
CMD ["pipenv", "run", "flowmachine"]
27+
rm -rf /var/lib/apt/lists/*
28+
ENTRYPOINT ["/tini", "--", "pipenv", "run"]
29+
CMD ["flowmachine"]
2630
# FlowDB has a default role named flowmachine for use with the flowmachine server
2731
# when starting the container with a different user, that user must be in the flowmachine
2832
# role

flowmachine/flowmachine/core/cache.py

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -880,69 +880,3 @@ def cache_table_exists(connection: "Connection", query_id: str) -> bool:
880880
return True
881881
except ValueError:
882882
return False
883-
884-
885-
async def watch_and_shrink_cache(
886-
*,
887-
flowdb_connection: "Connection",
888-
pool: Executor,
889-
sleep_time: int = 86400,
890-
timeout: Optional[int] = 600,
891-
loop: bool = True,
892-
size_threshold: int = None,
893-
dry_run: bool = False,
894-
protected_period: Optional[int] = None,
895-
) -> None:
896-
"""
897-
Background task to periodically trigger a shrink of the cache.
898-
899-
Parameters
900-
----------
901-
flowdb_connection : Connection
902-
Flowdb connection to check dates on
903-
pool : Executor
904-
Executor to run the date check with
905-
sleep_time : int, default 86400
906-
Number of seconds to sleep for between checks
907-
timeout : int or None, default 600
908-
Seconds to wait for a cache shrink to complete before cancelling it
909-
loop : bool, default True
910-
Set to false to return after the first check
911-
size_threshold : int, default None
912-
Optionally override the maximum cache size set in flowdb.
913-
dry_run : bool, default False
914-
Set to true to just report the objects that would be removed and not remove them
915-
protected_period : int, default None
916-
Optionally specify a number of seconds within which cache entries are excluded. If None,
917-
the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection
918-
completely.
919-
920-
Returns
921-
-------
922-
None
923-
924-
"""
925-
shrink_func = partial(
926-
shrink_below_size,
927-
connection=flowdb_connection,
928-
size_threshold=size_threshold,
929-
dry_run=dry_run,
930-
protected_period=protected_period,
931-
)
932-
while True:
933-
logger.debug("Checking if cache should be shrunk.")
934-
935-
try: # Set the shrink function running with a copy of the current execution context (db conn etc) in background thread
936-
await asyncio.wait_for(
937-
asyncio.get_running_loop().run_in_executor(
938-
pool, copy_context().run, shrink_func
939-
),
940-
timeout=timeout,
941-
)
942-
except (TimeoutError, asyncio.exceptions.TimeoutError):
943-
logger.error(
944-
f"Failed to complete cache shrink within {timeout}s. Trying again in {sleep_time}s."
945-
)
946-
if not loop:
947-
break
948-
await asyncio.sleep(sleep_time)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# This Source Code Form is subject to the terms of the Mozilla Public
2+
# License, v. 2.0. If a copy of the MPL was not distributed with this
3+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
from contextvars import copy_context
5+
6+
import asyncio
7+
from concurrent.futures import Executor
8+
9+
import structlog
10+
from functools import partial
11+
from typing import Optional
12+
13+
import flowmachine
14+
from flowmachine.core import Connection
15+
from flowmachine.core.cache import shrink_below_size
16+
from flowmachine.core.context import get_db, get_executor
17+
from flowmachine.core.server.server_config import get_server_config
18+
19+
logger = structlog.get_logger("flowmachine.debug", submodule=__name__)
20+
21+
22+
async def watch_and_shrink_cache(
23+
*,
24+
flowdb_connection: "Connection",
25+
pool: Executor,
26+
sleep_time: int = 86400,
27+
timeout: Optional[int] = 600,
28+
loop: bool = True,
29+
size_threshold: int = None,
30+
dry_run: bool = False,
31+
protected_period: Optional[int] = None,
32+
) -> None:
33+
"""
34+
Background task to periodically trigger a shrink of the cache.
35+
36+
Parameters
37+
----------
38+
flowdb_connection : Connection
39+
Flowdb connection to check dates on
40+
pool : Executor
41+
Executor to run the date check with
42+
sleep_time : int, default 86400
43+
Number of seconds to sleep for between checks
44+
timeout : int or None, default 600
45+
Seconds to wait for a cache shrink to complete before cancelling it
46+
loop : bool, default True
47+
Set to false to return after the first check
48+
size_threshold : int, default None
49+
Optionally override the maximum cache size set in flowdb.
50+
dry_run : bool, default False
51+
Set to true to just report the objects that would be removed and not remove them
52+
protected_period : int, default None
53+
Optionally specify a number of seconds within which cache entries are excluded. If None,
54+
the value stored in cache.cache_config will be used.Set to a negative number to ignore cache protection
55+
completely.
56+
57+
Returns
58+
-------
59+
None
60+
61+
"""
62+
shrink_func = partial(
63+
shrink_below_size,
64+
connection=flowdb_connection,
65+
size_threshold=size_threshold,
66+
dry_run=dry_run,
67+
protected_period=protected_period,
68+
)
69+
while True:
70+
logger.debug("Checking if cache should be shrunk.")
71+
72+
try: # Set the shrink function running with a copy of the current execution context (db conn etc) in background thread
73+
await asyncio.wait_for(
74+
asyncio.get_running_loop().run_in_executor(
75+
pool, copy_context().run, shrink_func
76+
),
77+
timeout=timeout,
78+
)
79+
except (TimeoutError, asyncio.exceptions.TimeoutError):
80+
logger.error(
81+
f"Failed to complete cache shrink within {timeout}s. Trying again in {sleep_time}s."
82+
)
83+
if not loop:
84+
break
85+
await asyncio.sleep(sleep_time)
86+
87+
88+
def main():
89+
# Read config options from environment variables
90+
config = get_server_config()
91+
# Connect to flowdb
92+
flowmachine.connect()
93+
94+
if config.debug_mode:
95+
logger.info(
96+
"Enabling asyncio's debugging mode.",
97+
sleep_time=config.cache_pruning_frequency,
98+
timeout=config.cache_pruning_timeout,
99+
)
100+
101+
logger.info(
102+
"Starting cache cleanup service.",
103+
)
104+
# Run loop which periodically checks if the cache can/should be resized
105+
asyncio.run(
106+
watch_and_shrink_cache(
107+
flowdb_connection=get_db(),
108+
pool=get_executor(),
109+
sleep_time=config.cache_pruning_frequency,
110+
timeout=config.cache_pruning_timeout,
111+
),
112+
debug=config.debug_mode,
113+
) # note: asyncio.run() requires Python 3.7+
114+
115+
116+
if __name__ == "__main__":
117+
main()

flowmachine/flowmachine/core/server/server.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55
import asyncio
6-
from concurrent.futures import Executor
76
from json import JSONDecodeError
87
import traceback
98

@@ -20,9 +19,7 @@
2019
from zmq.asyncio import Context
2120

2221
import flowmachine
23-
from flowmachine.core import Query, Connection
24-
from flowmachine.core.cache import watch_and_shrink_cache
25-
from flowmachine.core.context import get_db, get_executor, action_request_context
22+
from flowmachine.core.context import action_request_context
2623
from flowmachine.utils import convert_dict_keys_to_strings
2724
from .exceptions import FlowmachineServerError
2825
from .zmq_helpers import ZMQReply
@@ -232,14 +229,6 @@ async def recv(*, config: "FlowmachineServerConfig") -> NoReturn:
232229
main_loop = asyncio.get_event_loop()
233230
main_loop.add_signal_handler(signal.SIGTERM, partial(shutdown, socket=socket))
234231

235-
main_loop.create_task(
236-
watch_and_shrink_cache(
237-
flowdb_connection=get_db(),
238-
pool=get_executor(),
239-
sleep_time=config.cache_pruning_frequency,
240-
timeout=config.cache_pruning_timeout,
241-
)
242-
)
243232
try:
244233
while True:
245234
await receive_next_zmq_message_and_send_back_reply(

flowmachine/setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ def read(filename, parent=None):
5858
version=versioneer.get_version(),
5959
cmdclass=versioneer.get_cmdclass(),
6060
entry_points={
61-
"console_scripts": ["flowmachine = flowmachine.core.server.server:main"]
61+
"console_scripts": [
62+
"flowmachine = flowmachine.core.server.server:main",
63+
"cache-cleanup = flowmachine.core.server.cache_cleanup:main",
64+
]
6265
},
6366
description="Digestion program for Call Detail Record (CDR) data.",
6467
long_description=readme,

flowmachine/tests/test_cache_utils.py

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
write_query_to_cache,
3636
get_cache_protected_period,
3737
set_cache_protected_period,
38-
watch_and_shrink_cache,
3938
)
4039
from flowmachine.core.context import get_db, get_redis, get_executor
4140
from flowmachine.core.query_state import QueryState, QueryStateMachine
@@ -608,44 +607,3 @@ def test_cache_ddl_op_error(dummy_redis):
608607
ddl_ops_func=Mock(side_effect=TestException),
609608
)
610609
assert qsm.current_query_state == QueryState.ERRORED
611-
612-
613-
@pytest.mark.asyncio
614-
async def test_cache_watch_does_shrink(flowmachine_connect):
615-
"""
616-
Test that the cache watcher will shrink cache tables.
617-
"""
618-
dl = daily_location("2016-01-01").store().result()
619-
assert dl.is_stored
620-
assert get_size_of_cache(get_db()) > 0
621-
await watch_and_shrink_cache(
622-
flowdb_connection=get_db(),
623-
pool=get_executor(),
624-
sleep_time=0,
625-
loop=False,
626-
protected_period=-1,
627-
size_threshold=1,
628-
)
629-
assert not dl.is_stored
630-
assert get_size_of_cache(get_db()) == 0
631-
632-
633-
@pytest.mark.asyncio
634-
async def test_cache_watch_does_timeout(flowmachine_connect, json_log):
635-
"""
636-
Test that the cache watcher will timeout and log that it has.
637-
"""
638-
await watch_and_shrink_cache(
639-
flowdb_connection=get_db(),
640-
pool=get_executor(),
641-
sleep_time=0,
642-
loop=False,
643-
protected_period=-1,
644-
size_threshold=1,
645-
timeout=0,
646-
)
647-
log_lines = [x for x in json_log().err if x["level"] == "error"]
648-
assert (
649-
log_lines[0]["event"]
650-
== "Failed to complete cache shrink within 0s. Trying again in 0s."
651-
)

0 commit comments

Comments
 (0)