mobis esc를 대상으로 DID(Data IDentifier)를 0x0000부터 0xFFFF까지 변경하며 readDID 요청을 하는 DID 스캐닝을 하였다.
DID 스캐닝을 하며 측정한 CAN 버스 트레이스(blf) 파일을 분석하여 긍정 응답을 받은 DID를 추출한다.
import os
from pathlib import Path
import can
import pandas as pd
ldf is not supported
xls is not supported
xlsx is not supported
# constant
# parse_resp()에서 사용되는 고정값들을 정의한다.
# 긍정 응답은 진단 요청에 0x40을 더한 값으로 시작한다.
k_offset_positive_resp = 0x40
# read DTC의 service id = 0x19
k_req_sid_readDtc = 0x19
k_resp_sid_readDtc = k_req_sid_readDtc + k_offset_positive_resp
# read Data by Data ID service id = 0x22
k_req_sid_readDID = 0x22
k_resp_sid_readDID = k_req_sid_readDID + k_offset_positive_resp
# tester present service id = 0x3E
k_req_sid_testerPresent = 0x3E
k_resp_sid_testerPresent = k_req_sid_testerPresent + k_offset_positive_resp
# 현대기술정보 홈페이지에서 복붙한 DTC와 DTC 설명이 있는xlsx 파일을 읽는다.
# xlsx 파일 경로
k_xlsx_dtc_table = Path('베뉴_VDC_DTC_현대기술정보.xlsx')
# DTC표를 Pandas 데이터프레임으로 읽는다.
# 이 데이터프레임은 get_dtc_description 함수에서 사용된된다.
k_df_dtc = pd.read_excel(k_xlsx_dtc_table, index_col='DTC')
# function
def convert_dtc(dtc_bytes):
'''
written by claude.ai
제어기가 응답한 결함 코드를 표준 코드로 변환한다.
표준 코드는 Pxxxxxx, Cxxxxxx, Bxxxxxx, Uxxxxxx 형식이다.
dtc_bytes: 3바이트의 결함 코드
'''
# DTC 타입 결정을 위한 딕셔너리
# dictionary to determine DTC type
dtc_type = {
0: 'P',
1: 'C',
2: 'B',
3: 'U',
}
# 첫 번째 바이트의 상위 2비트를 사용하여 DTC 타입 결정
# determine DTC type using the first byte's upper 2 bits
first_char = dtc_type[(dtc_bytes[0] >> 6) & 0x3]
# 첫 번째 바이트의 하위 6비트와 두 번째 바이트를 결합하여 중간 부분 생성
# combine the lower 6 bits of the first byte and the second byte to create the middle part
middle_part = ((dtc_bytes[0] & 0x3F) << 8) | dtc_bytes[1]
# 세 번째 바이트를 마지막 부분으로 사용
# use the third byte as the last part
last_part = dtc_bytes[2]
# DTC 문자열 생성
# create the DTC string
dtc_string = f"{first_char}{middle_part:04X}{last_part:02X}"
return dtc_string
def get_dtc_description(dtc):
'''
입력으로 받은 DTC에 대한 설명인 description을 반환한다.
'''
global k_df_dtc
try:
description = k_df_dtc.loc[dtc, 'Description']
except KeyError:
description = 'DTC를 찾을 수 없습니다.'
return description
def parse_diag_resp(diag_resp, m_id_hex) -> None:
'''
제어기의 진단 응답을 분석한다.
diag_resp: 진단 응답을 저장한 딕셔너리
# positive response example
diag_resp[m_id_hex] = {
'length': 6,
'counter': 1,
's_id': 80,
'assembled': [80, 3, 0, 45, 0, 200, 170],
'hex': ['50', '03', '00', '2D', '00', 'C8', 'AA']
}
# negative response example
diag_resp[m_id_hex] = {
'length': 3,
'counter': 2,
's_id': 127,
'assembled': [127, 34, 49, 170, 170, 170, 170],
'hex': ['7F', '22', '31', 'AA', 'AA', 'AA', 'AA']
}
'''
length_resp = diag_resp[m_id_hex]['length']
# readDTC의 응답을 처리한다.
if diag_resp[m_id_hex]['s_id'] == k_resp_sid_readDtc:
# 처음 3 바이트는 dtc 정보가 아니다.
# 4 바이트씩 DTC 정보가 있다.
n_dtc = (length_resp - 3) // 4
if n_dtc > 0:
print(f'{m_id_hex = } {length_resp = :4,} {n_dtc = :4,}')
for i_dtc in range(1, n_dtc + 1):
dtc = diag_resp[m_id_hex]['assembled'][3 + 4 * i_dtc: 3 + 4 * i_dtc + 3]
dtc_hex = []
for byte_dtc in dtc:
dtc_hex.append(f'{byte_dtc:02X}')
dtc_hex_joined = ''.join(dtc_hex)
# DTC가 아니라 fill byte인 경우
if dtc_hex_joined == 'aaaaaa':
continue
dtc_converted = convert_dtc(dtc)
dtc_description = get_dtc_description(dtc_converted)
dtc_state = diag_resp[m_id_hex]['assembled'][3 + 4 * i_dtc + 3]
print(f'{i_dtc = }')
print(f'dtc = {dtc_converted} (0x{dtc_hex_joined})')
print(f'{dtc_description}')
print(f'{dtc_state = :02X}')
print('---')
## readDataByIdentifier의 응답을 처리한다.
# DID(Data IDentifier)에 따라 readECUID, readSoftwareVersion 등 여러 가지가 있다.
if diag_resp[m_id_hex]['s_id'] == k_resp_sid_readDID:
# response를 chr(=ascii)로 읽는다.
resp_assembled_ascii = ''.join([chr(c) for c in diag_resp[m_id_hex]['assembled'][3:] if c != 0xaa])
# data_id에 따라 처리하기 위해 data_id를 읽는다.
data_id_high = diag_resp[m_id_hex]['assembled'][1]
data_id_low = diag_resp[m_id_hex]['assembled'][2]
# data_id == ECU ID: 내가 갖고 있는 모비스 ESC의 경우
if data_id_high == 0xF1 and data_id_low == 0x00: # data_id for ECU Id
print(f'{m_id_hex:4}')
print(f'{length_resp = :4,}')
print(f'ecu_id = {resp_assembled_ascii}')
print()
# data_id == Software Version: 내가 갖고 있는 모비스 ESC의 경우
elif data_id_high == 0x10 and data_id_low == 0x00:
print(f'{m_id_hex:4}')
print(f'{length_resp = :4,}')
print(f'sw_id = {resp_assembled_ascii}')
print()
else:
print(f'{m_id_hex:4} {k_resp_sid_readDID:02X} {data_id_high:02X} {data_id_low:02X}')
print(f'{length_resp = :4,}')
print(f'diag_resp[hex] = {diag_resp[m_id_hex]["hex"][3:]}')
print(f'diag_resp[ascii] = {resp_assembled_ascii}')
print()
## tester present의 응답을 처리한다.
if diag_resp[m_id_hex]['s_id'] == k_resp_sid_testerPresent:
print(f'{m_id_hex:4} tester present')
print(f'{length_resp = :4,}')
print(f'diag_resp[hex] = {diag_resp[m_id_hex]["hex"]}')
print()
return # end of parse_diag_resp()
def on_can_diag_resp(msg, verbose=False) -> None:
'''
Transport Protocol로 분할된 메시지들을 받아서 조립하려면
이전 함수 호출 시 데이터들을 기억해야 한다.
이를 위해 전역 변수를 사용한다.
'''
global diag_resp
# message id
m_id = msg.arbitration_id
# m_id를 diag_resp의 key로 사용하기 위해 int를 str로 변환한다.
# 반드시 이렇게 할 필요는 없다. m_id를 key로 사용해도 된다.
m_id_hex = hex(m_id)
if m_id_hex in diag_resp:
# m_id_hex가 diag_resp에 있으면 counter를 증가시킨다.
diag_resp[m_id_hex]['counter'] += 1
else:
# m_id_hex가 diag_resp에 없으면 초기화한다.
# length : 제어기가 알려준 응답의 길이 [바이트]
# counter : (응답이 길어 여러 메시지로 나뉘어 전송될 경우) 메시지 카운터
# s_id : 서비스 아이디
# assembled : (응답이 길어 여러 메시지로 나뉘어 전송될 경우) 응답 메시지들 전체를 조립한 메시지
# hex : assembled의 바이트 하나 하나를 hex로 표현
diag_resp[m_id_hex] = {}
diag_resp[m_id_hex]['length'] = 0
diag_resp[m_id_hex]['counter'] = 1
diag_resp[m_id_hex]['s_id'] = 0x00
diag_resp[m_id_hex]['assembled'] = []
diag_resp[m_id_hex]['hex'] = []
# TP로 전송된 분해된 response를 조립한다.
pci = msg.data[0] & 0xF0
if pci == 0x00: # single frame
diag_resp[m_id_hex]['length'] = msg.data[0] & 0x0F
# 같은 m_id의 새 응답 처리를 위해 초기화 한다.
diag_resp[m_id_hex]['assembled'] = []
diag_resp[m_id_hex]['assembled'].extend(msg.data[1:8])
elif pci == 0x10: # first frame
diag_resp[m_id_hex]['length'] = (msg.data[0] & 0x0F) * 256 + msg.data[1]
# 같은 m_id의 새 응답 처리를 위해 초기화 한다.
diag_resp[m_id_hex]['assembled'] = []
diag_resp[m_id_hex]['assembled'].extend(msg.data[2:8])
elif (pci == 0x20): # consecutive frame
diag_resp[m_id_hex]['assembled'].extend(msg.data[1:8])
elif (pci == 0x30): # flow control
# 분석 프로그램에서는 flow control 메시지를 처리하지 않는다.
pass
else:
print(f'Error: unknown PCI = {pci:02X}')
# TP로 전송된 메시지의 조립이 완료되면 출력한다.
if ((diag_resp[m_id_hex]['length'] != 0) and
(diag_resp[m_id_hex]['length'] <= len(diag_resp[m_id_hex]['assembled']))):
diag_resp[m_id_hex]['hex'] = []
for byte_response in diag_resp[m_id_hex]['assembled']:
# byte_response를 hex로 변환하여 diag_resp[m_id_hex]['hex']에 저장한다.
# 전체 응답 메시지를 hex로 보기 위한 용도이다.
diag_resp[m_id_hex]['hex'].append(f'{byte_response:02X}')
diag_resp[m_id_hex]['s_id'] = diag_resp[m_id_hex]['assembled'][0]
# 디버깅 중에 상세를 보고 싶으면 verbose=True로 호출한다.
if verbose:
print(f'{m_id_hex:4} {diag_resp[m_id_hex]["length"]:4,} {diag_resp[m_id_hex]["hex"]}')
parse_diag_resp(diag_resp, m_id_hex)
return # on_can_diag_resp()
def check_all_messages_of_blf(blf, verbose=False) -> None:
'''
blf 파일을 읽어서 진단 통신 관련된 부분만 출력한다.
'''
log = can.io.BLFReader(blf)
for msg in log:
# 수신 메시지이고 진단 메시지이면 출력한다.
# 진단 메시지의 arbitration_id는 0x700 ~ 0x7FF이다.
if (msg.is_rx) and ((msg.arbitration_id >= 0x700) and (msg.arbitration_id <= 0x7FF)):
on_can_diag_resp(msg, verbose=verbose)
return # extract_diag_resp_from_blf()
def get_data_id_positve_resp(blf, verbose=False):
'''
긍정 응답들만 모아서 출력한다.
'''
print(f'blf = {blf.name}')
# blf를 읽는다.
log = can.io.BLFReader(blf)
# 초기화
msg_prev = None
list_data_id_positve_resp = []
for msg in log:
# 수신한 진단 메시지만 처리한다.
# 송신한 메시지는 요청 메시지다.
if (msg.is_rx and ((0x700 <= msg.arbitration_id) and (msg.arbitration_id <= 0x7df))):
if msg.data[0] != 0x03 or msg.data[1] != 0x7f: # 긍정 응답이 아니면 처리하지 않는다.
if msg_prev.data[1] == 0x22: # read data by data identifier 만 처리한다.
# 요청 메시지와 응답 메시지 쌍을 저장한다.
list_data_id_positve_resp.append((msg_prev, msg))
if verbose:
print(msg_prev)
print(msg)
print()
# 현재 메시지를 이전 메시지로 저장한다.
# 응답 메시지의 이전 메시지가 요청 메시지이다.
msg_prev = msg
return list_data_id_positve_resp
k_dir_data = Path().absolute()/'data'/'mobis_esc'
blfs = list(k_dir_data.glob('*.blf'))
for i, blf in enumerate(blfs):
print(f'{i:2}: {blf.name:60} {blf.stat().st_size:12,} bytes')
10msec 주기로 readDID 요청을 전송하며 측정하였다. 0: mobis_esc_ecu_id_rev_eng_scan_data_id_010msec.blf 949,154 bytes
50msec 주기로 readDID 요청을 전송하며 측정하였다. 1: mobis_esc_ecu_id_rev_eng_scan_data_id_050msec.blf 1,009,580 bytes
100msec 주기로 readDID 요청을 전송하며 측정하였다. 2: mobis_esc_ecu_id_rev_eng_scan_data_id_100msec.blf 1,006,592 bytes
진단 통신 메시지들만 저장하기 때문에 측정 시간이 많이 차이 나는데도 불구하고 파일 크기가 거의 같다.
10msec 시험 데이터 처리
# scan data id 요청으로 스캐닝한 결과
i = 0
blf = blfs[i]
긍정 응답을 받은 DID만 출력한다.
list_data_id_positve_resp = get_data_id_positve_resp(blf)
print(f'# of data id for positive response = {len(list_data_id_positve_resp)}')
print()
print_list_data_id_positve_resp(list_data_id_positve_resp)
DID 스캐닝 방법의 경우, First Frame만 수신하고 응답의 나머지를 수신하지 못한 경우 부정 응답으로 처리한다.
readDID 요청을 했을 때, 모비스 ESC는 First Frame을 전송했지만 나머지 Consecutive Frame들을 마저 전송하지 못했다. 모비스 ESC는 readDID 요청 처리 중에 새 readDID 요청을 받으면, 앞 요청 처리를 중단하고 새 요청 처리를 하는 것 같다.
데이터를 플래시 메모리 혹은 EEPROM에서 읽어오는데 시간이 10msec 보다 더 필요한 건가?
요청 주기를 늘리면서 실험해봐야겠다.
다른 아이디어
요청 후 응답 대기
타임아웃 시간 내 응답이 있으면,
Single Frame이면, 곧바로 다음 DID로 요청한다. (부정 응답이건 긍정 응답이건 상관 없다.)
First Frame 이면,
타임아웃 시간까지 기다린다.
수신을 완료(len(assembled message) >= length 조건을 만족)하면, 곧바로 다음 DID로 요청한다.
시간 내 응답이 없으면, 다음 DID로 요청한다.
50msec 시험 데이터 처리
i = 1
blf = blfs[i]
긍정 응답을 받은 DID만 출력한다.
list_data_id_positve_resp = get_data_id_positve_resp(blf)
print(f'# of data id for positive response = {len(list_data_id_positve_resp)}')
print()
print_list_data_id_positve_resp(list_data_id_positve_resp)
요청 주기가 50msec인 경우에도 여전히 제어기는 First Frame만 전송하고 나머지 프레임들을 전송하지 못했다.
100msec 시험 데이터 처리
i = 2
blf = blfs[i]
긍정 응답을 받은 DID만 출력한다.
list_data_id_positve_resp = get_data_id_positve_resp(blf)
print(f'# of data id for positive response = {len(list_data_id_positve_resp)}')
print()
print_list_data_id_positve_resp(list_data_id_positve_resp)