ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • IDS - 다채널: TSMaster 미니프로그램 변경
    카테고리 없음 2025. 11. 26. 16:59

    시작하기 전에

    단채널 IDS를 수정하여 다채널을 대응하도록 한다. 구체적으로는, 메시지 아이디 (mid)만을 기준으로 참조하던 화이트 리스트, DLC (Data Length Code) 기준값, 전송 주기 기준값을 (채널, 메시지 아이디)의 쌍, 즉, (ch, mid)를 기준으로 참조할 수 있도록 코드를 수정한다.  

    수정한 결과는 아래와 같다. 

     

     

     

    미니프로그램 

    • 다채널 IDS 기능의 TSMaster 미니프로그램의 코드은 아래와 같다. 코드의 주석으로 설명을 하였다.
    # CODE BLOCK BEGIN Global_Definitions 
    from TSMaster import *
    # 침입이 없는 상태에서 CAN 버스를 측정하였다.
    # 측정한 데이터를 분석하여, (채널, 메시지 아이디)의 쌍들을 추출했다.
    # (채널, 메시지 아이디) 쌍별로 dlc와전송 주기의 최소값(d_ts_min)을 구했다.
    # 전송 주기의 최소값은 침입 민감 감지를 방지하기 위해, 전송 주기별로 보상을 했다.
    # 이상의 데이터를 미니프로그램에서 쉽게 불러올 수 있도록 파일에 저장했다. 
    
    
    # 파이썬에서 변수를 저장하고 불러오는데 자주 사용되는 pickle 모듈을 이용하였다.
    import pickle
    
    # 데이터 파일에는 아래 정보가 들어있다. 
    # white list ch_mid:
    #    ch_mid: ch는 채널, mid는 메시지 아이디 
    #    등록된 ch_mid가 아니면 침입으로 판정한다. 
    # dlc: data length code. dlc가 맞지 않으면 침입으로 판정한다.
    # d_ts_min: 전송 주기가 이 값보다 작으면 침입으로 판정한다. 
    #     d_ts: ts의 diff라는 의미다. = 메시지 전송 간격
    #     ts: timestamp
    # ts_last: 마지막 메시지의 ts이다.
    # dlc, d_ts_min, ts_last는 모두 ch_mid별로 있다.  
    # (ch, mid)를 키로하는 사전형 변수 x_kum를 정의하였다. 
    # 예)
    #     x[(ch, mid)][dlc] = 8           m_id 메시지의 dlc는 8 바이트이다.
    #     x[(ch, mid)][d_ts_min] = 0.08   m_id 메시지의 침입 판정을 위한 최소 전송 간격은 d_ts_min이다. 
    #     x[(ch, mid)][ts_last] = ts      m_id 메시지가 마지막으로 수신된 시간은 ts이다.
    
    # 데이터 파일이 저장되어 있는 경로를 쉽게 참조하기 위해서 pathlib 모듈을 이용한다.
    from pathlib import Path 
    
    #  변수 x를 pickle 파일에서 로드한다.   
    path_to_pickle = Path('.').absolute()
    path_to_pickle = Path(r'C:\data\tosun\projects\hsl_security\CySec_IDS_KMU\MiniProgram\PySrc')/'x_kmu.pkl'
    x = pickle.load(open(path_to_pickle, 'rb'))
    
    # 프로그램 사용과 관계된 파라미터이다. 
    # 혹시 설명이 궁금하다면 ...
    #     IDS 미니프로그램을 실행하고 온-라인 재생을 여러 번 하는 경우가 종종 생긴다.
    #     온-라인 재생이 종료되자마자 다음 온-라인 재생을 하기보다는 
    #     잠시 결과를 분석을 하고 온-라인 재생을 하는 경우가 대부분이다. 
    #     미니프로그램이 작동 중이라서 타이머는 계속 작동한다.  
    #     다음 재생의 첫 메시지 수신 시각과 지난 재생의 마지막 메시지 수신 시각 차이가 크다. 
    #     이런 경우 처리를 위해, 메시지 수신 간격이 아래 값 이상이면 ts_last를 초기화 한다.     
    k_usec_skip_detection = 5_000_000    # 5초 이상 메시지 전송이 중단되면 ts_last를 초기화
    
    # 침입 감지 건수를 저장할 변수를 선언한다.
    n_intrusion_detection = {
        'dlc': 0,          # dlc 기준 침입 감지 카운터
        'd_ts_min': 0,     # 전송 주기 기준 침입 감지 카운터
        'white_list': 0,   # ch_mid 기준 침입 감지 카운터
    }
    
    # 침입 감지 건수를 Numeric Display로 표시하기 위해서 시스템 변수를 선언하였다. 
    # 시스템 변수 이름을calc.n_id_dlc, calc.n_id_d_ts_min, calc_n_id_white_list로 하였다. 
    
    # 시스템 변수를 초기화한다.
    # 시스템 변수들을 추가할 경우에, 시스템 변수 초기화  코드 수정을 잊어도 문제가 
    # 되지 않도록 아래와 같은 방식으로 해봤다. :-)    
    for k, v in n_intrusion_detection.items():
        app.set_system_var_uint32(f'calc.n_id_{k}', v)           
    app.set_system_var_uint32('calc.n_msg', 0)
    app.set_system_var_uint32('calc.n_diag', 0)
    
    # 처리 메시지 수는  현황 통계를 보기 위한 목적이다.
    n_diag = 0
    n_msg = 0
    n_msg_since_last_display_update = 0
    k_n_msg_to_update_display = 1_000
    
    
    def detect_intrusion(ACAN: RawCAN) -> None:
        '''
        침입을 감지한다.
        '''
        
        global x
        global k_usec_skip_detection
        global n_intrusion_detection
        global n_diag
        global n_msg
        global n_msg_since_last_display_update
        global k_n_msg_to_update_display
        
    
        # 처리한 메시지 수를 추적한다. n_msg
        n_msg_since_last_display_update += 1
        if n_msg_since_last_display_update >= k_n_msg_to_update_display:
            n_msg_since_last_display_update = 0
            n_msg += k_n_msg_to_update_display
            app.set_system_var_uint32('calc.n_msg', n_msg)
            
        # 하드웨어가 찍은 메시지 수신 타임스탬프
        ts = ACAN.time_us
                
        # 화이트 리스트에 없는 메시지면 침입으로 감지한다.
        if (ACAN.idx_chn, ACAN.id) not in x:
            n_intrusion_detection['white_list'] += 1
            app.set_system_var_uint32('calc.n_id_white_list', n_intrusion_detection['white_list'])
                    
            app.log_text(f'{ACAN.idx_chn + 1}_{ACAN.id:x} not in white list', lvlOK)
            return
    
        
        # 메시지 길이가 맞지 않으면 (길거나 짧으면) 침입으로 감지한다.
        if ACAN.dlc != x[(ACAN.idx_chn, ACAN.id)]['dlc']:
            n_intrusion_detection['dlc'] += 1
            app.set_system_var_uint32('calc.n_id_dlc', n_intrusion_detection['dlc'])
    
            app.log_text(f'{ACAN.idx_chn + 1}_{ACAN.id:x} wrong dlc. s/b={x[(ACAN.idx_chn, ACAN.id)]["dlc"]} is={ACAN.dlc}', lvlOK)
            return
                        
    
        # d_ts를 계산한다.
        if x[(ACAN.idx_chn, ACAN.id)]['ts_last'] == 0:
            # 첫 메시지 수신: d_ts를기준값으로 한다. 
            # 침입 감지가 안 된다.
            d_ts = x[(ACAN.idx_chn, ACAN.id)]['d_ts_min']
        else:
            d_ts = ts - x[(ACAN.idx_chn, ACAN.id)]['ts_last']
        
        
        # ts_last를 업데이트한다.
        x[(ACAN.idx_chn, ACAN.id)]['ts_last'] = ts
        
        
        # 프로그램을 종료하지 않은 채로 한참 있다가 재생할 경우, 
        # 첫 메시지의 d_ts 계산이 부정확한 경우가 발생한다. (변수의 범위를 넘어서)   
        # 이런 오류 방지를 위해서, d_ts가 기준 이상이면 
        # x[ACAN.id]['ts_last']만 업데이트하고 침입 감지를 하지 않는다.
        if (d_ts > k_usec_skip_detection) or (d_ts < 0): # 5 sec를 넘으면
            # ts_last는 위에서 업데이트를 했다.
            return     
        
        
        # 진단 통신 메시지는 d_ts 기준 침입 감지에서  제외한다. 진단 통신 메시지의 mid는 0x700 - 0x7FF에 속한다.
        if 0x700 <= ACAN.id <= 0x7FF:
            n_diag += 1
            app.set_system_var_uint32('calc.n_diag', n_diag)
            app.log_text(f'{ACAN.idx_chn + 1}_{ACAN.id:x} is a diag message.', lvlOK)         
            return
    
            
        # d_ts가 기준보다 작으면, 즉, 메시지 간격이 너무 좁으면, 침입으로 감지한다. 
        if d_ts < x[(ACAN.idx_chn, ACAN.id)]['d_ts_min']:
            # 침입이 감지되어 카운터를 증가한다.
            n_intrusion_detection['d_ts_min'] += 1
    
            # 침입 카운터 시스템 변수를 업데이트한다.
            app.set_system_var_uint32('calc.n_id_d_ts_min', n_intrusion_detection['d_ts_min'])
    
            # 기준을 얼마나 초과하였는가?
            d_ts_over_by = x[(ACAN.idx_chn, ACAN.id)]['d_ts_min'] - d_ts
    
            app.log_text(f'{ACAN.idx_chn + 1}_{ACAN.id:x} d_ts too short d_ts={d_ts:,} d_ts_min={x[(ACAN.idx_chn, ACAN.id)]["d_ts_min"]:,} {d_ts_over_by=:,}', lvlOK)         
    
        return
    
    # CODE BLOCK END Global_Definitions 
    # CODE BLOCK BEGIN Instance 
    Instance = MpInstance('intrusion_detection')
    # CODE BLOCK END Instance 
    # CODE BLOCK BEGIN On_CAN_Tx any MCwtMSwtMQ__
    def on_can_tx_any(ACAN: RawCAN) -> None:
        detect_intrusion(ACAN)
    
    # CODE BLOCK END On_CAN_Tx any
    # CODE BLOCK BEGIN On_CAN_Rx any MCwtMSwtMQ__
    def on_can_rx_any(ACAN: RawCAN) -> None:
        # 온-라인 재생 시에 메시지에는 Tx 된다.
        # detect_intrusion(ACAN)
        pass
    
    # CODE BLOCK END On_CAN_Rx any

     

    • Virtual Device 상태에서 온라인 재생을 하면서 IDS 기능 검증을 하였다. 이 경우, IDS는 수신 메시지들이 아닌 송신 메시지들을 대상으로 하게 된다. 그래서 detect_intrusion() 함수를 on_can_tx_any() 이벤트에 연결하였다. 

    트레이스를 온라인 재생하면서 IDS를 작동하므로 on_can_rx가 아닌 on_can_tx 이벤트에 detect_intrusion() 함수를 연결하였다.

    • 실시간으로 침입 감지 상황을 표시하기 위해서 시스템 변수를 이용하였다. 시스템 변수는 아래 그림과 같이 정의하였다. 

    침입 감지 상황을 실시간으로 표시하기 위해 시스템 변수들을 정의하였다.

     

    • Numeric Display 창에서 시스템 변수 버튼(아래 그림에서 마우스 화살표 위치)을 클릭하면, 시스템 변수 창이 열린다. 시스템 변수 창에서 변수를 선택하여 Numeric Display 창에 추가한다. 프로젝트 실행 중에 침입 감지 현황이 실시간으로 업데이트된다.

    Numeric Display 창에 침입 감지 현황을 실시간으로 표시한다.

    • 미니프로그램을 먼저 실행하고, 트레이스를 온라인 재생하면서, 트랜스밋 창에서 임의의 메시지들을 전송하면, IDS가 침입을 감지하여, 침입 감지 현황을 Numeric Display 창에 표시한다. 동시에 System 창에 침입 감지 관련한 상세 정보를 출력한다.

     

    CySec_IDS_KMU.zip
    8.67MB

     

     

    목차 :: hsl's tsmaster 사용기    

    IDS - 다채널 :: hsl's tsmaster 사용기    

    IDS - 다채널: 데이터 처리 코드 변경 :: hsl's tsmaster 사용기