Skip to content

Commit 78880f2

Browse files
mattp-dwoz
authored andcommitted
PillarCache: reimplement using salt.cache
took the liberty of making it a proper subclass in the process. this now uses the salt.cache infrastructure such that it can be driven by the base cache driver or a different one if so desired. functionality should be equivalent, including using the base bank=pillar key=minion_id for merged pillar, such that minion_data_cache can take advantage of the same cache. because we are updating the cache at the source, we no longer need to update the cache in master/masterapi.
1 parent 34b6563 commit 78880f2

16 files changed

Lines changed: 388 additions & 413 deletions

File tree

changelog/68030.changed.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
PillarCache: reimplement using salt.cache
2+
fix minion data cache organization/move pillar and grains to dedicated cache banks
3+
salt.cache: allow cache.store() to set expires per key

changelog/68030.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
salt.key: check_minion_cache performance optimization

salt/cache/__init__.py

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import salt.config
1313
import salt.loader
1414
import salt.syspaths
15+
from salt.utils.decorators import cached_property
1516

1617
log = logging.getLogger(__name__)
1718

@@ -59,30 +60,28 @@ class Cache:
5960

6061
def __init__(self, opts, cachedir=None, **kwargs):
6162
self.opts = opts
62-
if cachedir is None:
63-
self.cachedir = opts.get("cachedir", salt.syspaths.CACHE_DIR)
63+
64+
if kwargs.get("driver"):
65+
self.driver = kwargs["driver"]
6466
else:
65-
self.cachedir = cachedir
66-
self.driver = kwargs.get(
67-
"driver", opts.get("cache", salt.config.DEFAULT_MASTER_OPTS["cache"])
67+
self.driver = opts.get("cache", salt.config.DEFAULT_MASTER_OPTS["cache"])
68+
69+
self.cachedir = kwargs["cachedir"] = cachedir or opts.get(
70+
"cachedir", salt.syspaths.CACHE_DIR
6871
)
6972
self._modules = None
7073
self._kwargs = kwargs
71-
self._kwargs["cachedir"] = self.cachedir
72-
73-
def __lazy_init(self):
74-
self._modules = salt.loader.cache(self.opts)
75-
fun = f"{self.driver}.init_kwargs"
76-
if fun in self.modules:
77-
self._kwargs = self.modules[fun](self._kwargs)
78-
else:
79-
self._kwargs = {}
8074

81-
@property
75+
@cached_property
8276
def modules(self):
83-
if self._modules is None:
84-
self.__lazy_init()
85-
return self._modules
77+
return salt.loader.cache(self.opts)
78+
79+
@cached_property
80+
def kwargs(self):
81+
try:
82+
return self.modules[f"{self.driver}.init_kwargs"](self._kwargs)
83+
except KeyError:
84+
return {}
8685

8786
def cache(self, bank, key, fun, loop_fun=None, **kwargs):
8887
"""
@@ -148,7 +147,7 @@ def store(self, bank, key, data, expires=None):
148147
"""
149148
fun = f"{self.driver}.store"
150149
try:
151-
return self.modules[fun](bank, key, data, expires=expires, **self._kwargs)
150+
return self.modules[fun](bank, key, data, expires=expires, **self.kwargs)
152151
except TypeError:
153152
# if the backing store doesnt natively support expiry, we handle it as a fallback
154153
if expires:
@@ -157,10 +156,10 @@ def store(self, bank, key, data, expires=None):
157156
)
158157
expires_at = int(expires_at.timestamp())
159158
return self.modules[fun](
160-
bank, key, {"data": data, "_expires": expires_at}, **self._kwargs
159+
bank, key, {"data": data, "_expires": expires_at}, **self.kwargs
161160
)
162161
else:
163-
return self.modules[fun](bank, key, data, **self._kwargs)
162+
return self.modules[fun](bank, key, data, **self.kwargs)
164163

165164
def fetch(self, bank, key):
166165
"""
@@ -184,7 +183,7 @@ def fetch(self, bank, key):
184183
in the cache backend (auth, permissions, etc).
185184
"""
186185
fun = f"{self.driver}.fetch"
187-
ret = self.modules[fun](bank, key, **self._kwargs)
186+
ret = self.modules[fun](bank, key, **self.kwargs)
188187

189188
# handle fallback if necessary
190189
if isinstance(ret, dict) and set(ret.keys()) == {"data", "_expires"}:
@@ -218,7 +217,7 @@ def updated(self, bank, key):
218217
in the cache backend (auth, permissions, etc).
219218
"""
220219
fun = f"{self.driver}.updated"
221-
return self.modules[fun](bank, key, **self._kwargs)
220+
return self.modules[fun](bank, key, **self.kwargs)
222221

223222
def flush(self, bank, key=None):
224223
"""
@@ -239,7 +238,7 @@ def flush(self, bank, key=None):
239238
in the cache backend (auth, permissions, etc).
240239
"""
241240
fun = f"{self.driver}.flush"
242-
return self.modules[fun](bank, key=key, **self._kwargs)
241+
return self.modules[fun](bank, key=key, **self.kwargs)
243242

244243
def list(self, bank):
245244
"""
@@ -258,7 +257,7 @@ def list(self, bank):
258257
in the cache backend (auth, permissions, etc).
259258
"""
260259
fun = f"{self.driver}.list"
261-
return self.modules[fun](bank, **self._kwargs)
260+
return self.modules[fun](bank, **self.kwargs)
262261

263262
def contains(self, bank, key=None):
264263
"""
@@ -283,7 +282,7 @@ def contains(self, bank, key=None):
283282
in the cache backend (auth, permissions, etc).
284283
"""
285284
fun = f"{self.driver}.contains"
286-
return self.modules[fun](bank, key, **self._kwargs)
285+
return self.modules[fun](bank, key, **self.kwargs)
287286

288287

289288
class MemCache(Cache):
@@ -336,10 +335,15 @@ def fetch(self, bank, key):
336335
if self.debug:
337336
self.call += 1
338337
now = time.time()
338+
expires = None
339339
record = self.storage.pop((bank, key), None)
340340
# Have a cached value for the key
341341
if record is not None:
342-
(created_at, expires, data) = record
342+
if len(record) == 2:
343+
(created_at, data) = record
344+
elif len(record) == 3:
345+
(created_at, expires, data) = record
346+
343347
if (created_at + (expires or self.expire)) >= now:
344348
if self.debug:
345349
self.hit += 1
@@ -352,7 +356,7 @@ def fetch(self, bank, key):
352356
# update atime and return
353357
record[0] = now
354358
self.storage[(bank, key)] = record
355-
return record[1]
359+
return data
356360

357361
# Have no value for the key or value is expired
358362
data = super().fetch(bank, key)
@@ -361,12 +365,12 @@ def fetch(self, bank, key):
361365
MemCache.__cleanup(self.expire)
362366
if len(self.storage) >= self.max:
363367
self.storage.popitem(last=False)
364-
self.storage[(bank, key)] = [now, data]
368+
self.storage[(bank, key)] = [now, self.expire, data]
365369
return data
366370

367371
def store(self, bank, key, data, expires=None):
368372
self.storage.pop((bank, key), None)
369-
super().store(bank, key, data, expires)
373+
super().store(bank, key, data, expires=expires)
370374
if len(self.storage) >= self.max:
371375
if self.cleanup:
372376
MemCache.__cleanup(self.expire)

salt/cache/localfs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def fetch(bank, key, cachedir):
7676
inkey = False
7777
key_file = salt.utils.path.join(cachedir, os.path.normpath(bank), f"{key}.p")
7878
if not os.path.isfile(key_file):
79+
log.debug('Cache file "%s" does not exist', key_file)
7980
# The bank includes the full filename, and the key is inside the file
8081
key_file = salt.utils.path.join(cachedir, os.path.normpath(bank) + ".p")
8182
inkey = True

salt/config/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,8 @@ def _gather_buffer_space():
10351035
"request_server_aes_session": int,
10361036
# Minimum authentication protocol version to accept from minions
10371037
"minimum_auth_version": int,
1038+
# optional cache driver for pillar cache
1039+
"pillar.cache_driver": (type(None), str),
10381040
}
10391041
)
10401042

@@ -1350,6 +1352,7 @@ def _gather_buffer_space():
13501352
"encryption_algorithm": "OAEP-SHA1",
13511353
"signing_algorithm": "PKCS1v15-SHA1",
13521354
"keys.cache_driver": "localfs_key",
1355+
"pillar.cache_driver": None,
13531356
}
13541357
)
13551358

@@ -1715,6 +1718,7 @@ def _gather_buffer_space():
17151718
"request_server_aes_session": 0,
17161719
"request_server_ttl": 0,
17171720
"minimum_auth_version": 3,
1721+
"pillar.cache_driver": None,
17181722
}
17191723
)
17201724

salt/daemons/masterapi.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,6 @@ def _pillar(self, load):
789789
)
790790
data = pillar.compile_pillar()
791791
if self.opts.get("minion_data_cache", False):
792-
self.cache.store("pillar", load["id"], data)
793792
self.cache.store("grains", load["id"], load["grains"])
794793
if self.opts.get("minion_data_cache_events") is True:
795794
self.event.fire_event(

salt/master.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -724,16 +724,6 @@ def _pre_flight(self):
724724
if not self.opts["fileserver_backend"]:
725725
errors.append("No fileserver backends are configured")
726726

727-
# Check to see if we need to create a pillar cache dir
728-
if self.opts["pillar_cache"] and not os.path.isdir(
729-
os.path.join(self.opts["cachedir"], "pillar_cache")
730-
):
731-
try:
732-
with salt.utils.files.set_umask(0o077):
733-
os.mkdir(os.path.join(self.opts["cachedir"], "pillar_cache"))
734-
except OSError:
735-
pass
736-
737727
if self.opts.get("git_pillar_verify_config", True):
738728
try:
739729
git_pillars = [
@@ -1766,7 +1756,6 @@ def _pillar(self, load):
17661756
self.fs_.update_opts()
17671757
if self.opts.get("minion_data_cache", False):
17681758
self.masterapi.cache.store("grains", load["id"], load["grains"])
1769-
self.masterapi.cache.store("pillar", load["id"], data)
17701759

17711760
if self.opts.get("minion_data_cache_events") is True:
17721761
self.event.fire_event(

0 commit comments

Comments
 (0)