""" Python wrapper for BLF library with guaranteed file closing """ import ctypes import os import atexit from ctypes import c_void_p, c_int, c_uint8, c_uint16, c_uint32, c_uint64, c_char_p, Structure, POINTER from typing import Optional import weakref # Константы BL_FILE_SIGNATURE = 0x47474F4C # "LOGG" BL_OBJ_SIGNATURE = 0x4A424F4C # "LOBJ" BL_OBJ_TYPE_CAN_MESSAGE = 1 BL_OBJ_TYPE_LIN_MESSAGE = 11 BL_OBJ_TYPE_LIN_SND_ERROR = 15 BL_OBJ_TYPE_LOG_CONTAINER = 10 BL_OBJ_TYPE_ENV_DATA = 9 BL_OBJ_FLAG_TIME_ONE_NANS = 0x00000002 CAN_DIR_TX = 1 CAN_DIR_RX = 0 LIN_DIR_RX = 0 LIN_DIR_TX = 1 class SYSTEMTIME(Structure): """Структура времени""" _fields_ = [ ("year", c_uint16), ("month", c_uint16), ("dayOfWeek", c_uint16), ("day", c_uint16), ("hour", c_uint16), ("minute", c_uint16), ("second", c_uint16), ("milliseconds", c_uint16) ] def __init__(self, year=0, month=0, dayOfWeek=0, day=0, hour=0, minute=0, second=0, milliseconds=0): super().__init__() self.year = year self.month = month self.dayOfWeek = dayOfWeek self.day = day self.hour = hour self.minute = minute self.second = second self.milliseconds = milliseconds @classmethod def from_datetime(cls, dt): """Создать из datetime объекта""" return cls( year=dt.year, month=dt.month, dayOfWeek=dt.weekday(), day=dt.day, hour=dt.hour, minute=dt.minute, second=dt.second, milliseconds=dt.microsecond // 1000 ) @classmethod def now(cls): """Создать с текущим временем""" import datetime return cls.from_datetime(datetime.datetime.now()) class BLFLibrary: """Singleton для загрузки и управления библиотекой""" _instance = None _lib = None _loaded = False def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def load(self): """Загрузка библиотеки""" if self._loaded: return True lib_paths = [ "./libconv.so", # Linux текущая директория "./libconv.dylib", # macOS текущая директория "./conv.dll", # Windows текущая директория os.path.expanduser("~/lib/libconv.so"), "/usr/local/lib/libconv.so", "/usr/lib/libconv.so", # Добавляем пути из LD_LIBRARY_PATH *[os.path.join(p, "libconv.so") for p in os.environ.get("LD_LIBRARY_PATH", "").split(":") if p] ] for path in lib_paths: try: self._lib = ctypes.CDLL(path) self._loaded = True self._setup_functions() return True except OSError: continue raise RuntimeError("Could not load BLF library. Please build it first.") def _setup_functions(self): """Настройка типов функций""" self._lib.blf_open.argtypes = [c_char_p, POINTER(SYSTEMTIME)] self._lib.blf_open.restype = c_void_p self._lib.blf_set_start_time.argtypes = [c_void_p, POINTER(SYSTEMTIME)] self._lib.blf_set_start_time.restype = c_int self._lib.blf_start_container.argtypes = [c_void_p, c_uint64] self._lib.blf_start_container.restype = c_int self._lib.blf_end_container.argtypes = [c_void_p] self._lib.blf_end_container.restype = c_int self._lib.blf_add_can_message.argtypes = [ c_void_p, c_uint16, c_uint32, c_uint8, c_uint8, POINTER(c_uint8), c_uint64 ] self._lib.blf_add_can_message.restype = c_int self._lib.blf_add_lin_message_obsolete.argtypes = [ c_void_p, c_uint16, c_uint8, c_uint8, POINTER(c_uint8), c_uint8, c_uint64, c_uint16 ] self._lib.blf_add_lin_message_obsolete.restype = c_int self._lib.blf_add_lin_send_error.argtypes = [ c_void_p, c_uint16, c_uint8, c_uint8, c_uint64 ] self._lib.blf_add_lin_send_error.restype = c_int self._lib.blf_add_env_data.argtypes = [ c_void_p, c_char_p, POINTER(c_uint8), c_uint32, c_uint64 ] self._lib.blf_add_env_data.restype = c_int self._lib.blf_close.argtypes = [c_void_p] self._lib.blf_close.restype = c_int @property def lib(self): """Получить библиотеку""" if not self._loaded: self.load() return self._lib class BLFWriter: """Класс для записи BLF файлов с гарантированным закрытием""" _all_instances = weakref.WeakSet() # Следим за всеми экземплярами def __init__(self, filename: str, start_time: Optional[SYSTEMTIME] = None): """ Инициализация BLF writer Args: filename: имя файла для записи start_time: время начала измерения (если None - текущее время) """ self.filename = filename self._handle = None self._is_open = False self._lib_loader = BLFLibrary() self._lib = self._lib_loader.lib # Открываем файл self._open(start_time) # Регистрируем в глобальном списке self.__class__._all_instances.add(self) # Регистрируем финализатор self._finalizer = weakref.finalize(self, self._cleanup, self._handle, self.filename) @staticmethod def _cleanup(handle, filename): """Статический метод для очистки ресурсов""" if handle is not None and handle != 0: try: lib_loader = BLFLibrary() lib_loader.lib.blf_close(handle) print(f"DEBUG: Auto-closed file {filename} during cleanup") except Exception as e: print(f"WARNING: Failed to auto-close {filename}: {e}") @classmethod def close_all(cls): """Закрыть все открытые экземпляры (для аварийного завершения)""" for instance in list(cls._all_instances): try: if instance._is_open: instance.close() except: pass def _open(self, start_time: Optional[SYSTEMTIME] = None): """Открытие файла""" filename_bytes = self.filename.encode('utf-8') if start_time is None: self._handle = self._lib.blf_open(filename_bytes, None) else: self._handle = self._lib.blf_open(filename_bytes, ctypes.byref(start_time)) if self._handle is None or self._handle == 0: raise RuntimeError(f"Failed to open file: {self.filename}") self._is_open = True def set_start_time(self, start_time: SYSTEMTIME) -> bool: """ Установить время начала измерения Args: start_time: время начала Returns: bool: True при успехе """ if not self._is_open: raise RuntimeError("File is not open") result = self._lib.blf_set_start_time(self._handle, ctypes.byref(start_time)) return result == 0 def start_container(self, timestamp_ns: int) -> bool: """ Начать контейнер Args: timestamp_ns: временная метка в наносекундах Returns: bool: True при успехе """ if not self._is_open: raise RuntimeError("File is not open") result = self._lib.blf_start_container(self._handle, timestamp_ns) return result == 0 def end_container(self) -> bool: """ Закончить контейнер Returns: bool: True при успехе """ if not self._is_open: raise RuntimeError("File is not open") result = self._lib.blf_end_container(self._handle) return result == 0 def add_can_message(self, channel: int, can_id: int, dlc: int, data: bytes, flags: int = 0, timestamp_ns: int = 0) -> bool: """ Добавить CAN сообщение Args: channel: номер канала can_id: идентификатор CAN dlc: длина данных (0-8) data: данные сообщения flags: флаги (направление, RTR) timestamp_ns: временная метка в наносекундах Returns: bool: True при успехе """ if not self._is_open: raise RuntimeError("File is not open") if dlc > 8: raise ValueError("DLC cannot exceed 8") data_array = (c_uint8 * 8)() for i in range(min(dlc, len(data))): data_array[i] = data[i] result = self._lib.blf_add_can_message( self._handle, channel, can_id, flags, dlc, data_array, timestamp_ns ) return result == 0 def add_lin_message(self, channel: int, lin_id: int, dlc: int, data: bytes, direction: int, timestamp_ns: int, checksum: int = 0) -> bool: """ Добавить LIN сообщение Args: channel: номер канала lin_id: идентификатор LIN (6 бит) dlc: длина данных data: данные сообщения direction: направление (LIN_DIR_RX или LIN_DIR_TX) timestamp_ns: временная метка в наносекундах checksum: контрольная сумма Returns: bool: True при успехе """ if not self._is_open: raise RuntimeError("File is not open") if dlc > 8: raise ValueError("DLC cannot exceed 8") data_array = (c_uint8 * 8)() for i in range(min(dlc, len(data))): data_array[i] = data[i] result = self._lib.blf_add_lin_message_obsolete( self._handle, channel, lin_id, dlc, data_array, direction, timestamp_ns, checksum ) return result == 0 def add_lin_send_error(self, channel: int, lin_id: int, dlc: int, timestamp_ns: int) -> bool: """ Добавить ошибку отправки LIN Args: channel: номер канала lin_id: идентификатор LIN dlc: ожидаемая длина данных timestamp_ns: временная метка в наносекундах Returns: bool: True при успехе """ if not self._is_open: raise RuntimeError("File is not open") result = self._lib.blf_add_lin_send_error( self._handle, channel, lin_id, dlc, timestamp_ns ) return result == 0 def add_env_data(self, name: str, data: bytes, timestamp_ns: int) -> bool: """ Добавить данные окружения Args: name: имя переменной data: данные timestamp_ns: временная метка в наносекундах Returns: bool: True при успехе """ if not self._is_open: raise RuntimeError("File is not open") # Создаем копию данных, чтобы они не были собраны сборщиком мусора data_array = (c_uint8 * len(data))(*data) result = self._lib.blf_add_env_data( self._handle, name.encode('utf-8'), data_array, len(data), timestamp_ns ) return result == 0 def flush(self) -> bool: """ Принудительно сбросить данные на диск Note: В текущей реализации просто обновляет заголовок, но не сбрасывает буферы stdio """ if not self._is_open: raise RuntimeError("File is not open") # В C библиотеке нет отдельной функции flush, # но close/open не поддерживается, поэтому просто возвращаем True return True def close(self) -> bool: """ Закрыть файл и обновить заголовок Returns: bool: True при успехе """ if not self._is_open: return True result = False if self._handle and self._handle != 0: result = self._lib.blf_close(self._handle) == 0 if result: print(f"DEBUG: Successfully closed {self.filename} and updated header") else: print(f"ERROR: Failed to close {self.filename} properly") self._handle = None self._is_open = False # Отменяем финализатор, так как уже закрыли if self._finalizer is not None: self._finalizer.detach() return result def __enter__(self): """Контекстный менеджер - гарантирует закрытие""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Закрытие при выходе из контекста""" self.close() def __del__(self): """Деструктор - пытаемся закрыть файл если он еще открыт""" if self._is_open: # Не вызываем close напрямую, чтобы избежать проблем с порядком удаления if self._handle and self._handle != 0: try: self._lib.blf_close(self._handle) print(f"DEBUG: Auto-closed {self.filename} in destructor") except: pass self._handle = None self._is_open = False def is_open(self) -> bool: """Проверить, открыт ли файл""" return self._is_open # Вспомогательные функции def can_flags(direction: int, rtr: bool = False) -> int: """ Сформировать флаги для CAN сообщения Args: direction: направление (CAN_DIR_TX или CAN_DIR_RX) rtr: флаг RTR (Remote Transmission Request) Returns: int: флаги """ return ((1 if rtr else 0) << 7) | (direction & 0x0F) def timestamp_ms_to_ns(timestamp_ms: float) -> int: """ Преобразовать миллисекунды в наносекунды Args: timestamp_ms: время в миллисекундах Returns: int: время в наносекундах """ return int(timestamp_ms * 1_000_000) def timestamp_sec_to_ns(timestamp_sec: float) -> int: """ Преобразовать секунды в наносекунды Args: timestamp_sec: время в секундах Returns: int: время в наносекундах """ return int(timestamp_sec * 1_000_000_000) # Регистрируем глобальное закрытие при завершении программы atexit.register(BLFWriter.close_all)