472 lines
15 KiB
Python
472 lines
15 KiB
Python
"""
|
||
Python wrapper for BLF library with guaranteed file closing
|
||
"""
|
||
import ctypes
|
||
import os
|
||
import atexit
|
||
from ctypes import c_void_p, c_int, c_uint8, c_uint16, c_uint32, c_uint64, c_char_p, Structure, POINTER
|
||
from typing import Optional
|
||
import weakref
|
||
|
||
# Константы
|
||
BL_FILE_SIGNATURE = 0x47474F4C # "LOGG"
|
||
BL_OBJ_SIGNATURE = 0x4A424F4C # "LOBJ"
|
||
BL_OBJ_TYPE_CAN_MESSAGE = 1
|
||
BL_OBJ_TYPE_LIN_MESSAGE = 11
|
||
BL_OBJ_TYPE_LIN_SND_ERROR = 15
|
||
BL_OBJ_TYPE_LOG_CONTAINER = 10
|
||
BL_OBJ_TYPE_ENV_DATA = 9
|
||
BL_OBJ_FLAG_TIME_ONE_NANS = 0x00000002
|
||
|
||
CAN_DIR_TX = 1
|
||
CAN_DIR_RX = 0
|
||
LIN_DIR_RX = 0
|
||
LIN_DIR_TX = 1
|
||
|
||
|
||
class SYSTEMTIME(Structure):
|
||
"""Структура времени"""
|
||
_fields_ = [
|
||
("year", c_uint16),
|
||
("month", c_uint16),
|
||
("dayOfWeek", c_uint16),
|
||
("day", c_uint16),
|
||
("hour", c_uint16),
|
||
("minute", c_uint16),
|
||
("second", c_uint16),
|
||
("milliseconds", c_uint16)
|
||
]
|
||
|
||
def __init__(self, year=0, month=0, dayOfWeek=0, day=0,
|
||
hour=0, minute=0, second=0, milliseconds=0):
|
||
super().__init__()
|
||
self.year = year
|
||
self.month = month
|
||
self.dayOfWeek = dayOfWeek
|
||
self.day = day
|
||
self.hour = hour
|
||
self.minute = minute
|
||
self.second = second
|
||
self.milliseconds = milliseconds
|
||
|
||
@classmethod
|
||
def from_datetime(cls, dt):
|
||
"""Создать из datetime объекта"""
|
||
return cls(
|
||
year=dt.year,
|
||
month=dt.month,
|
||
dayOfWeek=dt.weekday(),
|
||
day=dt.day,
|
||
hour=dt.hour,
|
||
minute=dt.minute,
|
||
second=dt.second,
|
||
milliseconds=dt.microsecond // 1000
|
||
)
|
||
|
||
@classmethod
|
||
def now(cls):
|
||
"""Создать с текущим временем"""
|
||
import datetime
|
||
return cls.from_datetime(datetime.datetime.now())
|
||
|
||
|
||
class BLFLibrary:
|
||
"""Singleton для загрузки и управления библиотекой"""
|
||
_instance = None
|
||
_lib = None
|
||
_loaded = False
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def load(self):
|
||
"""Загрузка библиотеки"""
|
||
if self._loaded:
|
||
return True
|
||
|
||
# Укажите полный путь к вашей DLL
|
||
dll_path = r".\conv.dll" # или другой путь
|
||
|
||
try:
|
||
self._lib = ctypes.CDLL(dll_path)
|
||
self._loaded = True
|
||
self._setup_functions()
|
||
return True
|
||
except OSError as e:
|
||
raise RuntimeError(f"Could not load BLF library from {dll_path}: {e}")
|
||
|
||
def _setup_functions(self):
|
||
"""Настройка типов функций"""
|
||
self._lib.blf_open.argtypes = [c_char_p, POINTER(SYSTEMTIME)]
|
||
self._lib.blf_open.restype = c_void_p
|
||
|
||
self._lib.blf_set_start_time.argtypes = [c_void_p, POINTER(SYSTEMTIME)]
|
||
self._lib.blf_set_start_time.restype = c_int
|
||
|
||
self._lib.blf_start_container.argtypes = [c_void_p, c_uint64]
|
||
self._lib.blf_start_container.restype = c_int
|
||
|
||
self._lib.blf_end_container.argtypes = [c_void_p]
|
||
self._lib.blf_end_container.restype = c_int
|
||
|
||
self._lib.blf_add_can_message.argtypes = [
|
||
c_void_p, c_uint16, c_uint32, c_uint8, c_uint8,
|
||
POINTER(c_uint8), c_uint64
|
||
]
|
||
self._lib.blf_add_can_message.restype = c_int
|
||
|
||
self._lib.blf_add_lin_message_obsolete.argtypes = [
|
||
c_void_p, c_uint16, c_uint8, c_uint8, POINTER(c_uint8),
|
||
c_uint8, c_uint64, c_uint16
|
||
]
|
||
self._lib.blf_add_lin_message_obsolete.restype = c_int
|
||
|
||
self._lib.blf_add_lin_send_error.argtypes = [
|
||
c_void_p, c_uint16, c_uint8, c_uint8, c_uint64
|
||
]
|
||
self._lib.blf_add_lin_send_error.restype = c_int
|
||
|
||
self._lib.blf_add_env_data.argtypes = [
|
||
c_void_p, c_char_p, POINTER(c_uint8), c_uint32, c_uint64
|
||
]
|
||
self._lib.blf_add_env_data.restype = c_int
|
||
|
||
self._lib.blf_close.argtypes = [c_void_p]
|
||
self._lib.blf_close.restype = c_int
|
||
|
||
@property
|
||
def lib(self):
|
||
"""Получить библиотеку"""
|
||
if not self._loaded:
|
||
self.load()
|
||
return self._lib
|
||
|
||
|
||
class BLFWriter:
|
||
"""Класс для записи BLF файлов с гарантированным закрытием"""
|
||
|
||
_all_instances = weakref.WeakSet() # Следим за всеми экземплярами
|
||
|
||
def __init__(self, filename: str, start_time: Optional[SYSTEMTIME] = None):
|
||
"""
|
||
Инициализация BLF writer
|
||
|
||
Args:
|
||
filename: имя файла для записи
|
||
start_time: время начала измерения (если None - текущее время)
|
||
"""
|
||
self.filename = filename
|
||
self._handle = None
|
||
self._is_open = False
|
||
self._lib_loader = BLFLibrary()
|
||
self._lib = self._lib_loader.lib
|
||
|
||
# Открываем файл
|
||
self._open(start_time)
|
||
|
||
# Регистрируем в глобальном списке
|
||
self.__class__._all_instances.add(self)
|
||
|
||
# Регистрируем финализатор
|
||
self._finalizer = weakref.finalize(self, self._cleanup, self._handle, self.filename)
|
||
|
||
@staticmethod
|
||
def _cleanup(handle, filename):
|
||
"""Статический метод для очистки ресурсов"""
|
||
if handle is not None and handle != 0:
|
||
try:
|
||
lib_loader = BLFLibrary()
|
||
lib_loader.lib.blf_close(handle)
|
||
print(f"DEBUG: Auto-closed file {filename} during cleanup")
|
||
except Exception as e:
|
||
print(f"WARNING: Failed to auto-close {filename}: {e}")
|
||
|
||
@classmethod
|
||
def close_all(cls):
|
||
"""Закрыть все открытые экземпляры (для аварийного завершения)"""
|
||
for instance in list(cls._all_instances):
|
||
try:
|
||
if instance._is_open:
|
||
instance.close()
|
||
except:
|
||
pass
|
||
|
||
def _open(self, start_time: Optional[SYSTEMTIME] = None):
|
||
"""Открытие файла"""
|
||
filename_bytes = self.filename.encode('utf-8')
|
||
|
||
if start_time is None:
|
||
self._handle = self._lib.blf_open(filename_bytes, None)
|
||
else:
|
||
self._handle = self._lib.blf_open(filename_bytes, ctypes.byref(start_time))
|
||
|
||
if self._handle is None or self._handle == 0:
|
||
raise RuntimeError(f"Failed to open file: {self.filename}")
|
||
|
||
self._is_open = True
|
||
|
||
def set_start_time(self, start_time: SYSTEMTIME) -> bool:
|
||
"""
|
||
Установить время начала измерения
|
||
|
||
Args:
|
||
start_time: время начала
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
result = self._lib.blf_set_start_time(self._handle, ctypes.byref(start_time))
|
||
return result == 0
|
||
|
||
def start_container(self, timestamp_ns: int) -> bool:
|
||
"""
|
||
Начать контейнер
|
||
|
||
Args:
|
||
timestamp_ns: временная метка в наносекундах
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
result = self._lib.blf_start_container(self._handle, timestamp_ns)
|
||
return result == 0
|
||
|
||
def end_container(self) -> bool:
|
||
"""
|
||
Закончить контейнер
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
result = self._lib.blf_end_container(self._handle)
|
||
return result == 0
|
||
|
||
def add_can_message(self, channel: int, can_id: int, dlc: int, data: bytes,
|
||
flags: int = 0, timestamp_ns: int = 0) -> bool:
|
||
"""
|
||
Добавить CAN сообщение
|
||
|
||
Args:
|
||
channel: номер канала
|
||
can_id: идентификатор CAN
|
||
dlc: длина данных (0-8)
|
||
data: данные сообщения
|
||
flags: флаги (направление, RTR)
|
||
timestamp_ns: временная метка в наносекундах
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
if dlc > 8:
|
||
raise ValueError("DLC cannot exceed 8")
|
||
|
||
data_array = (c_uint8 * 8)()
|
||
for i in range(min(dlc, len(data))):
|
||
data_array[i] = data[i]
|
||
|
||
result = self._lib.blf_add_can_message(
|
||
self._handle, channel, can_id, flags, dlc, data_array, timestamp_ns
|
||
)
|
||
return result == 0
|
||
|
||
def add_lin_message(self, channel: int, lin_id: int, dlc: int, data: bytes,
|
||
direction: int, timestamp_ns: int, checksum: int = 0) -> bool:
|
||
"""
|
||
Добавить LIN сообщение
|
||
|
||
Args:
|
||
channel: номер канала
|
||
lin_id: идентификатор LIN (6 бит)
|
||
dlc: длина данных
|
||
data: данные сообщения
|
||
direction: направление (LIN_DIR_RX или LIN_DIR_TX)
|
||
timestamp_ns: временная метка в наносекундах
|
||
checksum: контрольная сумма
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
if dlc > 8:
|
||
raise ValueError("DLC cannot exceed 8")
|
||
|
||
data_array = (c_uint8 * 8)()
|
||
for i in range(min(dlc, len(data))):
|
||
data_array[i] = data[i]
|
||
|
||
result = self._lib.blf_add_lin_message_obsolete(
|
||
self._handle, channel, lin_id, dlc, data_array,
|
||
direction, timestamp_ns, checksum
|
||
)
|
||
return result == 0
|
||
|
||
def add_lin_send_error(self, channel: int, lin_id: int, dlc: int,
|
||
timestamp_ns: int) -> bool:
|
||
"""
|
||
Добавить ошибку отправки LIN
|
||
|
||
Args:
|
||
channel: номер канала
|
||
lin_id: идентификатор LIN
|
||
dlc: ожидаемая длина данных
|
||
timestamp_ns: временная метка в наносекундах
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
result = self._lib.blf_add_lin_send_error(
|
||
self._handle, channel, lin_id, dlc, timestamp_ns
|
||
)
|
||
return result == 0
|
||
|
||
def add_env_data(self, name: str, data: bytes, timestamp_ns: int) -> bool:
|
||
"""
|
||
Добавить данные окружения
|
||
|
||
Args:
|
||
name: имя переменной
|
||
data: данные
|
||
timestamp_ns: временная метка в наносекундах
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
# Создаем копию данных, чтобы они не были собраны сборщиком мусора
|
||
data_array = (c_uint8 * len(data))(*data)
|
||
|
||
result = self._lib.blf_add_env_data(
|
||
self._handle, name.encode('utf-8'), data_array, len(data), timestamp_ns
|
||
)
|
||
return result == 0
|
||
|
||
def flush(self) -> bool:
|
||
"""
|
||
Принудительно сбросить данные на диск
|
||
Note: В текущей реализации просто обновляет заголовок,
|
||
но не сбрасывает буферы stdio
|
||
"""
|
||
if not self._is_open:
|
||
raise RuntimeError("File is not open")
|
||
|
||
# В C библиотеке нет отдельной функции flush,
|
||
# но close/open не поддерживается, поэтому просто возвращаем True
|
||
return True
|
||
|
||
def close(self) -> bool:
|
||
"""
|
||
Закрыть файл и обновить заголовок
|
||
|
||
Returns:
|
||
bool: True при успехе
|
||
"""
|
||
if not self._is_open:
|
||
return True
|
||
|
||
result = False
|
||
if self._handle and self._handle != 0:
|
||
result = self._lib.blf_close(self._handle) == 0
|
||
if result:
|
||
print(f"DEBUG: Successfully closed {self.filename} and updated header")
|
||
else:
|
||
print(f"ERROR: Failed to close {self.filename} properly")
|
||
|
||
self._handle = None
|
||
self._is_open = False
|
||
|
||
# Отменяем финализатор, так как уже закрыли
|
||
if self._finalizer is not None:
|
||
self._finalizer.detach()
|
||
|
||
return result
|
||
|
||
def __enter__(self):
|
||
"""Контекстный менеджер - гарантирует закрытие"""
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
"""Закрытие при выходе из контекста"""
|
||
self.close()
|
||
|
||
def __del__(self):
|
||
"""Деструктор - пытаемся закрыть файл если он еще открыт"""
|
||
if self._is_open:
|
||
# Не вызываем close напрямую, чтобы избежать проблем с порядком удаления
|
||
if self._handle and self._handle != 0:
|
||
try:
|
||
self._lib.blf_close(self._handle)
|
||
print(f"DEBUG: Auto-closed {self.filename} in destructor")
|
||
except:
|
||
pass
|
||
self._handle = None
|
||
self._is_open = False
|
||
|
||
def is_open(self) -> bool:
|
||
"""Проверить, открыт ли файл"""
|
||
return self._is_open
|
||
|
||
|
||
# Вспомогательные функции
|
||
def can_flags(direction: int, rtr: bool = False) -> int:
|
||
"""
|
||
Сформировать флаги для CAN сообщения
|
||
|
||
Args:
|
||
direction: направление (CAN_DIR_TX или CAN_DIR_RX)
|
||
rtr: флаг RTR (Remote Transmission Request)
|
||
|
||
Returns:
|
||
int: флаги
|
||
"""
|
||
return ((1 if rtr else 0) << 7) | (direction & 0x0F)
|
||
|
||
|
||
def timestamp_ms_to_ns(timestamp_ms: float) -> int:
|
||
"""
|
||
Преобразовать миллисекунды в наносекунды
|
||
|
||
Args:
|
||
timestamp_ms: время в миллисекундах
|
||
|
||
Returns:
|
||
int: время в наносекундах
|
||
"""
|
||
return int(timestamp_ms * 1_000_000)
|
||
|
||
|
||
def timestamp_sec_to_ns(timestamp_sec: float) -> int:
|
||
"""
|
||
Преобразовать секунды в наносекунды
|
||
|
||
Args:
|
||
timestamp_sec: время в секундах
|
||
|
||
Returns:
|
||
int: время в наносекундах
|
||
"""
|
||
return int(timestamp_sec * 1_000_000_000)
|
||
|
||
|
||
# Регистрируем глобальное закрытие при завершении программы
|
||
atexit.register(BLFWriter.close_all) |