-
IDS - 다채널: 데이터 처리 코드 변경application 2025. 11. 26. 15:43
asc 파일에서 IDS의 칩입 판정 기준들을 계산한다.¶
- CAN 채널이 여러 개인 경우의 IDS를 만들어 본다.
- 이전에 작성한 IDS는 CAN 채널이 1개인 경우에 해당한다.
- (채널, 메시지 아이디) 조합을 기준으로 화이트 리스트가 필요하다.
- 침입 감지 기준값들도 (채널, 메시지 아이디)를 기준으로하는 데이터 구조로 변경이 필요하다.
- asc 파일을 입력하면 (채널, 메시지 아이디)별로
- 화이트 리스트를 작성하고,
- 각 메시지 아이디별로 dlc 기준값을 저장하고,
- 각 메시지 아이디별로 침입 감지를 위한 기준값을 계산하여
- 계산 결과를 pickle 파일로 저장한다.
- 수동으로 계산 값을 변경할 경우를 대비하여.
- 계산 결과를 xlsx 파일로 저장한다.
- xlsx를 pickle 파일로 변환하여 저장한다.
# import from pathlib import Path import pandas as pd import pprint from can import ASCReader import pickle분석 대상 asc 파일¶
- 대상 asc 파일을 변경할 경우, 아래 셀의 path_to_data_asc 변수의 값을 변경한다.
# 아래 asc 파일을 대상으로 한다. path_to_data_asc = Path('.').absolute()/'trace'/'kmu_ids_data.asc' if not path_to_data_asc.exists(): raise FileNotFoundError(f"{path_to_data_asc} 파일이 존재하지 않습니다.")def read_asc_to_df(asc): ''' .asc 파일을 읽어서 DataFrame으로 반환한다. asc 파일은 m_id 별로 분리된 파일이어야 한다. ''' # ASCReader로 읽은 메시지를 DataFrame으로 변환 messages = [] with ASCReader(str(asc)) as reader: for msg in reader: messages.append({ 'ts': msg.timestamp, 'ch': msg.channel, 'mid_hex': f"{msg.arbitration_id:X}", # 16진수 문자열로 변환 'dlc': msg.dlc, 'trx': 'Rx', # ASC 파일에서는 대부분 Rx, 필요시 msg 속성으로 확인 가능 'd': 'd', # ASC 파일 형식에서 'd'는 고정값 'is_ext': msg.is_extended_id, 'data': msg.data.hex().upper() # 데이터를 16진수 문자열로 }) df = pd.DataFrame(messages) return dfdf = read_asc_to_df(path_to_data_asc) pprint.pprint(df.head())ts ch mid_hex dlc trx d is_ext data 0 0.001555 0 2B0 5 Rx d False 00000007CB 1 0.002215 0 420 8 Rx d False 701E18C8B84B6A80 2 0.002453 0 130 8 Rx d False 1280678000000A1B 3 0.002691 0 421 8 Rx d False 000000FFE37F21DB 4 0.003248 0 220 8 Rx d False 09847F10000E10D3# 화이트 리스트 작성에 필요한 ch별 mid의 튜블 목록을 작성한다. # mid는 16진수 문자열이다. 정수로 변환한다. df['mid'] = df['mid_hex'].apply(lambda x: int(x, 16)) # ch, mid의 튜플 목록을 작성한다. ch_mids = df[['ch', 'mid']].drop_duplicates().to_records(index=False).tolist() # 출력 print(f'{len(ch_mids)} (ch, mid) pairs found.') pprint.pprint(ch_mids)275 (ch, mid) pairs found. [(0, 688), (0, 1056), (0, 304), ... 중간 생략 (2, 1057), (2, 1024), (2, 0)]def get_d_ts_intrusion_detection_threshold(d_ts_mean, d_ts_min): ''' d_ts_mean: 메시지의 평균 주기 (초) d_ts_min: 메시지의 최소 주기 (초) d_ts_mean에 따라 침입 감지 임계값을 계산한다. 반환값: d_ts_intrusion_detection_threshold (usec) ''' k_sec2usec = 1_000_000 # 초를 마이크로초로 변환하는 계수 if d_ts_mean < 0.011: d_ts_intrusion_detection_desensitizer = 0.003 elif d_ts_mean < 0.021: d_ts_intrusion_detection_desensitizer = 0.004 elif d_ts_mean < 0.051: d_ts_intrusion_detection_desensitizer = 0.005 elif d_ts_mean < 0.101: d_ts_intrusion_detection_desensitizer = 0.006 elif d_ts_mean < 0.201: d_ts_intrusion_detection_desensitizer = 0.007 elif d_ts_mean < 0.501: d_ts_intrusion_detection_desensitizer = 0.008 else: d_ts_intrusion_detection_desensitizer = 0.009 d_ts_intrusion_detection_threshold = int((d_ts_min - d_ts_intrusion_detection_desensitizer) * k_sec2usec) return d_ts_intrusion_detection_threshold# ch_mids의 원소들 별로 # - dlc를 구한다. dlc가 1개인 경우만 (ch, mid)를 키로하는 딕셔너리에 dlc를 밸루로 저장한다. # - d_ts (ts의 diff)의 min과 mean을 구한다. 이 값으로 부터 침입 감지 임계값(d_ts_min)을 구한다. x = {} for ch, mid in ch_mids: # ch, mid 조합에 해당하는 dlc 값을 구한다. df_temp = df[(df['ch'] == ch) & (df['mid'] == mid)] dlc_values = df_temp['dlc'].unique() if len(dlc_values) == 1: x[(ch, mid)] = {'dlc': int(dlc_values[0])} else: print(f'Warning: (ch={ch}, mid={mid}) combination has multiple dlc values: {dlc_values}') print('This (ch, mid) pair is not included in the ch_mid_dlc dictionary.') # ch, mid 조합에 해당하는 d_ts_min을 구한다. d_ts_min = float(df_temp['ts'].diff().dropna().min()) d_ts_mean = float(df_temp['ts'].diff().dropna().mean()) d_ts_threshold = get_d_ts_intrusion_detection_threshold(d_ts_mean, d_ts_min) if d_ts_threshold < 0: print(f'Warning:\n(ch={ch}, mid={mid}/{mid:X}) combination has negative d_ts_threshold: {d_ts_threshold} usec') print(f'{d_ts_mean = }') print(f'{d_ts_min = }') print('Setting d_ts_threshold to 0.\n') d_ts_threshold = 0 x[(ch, mid)]['d_ts_min'] = d_ts_threshold # 직전 메시지의 ts를 저장할 ts_last를 초기화한다. x[(ch, mid)]['ts_last'] = 0 pprint.pprint(x)Warning: (ch=5, mid=2024/7E8) combination has negative d_ts_threshold: -2715 usec d_ts_mean = 0.0002840000000000004 d_ts_min = 0.0002840000000000004 Setting d_ts_threshold to 0. {(0, 67): {'d_ts_min': 990653, 'dlc': 8, 'ts_last': 0}, (0, 68): {'d_ts_min': 989729, 'dlc': 8, 'ts_last': 0}, (0, 127): {'d_ts_min': 990602, 'dlc': 8, 'ts_last': 0}, ... 중간 생략 (4, 1491): {'d_ts_min': 988931, 'dlc': 8, 'ts_last': 0}, (4, 1530): {'d_ts_min': 990530, 'dlc': 8, 'ts_last': 0}, (5, 2024): {'d_ts_min': 0, 'dlc': 8, 'ts_last': 0}}- 0x7E8은 진단 통신 메시지의 아이디이다. 진단 통신 메시지는 전송 주기가 일정하지 않다. 진단 통신 메시지는 전송 주기 기준으로 침입 감지에서 제외한다. 이것은 TSMaster의 IDS 미니프로그램에 구현할 기능이다.
x를 pickle 파일로 저장한다.¶
# x를 pickle 파일로 저장한다. pickle.dump(x, open('x_kmu.pkl', 'wb')) print('x_kmu.pkl 파일로 저장되었습니다.')x_kmu.pkl 파일로 저장되었습니다.# 저장이 잘 되었는지, 로드가 잘 되는지 확인한다. x_loaded = pickle.load(open('x_kmu.pkl', 'rb')) pprint.pprint(x_loaded){(0, 67): {'d_ts_min': 990653, 'dlc': 8, 'ts_last': 0}, (0, 68): {'d_ts_min': 989729, 'dlc': 8, 'ts_last': 0}, (0, 127): {'d_ts_min': 990602, 'dlc': 8, 'ts_last': 0}, ... 중간 생략 (4, 1491): {'d_ts_min': 988931, 'dlc': 8, 'ts_last': 0}, (4, 1530): {'d_ts_min': 990530, 'dlc': 8, 'ts_last': 0}, (5, 2024): {'d_ts_min': 0, 'dlc': 8, 'ts_last': 0}}x == x_loadedTrue- 저장 전과 후의 x를 비교한다.
- 동일하다.
x를 xlsx 파일로 저장한다.¶
- x를 xlsx로 저장한다.
- xlsx를 수동으로 수정한다.
- xlsx를 pkl로 변환한다.
Step 1: x_kmu.pkl을 xlsx로 저장¶
# pkl 파일을 DataFrame으로 변환 x = pickle.load(open('x_kmu.pkl', 'rb')) # (ch, mid)를 키로 하는 딕셔너리를 DataFrame으로 변환 rows = [] for (ch, mid), values in x.items(): rows.append({ 'ch': ch, 'mid_hex': f"0x{mid:X}", # 16진수로 표시 'mid': mid, # 10진수도 함께 저장 'd_ts_min': values['d_ts_min'], 'dlc': values['dlc'], 'ts_last': values['ts_last'] }) df_x = pd.DataFrame(rows) df_x = df_x.sort_values(['ch', 'mid']).reset_index(drop=True) # xlsx 파일로 저장 xlsx_output = Path('.').absolute() / 'x_kmu_editable.xlsx' df_x.to_excel(xlsx_output, index=False) print(f"저장 완료: {xlsx_output}") print(df_x.head(10))Step 2: 편집된 xlsx를 다시 pkl로 변환¶
# xlsx 파일을 읽어서 다시 x 딕셔너리 구조로 변환 xlsx_input = Path('.').absolute() / 'x_kmu_editable.xlsx' df_edited = pd.read_excel(xlsx_input) # DataFrame을 다시 딕셔너리로 변환 x_edited = {} for _, row in df_edited.iterrows(): ch = int(row['ch']) mid = int(row['mid']) x_edited[(ch, mid)] = { 'd_ts_min': int(row['d_ts_min']), 'dlc': int(row['dlc']), 'ts_last': float(row['ts_last']) } # 편집된 데이터를 pkl 파일로 저장 pickle.dump(x_edited, open('x_kmu_edited.pkl', 'wb')) print(f"변환 완료: x_kmu_edited.pkl") # 검증: 로드해서 확인 x_verify = pickle.load(open('x_kmu_edited.pkl', 'rb')) pprint.pprint(x_verify)xlsx에서 읽은 x(= x_verify)와 x를 비교한다.¶
# 편집하지 않았다면 True x == x_verifyTrue# 변경하였다면, 변경 부분을 출력한다. for (ch, mid), values in x_verify.items(): if not x[(ch, mid)] == values: print(f"Mismatch found for (ch={ch}, mid={mid}): original={x[(ch, mid)]}, edited={values}")코드와 데이터
calc_multi_channel_ids_thresholds.ipynb0.06MBkmu_ids_data.zip7.17MBIDS - 다채널 :: hsl's tsmaster 사용기
'application' 카테고리의 다른 글
IDS - 다채널 (0) 2025.11.26 조향과 구동 조종을 위한 판넬을 만드는 법 (0) 2025.09.18 TIO 측정 데이터 분석 - 디지털 입출력 (0) 2025.09.02 TIO 측정 데이터 분석 - mat 파일을 데이터프레임으로 변환하기 (mdf2df) (1) 2025.09.01 UDS 기반 제어기 프로그래머 개발 - libTSCAN (5) 2025.08.11 - CAN 채널이 여러 개인 경우의 IDS를 만들어 본다.