diff --git a/system/hardware/tici/hardware.py b/system/hardware/tici/hardware.py index 120ca77107992e..8b913a43b9cd90 100644 --- a/system/hardware/tici/hardware.py +++ b/system/hardware/tici/hardware.py @@ -1,8 +1,8 @@ +import configparser import json import os import subprocess import time -from enum import IntEnum from functools import cached_property, lru_cache from pathlib import Path @@ -15,22 +15,8 @@ from openpilot.system.hardware.tici.pins import GPIO from openpilot.system.hardware.tici.amplifier import Amplifier -NM = 'org.freedesktop.NetworkManager' -NM_CON_ACT = NM + '.Connection.Active' -NM_DEV = NM + '.Device' -NM_DEV_WL = NM + '.Device.Wireless' -NM_AP = NM + '.AccessPoint' -DBUS_PROPS = 'org.freedesktop.DBus.Properties' - -class NMMetered(IntEnum): - NM_METERED_UNKNOWN = 0 - NM_METERED_YES = 1 - NM_METERED_NO = 2 - NM_METERED_GUESS_YES = 3 - NM_METERED_GUESS_NO = 4 - +NM_CONNECTIONS_DIRS = ("/run/NetworkManager/system-connections", "/data/etc/NetworkManager/system-connections") MODEM_STATE_PATH = "/dev/shm/modem" -TIMEOUT = 0.1 NetworkType = log.DeviceState.NetworkType NetworkStrength = log.DeviceState.NetworkStrength @@ -52,16 +38,16 @@ def get_device_type(): model = f.read().strip('\x00') return model.split('comma ')[-1] -class Tici(HardwareBase): - @cached_property - def bus(self): - import dbus - return dbus.SystemBus() +def wpa_cli(cmd): + out = subprocess.check_output(["wpa_cli", "-i", "wlan0", cmd], text=True, timeout=2) + return dict(l.split("=", 1) for l in out.splitlines() if "=" in l) - @cached_property - def nm(self): - return self.bus.get_object(NM, '/org/freedesktop/NetworkManager') +def get_default_route_iface(): + with open("/proc/net/route") as f: + routes = [(int(route[6]), route[0]) for line in f.readlines()[1:] if (route := line.split())[1] == "00000000" and int(route[3], 16) & 0x1] + return min(routes)[1] if routes else None +class Tici(HardwareBase): @cached_property def amplifier(self): if self.get_device_type() == "mici": @@ -114,13 +100,11 @@ def set_ir_power(self, percent: int): def get_network_type(self): ms = self.get_modem_state() try: - primary_connection = self.nm.Get(NM, 'PrimaryConnection', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) - primary_connection = self.bus.get_object(NM, primary_connection) - primary_type = primary_connection.Get(NM_CON_ACT, 'Type', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) - if primary_type == '802-3-ethernet': - return NetworkType.ethernet - elif primary_type == '802-11-wireless': + iface = get_default_route_iface() + if iface == "wlan0": return NetworkType.wifi + if iface in ("eth0", "usb0"): + return NetworkType.ethernet except Exception: pass @@ -136,10 +120,6 @@ def get_network_type(self): return NetworkType.cell2G return NetworkType.none - def get_wlan(self): - wlan_path = self.nm.GetDeviceByIpIface('wlan0', dbus_interface=NM, timeout=TIMEOUT) - return self.bus.get_object(NM, wlan_path) - def get_sim_info(self): ms = self.get_modem_state() sim_id = ms.get('iccid', '') @@ -189,13 +169,14 @@ def get_network_strength(self, network_type): try: if network_type == NetworkType.none: pass + elif network_type == NetworkType.ethernet: + network_strength = NetworkStrength.great elif network_type == NetworkType.wifi: - wlan = self.get_wlan() - active_ap_path = wlan.Get(NM_DEV_WL, 'ActiveAccessPoint', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) - if active_ap_path != "/": - active_ap = self.bus.get_object(NM, active_ap_path) - strength = int(active_ap.Get(NM_AP, 'Strength', dbus_interface=DBUS_PROPS, timeout=TIMEOUT)) - network_strength = self.parse_strength(strength) + rssi = wpa_cli("signal_poll").get("RSSI") + if rssi is not None: + dbm = int(rssi) + if -100 < dbm <= 0: + network_strength = self.parse_strength(120 + max(-90, min(-20, dbm))) else: # Cellular network_strength = self.parse_strength(self.get_modem_state().get('signal_quality', 0)) except Exception: @@ -208,17 +189,36 @@ def get_network_metered(self, network_type) -> bool: from openpilot.common.params import Params return Params().get_bool("GsmMetered") try: - primary_connection = self.nm.Get(NM, 'PrimaryConnection', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) - primary_connection = self.bus.get_object(NM, primary_connection) - primary_devices = primary_connection.Get(NM_CON_ACT, 'Devices', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) - - for dev in primary_devices: - dev_obj = self.bus.get_object(NM, str(dev)) - metered_prop = dev_obj.Get(NM_DEV, 'Metered', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) - - if network_type == NetworkType.wifi: - if metered_prop in [NMMetered.NM_METERED_YES, NMMetered.NM_METERED_GUESS_YES]: - return True + if network_type == NetworkType.wifi: + ssid = wpa_cli("status").get("ssid", "") + if ssid: + # wpa_cli escapes non-printable bytes as \xNN; NM keyfile stores ASCII SSIDs as a literal and others as a byte;byte; list + ssid_bytes = ssid.encode().decode('unicode_escape').encode('latin-1') + ssid_keyfile_list = ';'.join(str(b) for b in ssid_bytes) + ';' + + for fpath in (p for d in NM_CONNECTIONS_DIRS for p in Path(d).glob("*.nmconnection")): + raw = sudo_read(str(fpath)) + if not raw: + continue + cp = configparser.ConfigParser(interpolation=None) + try: + cp.read_string(raw) + keyfile_ssid = cp.get("wifi", "ssid", fallback="") + if keyfile_ssid != ssid and keyfile_ssid != ssid_keyfile_list: + continue + metered = cp.getint("connection", "metered", fallback=0) + except (configparser.Error, ValueError): + continue + if metered == 1: + return True + if metered == 2: + return False + break + + # TODO: remove when openpilot owns dhcp and can detect hotspots itself + out = subprocess.check_output(["nmcli", "-t", "-f", "GENERAL.METERED", "dev", "show", "wlan0"], text=True, timeout=2) + if out.strip().split(":", 1)[-1].startswith("yes"): + return True except Exception: pass