ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • filter_blf.py
    tip 2026. 6. 16. 09:52

    배경

    '트레이스를 이용한 RBS(Remaining Bus Simulation)' 수정하기에서 asc 파일에서 지정한 아이디의 메시지들을 필터해서 새 asc 파일로 저장하는 파이썬 스크립트를 작성했다. (정확히는 claude(클로드)에게 시켜서 작성했다.)

    python-can 이라는 패키지로 blf 파일을 직접 다룰 수 있는 것을 알게되었다. blf 파일에서 지정한 아이디의 메시지들을 필터해서 새 blf 파일로 저장할 수 있다면, blf를 asc로 변환하는 수고와 시간을 덜 수 있다. 드라이브 안에 blf 파일과 asc 파일을 보관하느라 낭비되는 저장 공간도 절약할 수 있다. 

    filter_asc.py를 기본으로 filter_blf.py를 클로드에게 시켜서 작성했다.

     

    개요

    • FRD (Functional Requirement Document. 요구기능서)
    • 코드

     

    FRD

    • 클로드 채팅창에 프람프팅을 반복해서 코드를 개발할 수도 있지만, FRD를 작성한 후에, 클로드에게 FRD를 읽고 피드백을 요청하는 방식이 나와 잘 맞는다. 
    • 처음 FRD에는 제목, 배경, 목적 등만 작성하고, 클로드와 대화하면서 중간중간에 클로드에게 FRD 업데이트 요청한다. 아래는 그렇게해서 작성한 FRD이다. (tstory는 md (markdown)파일을 삽입할 수 없고 html 파일을 삽입할 수 있다. 클로드에게 요청하여 md 파일을 html로 변환하였다.)

    FRD (Functional Requirement Document) filter_blf.py

    1. 목적

    • filter_blf.py는 입력 BLF 파일에서 지정한 CAN ID를 기준으로 메시지를 필터링하고, 결과를 새로운 BLF 파일로 저장한다.
    • 기존 filter_asc.py와 동일한 사용자 경험을 유지하되, 중간에 BLF를 ASC로 변환하는 수작업을 제거한다.

    2. 배경

    • BLF는 CAN 버스 로그를 저장하는 바이너리 파일 포맷이다.
    • ASC는 CAN 버스 로그를 저장하는 텍스트 파일 포맷이다.
    • 현재 작업 흐름은 다음과 같다.
    • TSMaster로 BLF를 저장한다.
    • TSMaster 변환기로 BLF를 ASC로 변환한다.
    • filter_asc.py로 특정 ID만 통과시키거나 제외한다.
    • 이 작업 흐름은 변환 단계가 추가되어 번거롭고, 대용량 로그일수록 비효율적이다.
    • python-can 패키지는 BLF 읽기/쓰기 기능을 제공하므로, BLF를 직접 필터링하는 도구를 구현할 수 있다.

    3. 범위

    3.1 포함 범위

    • .blf 파일 1개를 입력받아 .blf 파일 1개를 생성한다.
    • 지정한 CAN ID 목록을 기준으로 pass 또는 stop 필터를 수행한다.
    • 입력 파일의 메시지 순서를 유지한다.
    • 입력 파일의 각 메시지 timestamp, channel, arbitration ID, DLC, data, 확장 ID 여부, remote/error frame 정보를 가능한 한 유지하여 출력한다.
    • 실행 중 시작/진행/완료 상태를 콘솔에 출력한다.

    3.2 제외 범위

    • 여러 입력 파일을 한 번에 처리하는 기능은 포함하지 않는다.
    • GUI는 포함하지 않는다.
    • DBC 해석, 신호 디코딩, 통계 출력은 포함하지 않는다.
    • BLF 내부의 모든 비-CAN 오브젝트를 완전하게 보존하는 것은 범위에서 제외한다.
    • 근거: python-can의 BLF reader는 CAN 메시지와 error frame 중심으로 동작하며, 기타 오브젝트는 무시될 수 있다.

    4. 용어

    • pass: 지정한 CAN ID 목록에 포함된 메시지만 출력에 포함한다.
    • stop: 지정한 CAN ID 목록에 포함된 메시지를 출력에서 제외한다.
    • CAN ID: 필터 대상 arbitration ID를 의미한다.

    5. 사용자 시나리오

    5.1 pass 시나리오

    • 사용자는 BLF 파일과 pass, 그리고 CAN ID 목록을 입력한다.
    • 프로그램은 입력 BLF에서 해당 ID의 메시지만 남긴 BLF를 생성한다.

    5.2 stop 시나리오

    • 사용자는 BLF 파일과 stop, 그리고 CAN ID 목록을 입력한다.
    • 프로그램은 입력 BLF에서 해당 ID의 메시지를 제거한 BLF를 생성한다.

    6. 명령행 인터페이스 요구사항

    6.1 실행 형식

    프로그램은 아래 형식으로 실행된다.

    
    python filter_blf.py <input_file> <filter_type> <filter_ids...>
    

    6.2 인자 정의

    • input_file
    • 필수
    • 입력 BLF 파일 경로
    • .blf 확장자를 가져야 한다.
    • filter_type
    • 필수
    • 허용값: pass, stop
    • filter_ids
    • 필수
    • 1개 이상의 CAN ID
    • 공백으로 구분하여 여러 개 입력할 수 있다.

    6.3 사용 예시

    
    python filter_blf.py 20240725_venue2024_07_26_09_17_29.blf pass 220 2B0 386
    python filter_blf.py 20240725_venue2024_07_26_09_17_29.blf pass 0x220 std:2B0 ext:18FF50E5
    python filter_blf.py 20240725_venue2024_07_26_09_17_29.blf stop 220 2B0 386
    

    7. 필터 규칙

    7.1 ID 비교 규칙

    • 사용자가 입력한 ID는 아래 세 가지 형식을 허용한다.
    • 220
    • 0x220
    • std:220, std:0x220, ext:220, ext:0x220
    • ID 값은 16진수 문자열로 해석한다.
    • 220은 10진수 220이 아니라 16진수 0x220으로 간주한다.
    • std:ext: 접두사는 대소문자를 구분하지 않는다.
    • 예: STD:220, Ext:220도 허용한다.
    • 접두사가 없으면 표준 ID로 간주한다.
    • 220std:220과 동일하다.
    • 0x220std:0x220과 동일하다.
    • 내부 비교 시에는 (arbitration_id, is_extended_id) 조합으로 비교한다.

    7.2 표준 ID와 확장 ID 판정 규칙

    • std: 접두사가 있으면 표준 ID 필터로 해석한다.
    • ext: 접두사가 있으면 확장 ID 필터로 해석한다.
    • 접두사가 없으면 표준 ID 필터로 해석한다.
    • 표준 ID와 확장 ID는 숫자값이 같아도 서로 다른 대상으로 취급한다.
    • 예: 220ext:220은 서로 다른 필터 조건이다.

    7.3 메시지 판정 규칙

    • 각 메시지의 arbitration ID와 확장 ID 여부를 기준으로 필터링한다.
    • pass일 경우:
    • 메시지의 (arbitration_id, is_extended_id)가 입력 ID 목록에 있으면 출력한다.
    • 없으면 출력하지 않는다.
    • stop일 경우:
    • 메시지의 (arbitration_id, is_extended_id)가 입력 ID 목록에 있으면 출력하지 않는다.
    • 없으면 출력한다.

    8. 입출력 요구사항

    8.1 입력 파일

    • 입력 파일은 존재해야 한다.
    • 입력 파일은 읽을 수 있어야 한다.
    • 입력 파일 확장자는 .blf여야 한다.

    8.2 출력 파일명

    • 출력 파일은 입력 파일과 동일한 디렉터리에 생성한다.
    • 출력 파일명 규칙은 기존 filter_asc.py와 동일하게 한다.
    
    <입력파일명_without_extension>_<filter_type>_filtered.blf
    
    • 예:
    • 입력: sample.blf
    • pass 출력: sample_pass_filtered.blf
    • stop 출력: sample_stop_filtered.blf

    8.3 출력 파일 덮어쓰기

    • 동일한 출력 파일명이 이미 존재하면 덮어쓴다.
    • 덮어쓰기 전 별도 확인 프롬프트는 제공하지 않는다.

    8.4 출력 내용

    • 출력에는 필터 조건을 통과한 CAN 메시지만 기록한다.
    • 메시지의 기록 순서는 입력 파일과 동일해야 한다.
    • 메시지 timestamp는 입력에서 읽은 값을 유지한다.
    • writer 종료 시 파일이 정상적으로 flush/close 되어야 한다.

    9. 오류 처리 요구사항

    9.1 입력 파일 오류

    • 입력 파일이 없으면 오류 메시지를 출력하고 종료한다.
    • 입력 파일 확장자가 .blf가 아니면 오류 메시지를 출력하고 종료한다.
    • 입력 파일을 열 수 없으면 오류 메시지를 출력하고 종료한다.

    9.2 인자 오류

    • filter_typepass 또는 stop이 아니면 argparse 수준에서 오류 처리한다.
    • filter_ids가 하나도 없으면 argparse 수준에서 오류 처리한다.
    • filter_ids 중 허용된 형식에 맞지 않는 값이 있으면 어떤 값이 잘못되었는지 포함한 오류 메시지를 출력하고 종료한다.
    • filter_ids 중 ID 부분이 16진수로 해석할 수 없는 값이 있으면 어떤 값이 잘못되었는지 포함한 오류 메시지를 출력하고 종료한다.
    • std: 또는 ext: 외의 접두사는 허용하지 않고 오류로 처리한다.
    • 표준 ID로 해석된 값이 0x7FF를 초과하면 오류로 처리한다.
    • 확장 ID로 해석된 값이 0x1FFFFFFF를 초과하면 오류로 처리한다.

    9.3 처리 중 오류

    • BLF 읽기 또는 쓰기 도중 예외가 발생하면 오류 메시지를 출력하고 비정상 종료한다.
    • 가능한 경우 부분 생성된 출력 파일은 남기지 않는 것을 목표로 한다.

    10. 비기능 요구사항

    10.1 성능

    • 대용량 BLF 파일도 처리할 수 있도록 스트리밍 방식으로 읽고 쓴다.
    • 전체 메시지를 메모리에 모두 적재한 뒤 처리하지 않는다.

    10.2 의존성

    • Python 3 환경에서 실행 가능해야 한다.
    • 외부 패키지로 python-can을 사용한다.
    • tqdm이 설치되어 있으면 바이트 단위 진행률 바를 함께 표시한다.
    • tqdm은 선택 의존성(optional)이며, 미설치 시에도 기본 진행률 출력으로 정상 동작해야 한다.

    10.3 사용성

    • 성공 시 생성된 출력 파일 경로를 표준 출력으로 안내한다.
    • 실패 시 사용자가 원인을 알 수 있는 수준의 오류 메시지를 표준 오류 또는 표준 출력으로 제공한다.
    • 처리 시작 시 입력/출력 파일과 모드를 출력한다.
    • 처리 중 진행률을 5% 단위로 출력한다.
    • 진행률은 BLF 리더의 실제 파일 읽기 위치를 기준으로 계산한다.
    • 출력 형식은 텍스트 프로그레스바와 퍼센트를 포함한다.
    • 처리 완료 시 진행률 100%를 출력하고, 처리 건수/기록 건수를 함께 안내한다.

    11. 구현 가이드

    • python-canLogReader 또는 BLFReader로 입력 파일을 순회한다.
    • python-canLogger 또는 BLFWriter로 출력 파일을 생성한다.
    • 메시지를 한 건씩 읽으면서 즉시 필터링 후 기록한다.
    • writer는 반드시 명시적으로 종료하여 파일이 손상되지 않도록 한다.
    • 진행률 계산은 메시지 추정 크기가 아닌 BLF 리더의 파일 포인터 위치를 사용한다.
    • tqdm이 설치되어 있으면 바이트 단위 진행률 바를 표시하고, 미설치 시 5% 단위 텍스트 진행률을 표시한다.

    12. 수용 기준

    아래 조건을 모두 만족하면 요구사항을 충족한 것으로 본다.

    1. .blf 파일을 입력받아 실행할 수 있다.
    2. pass 실행 시 지정한 ID만 포함된 .blf가 생성된다.
    3. stop 실행 시 지정한 ID가 제외된 .blf가 생성된다.
    4. 출력 파일명은 &lt;원본파일명&gt;_&lt;filter_type&gt;_filtered.blf 규칙을 따른다.
    5. 출력 BLF는 python-can 또는 TSMaster에서 다시 열 수 있다.
    6. 메시지 순서가 입력과 동일하다.
    7. 2200x220은 동일한 표준 ID로 처리된다.
    8. ext:220은 표준 220과 다른 대상으로 처리된다.
    9. 잘못된 입력 파일 또는 잘못된 ID 형식에 대해 오류 메시지를 출력한다.
    10. 실행 시작 시 시작 메시지가 출력된다.
    11. 실행 중 5% 단위 진행률 메시지가 출력된다.
    12. 실행 종료 시 진행률 100%와 처리/기록 건수가 출력된다.

    13. 후속 개선 후보

    • 출력 파일명 사용자 지정 옵션 추가
    • 여러 BLF 파일 일괄 처리
    • 필터링 결과 요약 통계 출력
    • md 파일은 첨부한다.

    FRD_filter_blf.md
    0.01MB

     

     

    코드

    • 코드는 아래와 같다. 
    import argparse
    import os
    import sys
    from typing import Any
    
    try:
        import can as can_module
    except ImportError as exc:  # pragma: no cover - runtime environment dependent
        can_module = None
        CAN_IMPORT_ERROR = exc
    else:
        CAN_IMPORT_ERROR = None
    
    try:
        from tqdm import tqdm
    except ImportError:
        tqdm = None
    
    
    def parse_filter_id(raw_token):
        """Parse one filter token into (arbitration_id, is_extended_id)."""
        token = raw_token.strip()
        if not token:
            raise ValueError("Empty filter ID is not allowed")
    
        is_extended_id = False
        value_text = token
    
        if ":" in token:
            prefix, value_text = token.split(":", 1)
            prefix_lower = prefix.lower()
            if prefix_lower == "std":
                is_extended_id = False
            elif prefix_lower == "ext":
                is_extended_id = True
            else:
                raise ValueError(
                    f"Invalid filter ID '{raw_token}': only 'std:' or 'ext:' prefixes are allowed"
                )
    
        if not value_text:
            raise ValueError(f"Invalid filter ID '{raw_token}': missing hexadecimal ID value")
    
        try:
            arbitration_id = int(value_text, 16)
        except ValueError as exc:
            raise ValueError(
                f"Invalid filter ID '{raw_token}': hexadecimal value expected"
            ) from exc
    
        if arbitration_id < 0:
            raise ValueError(f"Invalid filter ID '{raw_token}': negative values are not allowed")
    
        if is_extended_id:
            if arbitration_id > 0x1FFFFFFF:
                raise ValueError(
                    f"Invalid filter ID '{raw_token}': extended ID must be <= 0x1FFFFFFF"
                )
        else:
            if arbitration_id > 0x7FF:
                raise ValueError(
                    f"Invalid filter ID '{raw_token}': standard ID must be <= 0x7FF"
                )
    
        return arbitration_id, is_extended_id
    
    
    def validate_input_file(input_file):
        if not input_file.lower().endswith(".blf"):
            raise ValueError("Input file must have a .blf extension")
        if not os.path.exists(input_file):
            raise FileNotFoundError(f"Input file not found: {input_file}")
        if not os.path.isfile(input_file):
            raise ValueError(f"Input path is not a file: {input_file}")
    
    
    def build_output_file(input_file, filter_type):
        file_name, _ = os.path.splitext(input_file)
        return f"{file_name}_{filter_type}_filtered.blf"
    
    
    def should_write_message(message, filter_set, filter_type):
        key = (message.arbitration_id, bool(message.is_extended_id))
        matched = key in filter_set
        return matched if filter_type == "pass" else not matched
    
    
    def format_progress_bar(percent, width=30):
        filled = int(width * percent / 100)
        return f"[{'#' * filled}{'-' * (width - filled)}]"
    
    
    def get_reader_position(reader):
        file_obj = getattr(reader, "file", None)
        if file_obj is None:
            return None
        tell = getattr(file_obj, "tell", None)
        if tell is None:
            return None
        try:
            return int(tell())
        except Exception:
            return None
    
    
    def filter_blf_messages(can_api: Any, input_file, output_file, filter_set, filter_type, progress_step=5):
        processed_count = 0
        written_count = 0
        total_file_size = os.path.getsize(input_file)
        next_progress_percent = progress_step
        pbar = None
        last_position = 0
    
        try:
            with can_api.LogReader(input_file) as reader, can_api.Logger(output_file) as writer:
                # Prefer BLF header file_size when available.
                header_file_size = getattr(reader, "file_size", None)
                if isinstance(header_file_size, int) and header_file_size > 0:
                    total_file_size = header_file_size
    
                last_position = get_reader_position(reader) or 0
    
                if tqdm is not None:
                    pbar = tqdm(
                        total=total_file_size,
                        unit="B",
                        unit_scale=True,
                        desc="Filtering",
                        leave=True,
                    )
    
                for message in reader:
                    processed_count += 1
    
                    if should_write_message(message, filter_set, filter_type):
                        writer.on_message_received(message)
                        written_count += 1
    
                    current_position = get_reader_position(reader)
                    if current_position is not None and current_position > last_position:
                        delta = current_position - last_position
                        last_position = current_position
    
                        if pbar is not None:
                            pbar.update(delta)
    
                        if total_file_size > 0 and progress_step > 0:
                            percent = min(int((current_position * 100) / total_file_size), 99)
                            while next_progress_percent <= percent:
                                bar = format_progress_bar(next_progress_percent)
                                print(
                                    f"Progress {bar} {next_progress_percent:3d}% "
                                    f"(processed {processed_count:,}, wrote {written_count:,})",
                                    flush=True,
                                )
                                next_progress_percent += progress_step
        except Exception:
            # Best effort cleanup for partial output files.
            if os.path.exists(output_file):
                try:
                    os.remove(output_file)
                except OSError:
                    pass
            raise
        finally:
            if pbar is not None:
                # Force complete bar rendering at the end.
                if pbar.n < pbar.total:
                    pbar.update(pbar.total - pbar.n)
                pbar.close()
    
        print(
            f"Progress {format_progress_bar(100)} 100% "
            f"(processed {processed_count:,}, wrote {written_count:,})",
            flush=True,
        )
    
        return processed_count, written_count
    
    
    def main():
        parser = argparse.ArgumentParser(
            description="Filter CAN messages in a .blf file based on CAN IDs."
        )
        parser.add_argument("input_file", help="Input .blf file")
        parser.add_argument(
            "filter_type", choices=["pass", "stop"], help="Filter type: pass or stop"
        )
        parser.add_argument(
            "filter_ids",
            nargs="+",
            help="CAN IDs to filter (e.g., 220, 0x220, std:220, ext:18FF50E5)",
        )
    
        args = parser.parse_args()
    
        if CAN_IMPORT_ERROR is not None or can_module is None:
            print(
                "Error: python-can is required. Install it with 'pip install python-can'.",
                file=sys.stderr,
            )
            return 1
    
        try:
            validate_input_file(args.input_file)
        except (FileNotFoundError, ValueError) as exc:
            print(f"Error: {exc}", file=sys.stderr)
            return 1
    
        try:
            filter_set = {parse_filter_id(token) for token in args.filter_ids}
        except ValueError as exc:
            parser.error(str(exc))
    
        output_file = build_output_file(args.input_file, args.filter_type)
    
        try:
            print(
                f"Start filtering: input={args.input_file}, output={output_file}, mode={args.filter_type}",
                flush=True,
            )
            processed_count, written_count = filter_blf_messages(
                can_module,
                args.input_file,
                output_file,
                filter_set,
                args.filter_type,
            )
        except Exception as exc:
            print(f"Error while processing BLF file: {exc}", file=sys.stderr)
            return 1
    
        print(
            f"Done: processed {processed_count:,} messages, wrote {written_count:,} messages",
            flush=True,
        )
        print(f"Filtered messages saved to {output_file}")
        return 0
    
    
    if __name__ == "__main__":
        raise SystemExit(main())
    • py 파일 첨부한다.

    filter_blf.py
    0.01MB

     

    • 아래 명령으로 필터가 잘 되는 지 점검해보았다.
    python filter_blf.py 20240725_venue2024_07_26_09_17_29.blf pass 220 2B0 0x386 ext:18FF50E5

    • 점검에 사용한 blf 파일을 첨부한다.

    20240725_venue2024_07_26_09_17_29.blf
    9.14MB

    • pass 설정한 아이디의 메시지들만 blf에 남았다.

    pass 필터 후에 blf의 메시지들. 0x220, 0x2B0, 0x386만 남았다.

     

    • 이 파일을 온라인 재생하면, blf 파일을 이용한 RBS를 할 수 있다. 

     

    결론

    • TSMaster 온라인 재생에 필터 기능이 있어서 blf를 필터하고 이를 재생하는 RBS를 할 필요가 없기는 하지만, 데이터 유출 방지를 위해서 blf에 보여주고 싶은 메시지들만 담아서 배포해야 할 경우가 있다면, 위와 같이 간단히 작성한 파이썬 스크립트를 이용할 수 있다.   

     

    hsl's tsmaster 사용기 목차 :: hsl's tsmaster 사용기    

    트레이스를 이용한 RBS(Remaining Bus Simulation) :: hsl's tsmaster 사용기

     

    'tip' 카테고리의 다른 글

    CAN dbc를 CAN-FD dbc로 변환하면서 배운 것  (1) 2025.12.09
    dbc2xlsx  (3) 2025.08.29
    dbc 파일에서 바이트 오더와 스타트 비트 위치와 LSB  (1) 2025.08.27
    dbc 파일 인코딩 (encoding)  (3) 2025.08.26
    xlsx2dbc  (1) 2025.08.22