#!/usr/bin/env python3 from functools import cached_property from collections import namedtuple from valve_gfx_ci.gfxinfo import find_gpus, cache_db import multiprocessing import argparse import platform import requests import serial import socket import struct import fcntl import glob import math import time import sys import re import os NetworkConf = namedtuple("NetworkConf", ['mac', 'ipv4']) def next_power_of_2(x): return 1 if x == 0 else 2**math.ceil(math.log2(x)) def readfile(root, filename): with open(os.path.join(root, filename)) as f: return f.read().strip() class MachineInfo: def __init__(self): self.gpus = find_gpus(allow_db_updates=False) if self.gpus: print(f"Found {len(self.gpus)} GPU(s):") for gpu in self.gpus: print(f" * {gpu}") print() else: print("No GPU found") print(f"Machine tags: {", ".join(sorted(self.machine_tags))}\n") @property def gpu(self): if len(self.gpus) > 0: return self.gpus[0] @property def machine_base_name(self) -> str: if self.gpus: return "+".join(sorted([g.base_name for g in self.gpus])).lower() else: # Default to using the board name, if available through the device tree try: with open("/proc/device-tree/compatible") as f: compatible = f.read().split('\0')[0].split(',') if len(compatible) == 2: vendor, model = compatible return f"{vendor}-{model}".lower() except Exception as e: print(f"Failed to get the compatible: {e}") return f"unk-{platform.machine()}" @property def cpu_tags(self) -> set[str]: def get_cpu_count(): cpus = set() for cpu_topology_path in glob.iglob("/sys/devices/system/cpu/cpu*/topology/"): package_id = int(readfile(cpu_topology_path, 'physical_package_id')) core_id = int(readfile(cpu_topology_path, 'core_id')) cpus.add((package_id, core_id)) return max(1, len(cpus)) tags = set() cpu_count = get_cpu_count() tags.add(f"cpu:arch:{platform.machine()}") # This value may change depending on the kernel (Linux vs Windows) tags.add(f"cpu:cores:{cpu_count}") if cpu_count >= 4: tags.add(f"cpu:cores:4+") if cpu_count >= 16: tags.add(f"cpu:cores:16+") return tags @property def ram_tags(self) -> set[str]: def ram_size(): with open("/proc/meminfo", "rt") as f: for line in f: if m := re.match(r'MemTotal:[ \t]+(\d+) kB', line):\ return int(m.groups()[0]) tags = set() mem_gib = next_power_of_2(ram_size() / 1024 / 1024) tags.add(f"mem:size:{mem_gib}GiB") if mem_gib >= 4: tags.add(f"mem:size:4+GiB") if mem_gib >= 16: tags.add(f"mem:size:16+GiB") if mem_gib >= 64: tags.add(f"mem:size:64+GiB") return tags @property def firmware_tags(self) -> set[str]: tags = set() # Check if this is an EFI firmware if os.path.exists("/sys/firmware/efi"): tags.add("firmware:efi") else: tags.add("firmware:non-efi") # Check if the machine has resizeable bar enabled for gpu in self.gpus: try: gpu_path = gpu.pci_device.sysfs_path() except Exception: continue # NOTE: This tag is disabled because it requires Linux 6.1 and some # Mesa jobs require an older kernel and thus incorrectly assume that # the BAR is fixed... failing the tags comparison test # if os.path.exists(f"{gpu_path}/resource0_resize"): # tags.add("firmware:gpu:bar0:resizeable") # else: # tags.add("firmware:gpu:bar0:fixedsized") try: bar0_mib = int(os.path.getsize(f"{gpu_path}/resource0") / 1024 / 1024) tags.add(f"firmware:gpu:bar0:{bar0_mib}MiB") except Exception as e: print(f"Can't check the size of BAR0: {e}") # TODO: Add DMI decoding to get the BIOS vendor, version, and release date # See https://wiki.osdev.org/System_Management_BIOS for more details return tags @property def gpu_tags(self) -> set[str]: tags = set() for gpu in self.gpus: tags = tags | gpu.tags return tags @cached_property def machine_tags(self) -> set[str]: return set().union(self.gpu_tags, self.cpu_tags, self.ram_tags, self.firmware_tags) @property def default_network_interface(self) -> str: with open("/proc/net/route", "rt") as f: for line in f: if m := re.match(r'^(?P\w+)[ \t]+(?P[A-F0-9]+)', line): fields = m.groupdict() if fields['destination'] == '00000000': return fields['nif'] @classmethod def __iface_query_param(cls, iface, param) -> bytes: # Implementation from: # https://code.activestate.com/recipes/439094-get-the-ip-address-associated-with-a-network-inter with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: try: return fcntl.ioctl(s.fileno(), param, struct.pack('256s', iface.encode('utf8'))) except OSError: # Iface doesn't exist, or no IP assigned raise ValueError(f"The interface {iface} has no IP assigned") from None @property def default_gateway_nif_addrs(self) -> str: def get_addr_ipv4(nif): return socket.inet_ntop(socket.AF_INET, self.__iface_query_param(nif, 0x8915)[20:24]) # SIOCGIFADDR def get_mac_addr(nif): mac_bytes = self.__iface_query_param(nif, 0x8927)[18:24] # SIOCGIFHWADDR return ":".join([f'{b:02x}' for b in mac_bytes]) if nif := self.default_network_interface: ipv4 = get_addr_ipv4(nif) mac = get_mac_addr(nif) # NOTE: If IPv6 were to be needed in the future, it could be read from procfs: # $ cat /proc/net/if_inet6 # 00000000000000000000000000000001 01 80 10 80 lo # fe80000000000000fec90893172808ea 03 40 20 80 enp4s0 return NetworkConf(mac, ipv4) raise ValueError("Your computer does not seem connected to a network") @cached_property def kernel_cmdline(self) -> str: with open("/proc/cmdline") as f: return f.read() @property def machine_id(self) -> str: # Use the android bootloaders' serial number as a machine ID if present if m := re.search(r'\bandroidboot.serialno=(\S+)\b', self.kernel_cmdline): return m.group(1) else: # Default to the mac address of the NIC which acts as the default gateway of the machine return self.default_gateway_nif_addrs.mac def send_through_local_tty_device(self, msg, tty_device=None): if tty_device is None: tty_device = self.local_tty_device if tty_device is not None: with serial.Serial(tty_device, baudrate=115200, timeout=1) as ser: ser.write(msg.encode()) @classmethod def list_all_possible_serial_ports(cls) -> set[str]: # Until https://github.com/pyserial/pyserial/pull/709 lands, open code the method devices = set() with open('/proc/tty/drivers') as drivers_file: drivers = drivers_file.readlines() for driver in drivers: items = driver.strip().split() if items[4] == 'serial': devices.update(glob.glob(items[1]+'*')) return devices @cached_property def local_tty_device(self) -> str: def ping_serial_port(port): try: ser = serial.Serial(port, baudrate=115200, timeout=1) except serial.serialutil.SerialException as e: print(f"Failed to open the tty {port}: {e}") sys.exit(42) # Make sure we start from a clean slate ser.reset_input_buffer() # Try pinging SALAD up to 3 times to work around early bytes being lost on some serial adapters for i in range (3): # Send a ping, and wait for the pong ser.write(b"\nSALAD.ping\n") line = ser.readline() print(f"{port}: Received {line}") if line == b"SALAD.pong\n": sys.exit(0) sys.exit(42) # Get all available ports ports = self.list_all_possible_serial_ports() if len(ports) == 0: print("WARNING: No serial ports found!") return None # Find all the available ports pending_processes = {} for port in ports: p = multiprocessing.Process(target=ping_serial_port, args=(port,)) p.start() pending_processes[p] = port # Find out which one is connected first_port_found = None while first_port_found is None and len(pending_processes) > 0: # Wait for a process to die (better than polling) time.sleep(0.01) # os.wait() # Check the state of all the pending processes for p in list(pending_processes.keys()): if p.exitcode is not None: # Remove the process from the pending list port = pending_processes.pop(p) if p.exitcode == 0: first_port_found = port break # Kill all the processes we created, then wait for them to die for p in pending_processes: p.terminate() for p in pending_processes: p.join() # Complete the association on the other side if first_port_found is not None: print("Found a tty device at", first_port_found) self.send_through_local_tty_device(f"SALAD.machine_id={self.machine_id}\n", tty_device=first_port_found) else: print(f"WARNING: None of the following TTYs are connected to SALAD: {ports}!") return first_port_found def to_machine_registration_request(self, ignore_local_tty_device=False): addrs = self.default_gateway_nif_addrs ret = { "base_name": self.machine_base_name, "tags": list(self.machine_tags), "mac_address": addrs.mac, "ip_address": addrs.ipv4, } # NOTE: Since the executor does not like to receive parameters it doesn't know about, # only include the 'id' parameter if absolutely necessary. Old executors will keep # on working as expected, and new ones will use the mac address as an id by default. # The only time we need to set the machine id is when it isn't the same as the mac # address, which will only be supported by newer executors... but only newer executors # can boot DUTs that are not identified by mac addresses :D if self.machine_id != addrs.mac: ret['id'] = self.machine_id if not ignore_local_tty_device: # Get the name of the local tty device (strip /dev/) tty_dev_name = self.local_tty_device if tty_dev_name is not None and tty_dev_name.startswith("/dev/"): tty_dev_name = tty_dev_name[5:] ret["local_tty_device"] = tty_dev_name return ret def serial_console_works() -> bool: def check_serial_console(): import termios # stdin is closed by multiprocessing, re-open it! sys.stdin = os.fdopen(0) # Remove any input we might have received thus far termios.tcflush(sys.stdin, termios.TCIFLUSH) # Send the ping sys.stdout.write("\nSALAD.ping\n") sys.stdout.flush() # Wait for the pong! is_answer_pong = re.match(r"^SALAD.pong\r?\n$", sys.stdin.readline()) sys.exit(0 if is_answer_pong else 42) # Start a process that will try to print and read p = multiprocessing.Process(target=check_serial_console) p.start() p.join(1) if p.exitcode == 0: return True elif p.exitcode is None: p.terminate() return False def action_register(args): info = MachineInfo() params = info.to_machine_registration_request(ignore_local_tty_device=args.no_tty) r = requests.post(f"http://{args.mars_host}/api/v1/dut/", json=params) if r.status_code == 400: # NOTE: Use the machine id when available, otherwise default to using the mac address as this is the only way # older versions of the executor could identify DUTs mid = params.get('id') or params.get('mac_address') r = requests.patch(f"http://{args.mars_host}/api/v1/dut/{mid}/", json=params) status = "complete" if r.status_code == 200 else "failed" print(f"MaRS: Registration {status}\n") info.send_through_local_tty_device(f"MaRS: Registration {status}\n") sys.exit(0 if r.status_code == 200 else 1) def action_check(args): info = MachineInfo() # Get the expected configuration r = requests.get(f"http://{args.mars_host}/api/v1/dut/{info.machine_id}/") r.raise_for_status() expected_conf = r.json() # Generate the configuration local_config = info.to_machine_registration_request(ignore_local_tty_device=True) has_differences = False for key, value in local_config.items(): expected_value = expected_conf.get(key) if (type(expected_value) != type(value) or \ (type(value) is list and set(expected_value) != set(value)) or \ (type(value) is not list and expected_value != value)): # NOTE: older versions of the executor assumed that the mac address of the DUT was always the machine id... # and thus did not need to have a separate 'id' field. If we are in the situation where the 'id' field is # missing from the expected values but the local value for this field is the mac address in the expected # values, then ignore the difference :) if key == 'id' and 'id' not in expected_conf and local_config.get('id') == expected_conf.get('mac_address'): continue has_differences = True print(f"Mismatch for '{key}': {value} vs the expected {expected_value}") # Check that the serial console is working if not args.no_tty: if serial_console_works(): print(f"SALAD.machine_id={info.machine_id}") else: has_differences = True print(f"The configured console is not connected to SALAD") if has_differences: print("FATAL ERROR: The local machine doesn't match its expected state from MaRS") else: print("Machine registration: SUCCESS - No differences found!") sys.exit(0 if not has_differences else 1) def action_cache(args): cache_db() print("Downloaded the latest GPU device databases") def action_setup(args): info = MachineInfo() wanted_tags = [t for t in args.tags.split(",") if t] if len(wanted_tags) > 0: if not set(wanted_tags).issubset(info.machine_tags): missing_tags = set(wanted_tags) - info.machine_tags print(f"ERROR: The following tags were wanted, but are missing: {",".join(missing_tags)}\n") sys.exit(1) else: print("All the wanted tags have been found\n") else: print("Ignoring the wanted tags check: no tags requested\n") # Removing the GPUs that are not referenced in the list of tags if len(info.gpus) > 0: # Remove all non-gpu tags wanted_tags = [t for t in wanted_tags if t not in (info.cpu_tags | info.ram_tags | info.firmware_tags)] print(f"Disabling all the GPUs that are not explicitly requested ({", ".join(wanted_tags)}):") for gpu in info.gpus: # Remove the first instance of every tag provided by the GPU has_matched_a_tag = False for tag in gpu.tags: try: wanted_tags.remove(tag) has_matched_a_tag = True except ValueError: # nothing to do pass # Tell the user what is going on print(f" * {gpu}: {"keep" if has_matched_a_tag else "disable"}") # Unbind/disable GPUs that were not asked by the user if not has_matched_a_tag: # NOTE: Let's do the unbinding in a separate process, so that # if it crashes the driver it would not take down our process if os.fork() == 0: gpu.unbind() if len(wanted_tags) > 0: print(f"ERROR: The following wanted tags have not been matched by GPUs: {",".join(sorted(wanted_tags))}") sys.exit(1) else: print() sys.exit(0) parser = argparse.ArgumentParser() parser.add_argument('-m', '--mars_host', dest='mars_host', default="ci-gateway", help='URL to the machine registration service MaRS') parser.add_argument('--no-tty', dest="no_tty", action="store_true", help="Do not discover/check the existence of a serial connection to SALAD") subparsers = parser.add_subparsers() register_parser = subparsers.add_parser('register', help='Register the current machine to the machine registration service MaRS') register_parser.set_defaults(func=action_register) check_parser = subparsers.add_parser('check', help="Check that the machine's configuration matches the one found in MaRS") check_parser.set_defaults(func=action_check) cache_parser = subparsers.add_parser('cache', help="Download and cache the latest gfxinfo databases") cache_parser.set_defaults(func=action_cache) setup_parser = subparsers.add_parser('setup', help="Ensure the machine exposes all the resources specified, and no more") setup_parser.add_argument("-t", "--tags", default="", help=("A coma-separated list of tags that the machine should match to succeed. " "GPUs not referenced by these tags will be unbound, repeat tags if needed.")) setup_parser.set_defaults(func=action_setup) # Parse the cmdline and execute the related action args = parser.parse_args() if func := getattr(args, "func", None): func(args) else: parser.print_help()