ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 실시간으로 FFT 하기 - 미니프로그램과 ai
    design 2024. 10. 25. 01:54

    시작하기 전에

    앞에서 설명했던 내용들을 다시 짚어 본다. 

    ESC (a.k.a. VDC) 제어기가 20msec 마다 전송하는 WHL_SPD11 메시지에 바퀴 속도 신호들이 있다. 이들 중 WHL_SPD_RR (Rear Right), WHL_SPD_RL (Rear Left)을 이용하여 실시간으로 요-레이트(yaw_rate_ws)를 계산하여 그래픽스 창에 표시하는 방법을 설명했다. CAN 신호들로 실시간 연산하기 (yaw_rate_ws)

    yaw_rate_ws가 매끄럽지 못하여 실시간으로 필터링한 신호 (yaw_rate_ws_filt)를 그래픽스 창에 표시하는 방법을 설명하였다. 칼만 필터를 이용했다. 칼만 필터의 코드는 claude.ai가 짰다.

    FFT (Fast Fourier Transform)를 해본다. yaw_rate_ws를 FFT 하면, 이 신호를 구성하는 사인(sine)파들의 진동수와 진폭을 계산하고 표시한다. 계산 방법은 구글링으로 찾은 예제 코드를 이용하였다. 

    나는 TSMaster에 FFT 결과를 그래픽으로 표시할 수 있는 방법이 없다는 것을 FFT 계산에 성공한 후에 알았다. 그래서 claude.ai에 가능한 방법을 문의하였다. Claude.ai가 준 코드로 FFT 결과를 그래픽 애니메이션으로 으로 표시할 수 있었다. (결과가 궁금하신 분은 비디오를 확인하십시오.)

    TSMaster에서 CAN 신호를 실시간 FFT하고, 계산 결과를 matplotlib으로 표시하기 - YouTube

     

    개요

    • 최근 (아래 그림의 FFT 계산 구간) 의 yaw_rate_ws 데이터 를 대상으로 FFT를 한다. 

    데이터가 계산 쌓이므로 가장 최근 몇 초간의 데이터를 대상으로 FFT 계산을 한다.

    • FFT 대상 데이터를 저장할 버퍼가 필요하다. 새로 수신된 바퀴 속도를 이용해서 새로 계산한 yaw_rate_ws를 버퍼에 추가하고, 버퍼의 가장 오래된 데이터를 삭제한다. 버퍼가 차면 기존 데이터를 덮어쓰는 링 버퍼(ring buffer) 방식을 적용하기로 한다. claude.ai에게 요청하여 만든 링 버퍼 코드 사용하였다. 
    • 버퍼의 데이터를 대상으로 FFT를 한다. FFT 계산은 구글링으로 찾은 예제 코드를 사용하였다. FFT 계산 결과로 frequencies와 amplitudes를 얻는다.
    • 수평축을 frequency로 수직축을 amplitude로 하여 그래프로 표시한다. 
      • frequencies와 amplitudes는 각각 배열(array)이다. TSMaster에는 (배열, 배열) 쌍을 그래프로 표시하는 위젯이 없다. 
      • claude.ai에게 방법을 문의하여 아래 기능을 하는 코드를 만들었다.
        • FFT 계산 결과를 그래프로 표시하는 프로그램 (visualizer) (FFT를 주기적으로 반복 수행하니까 결과를 애니메이션으로 출력해야 한다.)
        • TSMaster의 미니프로그램에 추가할 소켓 통신 코드.
          • TSMaster가 비주얼라이저에게 FFT 계산 결과를 전송해야 한다. 소켓 통신을 이용한다.
          • 나는 소켓 통신이 뭔지도 몰랐다. claude.ai가 제시한 방법들 중에서 소켓 통신을 골랐다. 프로젝트를 마친 지금 소켓 통신에 대한 어렴풋한 이해가 생겼지만, 남들에게 설명할 만큼 알지 못한다.
    • FFT 계산을 얼마나 자주해야 하는가? 우선 5초 마다 한 번 하기로 한다. 

     

    링 버퍼 (ring buffer)

    • claude.ai에게 아래와 같이 문의하였다.
      • 프람프드: 파이썬으로 원소 수가 k_n_element_max개인 numpy "롤링" 버퍼를 만들고 싶습니다. 어떤 방법이 최적인가요?
      • 나는 "링 버퍼"라는 용어를 몰라 롤링 버퍼라고 부를 정도로 프로그래밍 분야에 전문 지식이 없다.
    • 아래 코드를 받았다. 
      • claude.ai는 RingBuffer라고 하지 않고 RollingBuffer라고 클래스 명을 정하는 친절함을 보였다.^^
    import numpy as np
    
    class RollingBuffer:
        def __init__(self, max_size):
            """
            max_size: 버퍼의 최대 크기
            """
            self.max_size = max_size
            self.buffer = np.zeros(max_size)
            self.current_size = 0
            self.position = 0
        
        def append(self, value):
            """
            새로운 값을 버퍼에 추가
            """
            self.buffer[self.position] = value
            self.position = (self.position + 1) % self.max_size
            self.current_size = min(self.current_size + 1, self.max_size)
        
        def extend(self, values):
            """
            여러 값을 한번에 버퍼에 추가
            """
            values = np.asarray(values)
            if len(values) >= self.max_size:
                self.buffer = values[-self.max_size:]
                self.position = 0
                self.current_size = self.max_size
            else:
                remaining = self.max_size - self.position
                if len(values) <= remaining:
                    self.buffer[self.position:self.position + len(values)] = values
                    self.position = (self.position + len(values)) % self.max_size
                else:
                    self.buffer[self.position:] = values[:remaining]
                    self.buffer[:len(values) - remaining] = values[remaining:]
                    self.position = len(values) - remaining
                self.current_size = min(self.current_size + len(values), self.max_size)
        
        def get_buffer(self):
            """
            현재 버퍼의 모든 유효한 값을 시간 순서대로 반환
            """
            if self.current_size < self.max_size:
                return self.buffer[:self.current_size]
            else:
                return np.concatenate((
                    self.buffer[self.position:],
                    self.buffer[:self.position]
                ))
        
        def is_full(self):
            """
            버퍼가 가득 찼는지 확인
            """
            return self.current_size == self.max_size
    
        def clear(self):
            """
            버퍼를 초기화
            """
            self.buffer = np.zeros(self.max_size)
            self.current_size = 0
            self.position = 0
    • 앞에서 설명한 yaw_rate_ws 계산 방법 (CAN 신호들로 실시간 연산하기 - 미니프로그램으로 yaw_rate_ws 계산 )의 코드에 추가하였다.
      • 링 버퍼는 프로그램 동작 중 계속 있어야 하는 변수다. 그리고 여러 함수들에서 사용할 수 있다. 그래서 글로벌 변수로 선언한다. 
      • 미니프로그램에서 글로벌 변수로 선언은 "Global Definition"에서 한다. Global Definition을 클릭하고 코드 창에 위 코드를 복붙한다. RingBuffer 클래스가 선언된다. (나중에 Rolling Buffer는 정확한 용어가 아닌 것을 알고 내가 RingBuffer로 클래스 이름을 바꿨다.) 
      • ring_buffer_yaw_rate_ws = RingBuffer(버퍼 크기) 라는 한 줄의 코드로 객체를 만든다. 이 객체가 실제 링 버퍼로 사용된다. 
      • 버퍼 크기를 먼저 정해야 한다. WHL_SPD11 메시지의 전송 주기는 0.02sec이다. FFT 계산 구간을 4초로 하였다. (처음에 5초로 하였다가 4초로 변경하였다. 이유를 나중에 설명한다.) 필요 버퍼 크기는 1 / 0.02 * 4 = 200이다.

     

    Global Definition을 선택하고 코드 창에 링 버퍼 코드를 복붙하여 링 버퍼를 글로벌 변수로 선언하였다.
    claude.ai가 만든 코드는 링 버퍼의 클래스다. 프로그램에서 사용할 객체 (ring_buffer_yaw_rate_ws)를 만든다. 먼저 버퍼 크기를 계산한다.

    • WHL_SPD11 메시지를 수신할 때마다, yaw_rate_ws를 계산한다. 계산 직후에 ring_buffer에 yaw_rate_ws를 추가한다. 그렇게 하기 위해 On_CAN_Rx에서 WHL_SPD11 메시지를 선택한다. 

    링 버퍼에 yaw_rate_ws 신호를 추가한다.

     

    FFT 계산하기

    • "Python FFT"으로 구글링하여 scipy 모듈의 fftpack.fft() 함수와 예제 코드를 찾았다. 예제 코드를 미니프로그램에 적용한다. 
    • 수신한 WHL_SPD11 메시지 수가 기준값 (k_n_message_to_calc_fft)에 다다르면 FFT 계산을 한다.
      • k_n_message_to_calc_fft를 변경하여 FFT 계산 주기를 변경할 수 있다.
      • 수신한 WHL_SPD11 메시지 수를 카운트 해야한다. i_message_since_last_calc_fft로 카운트한다.
      • 원래 계획은 미니프로그램의 timer를 이용하여 주기적으로 FFT 계산을 하도록 하는 것이었다. 미니프로그램의 timer 사용법을 설명하기 위한 의도다.
      • 그런데 TSMaster의 Bus Replay 기능은 오프라인 상태에서 실제 시간보다 훨씬 빠르게 재생된다. 타이머는 실제 시간에 따라 동작한다. 그래서 실제 300초 길이의 트레이스의 오프라인 재생 시간은 5초도 채 안 된다. timer 이벤트가 작동하여 FFT 계산이 실행되기 전에 재생이 완료되는 문제가 있다는 것을 알았다. timer 대신 메시지를 카운트하는 방식으로 변경하였다. 

    FFT 계산을 한다. k_n_message_to_calc_fft 변수를 변경하여 계산 주기를 조절할 수 있다. 버퍼가 차기 전에는 계산하지 않도록하였다.

    • 미니프로그램을 실행하고, 메인 메뉴/ Analysis/ Bus Replay를 하여 결과를 확인한다. 문제없이 잘 수행된다.

    시스템 메시지 창에 FFT 계산 결과인 frequencies와 amplitudes가 잘 표시된다.

     

    FFT 계산 결과를 그래프로 표시하기

    • 계산 결과를 수치로 보니 직관적이지 않다. 천천히 꼼꼼히 봐야해서 불편했다. 그래서 그래프로 표시하기로 했다. 그래프로 표시하려고 보니 TSMaster에 마땅한 위젯이 없다는 것을 처음 알았다. 
    • Claude.ai에게 문의하니 아래와 같이 방법과 코드  (FFT Spectrum Visualizer, TSMaster FFT Data Sender) 를 제시하였다.

    Claudi.ai에게 방법을 물었다.

     

    FFT  Spectrum Visualizer

    • 프로그램의 기능은 위 설명대로이다. 
    • 나는 소켓 통신이 뭔지도 몰랐고, 프로젝트를 마친 지금도 어렴풋이 이해하지만 남들에게 설명하지는 못한다. 
    • 나는 프로젝트 이전에도 그리고 지금도 json은 "이름: 값" 형식의 데이터 저장 포맷이라는 정도만 안다. 
    • 나는 matplotlib을 잠깐 사용했었다. 파이썬으로 그래프를 그리려면 사람은 누구나 처음에 이 모듈을 만져볼 것이다. 나는 plotly라는 모듈을 주로 사용한다. matplotlib에 애니메이션 기능이 있다는 것은 새로운 발견이다.
    • 이런 정도의 지식과 경험으로도 소켓 통신으로 FFT 데이터를 받아서 matplotlib으로 데이터를 그래프로, 그것도 애니메이션 그래프로, 출력하는데 별 어려움이 없었다. 
    import socket
    import json
    import numpy as np
    import matplotlib.pyplot as plt
    from matplotlib.animation import FuncAnimation
    
    class FFTVisualizer:
        def __init__(self, port=5000):
            # Socket 설정
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.bind(('localhost', port))
            self.server_socket.listen(1)
            
            # 그래프 설정
            self.fig, self.ax = plt.subplots()
            self.line, = self.ax.plot([], [])
            self.ax.set_title('FFT Spectrum')
            self.ax.set_xlabel('Frequency (Hz)')
            self.ax.set_ylabel('Amplitude')
            
            # 데이터 버퍼
            self.frequencies = None
            self.amplitudes = None
            
            # 연결 수락
            print(f"Waiting for connection on port {port}...")
            self.client_socket, _ = self.server_socket.accept()
            print("Connected!")
    
        def update_plot(self, frame):
            try:
                # 데이터 수신
                data = self.client_socket.recv(4096).decode()
                if data:
                    # JSON 형식의 데이터 파싱
                    fft_data = json.loads(data)
                    self.frequencies = np.array(fft_data['frequencies'])
                    self.amplitudes = np.array(fft_data['amplitudes'])
                    
                    # 그래프 업데이트
                    self.line.set_data(self.frequencies, self.amplitudes)
                    self.ax.relim()
                    self.ax.autoscale_view()
                    
            except Exception as e:
                print(f"Error: {e}")
                
            return self.line,
    
        def run(self):
            ani = FuncAnimation(self.fig, self.update_plot, interval=100, blit=True)
            plt.show()
    
        def cleanup(self):
            self.client_socket.close()
            self.server_socket.close()
    
    if __name__ == "__main__":
        visualizer = FFTVisualizer()
        try:
            visualizer.run()
        finally:
            visualizer.cleanup()

     

    • 위 코드에서 수정한 부분이 있다. 아래 부분을 

          아래와 같이 변경하였다. 

    • 이유는  간혹 아래와 같은 에러가 발생해서였다. 

    claude.ai가 작성한 원래 코드에서는 통신 중에 간혹 에러가 발생한다.

    • "Error: Extra data: line 1 column 9 (char 8)"를 복사하여 검색했다.  Python Socket Receive Large Amount of Data - Stack Overflow 에서 변경 후 코드를 찾아서 사용했다.  
    • 에러 발생 빈도가 확연히 줄었지만 완전히 사라지지 않았다. 
    • 데이터가 많아서 그런가 생각하여, FFT 계산 구간을 5초에서 4초로 변경하고, FFT 계산 주기를 늘렸다. 에러가 발생하지 않는다. 계산 구간을 4초로 변경하고 FFT 계산 주기를 늘리는 변경은 FFT Spectrum Visualizer가 아닌 미니프로그램에서 했다.
    • 이 코드를 DOS 창에서 아래 명령어로 실행한다. 내 경우 파일 이름은 "01_fft_spectrum_visualizer.py" 이다. 
    python 파일_이름.py
    • 비주얼라이저를 먼저 실행하고 TSMaster에서 미니프로그램을 실행해야 한다. 비주얼라이저가 소켓 통신을 대기하고 있다가, 미니프로그램이 소켓 통신을 시작할 때 연결이 되기 때문이다. 
      • 코드에서 쉽게 유추할 수 있었다. 비주얼라이저 코드에 아래 부분들이 있다.
            self.server_socket.listen(1)
            # 연결 수락
            print(f"Waiting for connection on port {port}...")
            self.client_socket, _ = self.server_socket.accept()
            print("Connected!")
    • 비주얼라이저를 실행하면 그래프가 표시될 빈 창이 열린다.
      • 창을 미리 열어두는 것이 보기에  편리해서 코드를 한 줄 (self.fig.show()) 추가했다.
            # 그래프 설정
            self.fig, self.ax = plt.subplots()
            self.line, = self.ax.plot([], [])
            self.ax.set_title('FFT Spectrum')
            self.ax.set_xlabel('Frequency (Hz)')
            self.ax.set_ylabel('Amplitude')
            self.fig.show() # 창을 미리 열어두기 위해 추가한 코드

    그래프가 표시될 빈 창이 열린다.

     

    TSMaster FFT Data Sender

    • Claude.ai가 준 FFTDataSender 코드는 아래와 같다.
    import socket
    import json
    
    class FFTDataSender:
        def __init__(self, port=5000):
            self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client_socket.connect(('localhost', port))
    
        def send_fft_data(self, frequencies, amplitudes):
            data = {
                'frequencies': frequencies.tolist(),
                'amplitudes': amplitudes.tolist()
            }
            self.client_socket.send(json.dumps(data).encode())
    
        def cleanup(self):
            self.client_socket.close()
    
    # 티마 미니프로그램에서 FFT 계산 후 사용 예시
    fft_sender = FFTDataSender()
    try:
        # FFT 계산 후...
        fft_sender.send_fft_data(frequencies, amplitudes)
    finally:
        fft_sender.cleanup()
    • 코드를 보면 FFTDataSender 객체(fft_sender)가 생성될 때 소켓 통신이 만들어 진다. 이렇게 만든 소켓 통신이 미니프로그램 작동 중에 계속 유지되도록 하기 위해 객체를 글로벌 변수로 만든다. 그렇게 하기 위해 Global Definition에 위 코드를 클래스 선언 부분을 복붙한다.
    • fft_sender = FFTDataSender()로 객체를 생성한다. (= 소켓 통신을 시작한다.)

    fft_sender를 Global Definition에서 글로벌 변수로 생성한다.

    • FFT 계산 후에 계산 결과를 소켓 통신으로 FFT Spectrum Visualizer로 전송한다.

    • 미니프로그램과 비주얼라이저는 기본 기능은 잘 작동하나 사용하기에 불편함이 있다. 작동 순서가 틀리면 동작하지 않는 문제가 있다. 비주얼라이저 창이 잘 안 닫히는 문제도 있다. 나의 목적은 방법과 개념 설명이라, 목적에는 충분한 수준이라고 생각한다.
    • Claude.ai가 속도와 안전성이 개선된 코드를 만들어 주었다. 아직 해보지 못했다. 시간이 되면 해보고, 가치가 있다면 블로그로 작성하겠다.

     

    결론

    • TSMaster로 CAN 버스에서 받은 신호를 실시간으로 연산하고, 연산 결과를 실시간으로 FFT 하는 방법을 설명하였다.
    • 실시간으로 추가되는 데이터를 대상으로 FFT를 하기 위해 링 버퍼가 필요하다. AI에게 요청하여 만든 코드로 빠르게 링 버퍼를 구현하였다. 
    • FFT는 파이썬의 scipy 모듈을 사용하였다. FFT 연산만 따지면 2줄의 코드로 계산된다. 
    • FFT 계산 결과를 그래프로 표시할 위젯이 TSMaster에 없어서 (이런 위젯 개발 검토를 요청했다.) FFT Spectrum Visualizer라는 파이썬 코드를 ai에게 요청하여 만들었다. 비주얼라이저는 소켓 통신으로 TSMaster의 미니프로그램과 송신하도록 되어있다. 전송 데이터 양과 관련된 에러가 있어서고, 구글링으로 찾은 개선된 코드로 문제를 쉽게 해결하였다.  
    • 비주얼라이저와 FFT 계산 결과를 전송하는 기능이 미니프로그램에 필요하다. ai가 작성한 코드를 미니프로그램에 추가하여 기능을 구현하였다.
    • FFT, 링 버퍼, 소켓 통신는 내가 거의 모르거나 개념만 아는 주제들이다. 이런 주제가 포함된 프로젝트인데 ai 덕택에 잘 마칠 수 있었다. 

     

    참조

     

    미니프로그램 전체 코드

    yaw_rate_ws_fft.py
    0.01MB

     

    비주얼라이저 전체 코드

    01_fft_spectrum_visualizer.py
    0.00MB