diff --git a/hosts/models.py b/hosts/models.py index 96f163bf..2e3ac715 100644 --- a/hosts/models.py +++ b/hosts/models.py @@ -178,27 +178,28 @@ def process_update(self, package, highest_package): def find_updates(self): kernels_q = Q(name__name='kernel') | \ - Q(name__name='kernel-devel') | \ - Q(name__name='kernel-preempt') | \ - Q(name__name='kernel-preempt-devel') | \ - Q(name__name='kernel-rt') | \ - Q(name__name='kernel-rt-devel') | \ - Q(name__name='kernel-debug') | \ - Q(name__name='kernel-debug-devel') | \ - Q(name__name='kernel-default') | \ - Q(name__name='kernel-default-devel') | \ - Q(name__name='kernel-headers') | \ - Q(name__name='kernel-core') | \ - Q(name__name='kernel-modules') | \ - Q(name__name='virtualbox-kmp-default') | \ - Q(name__name='virtualbox-kmp-preempt') | \ - Q(name__name='kernel-uek') | \ - Q(name__name='kernel-uek-devel') | \ - Q(name__name='kernel-uek-debug') | \ - Q(name__name='kernel-uek-debug-devel') | \ - Q(name__name='kernel-uek-container') | \ - Q(name__name='kernel-uek-container-debug') | \ - Q(name__name='kernel-uek-doc') + Q(name__name__startswith='kernel-') | \ + Q(name__name__startswith='virtualbox-kmp-') | \ + Q(name__name__startswith='linux-image-') | \ + Q(name__name__startswith='linux-headers-') | \ + Q(name__name__regex=r'^linux-modules-\d') | \ + Q(name__name__regex=r'^linux-modules-extra-\d') | \ + Q(name__name__startswith='linux-tools-') | \ + Q(name__name__startswith='linux-cloud-tools-') | \ + Q(name__name__startswith='linux-kbuild-') | \ + Q(name__name__startswith='linux-support-') | \ + Q(name__name='linux') | \ + Q(name__name='linux-lts') | \ + Q(name__name='linux-zen') | \ + Q(name__name='linux-hardened') | \ + Q(name__name='linux-rt') | \ + Q(name__name='linux-rt-lts') | \ + Q(name__name='linux-headers') | \ + Q(name__name='linux-lts-headers') | \ + Q(name__name='linux-zen-headers') | \ + Q(name__name='linux-hardened-headers') | \ + Q(name__name='linux-rt-headers') | \ + Q(name__name='linux-rt-lts-headers') repo_packages = self.get_host_repo_packages() host_packages = self.packages.exclude(kernels_q).distinct() kernel_packages = self.packages.filter(kernels_q) @@ -313,42 +314,292 @@ def find_osrelease_repo_updates(self, host_packages, repo_packages): return update_ids def check_if_reboot_required(self, host_highest): + """Check if a reboot is required (running kernel < installed highest). - ver, rel = self.kernel.split('-')[:2] + Uses labelCompare for RPM-style version tuples parsed from uname -r. + Only valid for RPM kernels — DEB and Arch use compare_version via + their respective find_*_kernel_updates methods. + """ + parts = self.kernel.split('-') + if len(parts) < 2: + return + ver, rel = parts[:2] + # strip arch suffix from uname -r release (e.g. '.x86_64', '.aarch64') + arch_suffix = '.' + self.arch.name + if rel.endswith(arch_suffix): + rel = rel[:-len(arch_suffix)] + # SUSE uname -r truncates the micro release (e.g. '160000.8' vs + # RPM release '160000.8.1'), so check for prefix match first + if host_highest.version == ver and \ + (host_highest.release == rel or + host_highest.release.startswith(rel + '.')): + self.reboot_required = False + return kernel_ver = ('', str(ver), str(rel)) host_highest_ver = ('', host_highest.version, host_highest.release) if labelCompare(kernel_ver, host_highest_ver) == -1: self.reboot_required = True else: self.reboot_required = False - self.save() + + def _get_deb_kernel_flavour(self, pkg_name): + """Extract the flavour suffix from a DEB kernel package name. + + e.g. 'linux-image-6.8.0-51-generic' → 'generic' + 'linux-image-6.8.0-51-lowlatency' → 'lowlatency' + 'linux-image-6.1.0-28-cloud-amd64' → 'cloud-amd64' + 'linux-modules-extra-6.8.0-51-generic' → 'generic' + Returns None if the flavour cannot be determined. + """ + for prefix in self._deb_kernel_prefixes: + if pkg_name.startswith(prefix): + # strip prefix, then split version from flavour + # e.g. '6.8.0-51-generic' or '6.1.0-28-cloud-amd64' + remainder = pkg_name[len(prefix):] + # version parts are numeric/dotted, flavour starts after + # e.g. '6.8.0-51-generic' → parts=['6.8.0', '51', 'generic'] + parts = remainder.split('-') + # find first non-numeric part (not starting with digit) + for i, part in enumerate(parts): + if part and not part[0].isdigit(): + return '-'.join(parts[i:]) + return None + return None + + def _get_running_kernel_flavour(self): + """Extract the flavour from the running kernel string. + + e.g. '6.8.0-51-generic' → 'generic' + '6.8.0-51-lowlatency' → 'lowlatency' + '6.1.0-28-cloud-amd64' → 'cloud-amd64' + Returns None for RPM-style kernels (no flavour suffix). + """ + parts = self.kernel.split('-') + if len(parts) >= 2: + # find first non-numeric part after the version + for i, part in enumerate(parts): + if i > 0 and part and not part[0].isdigit(): + return '-'.join(parts[i:]) + return None + + # longest prefixes first to avoid linux-modules- matching linux-modules-extra- + _deb_kernel_prefixes = [ + 'linux-image-unsigned-', + 'linux-modules-extra-', + 'linux-cloud-tools-', + 'linux-image-uc-', + 'linux-image-', + 'linux-headers-', + 'linux-modules-', + 'linux-support-', + 'linux-kbuild-', + 'linux-tools-', + ] def find_kernel_updates(self, kernel_packages, repo_packages): update_ids = [] + self.reboot_required = False + + deb_kernels = kernel_packages.filter(packagetype='D') + rpm_kernels = kernel_packages.filter(packagetype='R') + arch_kernels = kernel_packages.filter(packagetype='A') + + update_ids.extend(self._find_rpm_kernel_updates(rpm_kernels, repo_packages)) + update_ids.extend(self._find_deb_kernel_updates(deb_kernels, repo_packages)) + update_ids.extend(self._find_arch_kernel_updates(arch_kernels, repo_packages)) + + self.save(update_fields=['reboot_required']) + return update_ids + + def _find_rpm_kernel_updates(self, kernel_packages, repo_packages): + + update_ids = [] + + # parse running kernel version for comparison + parts = self.kernel.split('-') + if len(parts) < 2: + return update_ids + ver, rel = parts[:2] + # strip arch suffix from uname -r release (e.g. '.x86_64') + arch_suffix = '.' + self.arch.name + if rel.endswith(arch_suffix): + rel = rel[:-len(arch_suffix)] + + # deduplicate: only process each kernel package name once + processed_names = set() + for package in kernel_packages: - host_highest = package - repo_highest = package + if package.name_id in processed_names: + continue + processed_names.add(package.name_id) pu_q = Q(name=package.name) - potential_updates = repo_packages.filter(pu_q) - for pu in potential_updates: - if package.compare_version(pu) == -1 \ - and repo_highest.compare_version(pu) == -1: + + # find repo highest for this kernel name + repo_highest = None + for pu in repo_packages.filter(pu_q): + if repo_highest is None or repo_highest.compare_version(pu) == -1: repo_highest = pu - host_packages = self.packages.filter(pu_q) - for hp in host_packages: - if package.compare_version(hp) == -1 and \ - host_highest.compare_version(hp) == -1: + if repo_highest is None: + continue + + # find host highest installed for reboot check + host_highest = None + running_package = None + for hp in self.packages.filter(pu_q): + if host_highest is None or host_highest.compare_version(hp) == -1: host_highest = hp + # match installed package to running kernel + # SUSE uname -r truncates micro release ('160000.8' vs + # RPM '160000.8.1') so use prefix match with dot boundary + if hp.version == ver and \ + (hp.release == rel or + hp.release.startswith(rel + '.')): + running_package = hp + + # for the running kernel's flavour, compare running vs repo + # for other flavours, compare highest installed vs repo + if running_package: + base_package = running_package + else: + base_package = host_highest - if host_highest.compare_version(repo_highest) == -1: - uid = self.process_update(host_highest, repo_highest) + if base_package and base_package.compare_version(repo_highest) == -1: + uid = self.process_update(base_package, repo_highest) if uid is not None: update_ids.append(uid) - self.check_if_reboot_required(host_highest) + # reboot check only on primary kernel packages + if host_highest and package.name.name in ( + 'kernel', 'kernel-core', 'kernel-debug-core', + 'kernel-default', 'kernel-rt', 'kernel-azure', + 'kernel-kvmsmall', + 'kernel-uek', 'kernel-uki-virt', 'kernel-debug-uki-virt', + ): + self.check_if_reboot_required(host_highest) + + return update_ids + + def _find_arch_kernel_updates(self, kernel_packages, repo_packages): + + update_ids = [] + + for package in kernel_packages: + pu_q = Q(name=package.name) + + repo_highest = None + for rp in repo_packages.filter(pu_q): + if repo_highest is None or repo_highest.compare_version(rp) == -1: + repo_highest = rp + + if repo_highest is None: + continue + + if package.compare_version(repo_highest) == -1: + uid = self.process_update(package, repo_highest) + if uid is not None: + update_ids.append(uid) + + # reboot check for main kernel packages (not -headers) + # Arch uname -r format varies by flavour: + # linux: 6.12.8-arch1-1 (pkgver=6.12.8.arch1) + # linux-lts: 6.1.68-1-lts (pkgver=6.1.68) + # linux-zen: 6.12.8-zen1-1-zen (pkgver=6.12.8.zen1) + # The only reliable common part is the base version (first segment + # of uname -r) which maps to the numeric prefix of pkgver + if package.name.name in ( + 'linux', 'linux-lts', 'linux-zen', 'linux-hardened', + 'linux-rt', 'linux-rt-lts', + ): + running_base = self.kernel.split('-')[0] if self.kernel else '' + pkg_ver = package.version + # pkgver is either 'X.Y.Z' (lts) or 'X.Y.Z.flavourN' (others) + # check if the package version matches or extends the base + if pkg_ver != running_base and not pkg_ver.startswith(running_base + '.'): + self.reboot_required = True + + return update_ids + + def _find_deb_kernel_updates(self, kernel_packages, repo_packages): + + update_ids = [] + running_flavour = self._get_running_kernel_flavour() + + # find the linux-image package matching the running kernel + running_kernel_pkg = None + for package in kernel_packages: + pkg_name = package.name.name + if pkg_name.startswith('linux-image-') and pkg_name.endswith(self.kernel): + running_kernel_pkg = package + break + + processed_prefixes = set() + for package in kernel_packages: + pkg_name = package.name.name + flavour = self._get_deb_kernel_flavour(pkg_name) + + # if we know the running flavour, only process matching packages + # if we don't (unflavoured kernel), process all kernel packages + if running_flavour and flavour != running_flavour: + continue + + # determine the prefix (e.g. 'linux-image-') + prefix = None + for p in self._deb_kernel_prefixes: + if pkg_name.startswith(p): + prefix = p + break + if prefix is None or prefix in processed_prefixes: + continue + processed_prefixes.add(prefix) + + # build endswith filter for flavoured kernels + name_filter = Q( + name__name__startswith=prefix, + packagetype='D', + ) + if running_flavour: + name_filter &= Q(name__name__endswith=f'-{running_flavour}') + + # find repo highest for this prefix+flavour + repo_highest = None + for rp in repo_packages.filter(name_filter): + if repo_highest is None or repo_highest.compare_version(rp) == -1: + repo_highest = rp + + if repo_highest is None: + continue + + # find the installed package matching the running kernel for this prefix + base_package = None + expected_name = prefix + self.kernel + for hp in self.packages.filter(name_filter): + if hp.name.name == expected_name: + base_package = hp + break + + # fallback: if no running match, use the installed package we started with + if base_package is None: + base_package = package + + if base_package.compare_version(repo_highest) == -1: + uid = self.process_update(base_package, repo_highest) + if uid is not None: + update_ids.append(uid) + + # reboot check: see if a newer linux-image is installed but not running + # use compare_version (DEB semantics) instead of labelCompare + if running_kernel_pkg: + for package in kernel_packages: + if package.name.name.startswith('linux-image-'): + flavour = self._get_deb_kernel_flavour(package.name.name) + if running_flavour is None or flavour == running_flavour: + if running_kernel_pkg.compare_version(package) == -1: + self.reboot_required = True + break + return update_ids diff --git a/hosts/tests/test_models.py b/hosts/tests/test_models.py index 2bd54dcb..12c52ba0 100644 --- a/hosts/tests/test_models.py +++ b/hosts/tests/test_models.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with Patchman. If not, see +from django.db.models import Q from django.test import TestCase, override_settings from django.utils import timezone @@ -22,7 +23,7 @@ from hosts.models import Host, HostRepo from operatingsystems.models import OSRelease, OSVariant from packages.models import Package, PackageName, PackageUpdate -from repos.models import Repository +from repos.models import Mirror, MirrorPackage, Repository @override_settings( @@ -191,3 +192,508 @@ def test_in_memory_counts_stale_after_m2m_signal(self): # verify DB has the correct value db_host = Host.objects.get(pk=self.host.pk) self.assertEqual(db_host.bug_updates_count, 1) + + +@override_settings( + CELERY_TASK_ALWAYS_EAGER=True, + CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}} +) +class KernelUpdateTests(TestCase): + """Tests for kernel update detection logic.""" + + def setUp(self): + """Set up test data with RPM kernel packages.""" + self.m_arch = MachineArchitecture.objects.create(name='x86_64') + self.domain = Domain.objects.create(name='example.com') + self.os_release = OSRelease.objects.create( + name='Rocky Linux 9', codename='' + ) + self.os_variant = OSVariant.objects.create( + name='Rocky Linux 9.4', osrelease=self.os_release + ) + self.pkg_arch = PackageArchitecture.objects.create(name='x86_64') + self.kernel_name = PackageName.objects.create(name='kernel-core') + + # create kernel packages at three versions + self.kernel_362 = Package.objects.create( + name=self.kernel_name, arch=self.pkg_arch, epoch='0', + version='5.14.0', release='362.el9', packagetype='R' + ) + self.kernel_427 = Package.objects.create( + name=self.kernel_name, arch=self.pkg_arch, epoch='0', + version='5.14.0', release='427.el9', packagetype='R' + ) + self.kernel_503 = Package.objects.create( + name=self.kernel_name, arch=self.pkg_arch, epoch='0', + version='5.14.0', release='503.el9', packagetype='R' + ) + + # set up a repo with the latest kernel + self.repo = Repository.objects.create( + name='baseos', repotype='R', arch=self.m_arch, + ) + self.mirror = Mirror.objects.create( + repo=self.repo, url='http://repo.example.com/baseos', + ) + self.mirror.packages.add(self.kernel_503) + + def _create_host(self, running_kernel, installed_kernels): + """Helper to create a host with a running kernel and installed packages.""" + host = Host.objects.create( + hostname='rocky.example.com', + ipaddress='192.168.1.50', + osvariant=self.os_variant, + kernel=running_kernel, + arch=self.m_arch, + domain=self.domain, + lastreport=timezone.now(), + host_repos_only=False, + ) + host.packages.set(installed_kernels) + return host + + def test_latest_installed_not_running(self): + """Scenario 1: latest kernel installed but not running. + + Old kernels should not show as updates. + Running→latest should be an update + reboot required. + """ + host = self._create_host( + '5.14.0-362.el9', + [self.kernel_362, self.kernel_427, self.kernel_503], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter(name=self.kernel_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 1) + update = host.updates.first() + self.assertEqual(update.oldpackage, self.kernel_362) + self.assertEqual(update.newpackage, self.kernel_503) + self.assertTrue(host.reboot_required) + + def test_latest_installed_and_running(self): + """Scenario 2: latest kernel installed and running. + + No updates should be generated. + """ + host = self._create_host( + '5.14.0-503.el9', + [self.kernel_362, self.kernel_427, self.kernel_503], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter(name=self.kernel_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 0) + self.assertFalse(host.reboot_required) + + def test_latest_not_installed(self): + """Scenario 3: latest kernel not installed. + + Update from running to repo highest. + """ + host = self._create_host( + '5.14.0-427.el9', + [self.kernel_362, self.kernel_427], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter(name=self.kernel_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 1) + update = host.updates.first() + self.assertEqual(update.oldpackage, self.kernel_427) + self.assertEqual(update.newpackage, self.kernel_503) + + def test_running_newer_than_repo(self): + """Scenario 5: running kernel is newer than repo highest. + + No updates should be generated. + """ + kernel_600 = Package.objects.create( + name=self.kernel_name, arch=self.pkg_arch, epoch='0', + version='5.14.0', release='600.el9', packagetype='R' + ) + host = self._create_host( + '5.14.0-600.el9', + [self.kernel_503, kernel_600], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter(name=self.kernel_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 0) + + def test_old_kernels_not_generating_duplicate_updates(self): + """Old kernels should not each generate their own update.""" + host = self._create_host( + '5.14.0-427.el9', + [self.kernel_362, self.kernel_427], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter(name=self.kernel_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + # should be exactly 1 update, not 2 + self.assertEqual(host.updates.count(), 1) + + def test_rpm_uname_with_arch_suffix(self): + """RPM: uname -r with .x86_64 suffix should still match correctly.""" + host = self._create_host( + '5.14.0-503.el9.x86_64', + [self.kernel_362, self.kernel_427, self.kernel_503], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter(name=self.kernel_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 0) + host.refresh_from_db() + self.assertFalse(host.reboot_required) + + def test_suse_uname_truncated_release(self): + """SUSE: uname -r truncates micro release and appends flavour.""" + suse_name = PackageName.objects.create(name='kernel-default') + pkg_8 = Package.objects.create( + name=suse_name, arch=self.pkg_arch, epoch='0', + version='6.12.0', release='160000.8.1', packagetype='R' + ) + pkg_25 = Package.objects.create( + name=suse_name, arch=self.pkg_arch, epoch='0', + version='6.12.0', release='160000.25.1', packagetype='R' + ) + suse_repo = Repository.objects.create( + name='oss', repotype='R', arch=self.m_arch, + security=False, enabled=True, + ) + suse_mirror = Mirror.objects.create( + repo=suse_repo, url='http://download.opensuse.org/oss', + ) + MirrorPackage.objects.create(mirror=suse_mirror, package=pkg_8) + MirrorPackage.objects.create(mirror=suse_mirror, package=pkg_25) + + # running 160000.8, installed 160000.8 and 160000.25 + # uname shows '6.12.0-160000.8-default' (truncated, with flavour) + host = self._create_host( + '6.12.0-160000.8-default', + [pkg_8, pkg_25], + ) + host.repos.add(suse_repo) + repo_packages = Package.objects.filter(mirror=suse_mirror) + kernel_packages = host.packages.filter(name=suse_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + # running 8 < installed 25 → one update, reboot required + self.assertEqual(host.updates.count(), 1) + host.refresh_from_db() + self.assertTrue(host.reboot_required) + + def test_suse_uname_running_is_highest(self): + """SUSE: running kernel matches highest installed (no reboot).""" + suse_name = PackageName.objects.create(name='kernel-default') + pkg_25 = Package.objects.create( + name=suse_name, arch=self.pkg_arch, epoch='0', + version='6.12.0', release='160000.25.1', packagetype='R' + ) + suse_repo = Repository.objects.create( + name='oss', repotype='R', arch=self.m_arch, + security=False, enabled=True, + ) + suse_mirror = Mirror.objects.create( + repo=suse_repo, url='http://download.opensuse.org/oss', + ) + MirrorPackage.objects.create(mirror=suse_mirror, package=pkg_25) + + # running 160000.25 = installed highest 160000.25.1 (truncated match) + host = self._create_host( + '6.12.0-160000.25-default', + [pkg_25], + ) + host.repos.add(suse_repo) + repo_packages = Package.objects.filter(mirror=suse_mirror) + kernel_packages = host.packages.filter(name=suse_name) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 0) + host.refresh_from_db() + self.assertFalse(host.reboot_required) + + +@override_settings( + CELERY_TASK_ALWAYS_EAGER=True, + CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}} +) +class DebKernelUpdateTests(TestCase): + """Tests for DEB kernel update detection logic.""" + + def setUp(self): + """Set up test data with DEB kernel packages.""" + self.m_arch = MachineArchitecture.objects.create(name='x86_64') + self.domain = Domain.objects.create(name='example.com') + self.os_release = OSRelease.objects.create( + name='Ubuntu 24.04', codename='noble' + ) + self.os_variant = OSVariant.objects.create( + name='Ubuntu 24.04.1 LTS', osrelease=self.os_release + ) + self.pkg_arch = PackageArchitecture.objects.create(name='amd64') + + # DEB kernel package names — each version is a different package name + self.img_49_name = PackageName.objects.create(name='linux-image-6.8.0-49-generic') + self.img_51_name = PackageName.objects.create(name='linux-image-6.8.0-51-generic') + self.img_53_name = PackageName.objects.create(name='linux-image-6.8.0-53-generic') + + self.img_49 = Package.objects.create( + name=self.img_49_name, arch=self.pkg_arch, epoch='', + version='6.8.0-49.50', release='', packagetype='D' + ) + self.img_51 = Package.objects.create( + name=self.img_51_name, arch=self.pkg_arch, epoch='', + version='6.8.0-51.52', release='', packagetype='D' + ) + self.img_53 = Package.objects.create( + name=self.img_53_name, arch=self.pkg_arch, epoch='', + version='6.8.0-53.54', release='', packagetype='D' + ) + + # repo has the latest kernel + self.repo = Repository.objects.create( + name='noble-security', repotype='D', arch=self.m_arch, + ) + from repos.models import Mirror + self.mirror = Mirror.objects.create( + repo=self.repo, url='http://archive.ubuntu.com/ubuntu', + ) + self.mirror.packages.add(self.img_49, self.img_51, self.img_53) + + def _create_host(self, running_kernel, installed_kernels): + host = Host.objects.create( + hostname='ubuntu.example.com', + ipaddress='192.168.1.60', + osvariant=self.os_variant, + kernel=running_kernel, + arch=self.m_arch, + domain=self.domain, + lastreport=timezone.now(), + host_repos_only=False, + ) + host.packages.set(installed_kernels) + return host + + def test_deb_latest_installed_not_running(self): + """DEB: latest kernel installed but not running. + + Should show update from running→latest. Old kernels ignored. + """ + host = self._create_host( + '6.8.0-49-generic', + [self.img_49, self.img_51, self.img_53], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter( + name__name__startswith='linux-image-' + ) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 1) + update = host.updates.first() + self.assertEqual(update.oldpackage, self.img_49) + self.assertEqual(update.newpackage, self.img_53) + + def test_deb_latest_installed_and_running(self): + """DEB: latest kernel installed and running. No updates.""" + host = self._create_host( + '6.8.0-53-generic', + [self.img_49, self.img_51, self.img_53], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter( + name__name__startswith='linux-image-' + ) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 0) + + def test_deb_latest_not_installed(self): + """DEB: repo has newer kernel not yet installed.""" + host = self._create_host( + '6.8.0-51-generic', + [self.img_49, self.img_51], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter( + name__name__startswith='linux-image-' + ) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 1) + update = host.updates.first() + self.assertEqual(update.oldpackage, self.img_51) + self.assertEqual(update.newpackage, self.img_53) + + def test_deb_flavour_extraction(self): + """Test _get_deb_kernel_flavour helper.""" + host = self._create_host('6.8.0-51-generic', [self.img_51]) + self.assertEqual( + host._get_deb_kernel_flavour('linux-image-6.8.0-51-generic'), + 'generic' + ) + self.assertEqual( + host._get_deb_kernel_flavour('linux-modules-extra-6.8.0-51-lowlatency'), + 'lowlatency' + ) + self.assertEqual( + host._get_deb_kernel_flavour('linux-image-6.1.0-28-cloud-amd64'), + 'cloud-amd64' + ) + self.assertEqual( + host._get_deb_kernel_flavour('linux-image-unsigned-6.8.0-51-generic'), + 'generic' + ) + + def test_deb_running_kernel_flavour(self): + """Test _get_running_kernel_flavour helper.""" + host = self._create_host('6.8.0-51-generic', [self.img_51]) + self.assertEqual(host._get_running_kernel_flavour(), 'generic') + + host.kernel = '6.1.0-28-cloud-amd64' + self.assertEqual(host._get_running_kernel_flavour(), 'cloud-amd64') + + host.kernel = '5.14.0-503.el9' + self.assertIsNone(host._get_running_kernel_flavour()) + + +@override_settings( + CELERY_TASK_ALWAYS_EAGER=True, + CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}} +) +class ArchKernelUpdateTests(TestCase): + """Tests for Arch Linux kernel update detection and reboot check.""" + + def setUp(self): + self.m_arch = MachineArchitecture.objects.create(name='x86_64') + self.domain = Domain.objects.create(name='example.com') + self.os_release = OSRelease.objects.create( + name='Arch Linux', codename='arch' + ) + self.os_variant = OSVariant.objects.create( + name='Arch Linux', osrelease=self.os_release + ) + self.pkg_arch = PackageArchitecture.objects.create(name='x86_64') + + self.linux_name = PackageName.objects.create(name='linux') + self.linux_headers_name = PackageName.objects.create(name='linux-headers') + + self.linux_installed = Package.objects.create( + name=self.linux_name, arch=self.pkg_arch, epoch='', + version='6.12.8.arch1', release='1', packagetype='A' + ) + self.linux_headers_installed = Package.objects.create( + name=self.linux_headers_name, arch=self.pkg_arch, epoch='', + version='6.12.8.arch1', release='1', packagetype='A' + ) + self.linux_repo = Package.objects.create( + name=self.linux_name, arch=self.pkg_arch, epoch='', + version='6.12.10.arch1', release='1', packagetype='A' + ) + self.linux_headers_repo = Package.objects.create( + name=self.linux_headers_name, arch=self.pkg_arch, epoch='', + version='6.12.10.arch1', release='1', packagetype='A' + ) + + self.repo = Repository.objects.create( + name='core', repotype='A', arch=self.m_arch, + ) + from repos.models import Mirror + self.mirror = Mirror.objects.create( + repo=self.repo, url='https://mirror.archlinux.org/core', + ) + self.mirror.packages.add(self.linux_repo, self.linux_headers_repo) + + def _create_host(self, running_kernel, installed_pkgs): + host = Host.objects.create( + hostname='arch.example.com', + ipaddress='192.168.1.70', + osvariant=self.os_variant, + kernel=running_kernel, + arch=self.m_arch, + domain=self.domain, + lastreport=timezone.now(), + host_repos_only=False, + ) + host.packages.set(installed_pkgs) + return host + + def test_arch_update_available(self): + """Arch: installed kernel older than repo → update.""" + host = self._create_host( + '6.12.8-arch1-1', + [self.linux_installed, self.linux_headers_installed], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter( + Q(name=self.linux_name) | Q(name=self.linux_headers_name) + ) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 2) + + def test_arch_no_update(self): + """Arch: installed matches repo → no update.""" + host = self._create_host( + '6.12.10-arch1-1', + [self.linux_repo, self.linux_headers_repo], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter( + Q(name=self.linux_name) | Q(name=self.linux_headers_name) + ) + + host.find_kernel_updates(kernel_packages, repo_packages) + + self.assertEqual(host.updates.count(), 0) + + def test_arch_reboot_required(self): + """Arch: installed kernel newer than running → reboot required.""" + host = self._create_host( + '6.12.8-arch1-1', + [self.linux_repo, self.linux_headers_repo], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter( + Q(name=self.linux_name) | Q(name=self.linux_headers_name) + ) + + host.find_kernel_updates(kernel_packages, repo_packages) + + host.refresh_from_db() + self.assertTrue(host.reboot_required) + + def test_arch_no_reboot_when_current(self): + """Arch: running matches installed → no reboot.""" + host = self._create_host( + '6.12.10-arch1-1', + [self.linux_repo, self.linux_headers_repo], + ) + repo_packages = Package.objects.filter(mirror=self.mirror) + kernel_packages = host.packages.filter( + Q(name=self.linux_name) | Q(name=self.linux_headers_name) + ) + + host.find_kernel_updates(kernel_packages, repo_packages) + + host.refresh_from_db() + self.assertFalse(host.reboot_required)