Curieux.JY
  • Post
  • Note
  • Jung Yeon Lee

On this page

  • 1 제어 루프의 근본적인 전환(Timer)
  • 2 실행기 백그라운드 처리의 안정화
  • 3 Device 일관성 문제의 해결
  • 4 추론 성능 최적화
  • 5 Observation Drop에 대한 Robustness
  • 6 Buffer Rolling의 안전한 구현
  • 7 디버깅을 위한 포지션 Gap 퍼블리셔
  • 8 신중한 초기화 과정
  • 9 효율적인 로깅 전략
  • 10 ROS2 API의 올바른 사용
  • 11 추가 최적화 가능성
  • 12 로직의 일관성 유지

👩‍💻ROS2 code 개선

ros2
python
code
기존의 ROS1 제어 코드에서 ROS2 제어 코드로 리펙토링
Published

October 31, 2025

ROS2 제어 코드 개선: 실시간성과 안정성을 위한 종합 가이드

ROS1 기반의 제어 코드를 ROS2로 전환하면서 가장 중요하게 생각한 점은 단순히 API를 바꾸는 것이 아니라, ROS2의 특성을 활용해 실시간성과 안정성을 근본적으로 향상시키는 것이었습니다. 이 글에서는 기존 코드가 가진 문제점들을 하나씩 짚어보고, 어떻게 개선했는지 그 과정과 이유를 상세히 설명하겠습니다.

1 제어 루프의 근본적인 전환(Timer)

기존 코드는 while True 루프 안에서 time.sleep()을 사용해 주기를 맞추는 방식이었습니다. 얼핏 보면 간단하고 직관적이지만, 실제로는 여러 문제를 안고 있었습니다. 운영체제의 스케줄링 정책, 파이썬의 GIL(Global Interpreter Lock), 깊은 함수 호출 스택 등이 복합적으로 작용하면서 제어 주기에 큰 지터(jitter)가 발생했습니다. 특히 ROS 콜백이나 구독자 처리와 동시에 실행될 때는 주기가 더욱 불안정해지는 문제가 있었습니다.

이를 해결하기 위해 ROS2의 타이머 메커니즘을 도입했습니다. self.allegro.create_timer(period, self._control_step)처럼 타이머를 등록하면, ROS2의 실행기(executor)가 지정된 주기마다 콜백 함수를 호출해줍니다. 이 방식의 장점은 제어 주기가 ROS 실행기에 의해 관리되기 때문에 지터가 크게 감소한다는 것입니다. 메인 스레드는 타이머만 처리하고, I/O 작업이나 다른 콜백은 실행기가 적절히 스케줄링해주기 때문에 자원 경합도 줄어듭니다.

2 실행기 백그라운드 처리의 안정화

초기 구현에서는 rclpy의 내부 API인 rclpy.executors._util이나 _thread를 사용했습니다. 하지만 이는 공식적으로 지원되지 않는 private API였고, ROS2 버전이 달라지면 AttributeError가 발생하는 심각한 문제가 있었습니다. 실제로 다른 환경에서 코드를 실행하려고 할 때 이 문제로 인해 코드가 동작하지 않는 경우가 빈번했습니다.

이를 해결하기 위해 표준 라이브러리인 threading.Thread와 ROS2의 SingleThreadedExecutor를 조합한 안정적인 방식으로 전환했습니다. 별도의 스레드에서 실행기를 돌리면서 ROS2 노드가 백그라운드에서 안전하게 동작하도록 구현했습니다. 이렇게 하면 타이머 콜백과 구독자 콜백이 서로 막히지 않고 원활하게 처리됩니다. 공식 API만 사용하기 때문에 버전 호환성 문제도 완전히 해결되었습니다.

3 Device 일관성 문제의 해결

딥러닝 모델을 사용할 때 가장 흔하게 마주치는 문제 중 하나가 텐서, 모델, 정규화 객체들이 서로 다른 디바이스(CUDA GPU 또는 CPU)에 있을 때 발생하는 오류입니다. “Expected all tensors to be on the same device”라는 런타임 에러가 발생하면 디버깅이 매우 어렵습니다.

이를 근본적으로 해결하기 위해 self.device를 단일 진실 공급원(single source of truth)으로 설정하고, 모델, running_mean_std, sa_mean_std는 물론이고 관측 버퍼인 obs_buf, 고유 수용 히스토리 버퍼인 proprio_hist_buf까지 모두 같은 디바이스에 생성하고 이동시켰습니다. ⚠️하드웨어로 명령을 보내야 할 때만 self.cur_target.detach().to('cpu').numpy()처럼 CPU로 변환합니다. 이렇게 하면 디바이스 간 이동을 최소화해 속도가 향상되고, 디바이스 불일치 에러도 완전히 사라집니다.

4 추론 성능 최적화

제어 루프에서는 모델의 순전파(feed forwarding)만 필요하고 역전파(backprop)는 전혀 필요하지 않습니다. 하지만 기본적으로 PyTorch는 그래디언트 트래킹을 수행하기 때문에 불필요한 메모리와 연산 비용이 발생합니다. 또한 코드 곳곳에서 .clone()을 남발하면 메모리 복사 오버헤드가 커집니다.

타이머 콜백 함수인 _control_step에 @torch.inference_mode() 데코레이터를 적용해 추론 모드임을 명시적으로 선언했습니다. 이렇게 하면 그래디언트 계산과 관련된 모든 오버헤드가 제거됩니다. 또한 코드를 전체적으로 검토해서 꼭 필요한 위치에서만 clone()을 사용하도록 수정했습니다. 이러한 최적화를 통해 추론 시간이 안정화되고 전체적인 제어 주기가 일정하게 유지됩니다.

5 Observation Drop에 대한 Robustness

실시간 제어 시스템에서 피할 수 없는 문제 중 하나가 센서 데이터의 지연이나 누락입니다. 타이머 주기와 /joint_states 토픽의 퍼블리시 주기가 완벽하게 일치하지 않으면, 어떤 제어 틱에서는 새로운 관측 데이터가 도착하지 않을 수 있습니다. 이때 블로킹 방식으로 데이터를 기다리면 제어 주기 자체가 깨져버립니다.

이를 해결하기 위해 논블로킹 방식인 wait=False로 데이터를 읽도록 변경했습니다. 새로운 샘플이 없으면 직전에 유효했던 관측값인 _last_obs_q를 재사용합니다. 동시에 드랍된 횟수를 _skipped 카운터로 기록해서 로깅합니다. 이렇게 하면 센서 데이터가 일시적으로 지연되더라도 제어 주기는 계속 유지되고, 시스템이 로버스트하게 동작합니다. 드랍 횟수를 모니터링함으로써 센서 통신에 문제가 있는지도 파악할 수 있습니다.

6 Buffer Rolling의 안전한 구현

시계열 데이터를 처리할 때 버퍼를 시프트하는 작업이 자주 필요합니다. 하지만 self.obs_buf[:, :64] = self.obs_buf[:, 32:]처럼 같은 텐서 내에서 겹치는 메모리 영역에 대입하면 PyTorch가 예외를 발생시킵니다. 이는 메모리 안정성을 위한 설계 결정인데, 올바르게 처리하지 않으면 런타임 에러로 이어집니다.

이를 해결하는 방법으로 두 가지를 제시했습니다. 첫 번째는 clone()을 사용하는 방법입니다. src = self.obs_buf[:, 32:96].clone()처럼 원본 데이터를 먼저 복사한 후 대입하면 안전합니다. 두 번째는 torch.roll()을 사용하는 방법입니다. rolled = torch.roll(self.obs_buf, shifts=-32, dims=1)로 전체 버퍼를 시프트한 후 self.obs_buf.copy_(rolled)로 복사하면 됩니다. 두 방법 모두 안전하고 명확하며, 예외 없이 동작합니다.

7 디버깅을 위한 포지션 Gap 퍼블리셔

로봇 제어에서 명령값과 (명령값 실행 후의)실제 위치 사이의 오차는 매우 중요한 지표입니다. 하지만 기존 코드에서는 이를 외부에서 확인할 방법이 없었습니다. 디버깅이나 성능 분석을 할 때 이 정보가 매우 유용합니다.

이를 위해 /position_gap 토픽을 새로 추가했습니다. sensor_msgs/JointState 메시지 타입을 사용해서 ROS 생태계의 표준 도구들과 호환되도록 했습니다. header.stamp에는 node.get_clock().now().to_msg()로 정확한 ROS 타임스탬프를 기록하고, name 필드에는 16개 조인트의 이름을, position 필드에는 명령값과 실측값의 차이를 담습니다. 이렇게 하면 rqt_plot으로 실시간 모니터링을 하거나, rosbag으로 기록해서 나중에 분석하는 것이 가능해집니다.

갭을 측정하는 타이밍도 중요합니다. 초기에는 “명령 전송 → 짧은 sleep → 갭 측정”을 고려했지만, 최종 구조에서는 타이머 콜백이 매 틱마다 최신 상태를 반영하기 때문에 별도의 sleep이 필요 없습니다. I/O 노드가 항상 최신 /joint_states를 기준으로 논블로킹 방식으로 갭을 계산하므로 제어 주기를 전혀 방해하지 않습니다.

8 신중한 초기화 과정

제어를 시작하기 전의 초기화 과정도 매우 중요합니다. 워밍업 단계에서는 제어 주파수의 4배에 해당하는 횟수만큼 초기 자세 명령을 보냅니다. 이는 모터 드라이버가 안정화되도록 하기 위함입니다.

첫 번째 관측은 특별하게 처리합니다. wait=True, timeout=5.0으로 블로킹 방식으로 한 번만 읽어서 유효한 데이터가 확실히 들어왔는지 확인합니다. 타임아웃을 설정해서 무한정 기다리는 것을 방지합니다. 이후부터는 모두 논블로킹 방식으로 전환합니다. 이렇게 하면 초기 버퍼 상태가 정상적으로 설정되고, 그 다음부터는 제어 주기가 깨지지 않습니다.

9 효율적인 로깅 전략

디버깅을 위해 로깅은 필수적이지만, 매 제어 틱마다 출력하면 성능이 크게 저하됩니다. 특히 고주파수로 동작하는 제어 루프에서는 로깅 자체가 병목이 될 수 있습니다.

이를 해결하기 위해 간헐적 로깅 전략을 사용했습니다. 예를 들어 5초에 한 번씩만 주기와 지터 정보를 출력합니다. skipped 카운터는 계속 업데이트하되, 출력은 주기적으로만 합니다. 이렇게 하면 필요한 정보는 얻으면서도 성능 저하를 최소화할 수 있습니다.

10 ROS2 API의 올바른 사용

ROS2를 사용할 때는 공식 API만 사용하는 것이 매우 중요합니다. 앞서 언급한 것처럼 내부 API를 사용하면 버전 호환성 문제가 발생할 수 있습니다. SingleThreadedExecutor로도 대부분의 경우 충분하지만, 만약 콜백 간의 경합이 심하다면 MultiThreadedExecutor를 고려할 수 있습니다.

프로그램을 종료할 때도 올바른 순서가 중요합니다. stop_allegro_io() 함수에서 executor.shutdown()을 먼저 호출해서 실행기를 정리하고, node.destroy_node()로 노드를 파괴한 다음, 마지막으로 rclpy.shutdown()을 호출합니다. 이 순서를 지키면 리소스가 깨끗하게 정리되고 예기치 않은 에러를 방지할 수 있습니다.

11 추가 최적화 가능성

더 나아가 성능을 극대화하고 싶다면 몇 가지 선택적인 최적화 기법을 적용할 수 있습니다. FP16이나 TF32를 사용하면 모델을 half precision으로 변환해서 추론 지연을 줄일 수 있습니다. 단, 이는 CUDA를 사용할 때만 가능하고 입력 데이터도 같은 타입으로 맞춰야 합니다.

입력 shape이 고정되어 있다면 CUDA Graph를 활용할 수 있습니다. 그래프를 한 번 캡처한 후 재사용하면 커널 런칭 오버헤드가 줄어들어 지터가 감소합니다. MLP 네트워크의 actor_units이나 priv_mlp_units 차원을 줄이는 것도 실시간성을 향상시키는 방법입니다.

타이머 주기 설정도 신중해야 합니다. 하드웨어와 드라이버의 지연을 고려해서 20Hz부터 시작하는 것이 안전합니다. 시스템이 안정적으로 동작하는 것을 확인한 후 점진적으로 주파수를 높이면 됩니다.

12 로직의 일관성 유지

이 모든 개선 과정에서 가장 중요하게 생각한 것은 ROS1과 ROS2 사이의 로직 일관성입니다. 관측 데이터를 받아서 정규화하고, 버퍼를 업데이트하고, 모델로 추론하고, 액션을 스케일링하고 클리핑해서 타깃을 갱신하고, 명령을 전송하고, 다음 관측을 받는 전체 파이프라인은 동일합니다.

차이점은 조인트 순서 변환과 토픽 인터페이스뿐입니다. ROS1과 ROS2는 조인트 순서가 다르기 때문에 이를 변환하는 로직이 필요하고, 토픽 이름이나 메시지 타입이 약간 다를 수 있습니다. 하지만 핵심 제어 로직은 완전히 동일하게 유지했습니다.


결론

이번 리팩토링의 핵심은 ROS2의 특성을 제대로 활용해서 실시간성과 안정성을 극대화하는 것이었습니다. 타이머 기반 제어 루프로 전환해서 정확한 제어 주기를 달성했고, 디바이스 일관성과 버퍼 관리를 개선해서 안정성을 높였습니다. 포지션 갭 퍼블리셔를 추가해서 분석 가능성을 향상시켰고, inference_mode와 최적화 기법을 적용해서 성능을 개선했습니다.

이러한 개선들은 개별적으로도 의미가 있지만, 함께 적용했을 때 시너지 효과가 큽니다. 정확한 제어 주기, 안정적인 동작, 효율적인 성능, 그리고 디버깅 가능성이 모두 향상되어 실제 로봇 제어 시스템에서 신뢰할 수 있는 코드가 되었습니다.

ROS2 제어 코드 개선: Before/After 핵심 코드 비교

1. 제어 루프: while+sleep → ROS2 Timer

❌ Before: 불안정한 while 루프

def deploy(self):
    self.allegro = start_allegro_io(side='right')
    hz = 20

    # ... 초기화 ...

    timestep = 0
    try:
        while True:
            loop_start = time.perf_counter()

            # 관측 정규화
            self.obs_buf = self.running_mean_std(self.obs_buf.clone())

            # 추론
            input_dict = {
                "obs": self.obs_buf,
                "proprio_hist": self.sa_mean_std(self.proprio_hist_buf.clone()),
            }
            action = torch.clamp(self.model.act_inference(input_dict), -1.0, 1.0)

            # 제어
            self.pre_physics_step(action)
            cmd = self.cur_target.detach().to('cpu').numpy()[0]
            ros1 = _action_hora2allegro(cmd)
            ros2 = _reorder_imrt2timr(ros1)
            self.allegro.command_joint_position(ros2)

            # 블로킹으로 관측 대기
            q_pos = self.allegro.poll_joint_position(wait=True, timeout=0.2)
            ros1_q = _reorder_timr2imrt(q_pos)
            hora_q = _obs_allegro2hora(ros1_q)
            obs_q = torch.from_numpy(hora_q.astype(np.float32)).to(self.device)

            self.post_physics_step(obs_q)

            time.sleep(0.03)  # 주기가 불안정!
            timestep += 1

            freq = 1.0 / (time.perf_counter() - loop_start)
            print(f"Hz={freq:.2f}")

문제점: - time.sleep(0.03)는 OS 스케줄링에 의존 → 지터 발생 - 블로킹 I/O(wait=True)로 주기 깨짐 - 매 틱마다 주파수 출력 → 성능 저하

✅ After: ROS2 Timer + 논블로킹

def deploy(self):
    self.allegro = start_allegro_io(side='right')

    # ... 초기화 ...

    # Timer 등록 (정확한 주기 보장)
    period = 1.0 / self.hz
    self.timer = self.allegro.create_timer(period, self._control_step)
    print(f"Deployment started (timer-based {self.hz:.1f} Hz).")

    # 메인 스레드는 시그널만 처리
    interrupted = False
    def _sigint(_sig, _frm):
        nonlocal interrupted
        interrupted = True
    signal.signal(signal.SIGINT, _sigint)

    try:
        while not interrupted:
            time.sleep(0.2)
    finally:
        if self.timer is not None:
            self.timer.cancel()
        self.allegro.go_safe()
        stop_allegro_io(self.allegro)

@torch.inference_mode()  # 그래디언트 트래킹 비활성화
def _control_step(self):
    t0 = time.perf_counter()

    # 1) 정규화
    obs_norm = self.running_mean_std(self.obs_buf)

    # 2) 추론
    input_dict = {
        "obs": obs_norm,
        "proprio_hist": self.sa_mean_std(self.proprio_hist_buf),
    }
    action = torch.clamp(self.model.act_inference(input_dict), -1.0, 1.0)

    # 3) 타깃 업데이트
    self._pre_physics_step(action)

    # 4) 명령 전송
    cmd = self.cur_target.detach().to("cpu").numpy()[0]
    ros1 = _action_hora2allegro(cmd)
    ros2 = _reorder_imrt2timr(ros1)
    self.allegro.command_joint_position(ros2)

    # 5) 논블로킹 관측 (드랍 시 last-good 사용)
    q_pos = self.allegro.poll_joint_position(wait=False, timeout=0.0)
    if q_pos is not None:
        ros1_q = _reorder_timr2imrt(q_pos)
        hora_q = _obs_allegro2hora(ros1_q)
        obs_q = torch.from_numpy(hora_q.astype(np.float32)).to(self.device)
        self._last_obs_q = obs_q
    else:
        obs_q = self._last_obs_q  # 재사용
        self._skipped += 1

    if obs_q is not None:
        self._post_physics_step(obs_q)

    # 6) 간헐적 로깅 (5초마다)
    if self._last_step_t is not None:
        dt = t0 - self._last_step_t
        if int(time.time()) % 5 == 0:
            hz_est = 1.0 / max(dt, 1e-6)
            print(f"[timer] {hz_est:.2f} Hz, skipped={self._skipped}")
    self._last_step_t = t0

개선 효과:

  • ✅ ROS2 실행기가 주기 관리 → 지터 감소
  • ✅ 논블로킹 I/O → 주기 유지
  • ✅ @torch.inference_mode() → 추론 오버헤드 제거
  • ✅ 간헐적 로깅 → 성능 저하 최소화

2. ROS2 실행기: 내부 API → 표준 threading

❌ Before: 불안정한 내부 API

class _Runner:
    def __init__(self, node: AllegroHandIO):
        self.node = node
        self.exec = SingleThreadedExecutor()
        self.exec.add_node(node)
        # ⚠️ 내부 API 사용 - 버전 호환성 문제!
        self.thread = threading.Thread(target=self.exec.spin, daemon=True)

    def start(self):
        self.thread.start()

    def stop(self):
        try:
            self.exec.shutdown()
        finally:
            self.node.destroy_node()

✅ After: 안전한 표준 API

import threading
from rclpy.executors import SingleThreadedExecutor

class _Runner:
    def __init__(self, node: 'AllegroHandIO'):
        self.node = node
        self.exec = SingleThreadedExecutor()
        self.exec.add_node(node)
        # ✅ 표준 threading 사용
        self.thread = threading.Thread(target=self.exec.spin, daemon=True)

    def start(self):
        self.thread.start()

    def stop(self):
        try:
            self.exec.shutdown()          # 1. executor 중지
        finally:
            try:
                self.node.destroy_node()  # 2. 노드 파괴
            finally:
                # 3. 스레드 종료 대기 (타임아웃 설정)
                self.thread.join(timeout=2.0)

개선 효과:

  • ✅ 공식 API만 사용 → 버전 호환성 보장
  • ✅ 올바른 종료 순서 → 리소스 누수 방지

3. 장치 일관성: 디바이스 불일치 해결

❌ Before: 산발적인 디바이스 할당

def __init__(self):
    self.device = "cuda"

    # 모델은 CUDA로
    self.model = ActorCritic(net_config).to(self.device).eval()
    self.running_mean_std = RunningMeanStd(obs_shape).to(self.device).eval()

    # ⚠️ 버퍼는 디바이스 명시 없음 - CPU에 생성됨!
    self.obs_buf = torch.zeros((1, 16 * 3 * 2), dtype=torch.float32)
    self.proprio_hist_buf = torch.zeros((1, 30, 16 * 2), dtype=torch.float32)

    # limits도 디바이스 명시
    self.allegro_dof_lower = torch.tensor([...], dtype=torch.float32, device=self.device)

    # ⚠️ targets는 디바이스 명시 없음
    self.prev_target = torch.zeros((1, 16), dtype=torch.float32)

✅ After: 모든 텐서를 같은 디바이스로

def __init__(self, hz: float = 20.0, device: str = "cuda"):
    torch.set_grad_enabled(False)
    self.hz = float(hz)
    self.device = device  # ✅ 단일 진실 공급원

    # 모델과 RMS
    self.model = ActorCritic(net_config).to(self.device).eval()
    self.running_mean_std = RunningMeanStd(obs_shape).to(self.device).eval()
    self.sa_mean_std = RunningMeanStd((30, 32)).to(self.device).eval()

    # ✅ 모든 버퍼를 명시적으로 같은 디바이스에 생성
    self.obs_buf = torch.zeros((1, 96), dtype=torch.float32, device=self.device)
    self.proprio_hist_buf = torch.zeros((1, 30, 32), dtype=torch.float32, device=self.device)

    # ✅ limits도 같은 디바이스
    self.allegro_dof_lower = torch.tensor([...], dtype=torch.float32, device=self.device)
    self.allegro_dof_upper = torch.tensor([...], dtype=torch.float32, device=self.device)

    # ✅ targets도 같은 디바이스
    self.prev_target = torch.zeros((1, 16), dtype=torch.float32, device=self.device)
    self.cur_target = torch.zeros((1, 16), dtype=torch.float32, device=self.device)

# ✅ 하드웨어 I/O 직전에만 CPU로 이동
def _control_step(self):
    # ... 추론 ...

    # CPU로 이동은 명령 전송 시에만
    cmd = self.cur_target.detach().to("cpu").numpy()[0]
    self.allegro.command_joint_position(ros2)

    # 관측 읽기 후 바로 디바이스로
    q_pos = self.allegro.poll_joint_position(wait=False)
    if q_pos is not None:
        # ... 변환 ...
        obs_q = torch.from_numpy(hora_q.astype(np.float32)).to(self.device)

개선 효과:

  • ✅ “Expected all tensors to be on the same device” 에러 제거
  • ✅ 불필요한 디바이스 이동 최소화 → 속도 향상

4. 버퍼 롤링: 겹침 오류 수정

❌ Before: 메모리 겹침 오류

def post_physics_step(self, obses):
    self.cur_obs_buf = self.unscale(obses, ...)[None]

    # ⚠️ 같은 텐서 내 겹치는 메모리 영역에 대입!
    # obs_buf[:, 32:96]의 일부가 [:, :64]와 겹침
    self.prev_obs_buf = self.obs_buf[:, 32:].clone()
    self.obs_buf[:, :64] = self.prev_obs_buf  # RuntimeError!
    self.obs_buf[:, 64:80] = self.cur_obs_buf.clone()
    self.obs_buf[:, 80:96] = self.cur_target.clone()

    # proprio history도 동일 문제
    cur = torch.cat([cur_norm_t, cur_tgt_t], dim=-1)
    prev = self.proprio_hist_buf[:, 1:30, :].clone()
    self.proprio_hist_buf[:] = torch.cat([prev, cur], dim=1)  # 비효율적

✅ After: 안전한 버퍼 시프트

방법 1: clone() 사용

def _post_physics_step(self, obses):
    # 1) 현재 관측 정규화
    cur_obs = self._unscale(
        obses.view(-1), self.allegro_dof_lower, self.allegro_dof_upper
    ).view(1, 16)

    # 2) obs_buf 롤링 - ✅ 소스를 먼저 clone()
    src64 = self.obs_buf[:, 32:96].clone()     # (1,64) 복사
    self.obs_buf[:, 0:64] = src64              # 안전한 대입
    self.obs_buf[:, 64:80] = cur_obs
    self.obs_buf[:, 80:96] = self.cur_target

    # 3) proprio_hist_buf 롤링 - ✅ 소스를 먼저 clone()
    src_hist = self.proprio_hist_buf[:, 1:, :].clone()  # (1,29,32)
    self.proprio_hist_buf[:, 0:-1, :] = src_hist
    self.proprio_hist_buf[:, -1, :16] = cur_obs
    self.proprio_hist_buf[:, -1, 16:32] = self.cur_target

방법 2: torch.roll 사용 (대안)

def _post_physics_step(self, obses):
    cur_obs = self._unscale(obses.view(-1), ...).view(1, 16)

    # ✅ roll을 사용한 시프트
    rolled = torch.roll(self.obs_buf, shifts=-32, dims=1)
    self.obs_buf[:, :-32].copy_(rolled[:, :-32])  # 앞 64칸
    self.obs_buf[:, 64:80] = cur_obs
    self.obs_buf[:, 80:96] = self.cur_target

개선 효과:

  • ✅ RuntimeError 완전 제거
  • ✅ 명확한 시프트 로직

5. 관측 드랍 대응: 블로킹 → 논블로킹 + last-good

❌ Before: 블로킹으로 주기 깨짐

# 매 틱마다 블로킹 대기
q_pos = self.allegro.poll_joint_position(wait=True, timeout=0.2)
if q_pos is None:
    # 데이터 없으면 에러...
    print("❌ failed to read")

ros1_q = _reorder_timr2imrt(q_pos)
# ... 처리 ...

✅ After: 논블로킹 + 재사용

def __init__(self, ...):
    # ...
    self._last_obs_q = None  # ✅ 마지막 유효 관측 저장
    self._skipped = 0        # ✅ 드랍 카운터

def _control_step(self):
    # ... 명령 전송 ...

    # ✅ 논블로킹으로 읽기
    q_pos = self.allegro.poll_joint_position(wait=False, timeout=0.0)

    if q_pos is not None:
        # 새 데이터 있으면 갱신
        ros1_q = _reorder_timr2imrt(q_pos)
        hora_q = _obs_allegro2hora(ros1_q)
        obs_q = torch.from_numpy(hora_q.astype(np.float32)).to(self.device)
        self._last_obs_q = obs_q  # ✅ 저장
    else:
        # 새 데이터 없으면 이전 것 재사용
        obs_q = self._last_obs_q
        self._skipped += 1  # ✅ 카운트

    if obs_q is not None:
        self._post_physics_step(obs_q)

    # 로깅 (5초마다)
    if int(time.time()) % 5 == 0:
        print(f"skipped={self._skipped}")

개선 효과:

  • ✅ 제어 주기 유지 (실시간성↑)
  • ✅ 센서 지연에 강건한 동작
  • ✅ 드랍 횟수 모니터링 가능

6. Position Gap 퍼블리셔: 타임스탬프 + 조인트 이름

❌ Before: Gap 측정에 짧은 sleep 삽입

def command_joint_position(self, positions: List[float]) -> bool:
    # 명령 퍼블리시
    msg = Float64MultiArray()
    msg.data = data
    self._cmd_pub.publish(msg)
    self._last_cmd = np.asarray(data, dtype=float)

    # ⚠️ 명령 직후 짧게 대기 (비효율적)
    if self._gap_after_cmd_delay > 0:
        time.sleep(self._gap_after_cmd_delay)  # 0.02초

    # gap 1회 퍼블리시
    self._publish_position_gap()
    return True

def _publish_position_gap(self):
    if self._last_cmd is None or self._last_js is None:
        return
    cur = self.poll_joint_position(wait=False)
    if cur is None or cur.size != 16:
        return

    cmd = np.asarray(self._last_cmd, dtype=float).reshape(16)
    gap = (cmd - cur).astype(float)

    # ⚠️ Header 정보 부족
    msg = JointState()
    msg.header.stamp = self.get_clock().now().to_msg()
    msg.header.frame_id = ""
    msg.name = list(self._desired_names)
    msg.position = gap.tolist()
    self._gap_pub.publish(msg)

✅ After: 타이머 콜백 내에서 최신 상태로

def command_joint_position(self, positions: List[float]) -> bool:
    msg = Float64MultiArray()
    msg.data = data
    self._cmd_pub.publish(msg)
    self._last_cmd = np.asarray(data, dtype=float)

    # ✅ sleep 없이 바로 gap 퍼블리시 (논블로킹)
    self._publish_position_gap()
    return True

def _publish_position_gap(self):
    if self._last_cmd is None or self._last_js is None:
        return

    # ✅ 논블로킹으로 현재 최신 상태 읽기
    cur = self.poll_joint_position(wait=False)
    if cur is None or cur.size != 16:
        return

    gap = (self._last_cmd - cur).astype(float)

    # ✅ 완전한 JointState 메시지
    js = JointState()
    js.header.stamp = self.get_clock().now().to_msg()  # ROS 타임스탬프
    js.header.frame_id = ""                            # 또는 "allegro"
    js.name = list(self._desired_names)                # 16개 조인트 이름
    js.position = gap.tolist()                         # gap 값
    # velocity/effort는 빈 리스트 (미사용)

    self._gap_pub.publish(js)

개선 효과:

  • ✅ sleep 제거 → 주기 안정성↑
  • ✅ 타임스탬프 + 조인트 이름 → rqt_plot, rosbag 호환
  • ✅ 타이머 콜백이 매 틱마다 최신 상태 반영

7. 초기화: 워밍업 + 첫 관측 블로킹

✅ 안전한 초기화 순서

def deploy(self):
    self.allegro = start_allegro_io(side='right')

    # 1) ✅ 워밍업 (하드웨어 안정화)
    warmup = int(self.hz * 4)
    for t in range(warmup):
        tprint(f"setup {t} / {warmup}")
        pose = _reorder_imrt2timr(np.array(self.init_pose, dtype=np.float64))
        self.allegro.command_joint_position(pose)
        time.sleep(1.0 / self.hz)

    # 2) ✅ 첫 관측만 블로킹 (타임아웃 설정)
    q_pos = self.allegro.poll_joint_position(wait=True, timeout=5.0)
    if q_pos is None:
        print("❌ failed to read joint state.")
        stop_allegro_io(self.allegro)
        return

    # 3) ✅ 버퍼 초기화
    ros1_q = _reorder_timr2imrt(q_pos)
    hora_q = _obs_allegro2hora(ros1_q)
    obs_q = torch.from_numpy(hora_q.astype(np.float32)).to(self.device)
    self._last_obs_q = obs_q

    cur_obs_buf = self._unscale(obs_q, self.allegro_dof_lower, self.allegro_dof_upper)[None]
    self.prev_target = obs_q[None]

    # obs_buf 3 타임스텝 채우기
    for i in range(3):
        self.obs_buf[:, i*32:i*32+16] = cur_obs_buf
        self.obs_buf[:, i*32+16:i*32+32] = self.prev_target

    # proprio_hist_buf 30 타임스텝 채우기
    self.proprio_hist_buf[:, :, :16] = cur_obs_buf
    self.proprio_hist_buf[:, :, 16:32] = self.prev_target

    # 4) ✅ 타이머 시작 (이후는 모두 논블로킹)
    period = 1.0 / self.hz
    self.timer = self.allegro.create_timer(period, self._control_step)

개선 효과:

  • ✅ 워밍업으로 모터 드라이버 안정화
  • ✅ 첫 관측 블로킹으로 유효한 초기 상태 확보
  • ✅ 이후는 논블로킹으로 주기 유지

13 요약: 핵심 개선 사항

항목 Before After 효과
제어 루프 while + sleep ROS2 Timer 지터↓, 주기 정확도↑
실행기 내부 API 표준 threading 버전 호환성↑
디바이스 산발적 할당 단일 소스 통일 에러 제거, 속도↑
버퍼 롤링 겹침 대입 clone() 사용 RuntimeError 제거
관측 읽기 블로킹 논블로킹 + 재사용 실시간성↑, 강건성↑
추론 일반 모드 @inference_mode() 오버헤드↓
로깅 매 틱 출력 5초마다 출력 성능 저하 최소화
Gap sleep 후 측정 논블로킹 측정 주기 안정성↑
Development Guide

정리한 13개 팁을 ROS2 실전 개발 관점에서 하나씩 깊게 풀어쓴 가이드(각 항목마다 “왜(배경) → 어떻게(패턴/코드) → 주의점(함정)” 순서로 짚어보기)

1) while+sleep 루프 → ROS2 Timer** 기반 제어 루프**

🤔 WHY

  • while True + time.sleep()는 OS 스케줄링, GIL, 콜백 경합 때문에 지터가 커짐. ROS 이벤트 루프와 분리되어 타이밍이 뒤틀리기 쉽다.

HOW

self.timer = self.create_timer(period_sec=1.0/hz, callback=self._control_step)
  • 타이머 콜백 안에서 “관측 → 추론 → 명령 → 버퍼업데이트”를 수행.
  • 메인 스레드는 아무것도 안 하고 살아만 있어도 됨(시그널 대기 정도).

CAUTION

  • 타이머 콜백 안에서 블로킹 작업(디스크 IO, 긴 슬립, 네트워크 대기 등) 금지.
  • 꼭 필요한 블로킹 초기화는 타이머 시작 전에 1회만.

2) rclpy 실행기(Executor) 백그라운드 스레드로 구동 (공식 API만)

🤔 WHY

  • 내부 API(rclpy.executors._util)는 버전별로 없어질 수 있어 에러 유발.

HOW

from rclpy.executors import SingleThreadedExecutor
import threading

self.exec = SingleThreadedExecutor()
self.exec.add_node(node)
self.thread = threading.Thread(target=self.exec.spin, daemon=True)
self.thread.start()

CAUTION

  • 종료 시: executor.shutdown() → node.destroy_node() → rclpy.shutdown() 순서.
  • 콜백 경합이 크면 MultiThreadedExecutor(max_workers=2~4) 고려.

3) 디바이스 일관성(cuda/cpu mismatch 방지)

🤔 WHY

  • 모델/버퍼/RMS가 서로 다른 디바이스면 즉시 런타임 에러.

HOW

self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = ActorCritic(...).to(self.device).eval()
self.obs_buf = torch.zeros(..., device=self.device)
# 하드웨어 I/O 직전에만
cmd = self.cur_target.detach().to('cpu').numpy()

CAUTION

  • 중간에 .cpu() 했다가 다시 .to(device) 하는 왕복 금지(오버헤드+버그).
  • 체크포인트 로드 시 map_location=self.device 고정.

4) @torch.inference_mode() + 불필요한 clone 최소화****

🤔 WHY

  • 추론 시 그래디언트 불필요. .clone() 남발은 메모리/시간 낭비.

HOW

@torch.inference_mode()
def _control_step(self):
    obs_norm = self.running_mean_std(self.obs_buf)
    action = self.model.act_inference({...})

CAUTION

  • 학습 코드와 섞여 있으면 inference_mode() 범위를 추론 블록에만.

5) 관측 드랍 대비: last-good sample 재사용****

🤔 WHY

  • /joint_states가 매 틱 오지 않을 수 있음. 이때 블로킹 대기하면 주기 깨짐.

HOW

q_pos = self.allegro.poll_joint_position(wait=False)
if q_pos is not None:
    self._last_obs_q = obs_q_from(q_pos)
obs_q = self._last_obs_q  # 드랍 시 마지막 관측 사용

CAUTION

  • 드랍이 잦으면 로깅/모니터링으로 원인 추적(드라이버 주기, 버스 지연 등).
  • 재사용 샘플은 짧은 구간에서만 허용(장기 드랍은 안전 중지 고려).

6) 버퍼 롤링 시 overlap 쓰기 금지** (clone or roll)**

🤔 WHY

  • 같은 텐서 내 겹치는 메모리에 직접 대입하면 PyTorch가 예외 던짐.

어떻게(둘 중 하나)

  • clone 방식
src = self.obs_buf[:, 32:96].clone()
self.obs_buf[:, :64] = src
  • roll 방식
rolled = torch.roll(self.obs_buf, shifts=-32, dims=1)
self.obs_buf.copy_(rolled)

CAUTION

  • roll 후 덮어쓰는 구간(새 샘플 넣는 64:80, 80:96)은 명확히 채우기.

7) position_gap 퍼블리셔** 추가 (정확한 타임스탬프/조인트 이름)**

🤔 WHY

  • 명령-실측 오차를 rqt_plot/bag로 쉽게 시각화·분석 가능.

HOW

js = JointState()
js.header.stamp = self.get_clock().now().to_msg()
js.name = desired_joint_names  # 16개
js.position = (cmd - cur).tolist()
gap_pub.publish(js)

CAUTION

  • 이름 배열은 /joint_states.name과 일치하는 의미를 가져야 비교가 쉽다.
  • frame_id는 필요 시 지정(기본은 빈 문자열).

8) “명령 직후 gap 측정”의 타이밍

🤔 WHY

  • 명령 퍼블리시 직후 즉시 읽으면 아직 상태가 반영되지 않았을 수 있음.

HOW

  • 타이머 주기 기반으로 다음 틱에서 최신 /joint_states로 gap 계산(권장).
  • 반드시 “즉시” 필요하면 아주 짧은 sleep(수 ms)을 두되, 타이머 콜백에서는 가급적 지양.

CAUTION

  • 제어 주기가 최우선. gap을 위한 블로킹이 주기를 깨지 않게.

9) 초기화 루틴: 워밍업 + 첫 관측 블로킹 1회****

🤔 WHY

  • 드라이브 settling/초기 버퍼 정합성 확보.

HOW

for t in range(int(hz*4)):
    command_initial_pose()
    time.sleep(1/hz)

q_pos = poll_joint_position(wait=True, timeout=5.0)
init_obs = process(q_pos)
init_buffers_with(init_obs)

CAUTION

  • 블로킹은 초기 1회까지만. 이후는 논블로킹 운영.

10) 지터/드랍** 가벼운 로깅**

🤔 WHY

  • 실시간 제어에서 주기/드랍 상황은 디버깅 핵심 지표.

HOW

if int(time.time()) % 5 == 0:  # 5초 간격
    print(f"[timer] {hz_est:.2f} Hz, skipped={self._skipped}")

CAUTION

  • 로그 너무 잦으면 오히려 성능 저하. 파일 로깅은 버퍼링/비동기 권장.

11) ROS2 API 안전 사용****

🤔 WHY

  • 내부 API는 버전 변화에 취약. 유지보수 어려움.

HOW

  • 실행기: SingleThreadedExecutor 또는 MultiThreadedExecutor.
  • 스레드: 표준 threading.Thread(target=exec.spin, daemon=True).
  • 종료: executor.shutdown() → node.destroy_node() → rclpy.shutdown().

CAUTION

  • rclpy.ok() 확인 후 shutdown() 호출하면 중복 호출 안전.

12) 성능 튜닝(선택)

🤔 WHY

  • 더 높은 주기, 낮은 지터를 위해.

HOW

  • FP16: model.half() + 입력도 half로 변환(CUDA 필요).
  • CUDA Graph: 입력 shape 고정이면 캡처 후 재사용.
  • 네트워크 축소: actor_units/priv_mlp_units 차원 줄이기.
  • 실행기 멀티스레드: 콜백 경합이 크면 MultiThreadedExecutor(max_workers=2~4).

CAUTION

  • FP16은 수치 안정성 체크(특히 normalize/clip 부분).
  • 그래프는 shape 고정 전제.

13) ROS1/ROS2 로직 일관성** 유지**

🤔 WHY

  • 프레임워크만 달라졌을 뿐, 제어 파이프라인의 본질은 같음. 재현성과 디버깅이 쉬워짐.

어떻게(파이프라인 공통 패턴)

  1. 관측(o_t) 수집(순서 변환 포함)
  2. 정규화(unscale/normalize)
  3. 버퍼 업데이트(obs_buf roll, proprio_hist roll)
  4. 모델 추론 → action
  5. 스케일/클리핑 → target(q*) 갱신
  6. 명령 퍼블리시
  7. 다음 틱에서 o_{t+1} 반영

CAUTION

  • 두 시스템 간 조인트 순서/스케일 차이는 별도 유틸 함수로 관리(가독성↑, 버그↓).

14 실전 체크리스트

Copyright 2024, Jung Yeon Lee