#!/cm/local/apps/python3/bin/python # # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: LicenseRef-NvidiaProprietary # # NVIDIA CORPORATION, its affiliates and licensors retain all intellectual # property and proprietary rights in and to this material, related # documentation and any modifications thereto. Any use, reproduction, # disclosure or distribution of this material and related documentation # without an express license agreement from NVIDIA CORPORATION or # its affiliates is strictly prohibited. # import argparse import hashlib import logging import os import sys import typing import shutil import tempfile import pprint import exec_helpers import pythoncm.cluster import pythoncm.entity from tabulate import tabulate LOGGER = logging.getLogger("cm-healthchecks-manage") LOGGER.setLevel(logging.DEBUG) # Console handler console_handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") console_handler.setFormatter(formatter) LOGGER.addHandler(console_handler) # File handler log_file = "/var/log/cm-healthchecks-manage.log" try: file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) LOGGER.addHandler(file_handler) except PermissionError: LOGGER.warning(f"Cannot write to {log_file} due to permission error. Logging to console only.") except IOError as e: LOGGER.warning(f"Cannot write to {log_file}: {e}. Logging to console only.") # Paths for health check scripts in the software images CERT_EXPIRATION_SCRIPT = "/cm/local/apps/cmd/scripts/healthchecks/kubernetescertsexpiration" CERT_EXPIRATION_SCRIPT_MD5 = "2ad91afe408396fbe41443c84bae3e65" CERT_EXPIRATION_SCRIPT_MD5_NEW = "96b77ffcf2ca08d6a07fd38c79d4e7ef" CORE_SCRIPT = "/cm/local/apps/cmd/scripts/healthchecks/kubernetescore.py" CORE_SCRIPT_MD5 = "ac430d1886f2077846983bdb280a11fb" CORE_SCRIPT_MD5_NEW = '94490aa4b3e76d3542f12b1253a7f410' def hash_with_summary(hash: str, script_idx: int) -> str: if hash == '-': return hash ret = f"{hash} (" if script_idx == 1: if hash == CERT_EXPIRATION_SCRIPT_MD5: ret += "needs patch!" elif hash == CERT_EXPIRATION_SCRIPT_MD5_NEW: ret += "ok" else: ret += "unknown" elif script_idx == 2: if hash == CORE_SCRIPT_MD5: ret += "needs patch!" elif hash == CORE_SCRIPT_MD5_NEW: ret += "ok" else: ret += "unknown" ret += ")" return ret CERT_EXPIRATION_SCRIPT_CONTENT = '''\ #!/cm/local/apps/python3/bin/python # # SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: LicenseRef-NvidiaProprietary # # NVIDIA CORPORATION, its affiliates and licensors retain all intellectual # property and proprietary rights in and to this material, related # documentation and any modifications thereto. Any use, reproduction, # disclosure or distribution of this material and related documentation # without an express license agreement from NVIDIA CORPORATION or # its affiliates is strictly prohibited. # """Check if all expected agents and services are up and running for active nodes""" import argparse import datetime import json import os import sys sys.path.append(os.path.dirname(os.path.abspath(__file__))) # We have absolute imports import kubernetescore def parse_kubeadm_output(json_output, threshold_days): now = datetime.datetime.now(datetime.timezone.utc) expiring_soon = [] try: data = json.loads(json_output) for cert_type in ["certificates", "certificateAuthorities"]: for cert in data.get(cert_type, []): if not cert['expirationDate']: continue expiration = datetime.datetime.strptime(cert["expirationDate"], "%Y-%m-%dT%H:%M:%SZ") expiration = expiration.replace(tzinfo=datetime.timezone.utc) difference = (expiration - now) days_until_expiration = difference.days if days_until_expiration <= threshold_days: expiring_soon.append( { "name": cert["name"], "expires": expiration.strftime("%b %d, %Y %H:%M UTC"), "days_left": days_until_expiration, } ) except json.decoder.JSONDecodeError: name_start = 0 expiration_start = None expiration_end = None for line in json_output.splitlines(): if line.startswith("CERTIFICATE"): expiration_start = line.find("EXPIRES") expiration_end = line.find("RESIDUAL TIME") continue name = line[name_start:expiration_start].strip() # Feb 03, 2026 13:37 UTC expiration_string = "" if len(line) >= expiration_end: expiration_string = line[expiration_start:expiration_end].strip() if not expiration_string: continue expiration = datetime.datetime.strptime(expiration_string, "%b %d, %Y %H:%M %Z") expiration = expiration.replace(tzinfo=datetime.timezone.utc) days_until_expiration = (expiration - now).days if days_until_expiration <= threshold_days: expiring_soon.append( { "name": name, "expires": expiration.strftime("%b %d, %Y %H:%M UTC"), "days_left": days_until_expiration, } ) return expiring_soon def main(): parser = argparse.ArgumentParser(description="Check for soon-to-expire Kubernetes certificates") parser.add_argument("--config", default="", help="Path to kubeadm config file") parser.add_argument( "--threshold", type=int, default=30, help="Threshold in days for expiration warning (default: 30)", ) args = parser.parse_args() json_output = kubernetescore.run_kubeadm_command(args.config) expiring_certs = parse_kubeadm_output(json_output, args.threshold) if expiring_certs: kubernetescore.debug(f"Certificates expiring within {args.threshold} days:") for cert in expiring_certs: kubernetescore.debug(f"{cert['name']}: Expires on {cert['expires']} ({cert['days_left']} days remaining)") kubernetescore.fail_and_exit( f"Are Kubernetes certificates valid for at least the next {args.threshold} days?" ) else: kubernetescore.pass_and_exit(f"No certificates are expiring within {args.threshold} days.") if __name__ == "__main__": try: main() except Exception as e: kubernetescore.fail_and_exit(f"{type(e).__name__}: {e!s}") kubernetescore.pass_and_exit() ''' CORE_SCRIPT_CONTENT = '''\ #!/cm/local/apps/python3/bin/python # # SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: LicenseRef-NvidiaProprietary # # NVIDIA CORPORATION, its affiliates and licensors retain all intellectual # property and proprietary rights in and to this material, related # documentation and any modifications thereto. Any use, reproduction, # disclosure or distribution of this material and related documentation # without an express license agreement from NVIDIA CORPORATION or # its affiliates is strictly prohibited. # import base64 import os import pathlib import subprocess import sys import typing import warnings import requests import yaml """ Script for core Kubernetes HealthCheck Scripts """ DEBUG = os.environ.get("CMD_DEBUG", "0") == "1" info_fd = int(os.environ.get("CMD_INFO_FD", 3)) def pass_and_exit(msg: str = ""): if DEBUG and msg: print(f"DEBUG: {msg}") do_print("PASS", msg) sys.exit(0) def debug(msg: typing.Union[str, bytes, int]) -> None: if DEBUG == 0: return print(f"DEBUG: {msg!r}") def do_print(status: str, message: str) -> bool: try: if status != "": print(status) if message != "": os.write(3, f"{message}".encode("utf-8")) except OSError: return False return True def fail_and_exit(msg: str = "") -> None: if DEBUG and msg: print("DEBUG: %s" % msg) do_print("FAIL", msg) sys.exit(0) KUBERNETES_KUBEADM_CONFIG = "" KUBERNETES_CA_CERT = "" KUBERNETES_API_SERVER_ENDPOINT = "" KUBERNETES_KUBELET_ENDPOINT = "" KUBERNETES_ADMIN_CERT = "" KUBERNETES_ADMIN_CERT_KEY = "" KUBERNETES_ADMIN_KUBECONFIG = "" KUBERNETES_CLUSTER_NAME = "" def get_env_var(name: str, descr: str) -> str: env = os.environ.get(name) if env is None: fail_and_exit(f"kube {descr} not configured") return env def get_admin_certificates(): global KUBERNETES_ADMIN_CERT global KUBERNETES_ADMIN_CERT_KEY KUBERNETES_ADMIN_CERT = f"/root/.kube/admin-{KUBERNETES_CLUSTER_NAME}.pem" KUBERNETES_ADMIN_CERT_KEY = f"/root/.kube/admin-{KUBERNETES_CLUSTER_NAME}.key" try: with open(KUBERNETES_ADMIN_KUBECONFIG) as config_file: config = yaml.safe_load(config_file) for user in config["users"]: if user["name"] == "kubernetes-admin": with open(KUBERNETES_ADMIN_CERT, "w") as cert_file: cert = base64.b64decode(user["user"]["client-certificate-data"]) cert_file.write(cert.decode()) with open(KUBERNETES_ADMIN_CERT_KEY, "w") as key_file: key = base64.b64decode(user["user"]["client-key-data"]) key_file.write(key.decode()) break except Exception as e: fail_and_exit(str(e)) def get_kubeadm_environment_variables(): global KUBERNETES_KUBEADM_CONFIG KUBERNETES_KUBEADM_CONFIG = get_env_var("CMD_KUBERNETES_KUBEADM_CONFIG", "kubeadm config file path") def get_apiserver_environment_variables(): global KUBERNETES_CA_CERT global KUBERNETES_API_SERVER_ENDPOINT global KUBERNETES_ADMIN_KUBECONFIG global KUBERNETES_CLUSTER_NAME KUBERNETES_CLUSTER_NAME = get_env_var("CMD_KUBERNETES_CLUSTER_NAME", "kubernetes cluster name") KUBERNETES_ADMIN_KUBECONFIG = get_env_var("CMD_KUBERNETES_ADMIN_KUBECONFIG", "configuration file") KUBERNETES_CA_CERT = get_env_var("CMD_KUBERNETES_CACERT", "ca certificate") KUBERNETES_API_SERVER_ENDPOINT = get_env_var("CMD_KUBERNETES_APISERVER_ENDPOINT", "apiserver endpoint") get_admin_certificates() def get_kubelet_environment_variables(): global KUBERNETES_CA_CERT global KUBERNETES_KUBELET_ENDPOINT KUBERNETES_CA_CERT = get_env_var("CMD_KUBERNETES_CACERT", "ca certificate") KUBERNETES_KUBELET_ENDPOINT = get_env_var("CMD_KUBERNETES_KUBELET_ENDPOINT", "kubelet endpoint") def run_apiserver( url: str, process_data_function: typing.Callable[[bytes], typing.Any], enable_debug=0, ): get_apiserver_environment_variables() return _run( KUBERNETES_API_SERVER_ENDPOINT, KUBERNETES_ADMIN_CERT, KUBERNETES_ADMIN_CERT_KEY, url, process_data_function, enable_debug, ) def run_kubelet( url: str, process_data_function: typing.Callable[[bytes], typing.Any], enable_debug=0, ): get_kubelet_environment_variables() return _run( KUBERNETES_KUBELET_ENDPOINT, KUBERNETES_ADMIN_CERT, KUBERNETES_ADMIN_CERT_KEY, url, process_data_function, enable_debug, ) def run_kubeadm_command(config: str) -> str: get_kubeadm_environment_variables() get_apiserver_environment_variables() global KUBERNETES_KUBEADM_CONFIG global KUBERNETES_ADMIN_KUBECONFIG if not config: config = KUBERNETES_KUBEADM_CONFIG # on demand retrieval of KUBERNETES_KUBEADM_CONFIG if not os.path.exists(config): cmd = ["kubectl", "--kubeconfig", KUBERNETES_ADMIN_KUBECONFIG, "get", "configmap", "-n", "kube-system", "kubeadm-config", "-o", "jsonpath={.data.ClusterConfiguration}"] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 1: fail_and_exit(f"Failed to retrieve {config}") pathlib.Path(config).write_text(result.stdout, encoding="utf-8") get_kubeadm_environment_variables() cmd = ["kubeadm", "--config", config or KUBERNETES_KUBEADM_CONFIG, "certs", "check-expiration", "-o", "json"] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 1: # No -o json argument result = subprocess.run(cmd[:-2], capture_output=True, text=True) if result.returncode == 1: fail_and_exit(result.stderr) return result.stdout def _run( endpoint: str, cert: str, key: str, url: str, process_data_function: typing.Callable[[bytes], typing.Any], enable_debug: int, ) -> None: global DEBUG DEBUG = enable_debug # hack to fix Bright HA: pass without checking if not in active master # disable the check if explicitly asked for active_env = os.environ.get("CMD_KUBERNETES_ACTIVE") if active_env == "0": pass_and_exit("not active master") with warnings.catch_warnings(): warnings.simplefilter("ignore") _process_request(endpoint, cert, key, url, process_data_function) def _process_request( endpoint, cert, key, url, process_data_function: typing.Callable[[bytes], typing.Any], ) -> None: debug(f"ca = {KUBERNETES_CA_CERT}") response = requests.get(f"{endpoint}{url}", cert=(cert, key), verify=KUBERNETES_CA_CERT) debug(response.status_code) process_data_function(response.content) ''' # TODO: currently copied from cm-kubeadm-manage def get_master_data( cluster: pythoncm.cluster.Cluster, kc: pythoncm.entity.KubeCluster ) -> tuple[set[pythoncm.entity.Node], pythoncm.entity.ConfigurationOverlay]: master_overlay = None master_nodes = set() # This takes all nodes that seem to be tied to a control plane role via overlays # Note that customers can freely create overlays with different priorities # We will later check on all the nodes if they are in fact kubelets with controlPlane set to True for overlay in cluster.get_by_type(pythoncm.entity.ConfigurationOverlay): for role in overlay.roles: if role.name.lower() == "kubelet" and role.kubeCluster.name == kc.name: LOGGER.info(f"Overlay {overlay.name} has role {role.name} for kube cluster {role.kubeCluster.name}") nodes = [n.hostname for n in overlay.all_nodes] if role.controlPlane: master_nodes.update(nodes) if master_overlay is None or overlay.priority > master_overlay.priority: master_overlay = overlay if master_overlay is None: LOGGER.error("No master overlay found") sys.exit(1) # Now we fetch the role for each node (get_role takes into consideration priorities of overlays etc.) # First convert hostnames to entity, then filter master_nodes = {typing.cast(pythoncm.entity.Node, cluster.get_by_name(node, pythoncm.entity.Node)) for node in master_nodes} master_nodes = { node for node in master_nodes if (kubelet_role := node.get_role(pythoncm.entity.KubeletRole)) and kubelet_role.kubeCluster.name == kc.name and kubelet_role.controlPlane } return master_nodes, master_overlay def execute(ssh_client, command) -> exec_helpers.exec_result.ExecResult: """Execute a command via the provided SSH client.""" LOGGER.debug(f"Executing: {command}") return ssh_client.execute(command) def check_cluster(cluster: pythoncm.cluster.Cluster, kube_cluster: str) -> pythoncm.entity.KubeCluster: """Verify that the specified Kubernetes cluster exists.""" kc: pythoncm.entity.KubeCluster = cluster.get_by_name(kube_cluster, pythoncm.entity.KubeCluster) if not kc: LOGGER.error(f"Provided kube cluster {kube_cluster} not found") sys.exit(1) return kc def calculate_md5(content: str) -> str: """Calculate MD5 hash of a string.""" return hashlib.md5(content.encode()).hexdigest() def get_file_md5(ssh_client, file_path: str) -> typing.Optional[str]: """Get MD5 hash of a file.""" result = execute(ssh_client, f"md5sum {file_path}") if not result.ok: LOGGER.warning(f"Failed to get MD5 of {file_path}: {result.stderr_str}") return None md5_parts = result.stdout_str.split() if md5_parts: return md5_parts[0] return None def read_file_content(ssh_client, file_path: str) -> typing.Optional[str]: """Read content of a file.""" result = execute(ssh_client, f"cat {file_path}") if not result.ok: LOGGER.warning(f"Failed to read {file_path}: {result.stderr_str}") return None return result.stdout_str def write_file_content(ssh_client, file_path: str, content: str, mode: int = 0o700) -> bool: """Write content to a file.""" with tempfile.NamedTemporaryFile(mode='w', delete=True) as tmp_file: tmp_file.write(content) tmp_file.flush() if ssh_client is None: try: shutil.copy(tmp_file.name, file_path) except Exception as e: LOGGER.error(f"Failed to copy {file_path}: {e}") return False try: os.chmod(file_path, mode) except Exception as e: LOGGER.error(f"Failed to set permissions on {file_path}: {e}") return False tmp_file.close() return True try: ssh_client.upload(tmp_file.name, file_path) except Exception as e: import pprint pprint.pprint(e) LOGGER.error(f"Failed to upload {file_path}: {e}") return False mode_str = oct(mode)[2:] # 0o700 -> '0o700' -> '700' result = ssh_client.execute(f"chmod {mode_str} {file_path}") if not result.ok: LOGGER.warning(f"Failed to set permissions on {file_path}: {result.stderr_str}") return False tmp_file.close() return True def get_healthcheck_scripts_info( cluster: pythoncm.cluster.Cluster, kc: pythoncm.entity.KubeCluster ) -> typing.Dict[str, typing.Dict[str, typing.Dict[str, str]]]: """ Collect information about health check scripts on nodes and in software images. Returns a dictionary with structure: { node_hostname: { 'node': { 'cert_script': {'path': path, 'md5': md5}, 'core_script': {'path': path, 'md5': md5} }, 'image': { 'cert_script': {'path': path, 'md5': md5}, 'core_script': {'path': path, 'md5': md5} } } } """ master_nodes, _ = get_master_data(cluster, kc) LOGGER.info(f"Master nodes: {', '.join(m.hostname for m in master_nodes)}") # Collect unique software images software_images = set() node_software_images = {} for node in master_nodes: try: software_images.add(node.software_image.path) node_software_images[node.hostname] = node.software_image.path except (KeyError, AttributeError): LOGGER.debug(f"Could not find software image for node: {node.hostname}") LOGGER.info(f"Found {len(software_images)} unique software images") result = {} local_exec = exec_helpers.Subprocess() for node in master_nodes: node_info = { 'node': { 'cert_script': {'path': CERT_EXPIRATION_SCRIPT, 'md5': None}, 'core_script': {'path': CORE_SCRIPT, 'md5': None} }, 'image': { 'cert_script': {'path': None, 'md5': None}, 'core_script': {'path': None, 'md5': None} } } # Get node script MD5s with exec_helpers.SSHClient(host=node.hostname) as ssh: node_info['node']['cert_script']['md5'] = get_file_md5(ssh, CERT_EXPIRATION_SCRIPT) node_info['node']['core_script']['md5'] = get_file_md5(ssh, CORE_SCRIPT) sw_image : typing.Optional[str] = node_software_images[node.hostname] if node.hostname in node_software_images else None if sw_image: cert_script_path = os.path.join(sw_image, CERT_EXPIRATION_SCRIPT.lstrip('/')) core_script_path = os.path.join(sw_image, CORE_SCRIPT.lstrip('/')) node_info['image']['cert_script']['path'] = cert_script_path node_info['image']['core_script']['path'] = core_script_path node_info['image']['cert_script']['md5'] = get_file_md5(local_exec, cert_script_path) node_info['image']['core_script']['md5'] = get_file_md5(local_exec, core_script_path) result[node.hostname] = node_info return result def status(cluster: pythoncm.cluster.Cluster, kc: pythoncm.entity.KubeCluster): """Display status information about health check scripts.""" LOGGER.info(f"Checking health checks status for cluster {kc.name}") script_info = get_healthcheck_scripts_info(cluster, kc) # Prepare data for tabulate table_data = [] headers = ["Node", "Script", "Node MD5", "Software Image MD5"] for node, info in script_info.items(): # Add cert expiration script info node_cert_md5 = info['node']['cert_script']['md5'] or "-" image_cert_md5 = info['image']['cert_script']['md5'] or "-" table_data.append([ node, os.path.basename(CERT_EXPIRATION_SCRIPT), hash_with_summary(node_cert_md5, 1), hash_with_summary(image_cert_md5, 1) ]) # Add core script info node_core_md5 = info['node']['core_script']['md5'] or "-" image_core_md5 = info['image']['core_script']['md5'] or "-" table_data.append([ "", os.path.basename(CORE_SCRIPT), hash_with_summary(node_core_md5, 2), hash_with_summary(image_core_md5, 2) ]) # Display results using tabulate print("\nHealth Check Scripts Status:") print("==========================") print(tabulate(table_data, headers=headers, tablefmt="grid")) return script_info def patch(cluster: pythoncm.cluster.Cluster, kc: pythoncm.entity.KubeCluster, force: bool = False): """ Update health check scripts on both software images and nodes. If the MD5 hash matches the expected hash, replace the file with new content. """ LOGGER.info(f"Patching health check scripts for cluster {kc.name}") script_info = get_healthcheck_scripts_info(cluster, kc) # Track which scripts were updated updated_files = [] pprint.pprint(script_info) seen: set[str] = set() for node, info in script_info.items(): LOGGER.info(f"Processing node: {node}") with exec_helpers.SSHClient(host=node) as ssh: # Process certificate expiration script node_cert_md5 = info['node']['cert_script']['md5'] if node_cert_md5 == CERT_EXPIRATION_SCRIPT_MD5 or force: LOGGER.info(f"Updating {CERT_EXPIRATION_SCRIPT} on node {node} (forced = {force!r})") if write_file_content(ssh, CERT_EXPIRATION_SCRIPT, CERT_EXPIRATION_SCRIPT_CONTENT): updated_files.append(f"Node {node}: {CERT_EXPIRATION_SCRIPT}") else: LOGGER.info(f"Skipping {CERT_EXPIRATION_SCRIPT} on node {node} (MD5 mismatch: {node_cert_md5} != {CERT_EXPIRATION_SCRIPT_MD5})") # Process core script node_core_md5 = info['node']['core_script']['md5'] if node_core_md5 == CORE_SCRIPT_MD5 or force: LOGGER.info(f"Updating {CORE_SCRIPT} on node {node} (forced = {force!r})") if write_file_content(ssh, CORE_SCRIPT, CORE_SCRIPT_CONTENT): updated_files.append(f"Node {node}: {CORE_SCRIPT}") else: LOGGER.info(f"Skipping {CORE_SCRIPT} on node {node} (MD5 mismatch: {node_core_md5} != {CORE_SCRIPT_MD5})") with exec_helpers.Subprocess() as subprocess: # Process software image scripts if they exist image_cert_path = info['image']['cert_script']['path'] if image_cert_path is None: continue if image_cert_path in seen: LOGGER.info(f"Skipping {image_cert_path} (already processed)") continue else: seen.add(image_cert_path) image_cert_md5 = info['image']['cert_script']['md5'] if image_cert_path and (image_cert_md5 == CERT_EXPIRATION_SCRIPT_MD5 or force): LOGGER.info(f"Updating {image_cert_path} (software image of {node}, ..., force={force!r})") if write_file_content(None, image_cert_path, CERT_EXPIRATION_SCRIPT_CONTENT, mode=0o700): updated_files.append(f"Software image: {image_cert_path}") else: LOGGER.info(f"Skipping {image_cert_path} (MD5 mismatch: {image_cert_md5} != {CERT_EXPIRATION_SCRIPT_MD5})") image_core_path = info['image']['core_script']['path'] image_core_md5 = info['image']['core_script']['md5'] if image_core_path and (image_core_md5 == CORE_SCRIPT_MD5 or force): LOGGER.info(f"Updating {image_core_path} (software image of {node}, ..., force={force!r})") if write_file_content(None, image_core_path, CORE_SCRIPT_CONTENT, mode=0o600): updated_files.append(f"Software image: {image_core_path}") else: LOGGER.info(f"Skipping {image_core_path} (MD5 mismatch: {image_core_md5} != {CORE_SCRIPT_MD5})") if updated_files: LOGGER.info("Successfully updated the following files:") for file in updated_files: LOGGER.info(f"- {file}") else: LOGGER.info("No files needed to be updated") def main(): parser = argparse.ArgumentParser(description="Manage Kubernetes health check scripts") parser.add_argument("--kube-cluster", type=str, required=True, help="Kubernetes cluster name (required)") subparsers = parser.add_subparsers(dest="action", help="Action to perform", required=True) subparsers.add_parser("status", help="Show health check scripts status") patch_parser = subparsers.add_parser("patch", help="Update health check scripts") patch_parser.add_argument("--force", action="store_true", help="Force update scripts regardless of MD5 check") args = parser.parse_args() LOGGER.info(f"##### CLI invoked: {sys.argv} #####") cluster = pythoncm.cluster.Cluster() kube_cluster = args.kube_cluster kc = check_cluster(cluster, kube_cluster) if args.action == "status": status(cluster, kc) elif args.action == "patch": patch(cluster, kc, force=args.force) if __name__ == "__main__": main()