application
SecOC 구현
hsl7
2025. 2. 11. 19:38
개요
SecOC의 작동을 데모하기 위해 TSMaster 미니프로그램을 짠다. 데모의 구성은 아래 그림과 같다.
- PC에서 TSMaster를 2개 연다. 한 쪽은 센더, 다른 쪽은 리시버 역할을 한다.
- 센더는 메시지에 MAC을 부착하여 전송한다.
- 리시버는 메시지의 MAC을 검사한다.
- 필요한 경우 FV 값을 변경할 수 있다.
PC에서 TSMaster를 2개 열어 사용하기
- TSMaster가 이미 열려있는 상태에서 설명한다.
- TSMaster 아이콘에서 마우스 우클릭을 하여 팝-업 메뉴를 띄운다. 메뉴에서 TSMaster를 선택한다. 두 번째 TSMaster가 시작된다. 중간에 TSMaster가 이미 실행 중이라는 안내가 나온다.
- 양쪽 TSMaster 모두 메인 메뉴/ Hardware/ Channel Selection 버튼을 이용하여 채널 설정 화면을 연다. 하드웨어를 TS Virtual Device로 채널을 동일하게 하여 선택한다. 센더와 리시버를 같은 버스에 연결한 셈이다.
센더에서 메시지에 MAC을 부착하여 전송하기
- 전송할 메시지를 정의한다.
- ECU_1이라고 부르겠다.
- 아이디는 0x101이다.
- 길이는 16 바이트이다.
- 주기적으로( 1s 마다) 전송한다.
- 구조는 아래 그림과 같다.
- 13번 바이트에 FV의 하위 1 바이트를 잘라서 만든 FV_trunc가 있다.
- 14번, 15번 바이트에 CMAC의 상위 2 바이트를 잘라서 만든 CMAC_trunc가 있다.
- 미니프로그램에서 ECU_1 메시지를 정의한다.
k_m_id_ecu_1 = 0x101
k_m_dlc_ecu_1 = 0x0a # dlc = 0x0a = 10
k_m_len_ecu_1 = 16 # 메시지 길이 = 16, dlc == 메시지 길이가 아니다.
# ecu_1 메시지를 정의한다.
msg_ecu_1 = RawCAN(
k_m_id_ecu_1, # id
k_m_dlc_ecu_1, # dlc
CH1, # channel
0, # 모른다.
[i for i in range(k_m_len_ecu_1)], # 데이터. 리스트이다.
0x306, # 설정. CAN-FD 메시지의 경우
)
- ECU_1 메시지의 data를 난수로 채운다.
# 메시지 길이
m_len = dlc_to_length(msg.dlc, msg.is_edl) # dlc 0x0a를 메시지 길이 16으로 변환한다.
# fv_trunc, cmac_trunc를 포함한 메시지 데이터를 난수로 채운다.
msg.data = bytearray(random.randbytes(m_len)) # random 모듈의 randbytes()를 이용하여
# msg의 데이터를 채운다.
- 추적을 위해서 보관하고 있는 FV (= tx_msgs[m_id]['fv'])를 복사해서 +1 하여 새로운 FV를 만든다. FV의 하위 1 바이트인 fv_trunc를 msg.data의 fv_trunc 자리에 넣는다.
# fv를 증가한다.
fv = tx_msgs[m_id]['fv']
fv += 1
# if fv_int > 0xFF FF FF FF FF FF: fv를 6 바이트로 했다.
if fv > k_fv_max:
fv = 0
# fv_trunc를 추출한다.
fv_trunc_calc = bytearray.fromhex(f'{fv:0{k_len_fv * 2}x}')[-1]
# msg.data에 fv_trunc를넣는다.
pos_fv_trunc = m_len - k_pos_fv_trunc_from_eod
msg.set_data(pos_fv_trunc, fv_trunc_calc)
- 키, data, FV로 CMAC을 계산한다. CMAC의 상위 2 바이트인 CMAC_trunc를 msg.data의 cmac_truc 자리에 넣는다.
# cmac을 구한다.
cmac_obj = CMAC.new(k_key, ciphermod=AES)
cmac_obj.update(data)
# fv는 int형이다. fv를 항상 일정한 길이로 하여 계산에 사용하기 위해 bytearray로 형변환한다.
fv_bytearray = bytearray.fromhex(f'{fv:0{k_len_fv * 2}x}')
cmac_obj.update(fv_bytearray)
cmac = cmac_obj.digest()
# cmac_trunc를 구한다.
cmac_trunc = cmac[:k_len_cmac_trunc]
# msg.data에 cmac_trunc를 넣는다.
pos_cmac_trunc = m_len - k_pos_cmac_trunc_from_eod
msg.set_data(pos_cmac_trunc, cmac_trunc[0])
msg.set_data(pos_cmac_trunc + 1, cmac_trunc[1])
- ECU_1 메시지를 주기적으로 전송하기 위해 타이머, timer_1,를 정의한다.
- timer_1이 트리거 할 때마다 실행할 함수, on_timer_timer_1(),를 정의한다.
- on_timer_timer_1()에 위에서 설명한 코드들을 입력한다.
- 메시지를 전송한다.
- 추적을 위해서 보관하고 있는 tx_msgs[m_id]['fv']를 최신 FV로 업데이트한다.
# msg를 전송한다.
if com.transmit_can_async(msg):
app.log_text('transmit failed', lvlInfo)
# 다음 메시지 전송 때 사용하기 위해 fv를 저장한다.
tx_msgs[m_id]['fv'] = fv
리시버에서 MAC을 검사하기
- 리시버는 수신 메시지를 아래의 순서로 처리한다.
def process_secoc_message(ACAN):
'''
fv가 맞는지 확인한다.
cmac이 맞는지 확인한다.
cmac이 맞으면, 트래킹하는 fv를 업데이트한다.
'''
# fv가 맞는지 확인한다.
fv_ok, fv_copy = check_fv(ACAN)
if not fv_ok:
return
# cmac이 맞는지 확인한다.
cmac_ok, cmac = check_cmac(ACAN, fv_copy)
if not cmac_ok:
return
# fv와 cmac이 맞으면, fv를 업데이트한다.
m_id = ACAN.id
rx_msgs[m_id]['fv'] = fv_copy
app.log_text('ok', lvlOK)
- FV 검사은 아래와 같이 한다.
def check_fv(ACAN):
'''
fv가 맞는지 확인한다.
수신한 fv_trunc와 계산한 fv_trunc_calc가동일하면
fv가 맞다고 간주한다.
ACAN에서 fv_trunc를 추출한다.
fv_trunc
ACAN의 data에 있다.
fv_trunc는 k_len_fv_trunc 바이트이다.
pos_fv는 data 내에 fv_trunc의 위치이다.
pos_fv = m_len - k_fv_pos_from_eod
m_len은 메시지 길이이다.
트래킹 중인 fv에서 fv_trnc_calc를 추출한다.
fv_trunc_calc는 fv의 마지막 바이트이다.
fv_trunc와 fv_trunc_calc를 비교한다.
'''
m_id = ACAN.id
m_len = dlc_to_length(ACAN.dlc, ACAN.is_edl)
## fv
# 트랙깅하는 fv를 1만큼 증가한다.
# cmac 확인까지 마친 후에 fv를 증가시켜야 한다.
# 확인 전까지 fv_copy를 이용한다.
fv_copy = rx_msgs[m_id]['fv']
fv_copy += 1
# fv_copy가 k_fv_max를 넘어가면 0으로 초기화한다.
# if fv_copy > 0xFF FF FF FF FF FF:
if fv_copy > k_fv_max:
fv_int = 0
# fv_trunc_calc를 구한다.
# int를 k_len_fv 길이의 bytearray로 변환하고
# 마지막 바이트를 취한다.
# 1 바이트는 헥스로 2자리라서 2를 곱한다.
fv_trunc_calc = bytearray.fromhex(f'{fv_copy:0{k_len_fv * 2}x}')[-1]
# fv_trunc를 구한다.
# fv_trunc의 위치
pos_fv_trunc = m_len - k_pos_fv_trunc_from_eod
# fv_trunc
fv_trunc = ACAN.data[pos_fv_trunc]
# moment of truth
if fv_trunc != fv_trunc_calc:
app.log_text(f'fv error: {m_id=:-2x} {fv_trunc=} {fv_trunc_calc=:}', lvlOK)
return False, fv_copy
return True, fv_copy
- CMAC 검사는 아래와 같이 한다.
def check_cmac(ACAN, fv_copy):
'''
cmac이 맞는지 확인한다.
수신한 cmac_trunc와 계산한 cmac_trunc_calc가 동일하면
cmac이 맞다고 간주한다.
ACAN에서 cmac_trunc를 추출한다.
cmac_trunc
ACAN의 data에 있다.
길이는 k_len_cmac_trunc 바이트이다.
시작 위치는 m_len-k_pos_cmac_truc_from이다.
k_key, data, fv(fv_trunc가 아니다.)를 이용하여 cmac을 계산한다.
cmac에서 cmac_trunc_calc를 추출한다.
cmac_trunc_calc는 cmac[:k_len_cmac_truc]이다.
상위에서 k_len_cmac_trunc 만큼이다.
cmac_trunc와 cmac_trunc_calc를 비교한다.
'''
m_id = ACAN.id
m_len = dlc_to_length(ACAN.dlc, ACAN.is_edl)
data = bytearray(ACAN.data[:m_len - k_len_mac])
fv_bytearray = bytearray.fromhex(f'{fv_copy:0{k_len_fv * 2}x}')
# 수신 메시지에서 cmac_trunc를 추출한다.
pos_cmac_trunc = m_len - k_pos_cmac_trunc_from_eod
cmac_trunc = bytearray(ACAN.data[pos_cmac_trunc:pos_cmac_trunc + k_len_cmac_trunc])
# 수신 메시지의 데이터로 cmac을 계산한다.
cmac_obj = CMAC.new(k_key, ciphermod=AES)
cmac_obj.update(data)
cmac_obj.update(fv_bytearray)
cmac = cmac_obj.digest()
# 계산한 cmac에서 cmac_trunc_calc를 추출한다..
cmac_trunc_calc = cmac[:k_len_cmac_trunc]
# moment of truth
if cmac_trunc != cmac_trunc_calc:
app.log_text(f'cmac error: {cmac_trunc.hex()=} {cmac_trunc_calc.hex()=}', lvlOK)
return False, cmac
return True, cmac
FV 값 변경하기
- CAN 통신의 안정성과 신뢰성이 높다. 그렇다고 통신이 불안정한 때를 대비할 필요가 없다는 의미는 아니다. 통신은 환경의 영향을 받고 그 영향으로 불안정할 수도 있기 때문이다.
- 센더와 리시버는 각각 FV를 추적한다. 둘 사이의 FV는 동기화 되어있어야 한다.
- 통신 불안정으로 동기화가 깨진다면 복구하는 방법이 필요하다.
- 데모에서는 사용자가 ctrl+5 키를 입력하면, 센더가 FV 재설정 메시지를 리시버에 전송한다. 리시버는 메시지에 있는 fv_init 값으로 추적중인 FV 값을 재설정한다. 상세는 아래와 같다.
- FV 재설정에 필요한 SecOC_Coord 메시지를 정의한다.
k_m_id_secoc_coord = 0x100
k_m_dlc_secoc_coord = 0x0a
# secoc_coord 메시지를 정의한다.
# secoc_coord 메시지는 SecOC 관련 통신 관리에 사용된다.
# 예) 추적 중인 fv값을 fv_init으로 재설정한다.
# request, m_id_target, fv_init
msg_secoc_coord = RawCAN(
k_m_id_secoc_coord,
k_m_dlc_secoc_coord,
CH1,
0,
[0x01, 0x01, 0x01, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
0x306, # can-fd 메시지의 경우 이렇게 설정한다.
)
- SecOC_Coord 메시지의 구조는 아래 그림과 같다.
- request:
- 1 byte.
- 256 가지의 요청이 가능하다. 이렇게 많이 필요하지는 않을 것이다. 따라서 1 바이트로 충분하다.
- 0x01: reset FV to fv_init로 정의한다.
- m_id_target
- 2 bytes
- 대상 메시지를 지정한다.
- m_id_target가 0xAA 0xAA이면 모든 메시지들을 대상으로 하느 것으로 한다.
- data
- secoc_coord 메시지는 16 바이트로 한다. request와 m_id_target를 제외하면 13 바이트가 남는다.
- 데이터 길이는 request에 따라 다르다.
- request가 0x01인 경우:
- fv_init: 6 바이트
센더측
- Ctrl+5를 입력하면, SecOC_Coord 메시지를 전송할 수 있도록 On_Shortcut 이벤트에 on_shortcut_ctrl_5() 함수를 추가한다.
- on_shortcut_ctrl_5() 함수에 아래 코드를 입력한다.
# request를 fv 재설정을 위한 0x01로 설정한다.
msg_secoc_coord.set_data(0, 0x01)
# m_id_target을 설정한다.
# fv를 변경할 메시지의 아이디이다.
m_id_target = k_m_id_ecu_1
m_id_target_bytearray = m_id_target.to_bytes(2, byteorder='big')
msg_secoc_coord.set_data(1, m_id_target_bytearray[0])
msg_secoc_coord.set_data(2, m_id_target_bytearray[1])
# fv_init을 설정한다.
# 예제에서는 fv_init 값을 고정값으로 한다.
# 전체 메시지의 fv를 변경할 때
# m_id_target == 0xAAAA
if m_id_target == 0xAAAA:
for tx_msg in tx_msgs:
tx_msg['fv'] = k_fv_init
else:
tx_msgs[m_id_target]['fv'] = k_fv_init
# msg를 전송한다.
if com.transmit_can_async(msg_secoc_coord):
app.log_text('transmit failed', lvlInfo)
else:
fv = bytearray(msg_secoc_coord.data[3:9])
app.log_text(f'reset {fv=}', lvlOK)
리시버측
- 리시버는 SecOC_Coord 메시지를 아래와 같이 처리한다.
def process_secoc_coord_message(ACAN):
'''
coord message를 처리한다.
coord message의 용도
1. fv 재설정을 위한 fv_init을 전달한다.
coord message의 구조
request + m_id + data
request:
1 byte. 256개의 명령이 가능하다.
이렇게 많은 명령이 필요하지는 않을 것이다.
따라서 1 바이트로 충분하다.
0x01: transfer fv_init
m_id_target:
2 bytes.
대상 메시지를 지정한다.
m_id가 0xAA 0xAA이면 모든 메시지들을 대상으로 하느 것으로 하자.
data:
secoc_coord 메시지는 16 바이트로 한다.
request와 m_id를 제외하면 13 바이트가 남는다.
데이터 길이는 request에 따라 다르다.
request 0x01: fv_init
data[3:3 + k_len_fv]는 fv_init이다.
m_id 메시지의 fv를 fv_init으로 초기화한다.
즉, rx_msgs[m_id]['fv'] = fv_init
'''
# 여기에 secoc_coord 메시지의
# cmac_trunc를 확인하는 과정이 필요하다.
# 대상 메시지의 m_id를 추출한다.
m_id_target = bytearray(ACAN.data[1:3])
request = ACAN.data[0]
if request == 0x01: # fv를 업데이트 한다.
fv_bytearray = bytearray(ACAN.data[3:9])
app.log_text(f'fv update m_id=0x{m_id_target.hex()} fv=0x{fv_bytearray.hex()}', lvlOK)
if m_id_target == bytearray([0xAA, 0xAA]): # 모든 메시지들을 대상으로 한다.
for msg in rx_msgs.values():
msg['fv'] = int.from_bytes(fv_bytearray, byteorder='big')
app.log_text(f'fv updated {msg=}', lvlOK)
else:
m_id_target_int = int(m_id_target.hex(), 16)
rx_msgs[m_id_target_int]['fv'] = int.from_bytes(fv_bytearray, byteorder='big')
app.log_text(f'fv updated', lvlOK)
코드
- 센더측 코드
- 리시버측 코드