application

SecOC 구현

hsl7 2025. 2. 11. 19:38

개요

SecOC의 작동을 데모하기 위해 TSMaster 미니프로그램을 짠다. 데모의 구성은 아래 그림과 같다.

  • PC에서 TSMaster를 2개 연다. 한 쪽은 센더, 다른 쪽은 리시버 역할을 한다.
  • 센더는 메시지에 MAC을 부착하여 전송한다.
  • 리시버는 메시지의 MAC을 검사한다.
  • 필요한 경우 FV 값을 변경할 수 있다.

TSMaster를 2개 열어서 센더와 리시버로 SecOC를 데모한다.

 

 

PC에서 TSMaster를 2개 열어 사용하기

  • TSMaster가 이미 열려있는 상태에서 설명한다.
  • TSMaster 아이콘에서 마우스 우클릭을 하여 팝-업 메뉴를 띄운다. 메뉴에서 TSMaster를 선택한다. 두 번째 TSMaster가 시작된다. 중간에 TSMaster가 이미 실행 중이라는 안내가 나온다. 

TSMaster가 열려있는 상태에서 두 번째 열기 위해 TSMaster 아이콘에서 우클릭하여 TSMaster를 선택한다.

 

  • 양쪽 TSMaster 모두 메인 메뉴/ Hardware/ Channel Selection 버튼을 이용하여 채널 설정 화면을 연다. 하드웨어를 TS Virtual Device로 채널을 동일하게 하여 선택한다. 센더와 리시버를 같은 버스에 연결한 셈이다.

하드웨어를 TS Virtual Device로 설정한다.

센더에서 메시지에 MAC을 부착하여 전송하기

  • 전송할 메시지를 정의한다.
    • ECU_1이라고 부르겠다.
    • 아이디는 0x101이다.
    • 길이는 16 바이트이다.
    • 주기적으로( 1s 마다)  전송한다.
    • 구조는 아래 그림과 같다.
      • 13번 바이트에 FV의 하위 1 바이트를 잘라서 만든 FV_trunc가 있다.
      • 14번, 15번 바이트에 CMAC의 상위 2 바이트를 잘라서 만든 CMAC_trunc가 있다.

  • 미니프로그램에서 ECU_1 메시지를 정의한다.
k_m_id_ecu_1 = 0x101
k_m_dlc_ecu_1 = 0x0a	# dlc = 0x0a = 10
k_m_len_ecu_1 = 16		# 메시지 길이 = 16, dlc == 메시지 길이가 아니다.

# ecu_1 메시지를 정의한다.
msg_ecu_1 = RawCAN(
    k_m_id_ecu_1,  # id 
    k_m_dlc_ecu_1, # dlc 
    CH1,           # channel
    0,             # 모른다. 
    [i for i in range(k_m_len_ecu_1)], # 데이터. 리스트이다.  
    0x306,         # 설정. CAN-FD 메시지의 경우
)

 

  • ECU_1 메시지의 data를 난수로 채운다. 
    # 메시지 길이
    m_len = dlc_to_length(msg.dlc, msg.is_edl) # dlc 0x0a를 메시지 길이 16으로 변환한다. 

    # fv_trunc, cmac_trunc를 포함한 메시지 데이터를 난수로 채운다.
    msg.data = bytearray(random.randbytes(m_len))	# random 모듈의 randbytes()를 이용하여
    												# msg의 데이터를 채운다.
  • 추적을 위해서 보관하고 있는 FV (= tx_msgs[m_id]['fv'])를 복사해서 +1 하여 새로운 FV를 만든다. FV의 하위 1 바이트인 fv_trunc를 msg.data의 fv_trunc 자리에 넣는다.
    # fv를 증가한다.
    fv = tx_msgs[m_id]['fv']
    fv += 1
    # if fv_int > 0xFF FF FF FF FF FF: fv를 6 바이트로 했다. 
    if fv > k_fv_max:
        fv = 0

    # fv_trunc를 추출한다.
    fv_trunc_calc = bytearray.fromhex(f'{fv:0{k_len_fv * 2}x}')[-1]

    # msg.data에 fv_trunc를넣는다.
    pos_fv_trunc = m_len - k_pos_fv_trunc_from_eod
    msg.set_data(pos_fv_trunc, fv_trunc_calc)
  • 키, data, FV로 CMAC을 계산한다. CMAC의 상위 2 바이트인 CMAC_trunc를 msg.data의 cmac_truc 자리에 넣는다.
    # cmac을 구한다.
    cmac_obj = CMAC.new(k_key, ciphermod=AES)
    cmac_obj.update(data)
    
    # fv는 int형이다. fv를 항상 일정한 길이로 하여 계산에 사용하기 위해 bytearray로 형변환한다.
    fv_bytearray = bytearray.fromhex(f'{fv:0{k_len_fv * 2}x}')    
    cmac_obj.update(fv_bytearray)

    cmac = cmac_obj.digest()

    # cmac_trunc를 구한다.    
    cmac_trunc = cmac[:k_len_cmac_trunc]

    # msg.data에 cmac_trunc를 넣는다.
    pos_cmac_trunc = m_len - k_pos_cmac_trunc_from_eod
    msg.set_data(pos_cmac_trunc, cmac_trunc[0])
    msg.set_data(pos_cmac_trunc + 1, cmac_trunc[1])

 

  • ECU_1 메시지를 주기적으로 전송하기 위해 타이머, timer_1,를 정의한다.

ECU_1 메시지를 1,000msec 마다 전송하기 위해 timer_1을 정의한다.

 

  • timer_1이 트리거 할 때마다 실행할 함수, on_timer_timer_1(),를 정의한다.

  • on_timer_timer_1()에 위에서 설명한 코드들을 입력한다.

 

  • 메시지를 전송한다.
  • 추적을 위해서 보관하고 있는 tx_msgs[m_id]['fv']를 최신 FV로 업데이트한다.
    # msg를 전송한다.
    if com.transmit_can_async(msg):
        app.log_text('transmit failed', lvlInfo)

    # 다음 메시지 전송 때 사용하기 위해 fv를 저장한다.
    tx_msgs[m_id]['fv'] = fv

 

 

리시버에서 MAC을 검사하기 

  • 리시버는 수신 메시지를 아래의 순서로 처리한다.
def process_secoc_message(ACAN):
    '''
    fv가 맞는지 확인한다. 
    cmac이 맞는지 확인한다. 
    cmac이 맞으면, 트래킹하는 fv를 업데이트한다. 
    '''

    # fv가 맞는지 확인한다.
    fv_ok, fv_copy = check_fv(ACAN)
    if not fv_ok:  
        return
    
    # cmac이 맞는지 확인한다.
    cmac_ok, cmac = check_cmac(ACAN, fv_copy)
    if not cmac_ok:
        return

    # fv와 cmac이 맞으면, fv를 업데이트한다.
    m_id = ACAN.id
    rx_msgs[m_id]['fv'] = fv_copy
    app.log_text('ok', lvlOK)
  •  FV 검사은 아래와 같이 한다.  
def check_fv(ACAN):
    '''
    fv가 맞는지 확인한다.
        수신한 fv_trunc와 계산한 fv_trunc_calc가동일하면
        fv가 맞다고 간주한다.   
    ACAN에서 fv_trunc를 추출한다.
        fv_trunc
            ACAN의 data에 있다. 
            fv_trunc는 k_len_fv_trunc 바이트이다. 
            pos_fv는 data 내에 fv_trunc의 위치이다.
                    pos_fv = m_len - k_fv_pos_from_eod
                           m_len은 메시지 길이이다.
    트래킹 중인 fv에서 fv_trnc_calc를 추출한다.
        fv_trunc_calc는 fv의 마지막 바이트이다.
    fv_trunc와 fv_trunc_calc를 비교한다.
    '''

    m_id = ACAN.id
    m_len = dlc_to_length(ACAN.dlc, ACAN.is_edl)

    ## fv
    # 트랙깅하는 fv를 1만큼 증가한다.
    # cmac 확인까지 마친 후에 fv를 증가시켜야 한다. 
    # 확인 전까지 fv_copy를 이용한다.
    fv_copy = rx_msgs[m_id]['fv']  
    fv_copy += 1

    # fv_copy가 k_fv_max를 넘어가면 0으로 초기화한다.
    # if fv_copy > 0xFF FF FF FF FF FF:
    if fv_copy > k_fv_max:
        fv_int = 0

    # fv_trunc_calc를 구한다.
    # int를 k_len_fv 길이의 bytearray로 변환하고
    # 마지막 바이트를 취한다.
    # 1 바이트는 헥스로 2자리라서 2를 곱한다. 
    fv_trunc_calc = bytearray.fromhex(f'{fv_copy:0{k_len_fv * 2}x}')[-1]

    # fv_trunc를 구한다.
    # fv_trunc의 위치
    pos_fv_trunc = m_len - k_pos_fv_trunc_from_eod

    # fv_trunc
    fv_trunc = ACAN.data[pos_fv_trunc]

    # moment of truth
    if fv_trunc != fv_trunc_calc:
        app.log_text(f'fv error: {m_id=:-2x} {fv_trunc=} {fv_trunc_calc=:}', lvlOK)
        return False, fv_copy
    
    return True, fv_copy
  • CMAC 검사는 아래와 같이 한다.
def check_cmac(ACAN, fv_copy):
    '''
    cmac이 맞는지 확인한다.
          수신한 cmac_trunc와 계산한 cmac_trunc_calc가 동일하면
          cmac이 맞다고 간주한다.
    ACAN에서 cmac_trunc를 추출한다.
        cmac_trunc
            ACAN의 data에 있다. 
            길이는 k_len_cmac_trunc 바이트이다.
            시작 위치는 m_len-k_pos_cmac_truc_from이다.
    k_key, data, fv(fv_trunc가 아니다.)를 이용하여 cmac을 계산한다.
    cmac에서 cmac_trunc_calc를 추출한다.
        cmac_trunc_calc는 cmac[:k_len_cmac_truc]이다.
        상위에서 k_len_cmac_trunc 만큼이다.
    cmac_trunc와 cmac_trunc_calc를 비교한다.
    '''

    m_id = ACAN.id
    m_len = dlc_to_length(ACAN.dlc, ACAN.is_edl)
    data = bytearray(ACAN.data[:m_len - k_len_mac])
    fv_bytearray = bytearray.fromhex(f'{fv_copy:0{k_len_fv * 2}x}')

    # 수신 메시지에서 cmac_trunc를 추출한다.
    pos_cmac_trunc = m_len - k_pos_cmac_trunc_from_eod
    cmac_trunc = bytearray(ACAN.data[pos_cmac_trunc:pos_cmac_trunc + k_len_cmac_trunc])

    # 수신 메시지의 데이터로 cmac을 계산한다.
    cmac_obj = CMAC.new(k_key, ciphermod=AES)
    cmac_obj.update(data)
    cmac_obj.update(fv_bytearray)
    cmac = cmac_obj.digest()

    # 계산한 cmac에서 cmac_trunc_calc를 추출한다..    
    cmac_trunc_calc = cmac[:k_len_cmac_trunc]

    # moment of truth
    if cmac_trunc != cmac_trunc_calc:
        app.log_text(f'cmac error: {cmac_trunc.hex()=} {cmac_trunc_calc.hex()=}', lvlOK)
        return False, cmac
    
    return True, cmac

 

 

FV 값 변경하기

  • CAN 통신의 안정성과 신뢰성이 높다. 그렇다고 통신이 불안정한 때를 대비할 필요가 없다는 의미는 아니다. 통신은 환경의 영향을 받고 그 영향으로 불안정할 수도 있기 때문이다.
  • 센더와 리시버는 각각 FV를 추적한다. 둘 사이의 FV는 동기화 되어있어야 한다.
  • 통신 불안정으로 동기화가 깨진다면 복구하는 방법이 필요하다.  
  • 데모에서는 사용자가 ctrl+5 키를 입력하면, 센더가 FV 재설정 메시지를 리시버에 전송한다. 리시버는 메시지에 있는 fv_init 값으로 추적중인 FV 값을 재설정한다. 상세는 아래와 같다.
  • FV 재설정에 필요한 SecOC_Coord 메시지를 정의한다.
k_m_id_secoc_coord = 0x100
k_m_dlc_secoc_coord = 0x0a

# secoc_coord 메시지를 정의한다.
# secoc_coord 메시지는 SecOC 관련 통신 관리에 사용된다.
# 예) 추적 중인 fv값을 fv_init으로 재설정한다.
# request, m_id_target, fv_init
msg_secoc_coord = RawCAN(
    k_m_id_secoc_coord, 
    k_m_dlc_secoc_coord, 
    CH1, 
    0, 
    [0x01, 0x01, 0x01, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06], 
    0x306, # can-fd 메시지의 경우 이렇게 설정한다.
)

 

  • SecOC_Coord 메시지의 구조는 아래 그림과 같다.

request가 0x01(FV 재설정)인 경우의 이다.

  • request:
    • 1 byte.
    • 256 가지의 요청이 가능하다. 이렇게 많이 필요하지는 않을 것이다. 따라서 1 바이트로 충분하다.
    • 0x01: reset FV to fv_init로 정의한다.
  • m_id_target
    • 2 bytes
    • 대상 메시지를 지정한다.
    • m_id_target가 0xAA 0xAA이면 모든 메시지들을 대상으로 하느 것으로 한다.
  • data
    • secoc_coord 메시지는 16 바이트로 한다. request와 m_id_target를 제외하면 13 바이트가 남는다.
    • 데이터 길이는 request에 따라 다르다.
    • request가 0x01인 경우:
      • fv_init: 6 바이트

 

센더측

  • Ctrl+5를 입력하면, SecOC_Coord 메시지를 전송할 수 있도록 On_Shortcut 이벤트에 on_shortcut_ctrl_5() 함수를 추가한다.

  • on_shortcut_ctrl_5() 함수에 아래 코드를 입력한다.
    # request를 fv 재설정을 위한 0x01로 설정한다.
    msg_secoc_coord.set_data(0, 0x01)


    # m_id_target을 설정한다.
    # fv를 변경할 메시지의 아이디이다.
    m_id_target = k_m_id_ecu_1
    m_id_target_bytearray = m_id_target.to_bytes(2, byteorder='big')
    msg_secoc_coord.set_data(1, m_id_target_bytearray[0])
    msg_secoc_coord.set_data(2, m_id_target_bytearray[1])
    

    # fv_init을 설정한다.
    # 예제에서는 fv_init 값을 고정값으로 한다.
    
    
    # 전체 메시지의 fv를 변경할 때
    # m_id_target == 0xAAAA 
    
    if m_id_target == 0xAAAA:
        for tx_msg in tx_msgs:
            tx_msg['fv'] = k_fv_init 
    else:
        tx_msgs[m_id_target]['fv'] = k_fv_init


    # msg를 전송한다.
    if com.transmit_can_async(msg_secoc_coord):
        app.log_text('transmit failed', lvlInfo)
    else:
        fv = bytearray(msg_secoc_coord.data[3:9])
        app.log_text(f'reset {fv=}', lvlOK)

 

 

리시버측

  •  리시버는 SecOC_Coord 메시지를 아래와 같이 처리한다.
def process_secoc_coord_message(ACAN):
    '''
    coord message를 처리한다.

    coord message의 용도
    1. fv 재설정을 위한 fv_init을 전달한다.

    coord message의 구조
    request + m_id + data
    request: 
        1 byte. 256개의 명령이 가능하다. 
        이렇게 많은 명령이 필요하지는 않을 것이다. 
        따라서 1 바이트로 충분하다.
        0x01: transfer fv_init

    m_id_target:
        2 bytes.
        대상 메시지를 지정한다.
        m_id가 0xAA 0xAA이면 모든 메시지들을 대상으로 하느 것으로 하자. 

    data: 
        secoc_coord 메시지는 16 바이트로 한다.
        request와 m_id를 제외하면 13 바이트가 남는다.
        데이터 길이는 request에 따라 다르다.
        request 0x01: fv_init
            data[3:3 + k_len_fv]는 fv_init이다. 
            m_id 메시지의 fv를 fv_init으로 초기화한다. 
            즉, rx_msgs[m_id]['fv'] = fv_init
    '''

    # 여기에 secoc_coord 메시지의 
    # cmac_trunc를 확인하는 과정이 필요하다.


    # 대상 메시지의 m_id를 추출한다.
    m_id_target = bytearray(ACAN.data[1:3])
    request = ACAN.data[0]

    if request == 0x01: # fv를 업데이트 한다.
        fv_bytearray = bytearray(ACAN.data[3:9]) 
        app.log_text(f'fv update m_id=0x{m_id_target.hex()} fv=0x{fv_bytearray.hex()}', lvlOK)
        if m_id_target == bytearray([0xAA, 0xAA]): # 모든 메시지들을 대상으로 한다.
            for msg in rx_msgs.values():
                msg['fv'] = int.from_bytes(fv_bytearray, byteorder='big')
                app.log_text(f'fv updated {msg=}', lvlOK)
        else:
            m_id_target_int = int(m_id_target.hex(), 16)
            rx_msgs[m_id_target_int]['fv'] = int.from_bytes(fv_bytearray, byteorder='big') 
            app.log_text(f'fv updated', lvlOK)

 

 

코드

  • 센더측 코드

SecOC_master.py
0.01MB

 

  • 리시버측 코드

SecOC_ecu_1.py
0.01MB