ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 수신 --> 필터 --> 연산 --> 전송
    simulation 2025. 9. 2. 11:27

    시작하기 전에 

    • 제어기가 동작하기에 필요한 CAN 통신을 모사해 주는 것을 RBS (Rest/ Remaining Bus Simulation)라고 한다. RBS를 아래 포스트들에서 다뤘다.

    CAN 버스 시뮬레이션 (RBS: Remaining Bus Simulation) :: hsl's tsmaster 사용기(판넬 사용법 설명을 포함한다.)

    Symbol Mapping: CAN 신호로 계산한 결과를 시스템 변수에 넣기 :: hsl's tsmaster 사용기    

    Symbol Mapping: 시스템 변수를 CAN 신호로 전송하기 :: hsl's tsmaster 사용기    

    TSMaster API를 사용하여 CAN 메시지 송신하기 (RBS) :: hsl's tsmaster 사용기    

    Python 미니프로그램: 코드에서 CAN 메시지의 신호를 읽는 방법 :: hsl's tsmaster 사용기    

    미니프로그램에서 CAN 신호와 시스템 변수 다루기 :: hsl's tsmaster 사용기    

     

    • 제어기가 전송하는 메시지를 TSMaster가 받아서, 메시지의 데이터로 연산을 하고, 연산 결과 데이터를 제어기로 전송하는 RBS를 하는 방법을 설명해달라는 요청을 받았다.

    제어기가 전송한 메시지를 수신, 필터, 처리 후 전송하는 RBS를 설명한다.

    • 이 방법을 설명한다.

     

     

    개요

    • 데모 시나리오
    • 미니프로그램 작성
    • 데모 실행

     

     

    데모 시나리오

    • 개발 중인 제어기가 없어서 RBS 기능과 제어기 기능을 구현한다. 
    • 아래와 같이 2 채널 CAN-FD 장치의 채널 1과 채널 2를 직접 연결한다. 이렇게 하면 CAN 버스 1개가 있는 셈이다.

    CAN-FD 2차널 하드웨어의 채널 1과 2를 직접 연결한다.

     

     

     

    RBS 

    • TSMaster는 CAN 버스의 메시지를 수신한다. (수신)
    • 수신 메시지 아이디가 0x100에서 0x1FF 사이의 메시지를 대상으로 한다. (필터)
    • 수신 메시지 아이디가 짝수이면 각 데이터 바이트에 1을 더한다. 수신 메시지 아이디가 홀수이면 각 데이터 바이트에서 1을 뺀다. (연산)
    • 처리된 데이터를 수신 메시지 아이디 + 0x100을 아이디로 하는 메시지로 전송한다. (전송)
    • 채널 1을 사용하기로 한다.

    제어기

    • 주기적으로 CAN 메시지를 전송하도록 한다.
    • 메시지 아이디는 0xFC와 0x110 사이에서 랜덤이다. 0xFC, 0xFD, 0xFE, 0xFF는 처리하지 않는 것을 확인하기 위한 목적이다.
    • 메시지 길이는 8로 한다. 
    • 데이터를 첫 바이트를 랜덤으로 하고, 나머지 바이트는 앞 바이트 + 1로 한다. RBS 메시지를 보고 계산 결과를 쉽게 확인하기 위해서다.  
    • 채널 2를 사용하기로 한다.

    시험 편의

    • 시험의 편리를 위해서 ctrl-1 키를 누르면 제어기의 CAN 메시지 전송이 시작되고, ctrl-0 키를 누르면 제어기의 CAN 메시지 전송이 중단되도록 한다.

     

     

    미니프로그램 작성

    RBS

    • 메인 메뉴/ Design/ Python Mini Program을 클릭하여 Python Code Editor 창을 연다.

    • 메시지를 "수신할 때마다" 수신 데이터를 필터하고 처리하고 전송할 것이라서 On CAN Rx에 새 함수를 추가한다.     

    On CAN Rx 이벤트에서 마우스 우클릭하여 함수를 추가한다. CAN, CAN-FD 중에 필요에 따라 선택한다. 나는 CAN-FD를 선택하였다.
    어떤 메시지 아이디를 대상으로 할 것인지 선택할 수 있다. 아이디와 상관 없이 처리할 것이기 때문에 Id에서 * (Any Message)를 선택한다. Name은 any라고 했다.

    • 코드 창에 def on_canfd_rx_any(ACAN: RawCAN) --> None: 이라는 함수 프로파일이 생긴다. 

    • 코드 영역에 아래와 같이 코드를 입력한다. (코드 설명은 주석을 참조해 주십시오.) 
    def on_canfd_rx_any(ACAN: RawCAN) -> None:
    	## 수신
        # CAN 메시지가 수신되면 on_canfd_rx_any() 함수가 실행된다.
    
    
        ## 필터
    
        # 채널 1로 오는 메시지가 아니면 처리하지 않는다. 
        if (ACAN.idx_chn != CH1): # if you want to filter channel
            return
            
        # 아이디가 0x100 미만이면 처리하지 않는다.
        if ACAN.id < 0x100:
            return
        
        # 아이디가 0x199 초과면 처리하지 않는다.
        if ACAN.id > 0x1FF:
            return
            
        
        ## 연산
        
        # 전송할 메시지를 만든다.
        ACAN_new = RawCAN()
        ACAN_new = ACAN    
        
        # 전송할 메시지의 아이디는 수신 메시지 아이디 + 0x100으로 한다.
        ACAN_new.id = ACAN.id + 0x100
        
        # 수신 메시지 아이디가 짝수이면 
        if ACAN.id % 2 == 0:
            # 각 데이터 바이트에 1을 더한다.
            for idx in range(ACAN.dlc):
                
                # 데이터를 한 바이트씩 처리하는 경우,
                # 아래 변수 = 값 방법은 작동하지 않는다.
                # ACAN_new.data[idx] = (ACAN.data[idx] + idx) % 16
    
                # RawCAN에 마련된 메소드를 이용해야 한다.
                ACAN_new.set_data(idx, (ACAN.data[idx] + idx) % 16)
    
                # 디버깅 용도의 출력용도
                # app.log_text(f'{idx = }', lvlOK)
                # app.log_text(f'{ACAN_new.data[idx] = } {ACAN.data[idx] = }', lvlOK)
        # 수시 메시지 아이디가 홀수이면
        else:
            # 각 데이터 바이트에서 1을 뺀다
            for idx in range(ACAN.dlc):
                ACAN_new.set_data(idx, (ACAN.data[idx] - idx) % 16)
    
        ## 전송               
        if com.transmit_can_async(ACAN_new) != 0:  # 메시지를 전송한다.
            app.log_text(f"error: Tx {ACAN_new.id} {ACAN_new.data}", lvlError)
        
        return

     

    제어기

    • 메시지를 주기적으로 전송하도록 하기 위해서 timer를 만든다. 

    Timers에서 마우스 우클릭하여 Add Timer를 선택한다.
    메시지 전송을 위한 타이머라 Name을 tx라고 했다. Interval (ms)는 1000로 했다. 너무 빠르면 검산이 힘들다. timer_tx가 생선된다.

    • 1000msec의 타이머 timer_tx를 만들었으니, timer_tx가 1000msec를 칠 때마다 어떤 기능을 할 지를 정의해야 한다. On Timer 이벤트에 timer_tx를 연결하고, timer_tx가 실행할 함수를 정의한ㄷ.

    On Timer 이벤트에서 마무스 우클릭으로 Add On Timer를 선택한다.
    Timer Name에서 앞에서 정의해둔 timer_tx를 선택한다. Name에는 tx라고 입력했다.

     

    • 코드 창에 def on_timer_timer_tx() --> None: 이라는 함수 프로파일이 생긴다. 

    • 코드 영역에 아래와 같이 코드를 입력한다. (코드 설명은 주석을 참조해 주십시오.) 
    def on_timer_timer_tx() -> None:
    
        ## prepare tx message
        
        # CAN 메시지를 생성한다. 
        ACAN = RawCAN()
        
        # 채널 2를 사용한다. 
        ACAN.idx_chn = CH2
        
        # 메시지 아이디는 0xFC와 0x110 사이에서 임의로 정한다.
        ACAN.id = random.randint(0xFC, 0x110)
        
        # 메시지 길이는 8 바이트로 한다.
        ACAN.dlc = 8
        
        # 0 부터 7 사이의 임의의 시작값을 정한다.
        # 메지시 아이디가 짝수면 최대 + 8하게 된다. 
        # 7 + 8 = 15 = 0xF 이다. 검산이 쉽다. 
        i_start = random.randint(0, 7)
        
        # 시작값부터 데이터 자리가 증가하면서 1씩 증가하도록 한다.
        # ACAN.data를 통째로 변경할 때는 변수 = 값으로 해도 된다.  
        ACAN.data = [i + i_start for i in range(ACAN.dlc)]  
        
    
        ## transmit
        if com.transmit_can_async(ACAN) != 0:
            app.log_text(f"error: Tx {ACAN.id} {ACAN.data}", lvlError)
        
        return

     

    시험 편의

    • timer_tx를 시작시키면 1000msec 마다 CAN 메시지가 전송된다. timer_tx를 정지시키면 CAN 메시지 전송이 중단된다. 메시지 전송을 제어하면 실험하기에 편리하다. On Shortcut 이벤트를 이용하면 키보드 입력으로 메시지 전송을 제어할 수 있다.

    On Shortcut 이벤트에서 마우스 우클릭하여 Add On Shortcut을 선택하여 Shortcut을 추가한다.
    Shortcut 입력창을 선택하고 키보드에서 ctrl-1을 동시에 누르면 입력창에 Ctrl+1이 입력된다. Name을 ctrl_1이라고 했다.

    • 코드 창에 def on_shortcut_ctrl_1() --> None: 이라는 함수 프로파일이 생긴다. 

     

    • 코드 영역에 아래와 같이 코드를 입력한다. (코드 설명은 주석을 참조해 주십시오.) 
    def on_shortcut_ctrl_1() --> None:
    
        # timer_tx를 시작한다.
        timer_tx.Start()
        
        # timer_tx 시작을 시스템 메시지 창에 출력한다.
        app.log_text('timer_tx started', lvlOK)
    • 같은 방법으로 On Shortcut 이벤트에 ctrl_0 shortcut을 추가하고, ctrl-0를 입력하면 timer_tx를 정지하는 코드를 작성한다.
    def on_shortcut_ctrl_0() -> None:
    
        # timer_tx를 정지한다.
        timer_tx.Stop()
        
        # timer_tx 정지를 시스템 메시지 창에 출력한다.
        app.log_text('timer_tx stopped', lvlOK)

     

     

     

    데모 실행

    • 채널 1과 채널 2를 직결한 상태라 메시지 한 개가 전송되면 트레이스 창에 Tx 메시지와 Rx 메시지 2개가 표시된다. 복잡해서 트레이스 분석에 방해가 된다. 트레이스창에 채널 필터를 걸어서 복잡함을 줄인다. 

    트레이스창의 필터 버튼을 클릭한다.
    필터창의 바탕에서 마우스 우클릭하여 Add Channel Filter를 선택한다. 필터가 추가되면 Channel에 1을 입력한다. 창 아래 왼쪽에 Filter Enabled를 체크하여 필터를 적용한다. 그 아래 Pass/ Block 스위치에서 Pass를 선택한다. (이 경우 Block을 선택해도 큰 차이가 없겠지만) 설정이 완료되면 Apply 버튼을 클릭하여 창을 닫는다.

     

    Python Code Editor 창에서 Compile을 한다. 컴파일 버튼 바로 오른쪽 옆에 플레이 버튼을 클릭하여 프로그램을 실행한다.
    메인 메뉴/ Analysis/ Start 버튼을 클릭하여 모니터링을 시작한다.

    • 키보드에서 ctrl-1을 입력하면 타이머가 시작되면서 제어기의 메시지 전송이 시작된다. RBS가 메시지 아이디에 따라 응답한다.
    • 키보드에서 ctrl-0를 입력하면 타이머가 정지되면서 제어기의 메시지 전송이 정지된다. 

     

     

     

    결론

    • 미니프로그램을 통해서 수신 --> 필터 --> 처리 --> 전송하는 RBS 방법을 설명하였다. 
    • On CAN Rx 이벤트를 이용하는 방법을 설명하였다.
    • Timer를 정의하고 On Timer 이벤트를 이용하는 방법을 설명하였다.
    • On Shortcut 이벤트를 이용하여 키보드 입력에 따라 메시지 전송 시작/정지를 제어하는 방법을 설명하였다.

     

     

    참조

    • TSMaster 프로젝트

    rx_filter_proc_tx.zip
    1.27MB

     

     

     

     

    목차 :: hsl's tsmaster 사용기