목적
- TSMaster로 받은 차량 통신 데이터 파일 (blf)를 분석하여 메시지 아이디 (m_id), 데이터 길이 (dlc, data length code), 전송 주기(d_ts. ts = timestamp, d_ts는 ts의 차이)를 구하는 미니프로그램을 만든다.
- 그동안 나는 blf 파일을 직접 처리할 몰랐다. 그래서 blf를 TSMaster의 Log Converter 기능을 이용하여 텍스트 형식의 asc 파일로 변환한 후, asc 파일을 읽어 데이터를 처리했다. 불편하다. 이 기회에 blf 파일을 직접 다루는 방법을 배운다.
blf 파일 읽기
- m_id, dlc, d_ts를 구하기 위해서는 blf를 읽기만 하면 된다.
- blf 파일을 읽을 수 있는 Python 모듈을 찾아보았다. python-can 이라는 모듈을 발견하였다. 아래 명령으로 설치한다.
pip install -U python-can
- blf 파일에서 메시지를 하나씩 읽어 사용하는 방법은 아래와 같다. (매우 쉽고 직관적이다.)
import can # python-can 모듈을 사용한다.
log = can.io.BLFReader(blf)
for msg in log:
ts = msg.timestamp # timestamp 읽기
m_id = msg.arbitration_id # 아이디 읽기
dlc = msg.dlc # dlc 읽기
데이터 처리
- 아래 코드와 같다. 코드의 주석으로 설명한다.
- streamlit으로 웹브라우저에서 동작하는 웹앱으로 만들었다. 웹앱 실행 명령은 아래와 같다.
streamlit run blf_analysis_st.py
## blf analysis
# - blf 파일을 읽어 아래 항목들을 추출한다.
# - m_id들은?
# - m_id별 전송 주기는?
# - m_id별 dlc는?
# - 진단 통신 m_id들은?
# import
import can # blf 파일을 읽기 위해 python-can을 사용한다.
import pandas as pd # 데이터 처리를 위해 pandas를 사용한다.
import plotly.express as px # 그래프 출력을 위해 plotly를 사용한다.
import streamlit as st # 웹앱으로 만들기 위해 streamlit을 사용한다.
# constant
k_tolerance_d_ts_upper = 0.1
k_tolerance_d_ts_lower = -0.1
k_n_id_per_line = 10
k_coi = ['m_id', 'dlc', 'dlc_error', 'd_ts', 'd_ts_mean', 'd_ts_max', 'd_ts_min', 'n_msg']
k_d_ts_max_to_plot = 1.2
# function
def get_m_id_dlc_d_ts(blf) -> dict:
'''
blf 파일에 포함된 m_id들을 추출한다.
m_id별로 dlc를 구한다.
m_id별로 d_ts를 구한다.
m_id별로 구한 dlc, d_ts를 딕셔너리 m에 저장하고, m을 반환한다.
원본 코드 출처:
https://stackoverflow.com/questions/77266326/how-can-i-open-a-blf-file-by-using-python-script
'''
m = {}
try:
log = can.io.BLFReader(blf)
for msg in log:
ts = msg.timestamp
m_id = msg.arbitration_id
dlc = msg.dlc
if m_id not in m:
# initialize m
m[m_id] = {}
m[m_id]['ts_prev'] = ts # 지난 번 처리한 m_id 메시지의 ts
m[m_id]['dlc'] = dlc # m_id 메시지의 dlc
m[m_id]['dlc_error'] = 0 # 메시지별로 dlc가 일정해야 한다. 그렇지 않으면 에러로 판정한다.
m[m_id]['d_ts_max'] = 0.0 # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 최대값
m[m_id]['d_ts_min'] = 1_000_000.0 # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 최소값
m[m_id]['d_ts_mean'] = 0.0 # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 평균값
m[m_id]['n_msg'] = 0 # blf 파일에 포함된 m_id 메시지의 개수
m[m_id]['ts_1st'] = ts # blf 파일에 포함된 첫 m_id 메시지의 ts d_ts_mean을 구할 때 사용한다.
else:
d_ts = ts - m[m_id]['ts_prev']
m[m_id]['d_ts_max'] = max(d_ts, m[m_id]['d_ts_max'])
m[m_id]['d_ts_min'] = min(d_ts, m[m_id]['d_ts_min'])
m[m_id]['ts_prev'] = ts
m[m_id]['n_msg'] += 1
if m[m_id]['dlc'] != dlc:
m[m_id]['dlc_error'] += 1
if log.object_count == 0:
print("The BLF file is empty.")
return m
except Exception as e:
print(f"Error reading BLF file: {e}")
return m
return m # get_m_id_dlc_d_ts()
def m_to_df(m) -> pd.DataFrame:
'''
convert m to dataframe
'''
df = pd.DataFrame(m).T
df.index.name = 'm_id_int'
df = df.reset_index()
# m_id는 일반적으로 hex로 표시한다.
df['m_id'] = df['m_id_int'].apply(lambda x: f'0x{x:03X}')
df['dlc'] = df['dlc'].astype(int)
df['dlc_error'] = df['dlc_error'].astype(int)
df['d_ts_mean'] = (df['ts_prev'] - df['ts_1st']) / df['n_msg']
df['d_ts_dev_upper'] = df['d_ts_max'] - df['d_ts_mean']
df['d_ts_dev_lower'] = df['d_ts_mean'] - df['d_ts_min']
df['d_ts'] = df['d_ts_mean'].round(2)
return df
def plot_d_ts_distribution_per_d_ts(df, d_ts_target):
'''
m_id < 0x700 (진단 메시지가 아닌닌) 메시지들만 대상으로 한다.
df에서 전송 주기(d_ts)가 d_ts_target인 메시지들만 그래프로 출력한다.
'''
filt = ((df['m_id_int'] < 0x700) & (df['d_ts'] == d_ts_target))
df_to_plot = df.loc[filt, :].sort_values('m_id')
# df_to_plot['m_id'] = df_to_plot['m_id'].astype(str)
fig = px.scatter(
df_to_plot,
x="m_id",
y="d_ts_mean",
error_y="d_ts_dev_upper",
error_y_minus="d_ts_dev_lower",
title=f'주기 = {d_ts_target:.3f}sec<br>메시지 수 = {len(df_to_plot)}'
)
fig.update_layout(xaxis_type='category')
return fig
def write_m_id_list(m_id_list, n_m_id_per_line) -> None:
'''
m_id를 한 줄에 n_m_id_per_line개씩 출력한다.
'''
s = ''
for i, m_id in enumerate(m_id_list, start=1):
s += f'{m_id} '
if i % n_m_id_per_line == 0:
st.write(s)
s = ''
st.write(s)
def main():
if blf is not None:
# read blf and extract m_id, dlc, d_ts related data
m = get_m_id_dlc_d_ts(blf=blf)
# convert dictionary m to dataframe df
df = m_to_df(m=m)
# display all m_id's
m_id_all = df['m_id'].to_list()
n_m_id_all = len(m_id_all)
st.divider()
st.header('전체 메시지')
st.write(f'메시지 수 = {n_m_id_all:,}')
write_m_id_list(m_id_all, 10)
# display diag m_id's
m_id_diag = df.loc[df['m_id_int'] >= 0x700, 'm_id'].to_list()
n_m_id_diag = len(m_id_diag)
st.divider()
st.header('진단 통신 메시지')
st.write(f'메시지 수 = {n_m_id_diag:,}')
write_m_id_list(m_id_diag, 10)
# plot m_id per d_ts_mean and dlc
st.divider()
st.header('주기별 메시지')
filt = (df['m_id_int'] < 0x700)
df_to_plot = df.loc[filt, :]
fig_m_id_per_d_ts_mean = px.scatter(
df_to_plot,
x='d_ts_mean',
# y='dlc',
y='m_id',
color='m_id',
# size='dlc',
title='주기별 메시지',
log_x=True,
)
fig_m_id_per_d_ts_mean.update_layout(
yaxis_type='category',
showlegend=False,
)
st.plotly_chart(fig_m_id_per_d_ts_mean)
# plot d_ts_mean per d_ts and m_id
st.divider()
st.header('메시지별 주기 변동폭폭')
# 주기가 k_d_ts_max_to_plot(1.2sec) 미만인 경우만 그래프로 출력한다.
d_ts_to_plot = sorted([d_ts for d_ts in df['d_ts'].unique() if d_ts < k_d_ts_max_to_plot])
for d_ts in d_ts_to_plot:
fig = plot_d_ts_distribution_per_d_ts(df, d_ts)
st.plotly_chart(fig, use_container_width=True)
# display df
st.divider()
st.header('메시지별, dlc, 주기 상세')
st.dataframe(
df[k_coi]
.sort_values('m_id')
.style
.format(
{
'd_ts':'{:,.3f}',
'd_ts_mean':'{:,.3f}',
'd_ts_max':'{:,.3f}',
'd_ts_min':'{:,.3f}',
'n_msg': '{:,.0f}',
}
), use_container_width = True,
)
# main()
## get user input
st.header('blf 분석')
# blf_path = Path(r"C:\data\tosun\projects\uds_vehicle_config\Logging\Bus\uds_vehicle_config_2025_01_19_10_34_14.blf")
blf = st.file_uploader('choose a blf file', type='.blf')
button_calc_clicked = st.button('calc')
## process data
if button_calc_clicked:
main()
웹앱 실행
- 웹앱을 실행하면 blf 파일을 업로드할 수 있는 gui가 있다. "Browse files" 버튼을 클릭하여 blf 파일을 선택하거나, blf 파일을 드래그&드롭한다.
- "calc" 버튼을 클릭하면 계산을 시작한다.
- 전체 메시지와 진단 통신 메시지를 표시한다. 진단 통신 메시지는 m_id가 0x700 이상인 메시지이다.
- 주기별 메시지 그래프를 출력한다. 주기가 1.2sec 미만인 메시지들만 표시된다.
- 메시지별 전송 주기의 변동폭을 그래프로 출력한다. 0x340 메시지의 경우 주기의 변동폭이 다른 메시지들에 비해 크다는 것을 확인할 수 있다.
- blf 에서 추출한 m_id, dlc, d_ts의 상세 결과를 볼 수 있다.
결론
- blf 통신 데이터 파일을 읽어서 통신 관련 기본 데이터인 m_id, dlc, 전송 주기를 추출하고 결과를 텍스트와 그래프로 출력하는 간단한 웹앱을 작성했다.
- python-can 모듈을 이용하여 asc로 변환하지 않고 blf를 직접 읽어서 데이터 처리를 해보았다. python-can의 사용법이 쉽고, 직관적이다. 프로그램 실행도 asc 파일을 읽는 것보다 빠른 것 같다.
참고
- Jupyter 노트북을 이용해서 위 계산 방법을 개발하였다.