TSMaster로 받은 차량 통신 데이터 파일 (blf)를 분석하여 메시지 아이디 (m_id), 데이터 길이 (dlc, data length code), 전송 주기(d_ts. ts = timestamp, d_ts는 ts의 차이)를 구하는 미니프로그램을 만든다.
그동안 나는 blf 파일을 직접 처리할 줄 몰랐다. 그래서 blf를 TSMaster의 Log Converter 기능을 이용하여 텍스트 형식의 asc 파일로 변환한 후, asc 파일을 읽어 데이터를 처리했다. 변환은 한 번만 하면 되는 일이지만 그래도 불편하다. 이 기회에 blf 파일을 직접 다루는 방법을 배운다.
blf 파일 읽기
m_id, dlc, d_ts를 구하기 위해서는 blf를 읽기만 하면 된다.
blf 파일을 읽을 수 있는 Python 모듈을 찾아보았다. python-can 이라는 모듈을 발견하였다. 아래 명령으로 설치한다.
pip install -U python-can
blf 파일에서 메시지를 하나씩 읽어 사용하는 방법은 아래와 같다. (매우 쉽고 직관적이다.)
import can # python-can 모듈을 사용한다.
log = can.io.BLFReader(blf)
for msg in log:
ts = msg.timestamp # timestamp 읽기
m_id = msg.arbitration_id # 아이디 읽기
dlc = msg.dlc # dlc 읽기
데이터 처리
아래 코드와 같다. 코드의 주석으로 설명한다.
streamlit 패키지를 이용하여 웹브라우저에서 동작하는 웹앱으로 만들었다. 웹앱 실행 명령은 아래와 같다.
streamlit run blf_analysis_st.py
## blf analysis
# - blf 파일을 읽어 아래 항목들을 추출한다.
# - m_id들은?
# - m_id별 전송 주기는?
# - m_id별 dlc는?
# - 진단 통신 m_id들은?
# import
import can # blf 파일을 읽기 위해 python-can을 사용한다.
import pandas as pd # 데이터 처리를 위해 pandas를 사용한다.
import plotly.express as px # 그래프 출력을 위해 plotly를 사용한다.
import streamlit as st # 웹앱으로 만들기 위해 streamlit을 사용한다.
# constant
k_tolerance_d_ts_upper = 0.1
k_tolerance_d_ts_lower = -0.1
k_n_id_per_line = 10 # 한 줄에 표시할 m_id 수
k_coi = ['m_id', 'dlc', 'dlc_error', 'd_ts', 'd_ts_mean', 'd_ts_max', 'd_ts_min', 'n_msg']
# coi: columns of interest
k_d_ts_max_to_plot = 1.2
# function
def get_m_id_dlc_d_ts(blf) -> dict:
'''
blf 파일에 포함된 m_id들을 추출한다.
m_id별로 dlc를 구한다.
m_id별로 d_ts를 구한다.
m_id별로 구한 dlc, d_ts를 딕셔너리 m에 저장하고, m을 반환한다.
원본 코드 출처:
https://stackoverflow.com/questions/77266326/how-can-i-open-a-blf-file-by-using-python-script
'''
m = {}
try:
log = can.io.BLFReader(blf)
for msg in log:
ts = msg.timestamp
m_id = msg.arbitration_id
dlc = msg.dlc
if m_id not in m:
# initialize m
m[m_id] = {}
m[m_id]['ts_prev'] = ts # 지난 번 처리한 m_id 메시지의 ts
m[m_id]['dlc'] = dlc # m_id 메시지의 dlc
m[m_id]['dlc_error'] = 0 # 메시지별로 dlc가 일정해야 한다. 그렇지 않으면 에러로 판정한다.
m[m_id]['d_ts_max'] = 0.0 # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 최대값
m[m_id]['d_ts_min'] = 1_000_000.0 # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 최소값
m[m_id]['d_ts_mean'] = 0.0 # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 평균값
m[m_id]['n_msg'] = 0 # blf 파일에 포함된 m_id 메시지의 개수
m[m_id]['ts_1st'] = ts # blf 파일에 포함된 첫 m_id 메시지의 ts d_ts_mean을 구할 때 사용한다.
else:
d_ts = ts - m[m_id]['ts_prev']
m[m_id]['d_ts_max'] = max(d_ts, m[m_id]['d_ts_max'])
m[m_id]['d_ts_min'] = min(d_ts, m[m_id]['d_ts_min'])
m[m_id]['ts_prev'] = ts
m[m_id]['n_msg'] += 1
if m[m_id]['dlc'] != dlc:
m[m_id]['dlc_error'] += 1
if log.object_count == 0:
print("The BLF file is empty.")
return m
except Exception as e:
print(f"Error reading BLF file: {e}")
return m
return m # get_m_id_dlc_d_ts()
def m_to_df(m) -> pd.DataFrame:
'''
convert m to dataframe
'''
df = pd.DataFrame(m).T
df.index.name = 'm_id_int'
df = df.reset_index()
# m_id는 일반적으로 hex로 표시한다.
df['m_id'] = df['m_id_int'].apply(lambda x: f'0x{x:03X}')
df['dlc'] = df['dlc'].astype(int)
df['dlc_error'] = df['dlc_error'].astype(int)
df['d_ts_mean'] = (df['ts_prev'] - df['ts_1st']) / df['n_msg']
df['d_ts_dev_upper'] = df['d_ts_max'] - df['d_ts_mean']
df['d_ts_dev_lower'] = df['d_ts_mean'] - df['d_ts_min']
df['d_ts'] = df['d_ts_mean'].round(2)
return df
def plot_d_ts_distribution_per_d_ts(df, d_ts_target):
'''
m_id < 0x700 (진단 메시지가 아닌) 메시지들만 대상으로 한다.
df에서 전송 주기(d_ts)가 d_ts_target인 메시지들만 그래프로 출력한다.
'''
filt = ((df['m_id_int'] < 0x700) & (df['d_ts'] == d_ts_target))
df_to_plot = df.loc[filt, :].sort_values('m_id')
# df_to_plot['m_id'] = df_to_plot['m_id'].astype(str)
fig = px.scatter(
df_to_plot,
x="m_id",
y="d_ts_mean",
error_y="d_ts_dev_upper",
error_y_minus="d_ts_dev_lower",
title=f'주기 = {d_ts_target:.3f}sec<br>메시지 수 = {len(df_to_plot)}'
)
fig.update_layout(xaxis_type='category')
return fig
def write_m_id_list(m_id_list, n_m_id_per_line) -> None:
'''
m_id를 한 줄에 n_m_id_per_line개씩 출력한다.
'''
s = ''
for i, m_id in enumerate(m_id_list, start=1):
s += f'{m_id} '
if i % n_m_id_per_line == 0:
st.write(s)
s = ''
st.write(s)
def main():
if blf is not None:
# read blf and extract m_id, dlc, d_ts related data
m = get_m_id_dlc_d_ts(blf=blf)
# convert dictionary m to dataframe df
df = m_to_df(m=m)
# display all m_id's
m_id_all = df['m_id'].to_list()
n_m_id_all = len(m_id_all)
st.divider()
st.header('전체 메시지')
st.write(f'메시지 수 = {n_m_id_all:,}')
write_m_id_list(m_id_all, 10)
# display diag m_id's
m_id_diag = df.loc[df['m_id_int'] >= 0x700, 'm_id'].to_list()
n_m_id_diag = len(m_id_diag)
st.divider()
st.header('진단 통신 메시지')
st.write(f'메시지 수 = {n_m_id_diag:,}')
write_m_id_list(m_id_diag, 10)
# plot m_id per d_ts_mean and dlc
st.divider()
st.header('주기별 메시지')
filt = (df['m_id_int'] < 0x700)
df_to_plot = df.loc[filt, :]
fig_m_id_per_d_ts_mean = px.scatter(
df_to_plot,
x='d_ts_mean',
# y='dlc',
y='m_id',
color='m_id',
# size='dlc',
title='주기별 메시지',
log_x=True,
)
fig_m_id_per_d_ts_mean.update_layout(
yaxis_type='category',
showlegend=False,
)
st.plotly_chart(fig_m_id_per_d_ts_mean)
# plot d_ts_mean per d_ts and m_id
st.divider()
st.header('메시지별 주기 변동폭폭')
# 주기가 k_d_ts_max_to_plot(1.2sec) 미만인 경우만 그래프로 출력한다.
d_ts_to_plot = sorted([d_ts for d_ts in df['d_ts'].unique() if d_ts < k_d_ts_max_to_plot])
for d_ts in d_ts_to_plot:
fig = plot_d_ts_distribution_per_d_ts(df, d_ts)
st.plotly_chart(fig, use_container_width=True)
# display df
st.divider()
st.header('메시지별, dlc, 주기 상세')
st.dataframe(
df[k_coi]
.sort_values('m_id')
.style
.format(
{
'd_ts':'{:,.3f}',
'd_ts_mean':'{:,.3f}',
'd_ts_max':'{:,.3f}',
'd_ts_min':'{:,.3f}',
'n_msg': '{:,.0f}',
}
), use_container_width = True,
)
# main()
## get user input
st.header('blf 분석')
# blf_path = Path(r"C:\data\tosun\projects\uds_vehicle_config\Logging\Bus\uds_vehicle_config_2025_01_19_10_34_14.blf")
blf = st.file_uploader('choose a blf file', type='.blf')
button_calc_clicked = st.button('calc')
## process data
if button_calc_clicked:
main()
웹앱을 실행하면 blf 파일을 업로드할 수 있는 gui가 있다. "Browse files" 버튼을 클릭하여 blf 파일을 선택하거나, blf 파일을 드래그&드롭한다.
"calc" 버튼을 클릭하면 계산을 시작한다.
Browse files 버튼으로 blf 파일을 선택한다. calc 버튼으로 계산을 시작한다.
전체 메시지와 진단 통신 메시지를 표시한다. 진단 통신 메시지는 m_id가 0x700 이상인 메시지이다.
blf에 포함된 메시지 아이디들과 진단 통신 메시지 아이디들을 표시한다.
주기별 메시지 그래프를 출력한다. 주기가 1.2sec 미만인 메시지들만 표시된다.
주기별로 메시지 아이디를 그래프로 출력한다.
메시지별 전송 주기의 변동폭을 그래프로 출력한다. 0x340 메시지의 경우 주기의 변동폭이 다른 메시지들에 비해 크다는 것을 확인할 수 있다.
메시지별로 전송 주기의 변동폭을 그래프로 출력한다.
blf 에서 추출한 m_id, dlc, d_ts의 상세 결과를 볼 수 있다.
blf에서 추출한 m_id, dlc, d_ts의 상세를 출력한다.
m_id별 사양서의 dlc와 차에서 측정한 dlc를 비교하여 오류 여부를 확인할 수 있다. dlc 오류의 가능성은 매우 낮다고 짐작한다.
m_id별 사양서의 주기와 측정한 주기 (d_ts)를 비교하여 오류 여부를 확인할 수 있다. 주기의 오차를 얼마나 허용하는가에 따라 오류 발견의 가능성이 달라진다. CAN 통신은 전송 우선 순위 결정이 m_id에 의존적이라 본질적으로 주기가 일정하지 않다. (jitter) 주기 변동이 문제를 일이킬 가능성은 낮다고 짐작한다.
오류 가능성이 (매우) 낮다고 짐작하는 것과 검증을 하지 않는다는 것은 서로 다른 차원의 문제이다. 특히 위와 같이 쉽게 검증할 수 있는 경우에 검증을 통과 하는 것은 해서는 안 될 일이라고 생각한다.
결론
blf 통신 데이터 파일을 읽어서 통신 관련 기본 데이터인 메시지 아이디, dlc, 전송 주기를 추출하고 결과를 텍스트와 그래프로 출력하는 간단한 웹앱을 작성했다.
python-can 모듈을 이용하여 asc로 변환하지 않고 blf를 직접 읽어서 데이터 처리를 해보았다. python-can은 사용법이 쉽고, 직관적이다. (프로그램 실행도 asc 파일을 읽는 것보다 빠른 느낌이다.)