위 네 파일들이 내가 진단 요청과 응답 메시지들의 m_id 쌍을 찾기 위해서 베뉴에서 측정한 데이터 파일들이다.
진단 통신 응답을 해석한다.
Read ECU ID, Read DTC 요청의 응답을 해석한다.
defconvert_dtc(dtc_bytes):'''
written by claude.ai
제어기가 응답한 결함 코드를 표준 코드로 변환한다.
표준 코드는 Pxxxxxx, Cxxxxxx, Bxxxxxx, Uxxxxxx 형식이다.
dtc_bytes: 3바이트의 결함 코드
'''# DTC 타입 결정을 위한 딕셔너리# dictionary to determine DTC type
dtc_type = {
0: 'P',
1: 'C',
2: 'B',
3: 'U',
}
# 첫 번째 바이트의 상위 2비트를 사용하여 DTC 타입 결정# determine DTC type using the first byte's upper 2 bits
first_char = dtc_type[(dtc_bytes[0] >> 6) & 0x3]
# 첫 번째 바이트의 하위 6비트와 두 번째 바이트를 결합하여 중간 부분 생성# combine the lower 6 bits of the first byte and the second byte to create the middle part
middle_part = ((dtc_bytes[0] & 0x3F) << 8) | dtc_bytes[1]
# 세 번째 바이트를 마지막 부분으로 사용# use the third byte as the last part
last_part = dtc_bytes[2]
# DTC 문자열 생성# create the DTC string
dtc_string = f"{first_char}{middle_part:04X}{last_part:02X}"return dtc_string
# 현대기술정보 홈페이지에서 복붙한 DTC와 DTC 설명이 있는xlsx 파일을 읽는다.# xlsx 파일 경로xlsx_dtc_table = Path('베뉴_VDC_DTC_현대기술정보.xlsx')
# DTC표를 Pandas 데이터프레임으로 읽는다. # 이 데이터프레임은 get_dtc_description 함수에서 사용된된다.
df_dtc = pd.read_excel(xlsx_dtc_table, index_col='DTC')
df_dtc.sample(3)
Description
DTC
P0105
VDC 모듈(Vehicle Dynamic Control Unit)는 비정상적인 클러...
C1702
전원 인가 후VDC 모듈은 베리언트 코드를 확인한다. 이때 부적절한 베리언트 코드가...
P0017
VDC 모듈은 조향각센서의 비정상적인 신호를 수신한 경우 이 고장코드를 나타낸다.
defget_dtc_description(dtc):'''
입력으로 받은 DTC에 대한 설명인 description을 반환한다.
'''global df_dtc
try:
description = df_dtc.loc[dtc, 'Description']
except KeyError:
description = 'DTC를 찾을 수 없습니다.'return description
# parse_resp()에서 사용되는 고정값들을 정의한다.# 긍정 응답은 진단 요청에 0x40을 더한 값으로 시작한다.k_offset_positive_resp = 0x40
# read DTC의 service id = 0x19k_req_sid_readDtc = 0x19
k_resp_sid_readDtc = k_req_sid_readDtc + k_offset_positive_resp
# read ECU Id service id = 0x22k_req_sid_readEcuId = 0x22
k_resp_sid_readEcuId = k_req_sid_readEcuId + k_offset_positive_resp
defparse_resp(m_id_hex) -> None:'''
제어기의 진단 응답을 분석한다.
'''global diag_resp
# read DTCif diag_resp[m_id_hex]['s_id'] == k_resp_sid_readDtc:
# 처음 3 바이트는 dtc 정보가 아니다.# The first 3 bytes are not DTC information.# 4 바이트씩 DTC 정보가 있다.# DTC information is in 4 bytes each.
n_dtc = (diag_resp[m_id_hex]['length'] - 3) // 4if n_dtc > 0:
print(f'{m_id_hex = }')
for i_dtc inrange(1, n_dtc + 1):
dtc = diag_resp[m_id_hex]['assembled'][3 + 4 * i_dtc: 3 + 4 * i_dtc + 3]
dtc_hex = []
for byte_dtc in dtc:
dtc_hex.append(f'{byte_dtc:02X}')
dtc_hex_joined = ''.join(dtc_hex)
# DTC가 아니라 fill byte인 경우if dtc_hex_joined == 'aaaaaa':
continue
dtc_converted = convert_dtc(dtc)
dtc_description = get_dtc_description(dtc_converted)
dtc_state = diag_resp[m_id_hex]['assembled'][3 + 4 * i_dtc + 3]
print(f'{i_dtc = }')
print(f'dtc = {dtc_converted} (0x{dtc_hex_joined})')
print(f'{dtc_description}')
print(f'{dtc_state = :02X}')
print('---')
## read data by identifier# DID(Data ID)에 따라 read ECU ID, read Software Version 등 여러 가지가 있다. if diag_resp[m_id_hex]['s_id'] == k_resp_sid_readEcuId:
# data_id에 따라 처리한다.
data_id_high = diag_resp[m_id_hex]['assembled'][1]
data_id_low = diag_resp[m_id_hex]['assembled'][2]
# data_id 별로 response 처리 방법이 다르다.# data_id == ECU Id: if data_id_high == 0xF1and data_id_low == 0x00: # data_id for ECU Id
ecu_id = ''.join([chr(c) for c in diag_resp[m_id_hex]['assembled'][3:] if c != 0xaa])
print(f'{m_id_hex:4}{ecu_id = }')
# data_id == SW Version: elif data_id_high == 0x10and data_id_low == 0x00:
sw_id = ''.join([chr(c) for c in diag_resp[m_id_hex]['assembled'][3:] if c != 0xaa])
print(f'{m_id_hex:4}{sw_id = }')
defon_can_diag_resp(msg, verbose=False) -> None:'''
Transport Protocol로 분할된 메시지들을 받아서 조립하려면
이전 함수 호출 시 데이터들을 기억해야 한다.
이를 위해 전역 변수를 사용한다.
'''global diag_resp
# response 메시지를 카운트한다.# message id
m_id = msg.arbitration_id
# diag_resp의 key로 사용하기 위해 int를 str로 변환한다.
m_id_hex = hex(m_id)
if m_id_hex in diag_resp:
diag_resp[m_id_hex]['counter'] += 1else:
# length_response : 제어기가 알려준 응답의 길리 [바이트] # counter_response : (응답이 길어 여러 메시지로 나뉘어 전송될 경우) 메시지 카운터# serive_id : 서비스 아이디# response_assembled : (응답이 길어 여러 메시지로 나뉘어 전송될 경우) 응답 메시지들 전체를 조립한 메시지# response_hex : response_assembled의 바이트 하나 하나를 hex로 표현
diag_resp[m_id_hex] = {}
diag_resp[m_id_hex]['length'] = 0
diag_resp[m_id_hex]['counter'] = 1
diag_resp[m_id_hex]['s_id'] = 0x00
diag_resp[m_id_hex]['assembled'] = []
diag_resp[m_id_hex]['hex'] = []
# 응답의 데이터 8 바이트를 hex로 표시한다.# 8은 클래식 CAN의 최대 메시지 길이이고, # 모비스 VDC는 CAN-FD로 통신한다.# 진단 통신response 메시지의 길이는 64 바이트이지만,# 처음 8 바이트 외에는 0x00으로 채우고 사용하지 않는다.
data_hex = []
for i inrange(msg.dlc):
data_hex.append(f'{msg.data[i]:02X}')
# if verbose:# print(f'{m_id_hex:4} counter = {diag_resp[m_id_hex]["counter"]:2}: {data_hex}')# TP로 전송된 분해된 response를 조립한다.
pci = msg.data[0] & 0xF0if pci == 0x00: # single frame# 같은 m_id의 다음 응답 처리를 위해 초기화 한다.# counter는 초기화하지 않는다. 응답 횟수를 추적하기 위해서
diag_resp[m_id_hex]['length'] = 0# diag_resp[m_id_hex]['counter'] = 1
diag_resp[m_id_hex]['s_id'] = 0x00
diag_resp[m_id_hex]['assembled'] = []
diag_resp[m_id_hex]['hex'] = []
diag_resp[m_id_hex]['length'] = msg.data[0] & 0x0F
diag_resp[m_id_hex]['assembled'].extend(msg.data[1:8])
elif pci == 0x10: # first frame # 같은 m_id의 다음 응답 처리를 위해 초기화 한다.# counter는 초기화하지 않는다. 응답 횟수를 추적하기 위해서
diag_resp[m_id_hex]['length'] = 0# diag_resp[m_id_hex]['counter'] = 1
diag_resp[m_id_hex]['s_id'] = 0x00
diag_resp[m_id_hex]['assembled'] = []
diag_resp[m_id_hex]['hex'] = []
diag_resp[m_id_hex]['length'] = (msg.data[0] & 0x0F) * 256 + msg.data[1]
diag_resp[m_id_hex]['assembled'].extend(msg.data[2:8])
elif (pci == 0x20): # consecutive frame
diag_resp[m_id_hex]['assembled'].extend(msg.data[1:8])
else: # pci == 0x30: flow control pass# do nothing# blf replay로 데이터를 처리하는 경우 flow control 메시지를 # Rx 할 수도 있다. # 제어기와 온-라인으로 연결한 상태에서는 여기로 올 수 없다.# TP로 전송된 메시지의 조립이 완료되면 출력한다. if ((diag_resp[m_id_hex]['length'] != 0) and
(diag_resp[m_id_hex]['length'] <= len(diag_resp[m_id_hex]['assembled']))):
for byte_response in diag_resp[m_id_hex]['assembled']:
# byte_response를 hex로 변환하여 diag_resp[m_id_hex]['hex']에 저장한다.# 전체 응답 메시지를 hex로 보기 위한 용도이다.
diag_resp[m_id_hex]['hex'].append(f'{byte_response:02X}')
if verbose:
print(f'{m_id_hex:4}{diag_resp[m_id_hex]["hex"]}')
diag_resp[m_id_hex]['s_id'] = diag_resp[m_id_hex]['assembled'][0]
parse_resp(m_id_hex)
return# on_can_diag_resp()
defextract_diag_resp_from_blf(blf, verbose=False) -> None:'''
blf 파일을 읽어서 진단 통신 관련된 부분만 출력한다.
'''global diag_resp
log = can.io.BLFReader(blf)
for msg in log:
if (msg.is_rx) and ((msg.arbitration_id >= 0x700) and (msg.arbitration_id <= 0x7FF)):
on_can_diag_resp(msg, verbose=verbose)
return# extract_diag_resp_from_blf()
Read ECU ID로 스캐닝한 응답 해석
# Read DTC 요청으로 스캐닝한 결과
i = 7
blf = blfs[i]
diag_resp = {}
print(f'{blf.name = }')
extract_diag_resp_from_blf(blf, verbose=False)