ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • blf 파일에서 m_id, dlc, d_ts 추출하기
    tip 2025. 1. 19. 19:17

    목적

    • TSMaster로 받은 차량 통신 데이터 파일 (blf)를 분석하여 메시지 아이디 (m_id), 데이터 길이 (dlc, data length code), 전송 주기(d_ts. ts = timestamp, d_ts는 ts의 차이)를 구하는 미니프로그램을 만든다.
    • 그동안 나는 blf 파일을 직접 처리할 몰랐다. 그래서 blf를 TSMaster의 Log Converter 기능을 이용하여 텍스트 형식의 asc 파일로 변환한 후, asc 파일을 읽어 데이터를 처리했다. 불편하다. 이 기회에 blf 파일을 직접 다루는 방법을 배운다.

     

    blf 파일 읽기

    • m_id, dlc, d_ts를 구하기 위해서는 blf를 읽기만 하면 된다.
    • blf 파일을 읽을 수 있는 Python 모듈을 찾아보았다. python-can 이라는 모듈을 발견하였다. 아래 명령으로 설치한다.
    pip install -U python-can
    • blf 파일에서 메시지를 하나씩 읽어 사용하는 방법은 아래와 같다. (매우 쉽고 직관적이다.)
    import can    # python-can 모듈을 사용한다.
    
    log = can.io.BLFReader(blf)
    for msg in log:
        ts = msg.timestamp          # timestamp 읽기 
        m_id = msg.arbitration_id   # 아이디 읽기
        dlc = msg.dlc               # dlc 읽기

     

     

    데이터 처리

    • 아래 코드와 같다. 코드의 주석으로 설명한다.
    • streamlit으로 웹브라우저에서 동작하는 웹앱으로 만들었다. 웹앱 실행 명령은 아래와 같다.
    streamlit run blf_analysis_st.py
    ## blf analysis
    # - blf 파일을 읽어 아래 항목들을 추출한다.
    #   - m_id들은?
    #   - m_id별 전송 주기는?
    #   - m_id별 dlc는?
    #   - 진단 통신 m_id들은?
    
    
    
    # import
    import can                  # blf 파일을 읽기 위해 python-can을 사용한다. 
    import pandas as pd         # 데이터 처리를 위해 pandas를 사용한다.
    import plotly.express as px # 그래프 출력을 위해 plotly를 사용한다.
    import streamlit as st      # 웹앱으로 만들기 위해 streamlit을 사용한다. 
    
    
    
    # constant
    k_tolerance_d_ts_upper = 0.1
    k_tolerance_d_ts_lower = -0.1
    k_n_id_per_line = 10
    k_coi = ['m_id', 'dlc', 'dlc_error', 'd_ts', 'd_ts_mean', 'd_ts_max', 'd_ts_min', 'n_msg']
    k_d_ts_max_to_plot = 1.2
    
    
    # function
    def get_m_id_dlc_d_ts(blf) -> dict:
        '''
        blf 파일에 포함된 m_id들을 추출한다.
        m_id별로 dlc를 구한다.
        m_id별로 d_ts를 구한다.
        m_id별로 구한 dlc, d_ts를 딕셔너리 m에 저장하고, m을 반환한다.  
    
        원본 코드 출처:
        https://stackoverflow.com/questions/77266326/how-can-i-open-a-blf-file-by-using-python-script
        
        '''
    
        m = {}
    
        try:
            log = can.io.BLFReader(blf)
            for msg in log:
                ts = msg.timestamp
                m_id = msg.arbitration_id
                dlc = msg.dlc
    
                if m_id not in m:
                    # initialize m
                    m[m_id] = {}
                    m[m_id]['ts_prev'] = ts             # 지난 번 처리한 m_id 메시지의 ts
                    m[m_id]['dlc'] = dlc                # m_id 메시지의 dlc
                    m[m_id]['dlc_error'] = 0            # 메시지별로 dlc가 일정해야 한다. 그렇지 않으면 에러로 판정한다.
                    m[m_id]['d_ts_max'] = 0.0           # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 최대값
                    m[m_id]['d_ts_min'] = 1_000_000.0   # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 최소값
                    m[m_id]['d_ts_mean'] = 0.0          # 지난 m_id 메시지 ts와 이번 m_id 메시지 ts의 차이의 평균값
                    m[m_id]['n_msg'] = 0                # blf 파일에 포함된 m_id 메시지의 개수
                    m[m_id]['ts_1st'] = ts              # blf 파일에 포함된 첫 m_id 메시지의 ts d_ts_mean을 구할 때 사용한다.
                else:
                    d_ts = ts - m[m_id]['ts_prev']      
                    m[m_id]['d_ts_max'] = max(d_ts, m[m_id]['d_ts_max'])
                    m[m_id]['d_ts_min'] = min(d_ts, m[m_id]['d_ts_min'])
                    m[m_id]['ts_prev'] = ts
                    m[m_id]['n_msg'] += 1
                    if m[m_id]['dlc'] != dlc:
                        m[m_id]['dlc_error'] += 1
    
            if log.object_count == 0:
                print("The BLF file is empty.")
                return m
    
        except Exception as e:
            print(f"Error reading BLF file: {e}")
            return m
    
        return m    # get_m_id_dlc_d_ts()
    
    
    def m_to_df(m) -> pd.DataFrame:
        '''
        convert m to dataframe
        '''
     
        df = pd.DataFrame(m).T
        df.index.name = 'm_id_int'
        df = df.reset_index()
    
        # m_id는 일반적으로 hex로 표시한다. 
        df['m_id'] = df['m_id_int'].apply(lambda x: f'0x{x:03X}')
    
        df['dlc'] = df['dlc'].astype(int)
        df['dlc_error'] = df['dlc_error'].astype(int)
    
        df['d_ts_mean'] = (df['ts_prev'] - df['ts_1st']) / df['n_msg']
    
        df['d_ts_dev_upper'] = df['d_ts_max'] - df['d_ts_mean']
        df['d_ts_dev_lower'] = df['d_ts_mean'] - df['d_ts_min']
    
        df['d_ts'] = df['d_ts_mean'].round(2)
    
        return df
    
    
    def plot_d_ts_distribution_per_d_ts(df, d_ts_target):
        '''
        m_id < 0x700 (진단 메시지가 아닌닌) 메시지들만 대상으로 한다.
        df에서 전송 주기(d_ts)가 d_ts_target인 메시지들만 그래프로 출력한다.
        '''
    
        filt = ((df['m_id_int'] < 0x700) & (df['d_ts'] == d_ts_target))
        df_to_plot = df.loc[filt, :].sort_values('m_id')
        # df_to_plot['m_id'] = df_to_plot['m_id'].astype(str)
        
        fig = px.scatter(
            df_to_plot, 
            x="m_id", 
            y="d_ts_mean",
            error_y="d_ts_dev_upper", 
            error_y_minus="d_ts_dev_lower",
            title=f'주기 = {d_ts_target:.3f}sec<br>메시지 수 = {len(df_to_plot)}'
        )
        fig.update_layout(xaxis_type='category')
    
        return fig
    
    
    def write_m_id_list(m_id_list, n_m_id_per_line) -> None:
        '''
        m_id를 한 줄에 n_m_id_per_line개씩 출력한다.
        '''
        s = ''
        for i, m_id in enumerate(m_id_list, start=1):
            s += f'{m_id}   '
            if i % n_m_id_per_line == 0:
                st.write(s)
                s = ''
        st.write(s)
    
    
    def main():
    
        if blf is not None:
            
            # read blf and extract m_id, dlc, d_ts related data
            m = get_m_id_dlc_d_ts(blf=blf)
    
            # convert dictionary m to dataframe df
            df = m_to_df(m=m)
    
            # display all m_id's
            m_id_all = df['m_id'].to_list()
            n_m_id_all = len(m_id_all)
    
            st.divider()
            st.header('전체 메시지')
            st.write(f'메시지 수 = {n_m_id_all:,}')
            write_m_id_list(m_id_all, 10)
    
            # display diag m_id's
            m_id_diag = df.loc[df['m_id_int'] >= 0x700, 'm_id'].to_list()
            n_m_id_diag = len(m_id_diag)
    
            st.divider()
            st.header('진단 통신 메시지')
            st.write(f'메시지 수 = {n_m_id_diag:,}')
            write_m_id_list(m_id_diag, 10)
    
            # plot m_id per d_ts_mean and dlc
            st.divider()
            st.header('주기별 메시지')
            filt = (df['m_id_int'] < 0x700)
            df_to_plot = df.loc[filt, :]
            fig_m_id_per_d_ts_mean = px.scatter(
                df_to_plot,
                x='d_ts_mean',
                # y='dlc',
                y='m_id',
                color='m_id',
                # size='dlc',
                title='주기별 메시지',
                log_x=True,
            )
            fig_m_id_per_d_ts_mean.update_layout(
                yaxis_type='category',
                showlegend=False,
            )
            st.plotly_chart(fig_m_id_per_d_ts_mean)
    
            # plot d_ts_mean per d_ts and m_id
            st.divider()
            st.header('메시지별 주기 변동폭폭')
            
            # 주기가 k_d_ts_max_to_plot(1.2sec) 미만인 경우만 그래프로 출력한다.
            d_ts_to_plot = sorted([d_ts for d_ts in df['d_ts'].unique() if d_ts < k_d_ts_max_to_plot])
            for d_ts in d_ts_to_plot: 
                fig = plot_d_ts_distribution_per_d_ts(df, d_ts)
                st.plotly_chart(fig, use_container_width=True)
    
            # display df
            st.divider()
            st.header('메시지별, dlc, 주기 상세')
            st.dataframe(
                df[k_coi]
                .sort_values('m_id')
                .style
                .format(
                    {
                        'd_ts':'{:,.3f}', 
                        'd_ts_mean':'{:,.3f}', 
                        'd_ts_max':'{:,.3f}', 
                        'd_ts_min':'{:,.3f}',
                        'n_msg': '{:,.0f}',
                    }
                ), use_container_width = True,
            )
    
    
    # main()
    
    ## get user input
    st.header('blf 분석')
    # blf_path = Path(r"C:\data\tosun\projects\uds_vehicle_config\Logging\Bus\uds_vehicle_config_2025_01_19_10_34_14.blf")
    blf = st.file_uploader('choose a blf file', type='.blf')
    button_calc_clicked = st.button('calc')
    
    ## process data
    if button_calc_clicked:
        main()

     

    blf_analysis_st.py
    0.01MB

     

     

    웹앱 실행

    • 웹앱을 실행하면 blf 파일을 업로드할 수 있는 gui가 있다. "Browse files" 버튼을 클릭하여 blf 파일을 선택하거나, blf 파일을 드래그&드롭한다.
    • "calc" 버튼을 클릭하면 계산을 시작한다.

    Browse files 버튼으로 blf 파일을 선택한다. calc 버튼으로 계산을 시작한다.

    •  전체 메시지와 진단 통신 메시지를 표시한다. 진단 통신 메시지는 m_id가 0x700 이상인 메시지이다.

    blf에 포함된 메시지 아이디들과 진단 통신 메시지 아이디들을 표시한다.

     

    • 주기별 메시지 그래프를 출력한다. 주기가 1.2sec 미만인 메시지들만 표시된다. 

    주기별로 메시지 아이디를 그래프로 출력한다.

     

    • 메시지별 전송 주기의 변동폭을 그래프로 출력한다. 0x340 메시지의 경우 주기의 변동폭이 다른 메시지들에 비해 크다는 것을 확인할 수 있다.

    메시지별로 전송 주기의 변동폭을 그래프로 출력한다.

    • blf 에서 추출한 m_id, dlc, d_ts의 상세 결과를 볼 수 있다. 

    blf에서 추출한 m_id, dlc, d_ts의 상세를 출력한다.

     

     

    결론

    • blf 통신 데이터 파일을 읽어서 통신 관련 기본 데이터인 m_id, dlc, 전송 주기를 추출하고 결과를 텍스트와 그래프로 출력하는 간단한 웹앱을 작성했다.   
    • python-can 모듈을 이용하여 asc로 변환하지 않고 blf를 직접 읽어서 데이터 처리를 해보았다. python-can의 사용법이 쉽고, 직관적이다. 프로그램 실행도 asc 파일을 읽는 것보다 빠른 것 같다.

     

     

    참고

    • Jupyter 노트북을 이용해서 위 계산 방법을 개발하였다. 

    blf_analysis.ipynb
    0.18MB