-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathcheck-netbox-machine-state.py
More file actions
executable file
·98 lines (81 loc) · 3.58 KB
/
check-netbox-machine-state.py
File metadata and controls
executable file
·98 lines (81 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/python3
# Copyright SUSE LLC
"""Verify qe-lsg NetBox machines with non-excluded statuses are not reachable via ping.
Exits 1 if any such machine responds to ping, indicating it is unexpectedly powered on.
By default machines with status active, unused, or staged are excluded from the check.
"""
from __future__ import annotations
import argparse
import logging
import operator
import os
import sys
from functools import reduce
from typing import TYPE_CHECKING
import pynetbox
import sh
if TYPE_CHECKING:
from pynetbox.models import dcim, ipam
ping_args = {"-c1", "-W1"}
_ping = sh.Command("ping")
def check_ping(destination: str | ipam.IpAddresses) -> bool:
"""Signal (True) that a machine is reachable when it should not be."""
destination = str(destination).split("/")[0]
try:
_ = _ping(destination, *ping_args)
except sh.ErrorReturnCode:
return False
log.warning("Ping to destination %s was successfull, this will fail the script!", destination)
return True
def check_machine(machine: dcim.Devices) -> bool:
"""Detect reachability across all known addresses of a machine to avoid false negatives."""
attributes_to_check = {"oob_ip", "primary_ip", "primary_ip4", "primary_ip6"}
machine_attributes = [getattr(machine, attr) for attr in attributes_to_check if getattr(machine, attr) is not None]
log.debug(
"Going to ping the attributes %s (values: %s) from machine %s", attributes_to_check, machine_attributes, machine
)
return any([check_ping(x) for x in machine_attributes] + [check_ping(machine.name + ".")])
def main(args: argparse.Namespace) -> int:
"""Fail if any non-excluded qe-lsg machine is still reachable, indicating it was not properly powered off."""
nb = pynetbox.api(args.netbox_url, token=args.netbox_token)
excluded_states = reduce(operator.add, args.exclude_status)
machine_filters = {
"tag": "qe-lsg",
"status__n": set(excluded_states),
}
our_machines = nb.dcim.devices.filter(**machine_filters)
return int(any(check_machine(machine) for machine in our_machines))
def loglevel_to_int(loglevel: str) -> int:
"""Allow LOGLEVEL env var to set the default verbosity as if the user had passed that many -v flags."""
loglevel_step = logging.CRITICAL - logging.ERROR
max_loglevel = logging.CRITICAL // loglevel_step
return max_loglevel - int(getattr(logging, loglevel.upper(), logging.ERROR) // loglevel_step)
log = logging.getLogger(sys.argv[0] if __name__ == "__main__" else __name__)
if __name__ == "__main__":
logging.basicConfig()
parser = argparse.ArgumentParser()
parser.add_argument(
"-v",
"--verbose",
help="Increase verbosity level, specify multiple times to increase verbosity",
action="count",
default=loglevel_to_int(os.environ.get("LOGLEVEL", "ERROR").upper()),
)
parser.add_argument(
"--netbox-url", help="Netbox instance to use", default=os.environ.get("NETBOX_URL", "https://netbox.suse.de")
)
parser.add_argument(
"--netbox-token", help="API token used to fetch entries from netbox", default=os.environ.get("NETBOX_TOKEN", "")
)
parser.add_argument("--exclude-status", action="append", nargs="+", default=[["active", "unused", "staged"]])
args = parser.parse_args()
verbose_to_log = {
0: logging.CRITICAL,
1: logging.ERROR,
2: logging.WARNING,
3: logging.INFO,
4: logging.DEBUG,
}
log.setLevel(logging.DEBUG if args.verbose > max(verbose_to_log) else verbose_to_log[args.verbose])
log.debug(args)
sys.exit(main(args))