# The script: # 1. Extract all the images used by the Kubeflow Working Groups # - The reported image lists are saved in respective files under ../image_lists directory # 2. Scan the reported images using Trivy for security vulnerabilities # - Scanned reports will be saved in JSON format inside ../image_lists/security_scan_reports/ folder for each Working Group # 3. The script will also generate a summary of the security scan reports with severity counts for each Working Group with images # - Summary of security counts with images a JSON file inside ../image_lists/summary_of_severity_counts_for_WG folder # 4. Generate a summary of the security scan reports # - The summary will be saved in JSON format inside ../image_lists/summary_of_severity_counts_for_WG folder # 5. Before run this file you have to # 1. Install kustomize # - sudo apt install snapd # - sudo snap install kustomize # 2. Install trivy # - sudo apt install snapd # - sudo snap install trivy # 4. Install Python # 5. Install prettytable # - pip install prettytable # The script must be executed from the hack folder as it use relative paths import os import subprocess import re import argparse import json import glob from prettytable import PrettyTable # Dictionary mapping Kubeflow workgroups to directories containing kustomization files wg_dirs = { "automl": "../apps/katib/upstream/installs", "pipelines": "../apps/pipeline/upstream/env ../apps/kfp-tekton/upstream/env", "training": "../apps/training-operator/upstream/overlays", "manifests": "../common/cert-manager/cert-manager/base ../common/cert-manager/kubeflow-issuer/base ../common/istio-1-24/istio-crds/base ../common/istio-1-24/istio-namespace/base ../common/istio-1-24/istio-install/overlays/oauth2-proxy ../common/oauth2-proxy/overlays/m2m-self-signed ../common/dex/overlays/oauth2-proxy ../common/knative/knative-serving/overlays/gateways ../common/knative/knative-eventing/base ../common/istio-1-24/cluster-local-gateway/base ../common/kubeflow-namespace/base ../common/kubeflow-roles/base ../common/istio-1-24/kubeflow-istio-resources/base", "workbenches": "../apps/pvcviewer-controller/upstream/base ../apps/admission-webhook/upstream/overlays ../apps/centraldashboard/overlays ../apps/jupyter/jupyter-web-app/upstream/overlays ../apps/volumes-web-app/upstream/overlays ../apps/tensorboard/tensorboards-web-app/upstream/overlays ../apps/profiles/upstream/overlays ../apps/jupyter/notebook-controller/upstream/overlays ../apps/tensorboard/tensorboard-controller/upstream/overlays", "serving": "../apps/kserve - ../apps/kserve/models-web-app/overlays/kubeflow", "model-registry": "../apps/model-registry/upstream", } DIRECTORY = "../image_lists" os.makedirs(DIRECTORY, exist_ok=True) SCAN_REPORTS_DIR = os.path.join(DIRECTORY, "security_scan_reports") ALL_SEVERITY_COUNTS = os.path.join(DIRECTORY, "severity_counts_with_images_for_WG") SUMMARY_OF_SEVERITY_COUNTS = os.path.join( DIRECTORY, "summary_of_severity_counts_for_WG" ) os.makedirs(SCAN_REPORTS_DIR, exist_ok=True) os.makedirs(ALL_SEVERITY_COUNTS, exist_ok=True) os.makedirs(SUMMARY_OF_SEVERITY_COUNTS, exist_ok=True) def log(*args, **kwargs): # Custom log function that print messages with flush=True by default. kwargs.setdefault("flush", True) print(*args, **kwargs) def save_images(wg, images, version): # Saves a list of container images to a text file named after the workgroup and version. output_file = f"../image_lists/kf_{version}_{wg}_images.txt" with open(output_file, "w") as f: f.write("\n".join(images)) log(f"File {output_file} successfully created") def validate_semantic_version(version): # Validates a semantic version string (e.g., "0.1.2" or "latest"). regex = r"^[0-9]+\.[0-9]+\.[0-9]+$" if re.match(regex, version) or version == "latest": return version else: raise ValueError(f"Invalid semantic version: '{version}'") def extract_images(version): version = validate_semantic_version(version) log(f"Running the script using Kubeflow version: {version}") all_images = set() # Collect all unique images across workgroups for wg, dirs in wg_dirs.items(): wg_images = set() # Collect unique images for this workgroup for dir_path in dirs.split(): for root, _, files in os.walk(dir_path): for file in files: if file in [ "kustomization.yaml", "kustomization.yml", "Kustomization", ]: full_path = os.path.join(root, file) try: # Execute `kustomize build` to render the kustomization file result = subprocess.run( ["kustomize", "build", root], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) except subprocess.CalledProcessError as e: log( f'ERROR:\t Failed "kustomize build" command for directory: {root}. See error above' ) continue # Use regex to find lines with 'image: :' or 'image: ' # and '- image: :' but avoid environment variables kustomize_images = re.findall( r"^\s*-?\s*image:\s*([^$\s:]+(?:\:[^\s]+)?)$", result.stdout, re.MULTILINE, ) wg_images.update(kustomize_images) # Ensure uniqueness within workgroup images uniq_wg_images = sorted(wg_images) all_images.update(uniq_wg_images) save_images(wg, uniq_wg_images, version) # Ensure uniqueness across all workgroups uniq_images = sorted(all_images) save_images("all", uniq_images, version) parser = argparse.ArgumentParser( description="Extract images from Kubeflow kustomizations." ) # Define a positional argument 'version' with optional occurrence and default value 'latest'. You can run this file as python3 .py or python .py parser.add_argument( "version", nargs="?", type=str, default="latest", help="Kubeflow version to use (defaults to latest).", ) args = parser.parse_args() extract_images(args.version) log("Started scanning images") # Get list of text files excluding "kf_latest_all_images.txt" files = [ f for f in glob.glob(os.path.join(DIRECTORY, "*.txt")) if not f.endswith("kf_latest_all_images.txt") ] # Loop through each text file in the specified directory for file in files: log(f"Scanning images in {file}") file_base_name = os.path.basename(file).replace(".txt", "") # Directory to save reports for this specific file file_reports_dir = os.path.join(SCAN_REPORTS_DIR, file_base_name) os.makedirs(file_reports_dir, exist_ok=True) # Directory to save security count severity_count = os.path.join(file_reports_dir, "severity_counts") os.makedirs(severity_count, exist_ok=True) with open(file, "r") as f: lines = f.readlines() for line in lines: line = line.strip() image_name = line.split(":")[0] image_tag = line.split(":")[1] if ":" in line else "" image_name_scan = image_name.split("/")[-1] if image_tag: image_name_scan = f"{image_name_scan}_{image_tag}" scan_output_file = os.path.join( file_reports_dir, f"{image_name_scan}_scan.json" ) log(f"Scanning ", line) try: result = subprocess.run( [ "trivy", "image", "--format", "json", "--output", scan_output_file, line, ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) with open(scan_output_file, "r") as json_file: scan_data = json.load(json_file) if not scan_data.get("Results"): log(f"No vulnerabilities found in {image_name}:{image_tag}") else: vulnerabilities_list = [ result["Vulnerabilities"] for result in scan_data["Results"] if "Vulnerabilities" in result and result["Vulnerabilities"] ] if not vulnerabilities_list: log( f"The vulnerabilities detection may be insufficient because security updates are not provided for {image_name}:{image_tag}\n" ) else: severity_counts = {"LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0} for vulnerabilities in vulnerabilities_list: for vulnerability in vulnerabilities: severity = vulnerability.get("Severity", "UNKNOWN") if severity == "UNKNOWN": continue elif severity in severity_counts: severity_counts[severity] += 1 report = {"image": line, "severity_counts": severity_counts} image_table = PrettyTable() image_table.field_names = ["Critical", "High", "Medium", "Low"] image_table.add_row( [ severity_counts["CRITICAL"], severity_counts["HIGH"], severity_counts["MEDIUM"], severity_counts["LOW"], ] ) log(f"{image_table}\n") severity_report_file = os.path.join( severity_count, f"{image_name_scan}_severity_report.json" ) with open(severity_report_file, "w") as report_file: json.dump(report, report_file, indent=4) except subprocess.CalledProcessError as e: log(f"Error scanning {image_name}:{image_tag}") log(e.stderr) # Combine all the JSON files into a single file with severity counts for all images json_files = glob.glob(os.path.join(severity_count, "*.json")) output_file = os.path.join(ALL_SEVERITY_COUNTS, f"{file_base_name}.json") if not json_files: log(f"No JSON files found in '{severity_count}'. Skipping combination.") else: combined_data = [] for json_file in json_files: with open(json_file, "r") as jf: combined_data.append(json.load(jf)) with open(output_file, "w") as of: json.dump({"data": combined_data}, of, indent=4) log(f"JSON files successfully combined into '{output_file}'") # File to save summary of the severity counts for WGs as JSON format. summary_file = os.path.join( SUMMARY_OF_SEVERITY_COUNTS, "severity_summary_in_json_format.json" ) # Initialize counters unique_images = {} # unique set of images across all WGs total_images = 0 total_low = 0 total_medium = 0 total_high = 0 total_critical = 0 # Initialize a dictionary to hold the final JSON data merged_data = {} # Loop through each JSON file in the ALL_SEVERITY_COUNTS for file_path in glob.glob(os.path.join(ALL_SEVERITY_COUNTS, "*.json")): # Split filename based on underscores filename_parts = os.path.basename(file_path).split("_") # Check if there are at least 3 parts (prefix, name, _images) if len(filename_parts) >= 4: # Extract name (second part) filename = filename_parts[2] filename = filename.capitalize() else: log(f"Skipping invalid filename format: {file_path}") continue with open(file_path, "r") as f: data = json.load(f)["data"] # Initialize counts for this file image_count = len(data) low = sum(entry["severity_counts"]["LOW"] for entry in data) medium = sum(entry["severity_counts"]["MEDIUM"] for entry in data) high = sum(entry["severity_counts"]["HIGH"] for entry in data) critical = sum(entry["severity_counts"]["CRITICAL"] for entry in data) # Update unique_images for the total counts later for d in data: unique_images[d["image"]] = d # Create the output for this file file_data = { "images": image_count, "LOW": low, "MEDIUM": medium, "HIGH": high, "CRITICAL": critical, } # Update merged_data with filename as key merged_data[filename] = file_data # Update the total counts unique_images = unique_images.values() # keep the set of values total_images += len(unique_images) total_low += sum(entry["severity_counts"]["LOW"] for entry in unique_images) total_medium += sum(entry["severity_counts"]["MEDIUM"] for entry in unique_images) total_high += sum(entry["severity_counts"]["HIGH"] for entry in unique_images) total_critical += sum(entry["severity_counts"]["CRITICAL"] for entry in unique_images) # Add total counts to merged_data merged_data["total"] = { "images": total_images, "LOW": total_low, "MEDIUM": total_medium, "HIGH": total_high, "CRITICAL": total_critical, } log("Summary in Json Format:") log(json.dumps(merged_data, indent=4)) # Write the final output to a file with open(summary_file, "w") as summary_f: json.dump(merged_data, summary_f, indent=4) log(f"Summary written to: {summary_file} as JSON format") # Load JSON content from the file with open(summary_file, "r") as file: data = json.load(file) # Define a mapping for working group names groupnames = { "Automl": "AutoML", "Pipelines": "Pipelines", "Workbenches": "Workbenches(Notebooks)", "Serving": "Kserve", "Manifests": "Manifests", "Training": "Training", "Model-registry": "Model Registry", "total": "All Images", } # Create PrettyTable table = PrettyTable() table.field_names = [ "Working Group", "Images", "Critical CVE", "High CVE", "Medium CVE", "Low CVE", ] # Populate the table with data for group_name in groupnames: if group_name in data: # Check if group_name exists in data value = data[group_name] table.add_row( [ groupnames[group_name], value["images"], value["CRITICAL"], value["HIGH"], value["MEDIUM"], value["LOW"], ] ) # log the table log(table) # Write the table output to a file in the specified folder output_file = ( SUMMARY_OF_SEVERITY_COUNTS + "/summary_of_severity_counts_for_WGs_in_table.txt" ) with open(output_file, "w") as f: f.write(str(table)) log("Output saved to:", output_file) log("Severity counts with images respect to WGs are saved in the",ALL_SEVERITY_COUNTS) log("Scanned Json reports on images are saved in",SCAN_REPORTS_DIR)