BLF_DLL/blf_wrapper.py

472 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Python wrapper for BLF library with guaranteed file closing
"""
import ctypes
import os
import atexit
from ctypes import c_void_p, c_int, c_uint8, c_uint16, c_uint32, c_uint64, c_char_p, Structure, POINTER
from typing import Optional
import weakref
# Константы
BL_FILE_SIGNATURE = 0x47474F4C # "LOGG"
BL_OBJ_SIGNATURE = 0x4A424F4C # "LOBJ"
BL_OBJ_TYPE_CAN_MESSAGE = 1
BL_OBJ_TYPE_LIN_MESSAGE = 11
BL_OBJ_TYPE_LIN_SND_ERROR = 15
BL_OBJ_TYPE_LOG_CONTAINER = 10
BL_OBJ_TYPE_ENV_DATA = 9
BL_OBJ_FLAG_TIME_ONE_NANS = 0x00000002
CAN_DIR_TX = 1
CAN_DIR_RX = 0
LIN_DIR_RX = 0
LIN_DIR_TX = 1
class SYSTEMTIME(Structure):
"""Структура времени"""
_fields_ = [
("year", c_uint16),
("month", c_uint16),
("dayOfWeek", c_uint16),
("day", c_uint16),
("hour", c_uint16),
("minute", c_uint16),
("second", c_uint16),
("milliseconds", c_uint16)
]
def __init__(self, year=0, month=0, dayOfWeek=0, day=0,
hour=0, minute=0, second=0, milliseconds=0):
super().__init__()
self.year = year
self.month = month
self.dayOfWeek = dayOfWeek
self.day = day
self.hour = hour
self.minute = minute
self.second = second
self.milliseconds = milliseconds
@classmethod
def from_datetime(cls, dt):
"""Создать из datetime объекта"""
return cls(
year=dt.year,
month=dt.month,
dayOfWeek=dt.weekday(),
day=dt.day,
hour=dt.hour,
minute=dt.minute,
second=dt.second,
milliseconds=dt.microsecond // 1000
)
@classmethod
def now(cls):
"""Создать с текущим временем"""
import datetime
return cls.from_datetime(datetime.datetime.now())
class BLFLibrary:
"""Singleton для загрузки и управления библиотекой"""
_instance = None
_lib = None
_loaded = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def load(self):
"""Загрузка библиотеки"""
if self._loaded:
return True
# Укажите полный путь к вашей DLL
dll_path = r".\conv.dll" # или другой путь
try:
self._lib = ctypes.CDLL(dll_path)
self._loaded = True
self._setup_functions()
return True
except OSError as e:
raise RuntimeError(f"Could not load BLF library from {dll_path}: {e}")
def _setup_functions(self):
"""Настройка типов функций"""
self._lib.blf_open.argtypes = [c_char_p, POINTER(SYSTEMTIME)]
self._lib.blf_open.restype = c_void_p
self._lib.blf_set_start_time.argtypes = [c_void_p, POINTER(SYSTEMTIME)]
self._lib.blf_set_start_time.restype = c_int
self._lib.blf_start_container.argtypes = [c_void_p, c_uint64]
self._lib.blf_start_container.restype = c_int
self._lib.blf_end_container.argtypes = [c_void_p]
self._lib.blf_end_container.restype = c_int
self._lib.blf_add_can_message.argtypes = [
c_void_p, c_uint16, c_uint32, c_uint8, c_uint8,
POINTER(c_uint8), c_uint64
]
self._lib.blf_add_can_message.restype = c_int
self._lib.blf_add_lin_message_obsolete.argtypes = [
c_void_p, c_uint16, c_uint8, c_uint8, POINTER(c_uint8),
c_uint8, c_uint64, c_uint16
]
self._lib.blf_add_lin_message_obsolete.restype = c_int
self._lib.blf_add_lin_send_error.argtypes = [
c_void_p, c_uint16, c_uint8, c_uint8, c_uint64
]
self._lib.blf_add_lin_send_error.restype = c_int
self._lib.blf_add_env_data.argtypes = [
c_void_p, c_char_p, POINTER(c_uint8), c_uint32, c_uint64
]
self._lib.blf_add_env_data.restype = c_int
self._lib.blf_close.argtypes = [c_void_p]
self._lib.blf_close.restype = c_int
@property
def lib(self):
"""Получить библиотеку"""
if not self._loaded:
self.load()
return self._lib
class BLFWriter:
"""Класс для записи BLF файлов с гарантированным закрытием"""
_all_instances = weakref.WeakSet() # Следим за всеми экземплярами
def __init__(self, filename: str, start_time: Optional[SYSTEMTIME] = None):
"""
Инициализация BLF writer
Args:
filename: имя файла для записи
start_time: время начала измерения (если None - текущее время)
"""
self.filename = filename
self._handle = None
self._is_open = False
self._lib_loader = BLFLibrary()
self._lib = self._lib_loader.lib
# Открываем файл
self._open(start_time)
# Регистрируем в глобальном списке
self.__class__._all_instances.add(self)
# Регистрируем финализатор
self._finalizer = weakref.finalize(self, self._cleanup, self._handle, self.filename)
@staticmethod
def _cleanup(handle, filename):
"""Статический метод для очистки ресурсов"""
if handle is not None and handle != 0:
try:
lib_loader = BLFLibrary()
lib_loader.lib.blf_close(handle)
print(f"DEBUG: Auto-closed file {filename} during cleanup")
except Exception as e:
print(f"WARNING: Failed to auto-close {filename}: {e}")
@classmethod
def close_all(cls):
"""Закрыть все открытые экземпляры (для аварийного завершения)"""
for instance in list(cls._all_instances):
try:
if instance._is_open:
instance.close()
except:
pass
def _open(self, start_time: Optional[SYSTEMTIME] = None):
"""Открытие файла"""
filename_bytes = self.filename.encode('utf-8')
if start_time is None:
self._handle = self._lib.blf_open(filename_bytes, None)
else:
self._handle = self._lib.blf_open(filename_bytes, ctypes.byref(start_time))
if self._handle is None or self._handle == 0:
raise RuntimeError(f"Failed to open file: {self.filename}")
self._is_open = True
def set_start_time(self, start_time: SYSTEMTIME) -> bool:
"""
Установить время начала измерения
Args:
start_time: время начала
Returns:
bool: True при успехе
"""
if not self._is_open:
raise RuntimeError("File is not open")
result = self._lib.blf_set_start_time(self._handle, ctypes.byref(start_time))
return result == 0
def start_container(self, timestamp_ns: int) -> bool:
"""
Начать контейнер
Args:
timestamp_ns: временная метка в наносекундах
Returns:
bool: True при успехе
"""
if not self._is_open:
raise RuntimeError("File is not open")
result = self._lib.blf_start_container(self._handle, timestamp_ns)
return result == 0
def end_container(self) -> bool:
"""
Закончить контейнер
Returns:
bool: True при успехе
"""
if not self._is_open:
raise RuntimeError("File is not open")
result = self._lib.blf_end_container(self._handle)
return result == 0
def add_can_message(self, channel: int, can_id: int, dlc: int, data: bytes,
flags: int = 0, timestamp_ns: int = 0) -> bool:
"""
Добавить CAN сообщение
Args:
channel: номер канала
can_id: идентификатор CAN
dlc: длина данных (0-8)
data: данные сообщения
flags: флаги (направление, RTR)
timestamp_ns: временная метка в наносекундах
Returns:
bool: True при успехе
"""
if not self._is_open:
raise RuntimeError("File is not open")
if dlc > 8:
raise ValueError("DLC cannot exceed 8")
data_array = (c_uint8 * 8)()
for i in range(min(dlc, len(data))):
data_array[i] = data[i]
result = self._lib.blf_add_can_message(
self._handle, channel, can_id, flags, dlc, data_array, timestamp_ns
)
return result == 0
def add_lin_message(self, channel: int, lin_id: int, dlc: int, data: bytes,
direction: int, timestamp_ns: int, checksum: int = 0) -> bool:
"""
Добавить LIN сообщение
Args:
channel: номер канала
lin_id: идентификатор LIN (6 бит)
dlc: длина данных
data: данные сообщения
direction: направление (LIN_DIR_RX или LIN_DIR_TX)
timestamp_ns: временная метка в наносекундах
checksum: контрольная сумма
Returns:
bool: True при успехе
"""
if not self._is_open:
raise RuntimeError("File is not open")
if dlc > 8:
raise ValueError("DLC cannot exceed 8")
data_array = (c_uint8 * 8)()
for i in range(min(dlc, len(data))):
data_array[i] = data[i]
result = self._lib.blf_add_lin_message_obsolete(
self._handle, channel, lin_id, dlc, data_array,
direction, timestamp_ns, checksum
)
return result == 0
def add_lin_send_error(self, channel: int, lin_id: int, dlc: int,
timestamp_ns: int) -> bool:
"""
Добавить ошибку отправки LIN
Args:
channel: номер канала
lin_id: идентификатор LIN
dlc: ожидаемая длина данных
timestamp_ns: временная метка в наносекундах
Returns:
bool: True при успехе
"""
if not self._is_open:
raise RuntimeError("File is not open")
result = self._lib.blf_add_lin_send_error(
self._handle, channel, lin_id, dlc, timestamp_ns
)
return result == 0
def add_env_data(self, name: str, data: bytes, timestamp_ns: int) -> bool:
"""
Добавить данные окружения
Args:
name: имя переменной
data: данные
timestamp_ns: временная метка в наносекундах
Returns:
bool: True при успехе
"""
if not self._is_open:
raise RuntimeError("File is not open")
# Создаем копию данных, чтобы они не были собраны сборщиком мусора
data_array = (c_uint8 * len(data))(*data)
result = self._lib.blf_add_env_data(
self._handle, name.encode('utf-8'), data_array, len(data), timestamp_ns
)
return result == 0
def flush(self) -> bool:
"""
Принудительно сбросить данные на диск
Note: В текущей реализации просто обновляет заголовок,
но не сбрасывает буферы stdio
"""
if not self._is_open:
raise RuntimeError("File is not open")
# В C библиотеке нет отдельной функции flush,
# но close/open не поддерживается, поэтому просто возвращаем True
return True
def close(self) -> bool:
"""
Закрыть файл и обновить заголовок
Returns:
bool: True при успехе
"""
if not self._is_open:
return True
result = False
if self._handle and self._handle != 0:
result = self._lib.blf_close(self._handle) == 0
if result:
print(f"DEBUG: Successfully closed {self.filename} and updated header")
else:
print(f"ERROR: Failed to close {self.filename} properly")
self._handle = None
self._is_open = False
# Отменяем финализатор, так как уже закрыли
if self._finalizer is not None:
self._finalizer.detach()
return result
def __enter__(self):
"""Контекстный менеджер - гарантирует закрытие"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Закрытие при выходе из контекста"""
self.close()
def __del__(self):
"""Деструктор - пытаемся закрыть файл если он еще открыт"""
if self._is_open:
# Не вызываем close напрямую, чтобы избежать проблем с порядком удаления
if self._handle and self._handle != 0:
try:
self._lib.blf_close(self._handle)
print(f"DEBUG: Auto-closed {self.filename} in destructor")
except:
pass
self._handle = None
self._is_open = False
def is_open(self) -> bool:
"""Проверить, открыт ли файл"""
return self._is_open
# Вспомогательные функции
def can_flags(direction: int, rtr: bool = False) -> int:
"""
Сформировать флаги для CAN сообщения
Args:
direction: направление (CAN_DIR_TX или CAN_DIR_RX)
rtr: флаг RTR (Remote Transmission Request)
Returns:
int: флаги
"""
return ((1 if rtr else 0) << 7) | (direction & 0x0F)
def timestamp_ms_to_ns(timestamp_ms: float) -> int:
"""
Преобразовать миллисекунды в наносекунды
Args:
timestamp_ms: время в миллисекундах
Returns:
int: время в наносекундах
"""
return int(timestamp_ms * 1_000_000)
def timestamp_sec_to_ns(timestamp_sec: float) -> int:
"""
Преобразовать секунды в наносекунды
Args:
timestamp_sec: время в секундах
Returns:
int: время в наносекундах
"""
return int(timestamp_sec * 1_000_000_000)
# Регистрируем глобальное закрытие при завершении программы
atexit.register(BLFWriter.close_all)