4
0
hwt 2 долоо хоног өмнө
parent
commit
8de31378a0

+ 38 - 0
brain/PlannerNode2/Skill/person_approach_skill/config/person_approach.yaml

@@ -0,0 +1,38 @@
+person_approach_node:
+  ros__parameters:
+    # RTSP 地址
+    rtsp_url: "rtsp://admin:hhzx2025@192.168.0.36:554/Streaming/Channels/201"
+
+    # HBM 模型
+    model_path: "/opt/hobot/model/s100/basic/yolov5x_672x672_nv12.hbm"
+
+    # COCO 标签
+    label_file: "/app/res/labels/coco_classes.names"
+
+    # pydev_demo 的 utils 路径
+    # 因为 preprocess_utils/postprocess_utils/common_utils 还在 /app/pydev_demo/utils
+    pydev_demo_path: "/app/pydev_demo"
+
+    # BPU 调度
+    priority: 0
+    bpu_cores: [0]
+
+    # YOLO 后处理
+    score_thres: 0.25
+    nms_thres: 0.45
+
+    # 人物靠近判断
+    person_score_thres: 0.45
+    area_ratio_thres: 0.15
+    center_offset_thres: 0.25
+    stable_frames: 5
+    cooldown_sec: 8.0
+
+    # 循环间隔
+    loop_sleep_sec: 0.01
+
+    # 调试日志
+    debug: true
+
+    # 事件 Topic
+    event_topic: "/skill/person_approach/event"

+ 25 - 0
brain/PlannerNode2/Skill/person_approach_skill/launch/person_approach_node.launch.py

@@ -0,0 +1,25 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import os
+
+from launch import LaunchDescription
+from launch_ros.actions import Node
+from ament_index_python.packages import get_package_share_directory
+
+
+def generate_launch_description():
+    pkg_share = get_package_share_directory("person_approach_skill")
+    config_file = os.path.join(pkg_share, "config", "person_approach.yaml")
+
+    person_approach_node = Node(
+        package="person_approach_skill",
+        executable="person_approach_node",
+        name="person_approach_node",
+        output="screen",
+        parameters=[config_file],
+    )
+
+    return LaunchDescription([
+        person_approach_node,
+    ])

+ 25 - 0
brain/PlannerNode2/Skill/person_approach_skill/package.xml

@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<package format="3">
+  <name>person_approach_skill</name>
+  <version>0.0.0</version>
+  <description>Greeting vision skill for AI Agent</description>
+  <maintainer email="sunrise@example.com">sunrise</maintainer>
+  <license>Apache-2.0</license>
+
+  <buildtool_depend>ament_python</buildtool_depend>
+
+  <depend>rclpy</depend>
+  <depend>std_msgs</depend>
+
+  <exec_depend>launch</exec_depend>
+  <exec_depend>launch_ros</exec_depend>
+
+  <test_depend>ament_copyright</test_depend>
+  <test_depend>ament_flake8</test_depend>
+  <test_depend>ament_pep257</test_depend>
+  <test_depend>python3-pytest</test_depend>
+
+  <export>
+    <build_type>ament_python</build_type>
+  </export>
+</package>

+ 0 - 0
brain/PlannerNode2/Skill/person_approach_skill/person_approach_skill/__init__.py


+ 777 - 0
brain/PlannerNode2/Skill/person_approach_skill/person_approach_skill/person_approach_node.py

@@ -0,0 +1,777 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import os
+
+# 必须放在 import cv2 前,强制 RTSP 使用 TCP
+os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
+
+import cv2
+import sys
+import json
+import time
+import signal
+import numpy as np
+import hbm_runtime
+import rclpy
+
+from rclpy.node import Node
+from std_msgs.msg import String
+from typing import Optional, Dict, Tuple
+from hobot_vio import libsrcampy as srcampy
+
+
+STRIDES = np.array([8, 16, 32], dtype=np.int32)
+
+ANCHORS = np.array([
+    [10, 13], [16, 30], [33, 23],
+    [30, 61], [62, 45], [59, 119],
+    [116, 90], [156, 198], [373, 326]
+], dtype=np.float32).reshape(3, 3, 2)
+
+
+def ret_ok(ret):
+    if isinstance(ret, (list, tuple)):
+        return len(ret) > 0 and ret[0] == 0
+    return ret == 0
+
+
+def get_nalu_pos(byte_stream: bytes):
+    size = len(byte_stream)
+    nals = []
+    retnals = []
+
+    start_code = b"\x00\x00\x01"
+    pos = 0
+
+    while pos < size:
+        is4bytes = False
+        retpos = byte_stream.find(start_code, pos)
+
+        if retpos == -1:
+            break
+
+        if retpos > 0 and byte_stream[retpos - 1] == 0:
+            retpos -= 1
+            is4bytes = True
+
+        pos = retpos + (4 if is4bytes else 3)
+
+        if pos >= size:
+            break
+
+        val = int(byte_stream[pos])
+        fb = (val >> 7) & 0x1
+        nri = (val >> 5) & 0x3
+        nalu_type = val & 0x1F
+
+        nals.append((pos, is4bytes, fb, nri, nalu_type))
+
+    if not nals:
+        return []
+
+    for i in range(0, len(nals) - 1):
+        start = nals[i][0]
+        end = nals[i + 1][0] - (5 if nals[i + 1][1] else 4)
+        retnals.append((
+            start,
+            end,
+            nals[i][1],
+            nals[i][2],
+            nals[i][3],
+            nals[i][4],
+        ))
+
+    start = nals[-1][0]
+    end = size - 1
+    retnals.append((
+        start,
+        end,
+        nals[-1][1],
+        nals[-1][2],
+        nals[-1][3],
+        nals[-1][4],
+    ))
+
+    return retnals
+
+
+def get_h264_nalu_type(byte_stream: bytes):
+    nalu_types = []
+    nalu_pos = get_nalu_pos(byte_stream)
+
+    for _, _, _, _, _, nalu_type in nalu_pos:
+        nalu_types.append(nalu_type)
+
+    return nalu_types
+
+
+class YoloV5X:
+    def __init__(
+        self,
+        model_path: str,
+        score_thres: float,
+        nms_thres: float,
+        pre_utils,
+        post_utils,
+    ):
+        self.pre_utils = pre_utils
+        self.post_utils = post_utils
+
+        self.model = hbm_runtime.HB_HBMRuntime(model_path)
+
+        self.model_name = self.model.model_names[0]
+        self.input_names = self.model.input_names[self.model_name]
+        self.output_names = self.model.output_names[self.model_name]
+        self.input_shapes = self.model.input_shapes[self.model_name]
+        self.output_quants = self.model.output_quants[self.model_name]
+
+        self.input_H = self.input_shapes[self.input_names[0]][1]
+        self.input_W = self.input_shapes[self.input_names[0]][2]
+
+        self.score_thres = score_thres
+        self.nms_thres = nms_thres
+
+        self.resize_type = 1
+        self.classes_num = 80
+
+    def set_scheduling_params(
+        self,
+        priority: Optional[int] = None,
+        bpu_cores: Optional[list] = None,
+    ):
+        kwargs = {}
+
+        if priority is not None:
+            kwargs["priority"] = {self.model_name: priority}
+
+        if bpu_cores is not None:
+            kwargs["bpu_cores"] = {self.model_name: bpu_cores}
+
+        if kwargs:
+            self.model.set_scheduling_params(**kwargs)
+
+    def pre_process(
+        self,
+        img: np.ndarray,
+        width: int,
+        height: int,
+    ) -> Dict[str, Dict[str, np.ndarray]]:
+        y, uv = self.pre_utils.split_nv12_bytes(img, width, height)
+
+        y_resized, uv_resized = self.pre_utils.resize_nv12_yuv(
+            y,
+            uv,
+            self.input_W,
+            self.input_H,
+        )
+
+        y_input = y_resized[..., None][None, ...]
+        uv_input = uv_resized[None, ...]
+
+        return {
+            self.model_name: {
+                self.input_names[0]: y_input,
+                self.input_names[1]: uv_input,
+            }
+        }
+
+    def forward(
+        self,
+        input_tensor: Dict[str, Dict[str, np.ndarray]],
+    ) -> Dict[str, np.ndarray]:
+        outputs = self.model.run(input_tensor)
+        return outputs[self.model_name]
+
+    def post_process(
+        self,
+        outputs: Dict[str, np.ndarray],
+        img_w: int,
+        img_h: int,
+    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
+        fp32_outputs = self.post_utils.dequantize_outputs(
+            outputs,
+            self.output_quants,
+        )
+
+        pred = self.post_utils.decode_outputs(
+            self.output_names,
+            fp32_outputs,
+            STRIDES,
+            ANCHORS,
+            self.classes_num,
+        )
+
+        xyxy_boxes, score, cls = self.post_utils.filter_predictions(
+            pred,
+            self.score_thres,
+        )
+
+        keep = self.post_utils.NMS(
+            xyxy_boxes,
+            score,
+            cls,
+            self.nms_thres,
+        )
+
+        xyxy = self.post_utils.scale_coords_back(
+            xyxy_boxes[keep],
+            img_w,
+            img_h,
+            self.input_W,
+            self.input_H,
+            self.resize_type,
+        )
+
+        return xyxy, cls[keep], score[keep]
+
+
+class RtspH264Decoder:
+    def __init__(self, rtsp_url: str, logger=None):
+        self.rtsp_url = rtsp_url
+        self.logger = logger
+
+        self.cap = None
+        self.dec = None
+
+        self.width = 0
+        self.height = 0
+        self.fps = 0.0
+
+        self.dec_chn = 0
+        self.dec_type = 1
+
+        self.find_pps_sps = 0
+        self.skip_count = 0
+
+    def log_info(self, text):
+        if self.logger:
+            self.logger.info(text)
+        else:
+            print(text)
+
+    def log_warn(self, text):
+        if self.logger:
+            self.logger.warn(text)
+        else:
+            print("[WARN]", text)
+
+    def log_error(self, text):
+        if self.logger:
+            self.logger.error(text)
+        else:
+            print("[ERROR]", text)
+
+    def open(self, dec_chn=0, dec_type=1):
+        self.dec_chn = dec_chn
+        self.dec_type = dec_type
+
+        self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
+        self.cap.set(cv2.CAP_PROP_FORMAT, -1)
+
+        if not self.cap.isOpened():
+            self.log_error(f"[RTSP] 打开失败: {self.rtsp_url}")
+            return False
+
+        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
+        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
+        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
+
+        self.log_info(
+            f"[RTSP] open success, width={self.width}, height={self.height}, fps={self.fps}"
+        )
+
+        if self.width <= 0 or self.height <= 0:
+            self.log_error("[RTSP] width/height 无效")
+            self.close()
+            return False
+
+        self.dec = srcampy.Decoder()
+        ret = self.dec.decode(
+            "",
+            self.dec_chn,
+            self.dec_type,
+            self.width,
+            self.height,
+        )
+
+        self.log_info(f"[DEC] decode ret={ret}")
+
+        if not ret_ok(ret):
+            self.log_error("[DEC] 初始化失败")
+            self.close()
+            return False
+
+        self.find_pps_sps = 0
+        self.skip_count = 0
+
+        return True
+
+    def close(self):
+        try:
+            if self.dec is not None:
+                self.dec.close()
+        except Exception as e:
+            self.log_warn(f"[DEC] close 异常: {e}")
+
+        try:
+            if self.cap is not None:
+                self.cap.release()
+        except Exception as e:
+            self.log_warn(f"[RTSP] release 异常: {e}")
+
+        self.cap = None
+        self.dec = None
+
+    def reopen(self):
+        self.log_warn("[RTSP] 准备重连")
+        self.close()
+        time.sleep(0.5)
+        return self.open(self.dec_chn, self.dec_type)
+
+    def read_frame(self):
+        if self.cap is None or self.dec is None:
+            return None
+
+        ret, stream_frame = self.cap.read()
+
+        if not ret or stream_frame is None:
+            self.log_warn("[RTSP] read failed")
+            if not self.reopen():
+                time.sleep(1.0)
+            return None
+
+        stream_bytes = stream_frame.tobytes()
+
+        try:
+            nalu_types = get_h264_nalu_type(stream_bytes)
+        except Exception as e:
+            self.log_warn(f"[RTSP] parse nalu failed: {e}")
+            return None
+
+        if not nalu_types:
+            return None
+
+        if (nalu_types[0] in [1, 5]) and self.find_pps_sps == 0:
+            return None
+
+        self.find_pps_sps = 1
+
+        ret = self.dec.set_img(stream_bytes, self.dec_chn)
+        if ret != 0:
+            self.log_warn(f"[DEC] set_img failed ret={ret}")
+            return None
+
+        if self.skip_count < 8:
+            self.skip_count += 1
+            return None
+
+        frame = self.dec.get_img()
+        return frame
+
+
+class PersonApproachNode(Node):
+    def __init__(self):
+        super().__init__("person_approach_node")
+
+        self.declare_params()
+        self.load_params()
+
+        self.load_pydev_utils()
+
+        self.event_pub = self.create_publisher(
+            String,
+            self.event_topic,
+            10,
+        )
+
+        self.running = True
+        self.decoder = None
+        self.yolo = None
+        self.class_names = []
+
+        self.stable_count = 0
+        self.last_event_time = 0.0
+        self.frame_count = 0
+        self.fps_start_time = time.time()
+
+        self.get_logger().info("[人物靠近] 节点初始化完成")
+
+    def declare_params(self):
+        self.declare_parameter(
+            "rtsp_url",
+            "rtsp://admin:hhzx2025@192.168.0.36:554/Streaming/Channels/201",
+        )
+        self.declare_parameter(
+            "model_path",
+            "/opt/hobot/model/s100/basic/yolov5x_672x672_nv12.hbm",
+        )
+        self.declare_parameter(
+            "label_file",
+            "/app/res/labels/coco_classes.names",
+        )
+        self.declare_parameter(
+            "pydev_demo_path",
+            "/app/pydev_demo",
+        )
+
+        self.declare_parameter("priority", 0)
+        self.declare_parameter("bpu_cores", [0])
+
+        self.declare_parameter("score_thres", 0.25)
+        self.declare_parameter("nms_thres", 0.45)
+
+        self.declare_parameter("person_score_thres", 0.45)
+        self.declare_parameter("area_ratio_thres", 0.15)
+        self.declare_parameter("center_offset_thres", 0.25)
+        self.declare_parameter("stable_frames", 5)
+        self.declare_parameter("cooldown_sec", 8.0)
+        self.declare_parameter("loop_sleep_sec", 0.01)
+
+        self.declare_parameter("debug", True)
+        self.declare_parameter("event_topic", "/greeting/person_approach")
+
+    def load_params(self):
+        self.rtsp_url = self.get_parameter("rtsp_url").value
+        self.model_path = self.get_parameter("model_path").value
+        self.label_file = self.get_parameter("label_file").value
+        self.pydev_demo_path = self.get_parameter("pydev_demo_path").value
+
+        self.priority = int(self.get_parameter("priority").value)
+        self.bpu_cores = list(self.get_parameter("bpu_cores").value)
+
+        self.score_thres = float(self.get_parameter("score_thres").value)
+        self.nms_thres = float(self.get_parameter("nms_thres").value)
+
+        self.person_score_thres = float(
+            self.get_parameter("person_score_thres").value
+        )
+        self.area_ratio_thres = float(
+            self.get_parameter("area_ratio_thres").value
+        )
+        self.center_offset_thres = float(
+            self.get_parameter("center_offset_thres").value
+        )
+        self.stable_frames = int(
+            self.get_parameter("stable_frames").value
+        )
+        self.cooldown_sec = float(
+            self.get_parameter("cooldown_sec").value
+        )
+        self.loop_sleep_sec = float(
+            self.get_parameter("loop_sleep_sec").value
+        )
+
+        self.debug = bool(self.get_parameter("debug").value)
+        self.event_topic = self.get_parameter("event_topic").value
+
+    def load_pydev_utils(self):
+        """
+        加载 YOLO demo 的 utils。
+        优先使用 person_approach_skill 包内置 utils。
+        """
+        try:
+            from person_approach_skill.utils import preprocess_utils as pre_utils
+            from person_approach_skill.utils import postprocess_utils as post_utils
+            from person_approach_skill.utils import common_utils as common
+
+            self.pre_utils = pre_utils
+            self.post_utils = post_utils
+            self.common = common
+
+            self.get_logger().info("[人物靠近] 使用 person_approach_skill 内置 utils")
+            return
+
+        except Exception as e:
+            self.get_logger().warn(f"[人物靠近] 内置 utils 加载失败,尝试外部 utils: {e}")
+
+        candidate_paths = [
+            self.pydev_demo_path,
+            "/app/pydev_demo",
+            "/app/pydev_demo/12_rtsp_yolov5x_display_sample/..",
+        ]
+
+        for path in candidate_paths:
+            path = os.path.abspath(path)
+            if path not in sys.path:
+                sys.path.insert(0, path)
+
+        try:
+            import skill_utils.preprocess_utils as pre_utils
+            import skill_utils.postprocess_utils as post_utils
+            import skill_utils.common_utils as common
+
+            self.pre_utils = pre_utils
+            self.post_utils = post_utils
+            self.common = common
+
+            self.get_logger().info("[人物靠近] 使用外部 /app/pydev_demo/utils")
+
+        except Exception as e:
+            self.get_logger().error(
+                f"[人物靠近] utils 加载失败: {e},请检查 pydev_demo_path={self.pydev_demo_path}"
+            )
+            raise
+
+    def init_detector(self):
+        if not os.path.exists(self.model_path):
+            self.get_logger().error(f"[人物靠近] 模型不存在: {self.model_path}")
+            return False
+
+        if not os.path.exists(self.label_file):
+            self.get_logger().error(f"[人物靠近] label 文件不存在: {self.label_file}")
+            return False
+
+        self.class_names = self.common.load_class_names(self.label_file)
+
+        self.yolo = YoloV5X(
+            model_path=self.model_path,
+            score_thres=self.score_thres,
+            nms_thres=self.nms_thres,
+            pre_utils=self.pre_utils,
+            post_utils=self.post_utils,
+        )
+
+        self.yolo.set_scheduling_params(
+            priority=self.priority,
+            bpu_cores=self.bpu_cores,
+        )
+
+        self.get_logger().info(
+            f"[人物靠近] 模型加载完成: {self.model_path}"
+        )
+
+        self.decoder = RtspH264Decoder(
+            rtsp_url=self.rtsp_url,
+            logger=self.get_logger(),
+        )
+
+        if not self.decoder.open(dec_chn=0, dec_type=1):
+            self.get_logger().error("[人物靠近] RTSP/Decoder 初始化失败")
+            return False
+
+        self.get_logger().info(
+            f"[人物靠近] RTSP 初始化完成: {self.decoder.width}x{self.decoder.height}"
+        )
+
+        return True
+
+    def get_person_class_id(self):
+        for idx, name in enumerate(self.class_names):
+            if name.strip().lower() == "person":
+                return idx
+        return 0
+
+    def detect_person_close(self, boxes, cls_ids, scores, width, height):
+        person_cls_id = self.get_person_class_id()
+
+        image_area = float(width * height)
+        img_cx = width / 2.0
+
+        best = None
+
+        for box, cls_id, score in zip(boxes, cls_ids, scores):
+            cls_id = int(cls_id)
+            score = float(score)
+
+            if cls_id != person_cls_id:
+                continue
+
+            if score < self.person_score_thres:
+                continue
+
+            x1, y1, x2, y2 = box.astype(float)
+
+            x1 = max(0.0, min(float(width - 1), x1))
+            y1 = max(0.0, min(float(height - 1), y1))
+            x2 = max(0.0, min(float(width - 1), x2))
+            y2 = max(0.0, min(float(height - 1), y2))
+
+            box_w = max(0.0, x2 - x1)
+            box_h = max(0.0, y2 - y1)
+
+            if box_w <= 0 or box_h <= 0:
+                continue
+
+            area_ratio = (box_w * box_h) / image_area
+            cx = (x1 + x2) / 2.0
+            center_offset = abs(cx - img_cx) / float(width)
+
+            is_close = (
+                area_ratio >= self.area_ratio_thres
+                and center_offset <= self.center_offset_thres
+            )
+
+            candidate = {
+                "is_close": is_close,
+                "score": score,
+                "bbox": [
+                    int(x1),
+                    int(y1),
+                    int(x2),
+                    int(y2),
+                ],
+                "area_ratio": float(area_ratio),
+                "center_offset": float(center_offset),
+                "class_id": int(cls_id),
+                "class_name": "person",
+            }
+
+            if best is None:
+                best = candidate
+            elif candidate["area_ratio"] > best["area_ratio"]:
+                best = candidate
+
+        return best
+
+    def publish_person_approach(self, result):
+        now = time.time()
+
+        if now - self.last_event_time < self.cooldown_sec:
+            return
+
+        self.last_event_time = now
+
+        event = {
+            "event": "person_approach",
+            "score": float(result["score"]),
+            "bbox": result["bbox"],
+            "area_ratio": float(result["area_ratio"]),
+            "center_offset": float(result["center_offset"]),
+            "timestamp": now,
+            "source": "person_approach_node",
+        }
+
+        self.event_pub.publish(
+            String(data=json.dumps(event, ensure_ascii=False))
+        )
+
+        self.get_logger().info(
+            f"[人物靠近] 发布人员靠近事件: {event}"
+        )
+
+    def process_once(self):
+        frame = self.decoder.read_frame()
+
+        if frame is None:
+            return
+
+        width = self.decoder.width
+        height = self.decoder.height
+
+        expected_len = width * height * 3 // 2
+        if len(frame) != expected_len:
+            self.get_logger().warn(
+                f"[人物靠近] NV12 长度不匹配 len={len(frame)}, expected={expected_len}"
+            )
+            return
+
+        img_np = np.frombuffer(frame, dtype=np.uint8)
+
+        input_tensor = self.yolo.pre_process(
+            img_np,
+            width,
+            height,
+        )
+
+        outputs = self.yolo.forward(input_tensor)
+
+        boxes, cls_ids, scores = self.yolo.post_process(
+            outputs,
+            width,
+            height,
+        )
+
+        result = self.detect_person_close(
+            boxes,
+            cls_ids,
+            scores,
+            width,
+            height,
+        )
+
+        if result and result["is_close"]:
+            self.stable_count += 1
+
+            if self.debug:
+                self.get_logger().info(
+                    "[人物靠近] person close candidate "
+                    f"stable={self.stable_count}/{self.stable_frames}, "
+                    f"score={result['score']:.2f}, "
+                    f"area={result['area_ratio']:.3f}, "
+                    f"center_offset={result['center_offset']:.3f}, "
+                    f"bbox={result['bbox']}"
+                )
+
+            if self.stable_count >= self.stable_frames:
+                self.publish_person_approach(result)
+                self.stable_count = 0
+        else:
+            self.stable_count = 0
+
+        self.frame_count += 1
+        now = time.time()
+
+        if now - self.fps_start_time > 3.0:
+            fps = self.frame_count / (now - self.fps_start_time)
+            self.get_logger().info(f"[人物靠近] FPS: {fps:.2f}")
+            self.frame_count = 0
+            self.fps_start_time = now
+
+    def run_loop(self):
+        self.get_logger().info("[人物靠近] 开始检测循环")
+
+        if not self.init_detector():
+            self.get_logger().error("[人物靠近] 初始化失败,节点退出")
+            return
+
+        while rclpy.ok() and self.running:
+            rclpy.spin_once(self, timeout_sec=0.0)
+
+            try:
+                self.process_once()
+            except Exception as e:
+                self.get_logger().error(f"[人物靠近] process_once 异常: {e}")
+                time.sleep(0.1)
+
+            if self.loop_sleep_sec > 0:
+                time.sleep(self.loop_sleep_sec)
+
+        self.close()
+
+    def close(self):
+        self.get_logger().info("[人物靠近] 正在释放资源")
+
+        try:
+            if self.decoder is not None:
+                self.decoder.close()
+        except Exception as e:
+            self.get_logger().warn(f"[人物靠近] decoder close 异常: {e}")
+
+        self.get_logger().info("[人物靠近] 节点退出")
+
+
+def main(args=None):
+    rclpy.init(args=args)
+
+    node = PersonApproachNode()
+
+    def _signal_handler(sig, frame):
+        node.get_logger().info("[人物靠近] 收到退出信号")
+        node.running = False
+
+    signal.signal(signal.SIGINT, _signal_handler)
+    signal.signal(signal.SIGTERM, _signal_handler)
+
+    try:
+        node.run_loop()
+    except KeyboardInterrupt:
+        node.get_logger().info("[人物靠近] KeyboardInterrupt")
+    finally:
+        node.close()
+        node.destroy_node()
+        rclpy.shutdown()
+
+
+if __name__ == "__main__":
+    main()

+ 0 - 0
brain/PlannerNode2/Skill/person_approach_skill/resource/person_approach_skill


+ 4 - 0
brain/PlannerNode2/Skill/person_approach_skill/setup.cfg

@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/person_approach_skill
+[install]
+install_scripts=$base/lib/person_approach_skill

+ 41 - 0
brain/PlannerNode2/Skill/person_approach_skill/setup.py

@@ -0,0 +1,41 @@
+from setuptools import setup, find_packages
+import os
+from glob import glob
+
+package_name = 'person_approach_skill'
+
+setup(
+    name=package_name,
+    version='0.0.0',
+    packages=find_packages(),
+    data_files=[
+        (
+            'share/ament_index/resource_index/packages',
+            ['resource/' + package_name]
+        ),
+        (
+            'share/' + package_name,
+            ['package.xml']
+        ),
+        (
+            os.path.join('share', package_name, 'config'),
+            glob('config/*.yaml')
+        ),
+        (
+            os.path.join('share', package_name, 'launch'),
+            glob('launch/*.py')
+        ),
+    ],
+    install_requires=['setuptools'],
+    zip_safe=True,
+    maintainer='sunrise',
+    maintainer_email='sunrise@example.com',
+    description='Greeting vision skill for AI Agent',
+    license='Apache-2.0',
+    tests_require=['pytest'],
+    entry_points={
+        'console_scripts': [
+            'person_approach_node = person_approach_skill.person_approach_node:main',
+        ],
+    },
+)

+ 7 - 0
brain/PlannerNode2/Skill/person_approach_skill/skill_utils/__init__.py

@@ -0,0 +1,7 @@
+# flake8: noqa: F401
+# flake8: noqa: F403
+
+from .preprocess_utils import *
+from .postprocess_utils import *
+from .draw_utils import *
+from .common_utils import *

+ 213 - 0
brain/PlannerNode2/Skill/person_approach_skill/skill_utils/common_utils.py

@@ -0,0 +1,213 @@
+# Copyright (c) 2025 D-Robotics Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# flake8: noqa: E501
+
+import cv2
+import numpy as np
+
+# List of predefined RGB color tuples used for bounding box visualization.
+rdk_colors = [
+    (56, 56, 255), (151, 157, 255), (31, 112, 255), (29, 178, 255),
+    (49, 210, 207), (10, 249, 72), (23, 204, 146), (134, 219, 61),
+    (52, 147, 26), (187, 212, 0), (168, 153, 44), (255, 194, 0),
+    (147, 69, 52), (255, 115, 100), (236, 24, 0), (255, 56, 132),
+    (133, 0, 82), (255, 56, 203), (200, 149, 255), (199, 55, 255)]
+
+
+def load_image(img_path: str) -> np.ndarray:
+    """
+    @brief Load an image from file path using OpenCV.
+    @param img_path Path to the image file.
+    @return Image as a NumPy ndarray in BGR format.
+    @throws FileNotFoundError if the image cannot be loaded.
+    """
+    img = cv2.imread(img_path)
+    if img is None:
+        raise FileNotFoundError(f"Image file '{img_path}' not found.")
+    return img
+
+
+def load_class_names(path: str) -> list:
+    """
+    @brief Load class names from a file.
+    @param path Path to the label file, each line contains a class name.
+    @return List of class name strings.
+    """
+    with open(path, 'r') as f:
+        # Strip whitespace and filter out empty lines
+        class_names = [line.strip() for line in f.readlines() if line.strip()]
+    return class_names
+
+
+def zscore_normalize_lastdim(x: np.ndarray) -> np.ndarray:
+    """
+    @brief Normalize input array along the last dimension.
+    @details This function performs standard score normalization (z-score).
+    @param x Input NumPy array of shape (..., channels).
+    @return Normalized array with mean 0 and variance 1 per vector.
+    """
+    mean = np.mean(x, axis=-1, keepdims=True)        # Compute mean per sample
+    var = np.var(x, axis=-1, keepdims=True)          # Compute variance per sample
+    return np.squeeze((x - mean) / np.sqrt(var + 1e-5))  # Z-score normalization
+
+
+def print_model_info(models: object) -> None:
+    """Print detailed information about input and \
+        output tensors of all models in the system."""
+
+    # 1. Model Name List
+    print("=== Model Name List ===")
+    model_names = models.model_names
+    print(model_names)
+
+    # 2. Total Number of Models
+    print("\n=== Model Count ===")
+    print(models.model_count)
+
+    # 3. Input Count per Model
+    print("\n=== Input Counts ===")
+    input_counts = models.input_counts
+    for model, count in input_counts.items():
+        print(f"{model}: {count}")
+
+    # 4. Input Names per Model
+    print("\n=== Input Names ===")
+    input_names = models.input_names
+    for model, inputs in input_names.items():
+        print(f"{model}:")
+        for name in inputs:
+            print(f"  - {name}")
+
+    # 5. Input Tensor Shapes
+    print("\n=== Input Tensor Shapes ===")
+    input_shapes = models.input_shapes
+    for model, inputs in input_shapes.items():
+        print(f"{model}:")
+        for name, shape in inputs.items():
+            print(f"  {name} -> shape: {shape}")
+
+    # 6. Input Tensor Data Types
+    print("\n=== Input Tensor Types ===")
+    input_types = models.input_dtypes
+    for model, inputs in input_types.items():
+        print(f"{model}:")
+        for name, dtype in inputs.items():
+            print(f"  {name} -> dtype: {dtype.name}")
+
+    # 7. Input Quantization Information
+    print("\n=== Input Quantization Info ===")
+    input_quanti_info = models.input_quants
+    for model, inputs in input_quanti_info.items():
+        print(f"{model}:")
+        for name, info in inputs.items():
+            print(f"  {name}:")
+            print(f"    quanti_type: {info.quant_type.name}")
+            print(f"    quantize_axis: {info.axis}")
+            print(f"    scale_data: {info.scale.tolist()}")
+            print(f"    zero_point_data: {info.zero_point.tolist()}")
+
+    # 8. Input Tensor Stride
+    print("\n=== Input Tensor Stride ===")
+    input_strides = models.input_strides
+    for model, inputs in input_strides.items():
+        print(f"{model}:")
+        for name, stride in inputs.items():
+            print(f"  {name} -> stride: {stride}")
+
+    # 9. Input Descriptions
+    input_descs = models.input_descs
+    for model, inputs in input_descs.items():
+        for name, desc in inputs.items():
+            print(f"[Input] {model}.{name} desc: {desc}")
+
+    print("\n================ OUTPUT TESTS ================\n")
+
+    # 1. Output Count per Model
+    print("=== Output Counts ===")
+    output_counts = models.output_counts
+    for model, count in output_counts.items():
+        print(f"{model}: {count}")
+
+    # 2. Output Names per Model
+    print("\n=== Output Names ===")
+    output_names = models.output_names
+    for model, outputs in output_names.items():
+        print(f"{model}:")
+        for name in outputs:
+            print(f"  - {name}")
+
+    # 3. Output Tensor Shapes
+    print("\n=== Output Tensor Shapes ===")
+    output_shapes = models.output_shapes
+    for model, outputs in output_shapes.items():
+        print(f"{model}:")
+        for name, shape in outputs.items():
+            print(f"  {name} -> shape: {shape}")
+
+    # 4. Output Tensor Data Types
+    print("\n=== Output Tensor Types ===")
+    output_types = models.output_dtypes
+    for model, outputs in output_types.items():
+        print(f"{model}:")
+        for name, dtype in outputs.items():
+            print(f"  {name} -> dtype: {dtype.name}")
+
+    # 5. Output Quantization Information
+    print("\n=== Output Quantization Info ===")
+    output_quanti = models.output_quants
+    for model, outputs in output_quanti.items():
+        print(f"{model}:")
+        for name, info in outputs.items():
+            print(f"  {name}:")
+            print(f"    quanti_type: {info.quant_type.name}")
+            print(f"    quantize_axis: {info.axis}")
+            print(f"    scale_data: {info.scale}")
+            print(f"    zero_point_data: {info.zero_point}")
+
+    # 6. Output Tensor Stride
+    print("\n=== Output Tensor Stride ===")
+    output_stride = models.output_strides
+    for model, outputs in output_stride.items():
+        print(f"{model}:")
+        for name, stride in outputs.items():
+            print(f"  {name} -> stride: {stride}")
+
+    # 7. Output Descriptions
+    output_descs = models.output_descs
+    for model, outputs in output_descs.items():
+        for name, desc in outputs.items():
+            print(f"[Output] {model}.{name} desc: {desc}")
+
+    # Get and Print Model Description Info
+    print("\nModel Description:")
+    model_desc = models.model_descs
+    for model_name, desc in model_desc.items():
+        print(f" - {model_name}: {desc}")
+
+    # Get and Print HBM Description Info
+    print("\nHBM Description:")
+    hbm_desc = models.hbm_descs
+    for file_name, desc in hbm_desc.items():
+        print(f" - {file_name}: {desc}")
+
+        # Get and PrintScheduling Params
+    print("\n=== Scheduling Parameters ===")
+    sched_params = models.sched_params
+    for model_name, sched in sched_params.items():
+        print(f"{model_name}:")
+        print(f"  priority    : {sched.priority}")
+        print(f"  customId    : {sched.customId}")
+        print(f"  bpu_cores   : {sched.bpu_cores}")
+        print(f"  deviceId    : {sched.deviceId}")

+ 207 - 0
brain/PlannerNode2/Skill/person_approach_skill/skill_utils/draw_utils.py

@@ -0,0 +1,207 @@
+# Copyright (c) 2025 D-Robotics Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# flake8: noqa: E501
+
+import cv2
+import numpy as np
+
+
+def draw_boxes(image: np.ndarray, boxes: np.ndarray, cls_ids: np.ndarray,
+               scores: np.ndarray, class_names: list, colors: list) -> np.ndarray:
+    """
+    @brief Draw bounding boxes with class names and scores on the image.
+    @param image Input image as a NumPy array.
+    @param boxes Bounding boxes as a NumPy array of shape (N, 4), format: [x1, y1, x2, y2].
+    @param cls_ids List or array of class indices corresponding to boxes.
+    @param scores List or array of confidence scores for each detection.
+    @param class_names List of class name strings.
+    @param colors List of RGB color tuples for each class.
+    @return Image with drawn boxes and labels.
+    """
+    for box, cls_id, score in zip(boxes, cls_ids, scores):
+        x1, y1, x2, y2 = map(int, box)
+        color = colors[cls_id % len(colors)]
+        label = f"{class_names[cls_id]} {score:.2f}"
+
+        # Draw bounding box
+        cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness=2)
+
+        # Draw class label and score
+        cv2.putText(image, label, (x1, max(y1 - 5, 0)),
+                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,
+                    fontScale=0.5, color=color, thickness=1)
+
+    return image
+
+
+def draw_masks(image: np.ndarray, boxes: np.ndarray, masks: list,
+               cls_ids: list, colors: list, alpha: float = 0.3) -> None:
+    """
+    @brief Overlay semi-transparent instance masks on the image.
+    @param image Input image to draw on (modified in-place).
+    @param boxes Bounding boxes corresponding to masks, shape: (N, 4).
+    @param masks List of binary masks, each with shape matching box region.
+    @param cls_ids List of class indices for each instance.
+    @param colors List of RGB color tuples for each class.
+    @param alpha Transparency level for the masks (0: transparent, 1: opaque).
+    @return None
+    """
+    for class_id, box, mask in zip(cls_ids, boxes, masks):
+        x1, y1, x2, y2 = map(int, box)
+        if mask.size == 0 or x2 <= x1 or y2 <= y1:
+            continue
+
+        region = image[y1:y2, x1:x2]  # Crop region from image
+        mask_area = mask.astype(bool)  # Convert to boolean mask
+
+        if not np.any(mask_area):
+            continue
+
+        # Generate a solid color patch
+        color = colors[(class_id - 1) % len(colors)]
+        color_patch = np.empty(region.shape, dtype=np.uint8)
+        color_patch[:, :] = color
+
+        # Blend mask with image
+        region[mask_area] = (
+            (1 - alpha) * region[mask_area] + alpha * color_patch[mask_area]
+        ).astype(np.uint8)
+
+
+def draw_contours(img: np.ndarray, boxes: np.ndarray, masks: list,
+                  cls_ids: list, colors: list, thickness: int = 2) -> None:
+    """
+    @brief Draw contour outlines of instance masks on the image.
+    @param img Input image to draw on (modified in-place).
+    @param boxes Bounding boxes for each mask, shape: (N, 4).
+    @param masks List of binary masks for each instance.
+    @param cls_ids List of class indices for each instance.
+    @param colors List of RGB color tuples.
+    @param thickness Thickness of contour lines.
+    @return None
+    """
+    for class_id, box, mask in zip(cls_ids, boxes, masks):
+        x1, y1, x2, y2 = map(int, box)
+        if mask.size == 0:
+            continue
+
+        # Extract external contours from mask
+        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+        if not contours:
+            continue
+
+        # Merge all contours and shift to global coordinates
+        merged_points = np.vstack([c for c in contours])
+        merged_points[:, 0, 0] += x1
+        merged_points[:, 0, 1] += y1
+
+        # Draw the contour line on the image
+        cv2.polylines(img, [merged_points], isClosed=True,
+                      color=colors[(class_id - 1) % len(colors)],
+                      thickness=thickness)
+
+
+def rgb_to_disp_color(rgb_tuple: tuple) -> int:
+    """
+    @brief Convert RGB tuple to 32-bit ARGB display color format.
+    @details Format is ARGB: alpha in high 8 bits, followed by R, G, B.
+    @param rgb_tuple Tuple of (R, G, B) values.
+    @return 32-bit ARGB integer color value.
+    """
+    r, g, b = rgb_tuple
+    alpha = 255
+    return (alpha << 24) | (r << 16) | (g << 8) | b
+
+
+def draw_detections_on_disp(disp, boxes: np.ndarray, cls_ids: list,
+                            scores: list, class_names: list,
+                            colors: list, chn: int = 2) -> None:
+    """
+    @brief Draw detection boxes and labels on a hardware display.
+    @param disp Display device object with `set_graph_rect` and `set_graph_word` methods.
+    @param boxes Array of bounding boxes (N, 4).
+    @param cls_ids List of class indices.
+    @param scores List of detection confidence scores.
+    @param class_names List of class name strings.
+    @param colors List of RGB color tuples.
+    @param chn Display channel index.
+    @return None
+    """
+    # Clear canvas
+    disp.set_graph_rect(0, 0, 0, 0, 2, 1, 0, 3)
+    disp.set_graph_word(0, 0, "", chn, 1, 0, 16)
+
+    for box, cls_id, score in zip(boxes, cls_ids, scores):
+        x1, y1, x2, y2 = map(int, box)
+        label = f"{class_names[cls_id]} {score:.2f}"
+        color = rgb_to_disp_color(colors[cls_id % len(colors)])
+
+        # Draw bounding box on display
+        disp.set_graph_rect(x1, y1, x2, y2, 2, 0, color, 3)
+        # Draw class name and confidence
+        disp.set_graph_word(x1, max(y1 - 20, 0), label, chn, 0, color, 16)
+
+
+def draw_keypoints(image: np.ndarray, kpts_xy: np.ndarray,
+                   kpts_score: np.ndarray, kpt_conf_thresh: float = 0.5,
+                   radius_outer: int = 5, radius_inner: int = 2) -> None:
+    """
+    @brief Draw keypoints with confidence scores on an image.
+    @param image Input/output image in-place modification.
+    @param kpts_xy Keypoints coordinates, shape (N, K, 2).
+    @param kpts_score Keypoints confidence scores, shape (N, K, 1).
+    @param kpt_conf_thresh Confidence threshold to show keypoints.
+    @param radius_outer Outer circle radius.
+    @param radius_inner Inner circle radius.
+    @return None
+    """
+    # Convert threshold to logit space (same as sigmoid(score) > threshold)
+    kpt_conf_inverse = -np.log(1 / kpt_conf_thresh - 1)
+
+    for instance_xy, instance_score in zip(kpts_xy, kpts_score):
+        for j in range(instance_xy.shape[0]):
+            if instance_score[j, 0] < kpt_conf_inverse:
+                continue
+
+            x, y = int(instance_xy[j, 0]), int(instance_xy[j, 1])
+
+            # Draw outer and inner circles
+            cv2.circle(image, (x, y), radius_outer, (0, 0, 255), -1)
+            cv2.circle(image, (x, y), radius_inner, (0, 255, 255), -1)
+
+            # Draw index number twice for bold outline effect
+            cv2.putText(image, f"{j}", (x, y), cv2.FONT_HERSHEY_SIMPLEX,
+                        0.5, (0, 0, 255), 3, cv2.LINE_AA)
+            cv2.putText(image, f"{j}", (x, y), cv2.FONT_HERSHEY_SIMPLEX,
+                        0.5, (0, 255, 255), 1, cv2.LINE_AA)
+
+
+def draw_polygon_boxes(img: np.ndarray, bboxes: list,
+                       color: tuple = (128, 240, 128),
+                       thickness: int = 3) -> np.ndarray:
+    """
+    @brief Draw polygon-style bounding boxes on a copy of the image.
+    @param img Input image (BGR format).
+    @param bboxes List of polygon boxes, each is an ndarray of shape (N, 2).
+    @param color Polygon color (B, G, R).
+    @param thickness Line thickness.
+    @return Image with drawn polygons.
+    """
+    img_copy = img.copy()
+    for bbox in bboxes:
+        bbox = bbox.astype(int)
+        # Draw closed polygon on image
+        cv2.polylines(img_copy, [bbox], isClosed=True, color=color, thickness=thickness)
+    return img_copy

+ 576 - 0
brain/PlannerNode2/Skill/person_approach_skill/skill_utils/postprocess_utils.py

@@ -0,0 +1,576 @@
+# Copyright (c) 2025 D-Robotics Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# flake8: noqa: E501
+
+import cv2
+import numpy as np
+from hbm_runtime import QuantParams
+from scipy.special import softmax
+
+
+def sigmoid(x: np.ndarray) -> np.ndarray:
+    """
+    @brief Compute the sigmoid activation function.
+    @param x Input NumPy array.
+    @return NumPy array after applying sigmoid function element-wise.
+    """
+    return 1.0 / (1.0 + cv2.exp(-x))
+
+
+def recover_to_original_size(img: np.ndarray,
+                             orig_w: int,
+                             orig_h: int,
+                             resize_type: int = 1) -> np.ndarray:
+    """
+    @brief Restore resized image back to original size.
+    @details Supports direct resize or reverse letterbox removal.
+    @param img Input image of shape (H, W, C).
+    @param orig_w Original image width.
+    @param orig_h Original image height.
+    @param resize_type Resize type used before: 0 (direct) or 1 (letterbox).
+    @return Resized image of shape (orig_h, orig_w, C).
+    """
+    h, w = img.shape[:2]  # current size after preprocess
+
+    if resize_type == 0:
+        # Resize directly to original dimensions
+        img_resized = cv2.resize(img, (orig_w, orig_h),
+                                 interpolation=cv2.INTER_NEAREST)
+    elif resize_type == 1:
+        # Remove padding and resize back from letterbox
+        scale = min(h / orig_h, w / orig_w)
+        new_w, new_h = int(orig_w * scale), int(orig_h * scale)
+
+        pad_w = w - new_w
+        pad_h = h - new_h
+        left = pad_w // 2
+        top = pad_h // 2
+
+        # Crop out the letterbox padding
+        cropped = img[top:top + new_h, left:left + new_w]
+
+        # Resize cropped region to original size
+        img_resized = cv2.resize(cropped, (orig_w, orig_h),
+                                 interpolation=cv2.INTER_NEAREST)
+    else:
+        raise ValueError(f"Invalid resize_type: {resize_type}, must be 0 or 1")
+
+    return img_resized
+
+
+def print_topk_predictions(output: np.ndarray,
+                           idx2label: dict,
+                           topk: int = 5) -> None:
+    """
+    @brief Print top-k classification predictions.
+    @details Uses softmax to compute probability and selects top-k.
+    @param output Raw logits as NumPy array (shape: [num_classes]).
+    @param idx2label Dictionary mapping class indices to labels.
+    @param topk Number of top predictions to display.
+    @return None
+    """
+    # Softmax with stability adjustment
+    exp_logits = np.exp(output - np.max(output))
+    probabilities = exp_logits / np.sum(exp_logits)
+
+    # Top-k indices
+    topk_idx = np.argsort(probabilities)[-topk:][::-1]
+    topk_prob = probabilities[topk_idx]
+
+    print(f"Top-{topk} Predictions:")
+    for i in range(topk):
+        idx = topk_idx[i]
+        prob = topk_prob[i]
+        label = idx2label[idx] if idx2label and idx in idx2label else f"Class {idx}"
+        print(f"{label}: {prob:.4f}")
+
+
+def dequantize_tensor(q_tensor: np.ndarray, quant_info: QuantParams) -> np.ndarray:
+    """
+    @brief Dequantize a quantized tensor to floating-point values.
+    @details Supports both per-tensor and per-channel dequantization based on quant_info.
+    @param q_tensor Quantized tensor (e.g., int8 or uint8).
+    @param quant_info Quantization parameters (scale, zero_point, axis, type).
+    @return Dequantized tensor (float32).
+    """
+    if quant_info.quant_type != 1:  # 1 indicates linear scale quantization
+        return q_tensor
+
+    if quant_info.scale.ndim == 0 or q_tensor.ndim == 1 or quant_info.scale.size == 1:
+        # Per-tensor dequantization
+        return (q_tensor.astype(np.float32) - quant_info.zero_point.astype(np.float32)) * quant_info.scale
+    else:
+        # Per-channel dequantization
+        shape = [1] * q_tensor.ndim
+        shape[quant_info.axis] = -1
+        scale = quant_info.scale.reshape(shape)
+        zero_point = quant_info.zero_point.reshape(shape)
+        return (q_tensor.astype(np.float32) - zero_point.astype(np.float32)) * scale
+
+
+def dequantize_outputs(outputs: dict, quan_infos: dict) -> dict:
+    """
+    @brief Dequantize a dictionary of quantized model outputs.
+    @param outputs Dictionary of quantized output tensors.
+    @param quan_infos Dictionary of quantization parameters per output.
+    @return Dictionary of dequantized float32 outputs.
+    """
+    fp32_outputs = {}
+    for name, output in outputs.items():
+        quant_info = quan_infos[name]
+        fp32_outputs[name] = dequantize_tensor(output, quant_info)
+    return fp32_outputs
+
+
+def scale_coords_back(xyxy: np.ndarray,
+                      img_w: int,
+                      img_h: int,
+                      input_w: int,
+                      input_h: int,
+                      resize_type: int = 1) -> np.ndarray:
+    """
+    @brief Map coordinates from resized image back to original image scale.
+    @param xyxy Bounding boxes (N, 4) in resized image.
+    @param img_w Original image width.
+    @param img_h Original image height.
+    @param input_w Network input width.
+    @param input_h Network input height.
+    @param resize_type Resize strategy: 0 (resize), 1 (letterbox).
+    @return Bounding boxes rescaled to original image dimensions.
+    """
+    if resize_type == 0:
+        # Direct resize
+        scale_x = img_w / input_w
+        scale_y = img_h / input_h
+        xyxy[:, [0, 2]] *= scale_x
+        xyxy[:, [1, 3]] *= scale_y
+    elif resize_type == 1:
+        # Letterbox resize
+        scale = min(input_w / img_w, input_h / img_h)
+        pad_w = (input_w - img_w * scale) / 2
+        pad_h = (input_h - img_h * scale) / 2
+        xyxy[:, [0, 2]] = (xyxy[:, [0, 2]] - pad_w) / scale
+        xyxy[:, [1, 3]] = (xyxy[:, [1, 3]] - pad_h) / scale
+    else:
+        raise ValueError("resize_type must be 0 (resize) or 1 (letterbox)")
+
+    # Clamp coordinates within valid image bounds
+    xyxy[:, [0, 2]] = np.clip(xyxy[:, [0, 2]], 0, img_w)
+    xyxy[:, [1, 3]] = np.clip(xyxy[:, [1, 3]], 0, img_h)
+
+    return xyxy
+
+
+def NMS(xyxy: np.ndarray,
+        score: np.ndarray,
+        cls: np.ndarray,
+        iou_thresh: float = 0.45) -> list:
+    """
+    @brief Perform class-wise Non-Maximum Suppression (NMS).
+    @details Keeps boxes with highest scores and removes overlaps above IoU threshold.
+    @param xyxy Bounding boxes (N, 4).
+    @param score Confidence scores (N,).
+    @param cls Class IDs for each box (N,).
+    @param iou_thresh IoU threshold for suppression.
+    @return List of indices to keep.
+    """
+    keep = []
+    for c in np.unique(cls):
+        idx = np.where(cls == c)[0]
+        x1, y1, x2, y2 = xyxy[idx].T
+        area = (x2 - x1) * (y2 - y1)
+        order = score[idx].argsort()[::-1]  # Sort by descending score
+
+        while order.size > 0:
+            i = order[0]
+            keep.append(idx[i])
+            # Compute IoU with remaining boxes
+            xx1 = np.maximum(x1[i], x1[order[1:]])
+            yy1 = np.maximum(y1[i], y1[order[1:]])
+            xx2 = np.minimum(x2[i], x2[order[1:]])
+            yy2 = np.minimum(y2[i], y2[order[1:]])
+            inter = np.clip(xx2 - xx1, 0, None) * np.clip(yy2 - yy1, 0, None)
+            iou = inter / (area[i] + area[order[1:]] - inter + 1e-9)
+
+            # Keep boxes with IoU below threshold
+            order = order[1:][iou < iou_thresh]
+
+    return keep
+
+
+def xywh_to_xyxy(xywh: np.ndarray) -> np.ndarray:
+    """
+    @brief Convert bounding boxes from (x_center, y_center, w, h) to (x1, y1, x2, y2).
+    @param xywh (N, 4) array in [center_x, center_y, width, height] format.
+    @return (N, 4) array in [x1, y1, x2, y2] format.
+    """
+    x1y1 = xywh[:, :2] - xywh[:, 2:] / 2
+    x2y2 = xywh[:, :2] + xywh[:, 2:] / 2
+    return np.hstack([x1y1, x2y2])
+
+
+def filter_classification(cls_output: np.ndarray, conf_thres_raw: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
+    """
+    @brief Filter classification outputs using raw confidence threshold.
+    @param cls_output Classification logits of shape (N, C).
+    @param conf_thres_raw Threshold applied to max logit (before sigmoid).
+    @return Tuple of:
+        - scores: Sigmoid confidence scores of selected predictions
+        - ids: Class indices of selected predictions
+        - valid_indices: Original indices of selected predictions
+    """
+    cls_output = cls_output.reshape(-1, cls_output.shape[-1])
+    max_scores = np.max(cls_output, axis=1)
+    valid_indices = np.flatnonzero(max_scores >= conf_thres_raw)
+    ids = np.argmax(cls_output[valid_indices], axis=1)
+    # Apply sigmoid
+    scores = 1 / (1 + np.exp(-max_scores[valid_indices]))
+    return scores, ids, valid_indices
+
+
+def filter_mces(mces_output: np.ndarray, valid_indices: np.ndarray) -> np.ndarray:
+    """
+    @brief Extract MCES features from selected predictions.
+    @param mces_output.
+    @param valid_indices Indices of valid predictions.
+    @return Filtered MCES tensor of shape (K, D), K = len(valid_indices).
+    """
+    mces_output = mces_output.reshape(-1, mces_output.shape[-1])
+    mces = mces_output[valid_indices, :]
+    return mces
+
+
+def filter_predictions(pred: np.ndarray, score_thres: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
+    """
+    @brief Filter detection predictions by confidence threshold.
+    @param pred Tensor of shape (N, 5 + C): [x, y, w, h, obj_conf, class_probs...].
+    @param score_thres Threshold on (obj_conf * class_conf).
+    @return Tuple of:
+        - xyxy: Filtered bounding boxes (Nf, 4)
+        - score: Filtered scores (Nf,)
+        - cls: Class indices (Nf,)
+    """
+    xywh = pred[:, :4]
+
+    # Combine objectness and class scores
+    conf_all = pred[:, 4:5] * pred[:, 5:]
+    cls = conf_all.argmax(axis=1)
+    score = conf_all[np.arange(len(pred)), cls]
+    mask = score > score_thres
+    xyxy = xywh_to_xyxy(xywh[mask])
+    return xyxy, score[mask], cls[mask]
+
+
+def gen_anchor(grid_size: int) -> np.ndarray:
+    """
+    @brief Generate anchor center positions on a square grid.
+    @param grid_size Size of the square grid (e.g., 80 for 80x80).
+    @return (N, 2) array of anchor coordinates [x, y].
+    """
+    x = np.tile(np.linspace(0.5, grid_size - 0.5, grid_size), reps=grid_size)
+    y = np.repeat(np.linspace(0.5, grid_size - 0.5, grid_size), grid_size)
+    return np.stack([x, y], axis=1)
+
+
+def decode_boxes(boxes_output: np.ndarray,
+                 valid_indices: np.ndarray,
+                 grid_size: int,
+                 stride: int,
+                 weights_static: np.ndarray) -> np.ndarray:
+    """
+    @brief Decode bounding boxes from distributional predictions.
+    @param boxes_output Tensor of shape (N, 4 * 16).
+    @param valid_indices Indices of valid predictions.
+    @param grid_size Feature map grid size.
+    @param stride Downsampling factor.
+    @param weights_static Discrete location weights (e.g., 0~15).
+    @return Decoded bounding boxes in xyxy format (M, 4).
+    """
+    bboxes = boxes_output.reshape(-1, boxes_output.shape[-1])
+    bboxes_float32 = bboxes[valid_indices]
+    # Softmax over 16 bins per LTRB side and apply expectation
+    ltrb = np.sum(softmax(bboxes_float32.reshape(-1, 4, 16), axis=2) *
+                  weights_static, axis=2)
+    anchor = gen_anchor(grid_size)[valid_indices]
+    x1y1 = anchor - ltrb[:, 0:2]
+    x2y2 = anchor + ltrb[:, 2:4]
+    return np.hstack([x1y1, x2y2]) * stride
+
+
+def decode_masks(mces: np.ndarray,
+                 boxes: np.ndarray,
+                 protos: np.ndarray,
+                 input_w: int,
+                 input_h: int,
+                 mask_w: int,
+                 mask_h: int,
+                 mask_thresh: float = 0.5) -> list[np.ndarray]:
+    """
+    @brief Decode instance segmentation masks.
+    @param mces Mask coefficients for each detection (M, C).
+    @param boxes Bounding boxes (M, 4).
+    @param protos Mask prototype feature map (H, W, C).
+    @param input_w Width of the input image.
+    @param input_h Height of the input image.
+    @param mask_w Width of the mask proto.
+    @param mask_h Height of the mask proto.
+    @param mask_thresh Threshold to binarize masks.
+    @return List of (H, W) binary mask arrays.
+    """
+    masks = []
+    x_scale = mask_w / input_w
+    y_scale = mask_h / input_h
+
+    for i, (x1, y1, x2, y2) in enumerate(boxes):
+        # Crop proto features using scaled coordinates
+        x1_corp = int(x1 * x_scale)
+        y1_corp = int(y1 * y_scale)
+        x2_corp = int(x2 * x_scale)
+        y2_corp = int(y2 * y_scale)
+
+        proto_crop = protos[y1_corp:y2_corp, x1_corp:x2_corp, :]  # (H, W, C)
+        mc = mces[i]
+        # Linear combination and thresholding
+        mask = (np.sum(proto_crop * mc[np.newaxis, np.newaxis, :], axis=2)
+                > mask_thresh).astype(np.uint8)
+        masks.append(mask)
+
+    return masks
+
+
+def decode_kpts(kpts_output: np.ndarray,
+                valid_indices: np.ndarray,
+                grid_size: int,
+                stride: int,
+                anchor: np.ndarray = None) -> tuple[np.ndarray, np.ndarray]:
+    """
+    @brief Decode keypoint coordinates from model output.
+    @param kpts_output Keypoint tensor of shape (N, 17*3).
+    @param valid_indices Indices of valid predictions.
+    @param grid_size Size of feature map grid.
+    @param stride Downsampling factor (e.g., 8, 16, 32).
+    @param anchor Optional anchor points. If None, generated automatically.
+    @return Tuple:
+            - kpts_xy: (M, 17, 2) pixel coordinates of keypoints.
+            - kpts_score: (M, 17, 1) keypoint confidence scores.
+    """
+    kpts_output = kpts_output.reshape(-1, kpts_output.shape[-1])
+    kpts = kpts_output[valid_indices].reshape(-1, 17, 3)  # (M, 17, 3)
+
+    if anchor is None:
+        anchor = gen_anchor(grid_size)[valid_indices]  # (M, 2)
+
+    # Decode x, y using anchor and stride
+    kpts_xy = (kpts[:, :, :2] * 2.0 + (anchor[:, None, :] - 0.5)) * stride
+
+    # Extract score without activation (or apply sigmoid optionally)
+    kpts_score = kpts[:, :, 2:3]
+
+    return kpts_xy, kpts_score
+
+
+def decode_layer(feat: np.ndarray,
+                 stride: int,
+                 anchor: np.ndarray,
+                 classes_num: int = 80) -> np.ndarray:
+    """
+    @brief Decode a single feature layer from detection head.
+    @param feat Raw model output tensor of shape (1, na, h, w, c).
+    @param stride Stride of the feature layer.
+    @param anchor Anchor sizes for this layer (na, 2).
+    @param classes_num Number of output classes.
+    @return Decoded prediction array of shape (N, 5 + num_classes).
+    """
+    _, _, h, w, _ = feat.shape  #  h/w: feature map size
+
+    # Create coordinate grid of shape (1, 1, h, w, 2)
+    grid_y, grid_x = np.mgrid[0:h, 0:w]
+    grid = np.stack((grid_x, grid_y), axis=-1)[None, None]
+
+    # batch sigmoid
+    feat_sig = sigmoid(feat[..., :5 + classes_num])
+
+    # Decode center offsets (dx, dy) and size (dw, dh)
+    dxdy = feat_sig[..., :2]
+    dwdh = feat_sig[..., 2:4]
+    obj  = feat_sig[..., 4:5]
+    cls  = feat_sig[..., 5:]
+
+    # Compute center coordinates in original image scale
+    xy = (dxdy * 2. - 0.5 + grid) * stride
+
+    # Compute width/height from anchor sizes
+    wh = (dwdh * 2.) ** 2 * anchor[:, None, None, :]
+
+    # Construct final output tensor (xywh + obj + class scores)
+    out = np.empty((*xy.shape[:-1], 5 + classes_num), dtype=np.float32)
+    out[..., 0:2] = xy
+    out[..., 2:4] = wh
+    out[..., 4:5] = obj
+    out[..., 5:]  = cls
+
+    return out.reshape(-1, 5 + classes_num)
+
+
+def decode_outputs(output_names: list[str],
+                   fp32_outputs: dict[str, np.ndarray],
+                   strides: list[int],
+                   anchors: list[np.ndarray],
+                   classes_num: int = 80) -> np.ndarray:
+    """
+    @brief Decode all feature maps from model output.
+    @param output_names List of output tensor names.
+    @param fp32_outputs Dict of decoded tensors from model.
+    @param strides Stride values for each output head.
+    @param anchors Anchor arrays for each head.
+    @param classes_num Number of output classes.
+    @return Concatenated prediction tensor of shape (N, 5 + classes).
+    """
+    decoded = []
+    for i, key in enumerate(output_names):
+        out = fp32_outputs[key]
+        h, w = out.shape[1:3]
+        # Reshape and transpose to (1, na, h, w, c)
+        feat = out.reshape(1, h, w, 3, 5 + classes_num).transpose(0, 3, 1, 2, 4)
+        decoded.append(decode_layer(feat, strides[i], anchors[i], classes_num))
+    return np.concatenate(decoded, axis=0)
+
+
+def get_bounding_boxes(dilated_polys: list[np.ndarray], min_area: float) -> list[np.ndarray]:
+    """
+    @brief Extract minimum area bounding boxes from polygon contours.
+    @param dilated_polys List of polygon contours. Each element is a NumPy array of shape (N, 1, 2).
+    @param min_area Minimum area threshold to filter small boxes.
+    @return List of bounding boxes. Each is a NumPy array of shape (4, 2), type int.
+    """
+    boxes_list = []
+    for cnt in dilated_polys:
+        if cv2.contourArea(cnt) < min_area:
+            continue  # Skip small contours
+        rect = cv2.minAreaRect(cnt)
+        box = cv2.boxPoints(rect).astype(np.int_)
+        boxes_list.append(box)
+    return boxes_list
+
+
+def resize_masks_to_boxes(masks: list[np.ndarray],
+                          boxes: list[tuple[float, float, float, float]],
+                          img_w: int, img_h: int,
+                          interpolation: int = cv2.INTER_LANCZOS4,
+                          do_morph: bool = True) -> list[np.ndarray]:
+    """
+    @brief Resize binary masks to fit inside their corresponding bounding boxes.
+    @param masks List of binary mask arrays of shape (H, W), dtype=uint8.
+    @param boxes List of bounding boxes in (x1, y1, x2, y2) format.
+    @param img_w Width of the original image.
+    @param img_h Height of the original image.
+    @param interpolation OpenCV interpolation method used for resizing.
+    @param do_morph Whether to apply morphological open to smooth the mask.
+    @return List of resized binary masks cropped to box size.
+    """
+    resized_masks = []
+    for mask, (x1, y1, x2, y2) in zip(masks, boxes):
+        # Clamp coordinates to image bounds
+        x1, y1 = max(int(x1), 0), max(int(y1), 0)
+        x2, y2 = min(int(x2), img_w), min(int(y2), img_h)
+
+        target_w = max(x2 - x1, 1)
+        target_h = max(y2 - y1, 1)
+
+        resized = cv2.resize(mask, (target_w, target_h), interpolation=interpolation)
+
+        if do_morph:
+            # Apply morphological filtering
+            resized = cv2.morphologyEx(resized, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8))
+
+        resized_masks.append(resized)
+
+    return resized_masks
+
+
+def scale_keypoints_to_original_image(kpts_xy: np.ndarray,
+                                      kpts_score: np.ndarray,
+                                      boxes: list[tuple[float, float, float, float]],
+                                      img_w: int, img_h: int,
+                                      input_w: int, input_h: int,
+                                      resize_type: int = 1) -> tuple[np.ndarray, np.ndarray]:
+    """
+    @brief Scale keypoints back to original image coordinates.
+    @param kpts_xy Keypoint coordinates of shape (M, 17, 2), float32.
+    @param kpts_score Keypoint scores of shape (M, 17, 1), float32.
+    @param boxes List of bounding boxes, not used here.
+    @param img_w Width of the original image.
+    @param img_h Height of the original image.
+    @param input_w Width of model input.
+    @param input_h Height of model input.
+    @param resize_type 0 = direct resize, 1 = letterbox resize.
+    @return Tuple of (scaled keypoints, scores), both NumPy arrays.
+    """
+    scaled_kpts = kpts_xy.copy()
+
+    if resize_type == 0:
+        scale_x = img_w / input_w
+        scale_y = img_h / input_h
+        scaled_kpts[..., 0] *= scale_x
+        scaled_kpts[..., 1] *= scale_y
+
+    elif resize_type == 1:
+        scale = min(input_w / img_w, input_h / img_h)
+        pad_w = (input_w - img_w * scale) / 2
+        pad_h = (input_h - img_h * scale) / 2
+        scaled_kpts[..., 0] = (scaled_kpts[..., 0] - pad_w) / scale
+        scaled_kpts[..., 1] = (scaled_kpts[..., 1] - pad_h) / scale
+
+    else:
+        raise ValueError("resize_type must be 0 or 1")
+
+    # Clip to image bounds
+    scaled_kpts[..., 0] = np.clip(scaled_kpts[..., 0], 0, img_w)
+    scaled_kpts[..., 1] = np.clip(scaled_kpts[..., 1], 0, img_h)
+
+    return scaled_kpts, kpts_score
+
+
+def crop_and_rotate_image(img: np.ndarray, box: np.ndarray) -> np.ndarray:
+    """
+    @brief Crop and rotate a region from the image using a rotated bounding box.
+    @param img Input image array of shape (H, W, C), dtype=uint8.
+    @param box Bounding box as 4-point array of shape (4, 2).
+    @return Cropped and rotated region image as a NumPy array.
+    """
+    rect = cv2.minAreaRect(box)
+    box = cv2.boxPoints(rect).astype(np.intp)
+    width = int(rect[1][0])
+    height = int(rect[1][1])
+    angle = rect[2]
+
+    src_pts = box.astype("float32")
+    dst_pts = np.array([[0, height - 1],
+                        [0, 0],
+                        [width - 1, 0],
+                        [width - 1, height - 1]], dtype="float32")
+
+    # Apply perspective transform
+    M = cv2.getPerspectiveTransform(src_pts, dst_pts)
+    warped = cv2.warpPerspective(img, M, (width, height))
+
+    # Rotate if angle is large
+    if angle >= 45:
+        rotated = cv2.rotate(warped, cv2.ROTATE_90_CLOCKWISE)
+    else:
+        rotated = warped
+
+    print("width:", rotated.shape[1], "height:", rotated.shape[0])
+    return rotated

+ 166 - 0
brain/PlannerNode2/Skill/person_approach_skill/skill_utils/preprocess_utils.py

@@ -0,0 +1,166 @@
+# Copyright (c) 2025 D-Robotics Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# flake8: noqa: E501
+
+import cv2
+import numpy as np
+
+
+def bgr_to_nv12_planes(image: np.ndarray) -> tuple:
+    """
+    @brief Convert a BGR image to NV12 format (Y and UV planes).
+    @param image Input BGR image as a NumPy array of shape (H, W, 3).
+    @return A tuple of:
+        - y: Y plane with shape (1, H, W, 1)
+        - uv: UV plane with shape (1, H/2, W/2, 2)
+    """
+    height, width = image.shape[:2]
+    area = height * width
+
+    # Convert to planar YUV I420 format
+    yuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420)
+    yuv420p = yuv420p.reshape((area * 3 // 2,))
+
+    # Extract Y, U, V planes
+    y = yuv420p[:area].reshape((height, width))
+    u = yuv420p[area:area + area // 4].reshape((height // 2, width // 2))
+    v = yuv420p[area + area // 4:].reshape((height // 2, width // 2))
+
+    # Interleave U and V to form UV plane
+    uv = np.stack((u, v), axis=-1)
+
+    # Add batch and channel dimensions
+    y = y[np.newaxis, :, :, np.newaxis]
+    uv = uv[np.newaxis, :, :, :]
+
+    return y, uv
+
+
+def resized_image(img: np.ndarray, input_W: int, input_H: int,
+                  resize_type: int = 1,
+                  interpolation=cv2.INTER_NEAREST) -> np.ndarray:
+    """
+    @brief Resize image with either direct resize or letterbox strategy.
+    @param img Input image (H, W, 3).
+    @param input_W Target width.
+    @param input_H Target height.
+    @param resize_type Resize method: 0 for direct resize, 1 for letterbox padding.
+    @param interpolation Interpolation method (default: nearest).
+    @return Resized image with shape (input_H, input_W, 3).
+    """
+    img_h, img_w = img.shape[:2]
+
+    if resize_type == 0:  # Direct resize
+        resized = cv2.resize(img, (input_W, input_H), interpolation=interpolation)
+    elif resize_type == 1:  # Letterbox resize (preserve aspect ratio)
+        scale = min(input_H / img_h, input_W / img_w)
+        new_w, new_h = int(img_w * scale), int(img_h * scale)
+        resized = cv2.resize(img, (new_w, new_h))
+
+        pad_w = input_W - new_w
+        pad_h = input_H - new_h
+        left, right = pad_w // 2, pad_w - pad_w // 2
+        top, bottom = pad_h // 2, pad_h - pad_h // 2
+
+        # Pad image with gray (127,127,127)
+        resized = cv2.copyMakeBorder(resized, top, bottom, left, right,
+                                     borderType=cv2.BORDER_CONSTANT,
+                                     value=(127, 127, 127))
+    else:
+        raise ValueError(f"Invalid resize_type: {resize_type}, must be 0 or 1")
+
+    return resized
+
+
+def split_nv12_bytes(nv12_bytes: bytes, width: int, height: int) -> tuple:
+    """
+    @brief Split raw NV12 bytes into Y and UV planes.
+    @param nv12_bytes Input NV12-encoded byte stream.
+    @param width Width of the image.
+    @param height Height of the image.
+    @return Tuple (y, uv), where:
+        - y: shape (H, W), dtype uint8
+        - uv: shape (H/2, W), dtype uint8 (interleaved UV)
+    """
+    y_size = width * height
+    uv_size = y_size // 2
+    nv12_array = np.frombuffer(nv12_bytes, dtype=np.uint8)
+
+    y = nv12_array[:y_size].reshape((height, width))
+    uv = nv12_array[y_size:y_size + uv_size].reshape((height // 2, width))
+
+    return y, uv
+
+
+def letterbox_resize_gray(gray_img: np.ndarray, target_w: int, target_h: int) -> np.ndarray:
+    """
+    @brief Resize a grayscale image using letterbox (aspect ratio preserving) strategy.
+    @param gray_img Input grayscale image of shape (H, W).
+    @param target_w Target width.
+    @param target_h Target height.
+    @return Resized and padded grayscale image of shape (target_h, target_w).
+    """
+    h, w = gray_img.shape
+    scale = min(target_w / w, target_h / h)
+    new_w, new_h = int(w * scale), int(h * scale)
+    resized = cv2.resize(gray_img, (new_w, new_h))
+
+    pad_w = target_w - new_w
+    pad_h = target_h - new_h
+    top, bottom = pad_h // 2, pad_h - pad_h // 2
+    left, right = pad_w // 2, pad_w - pad_w // 2
+
+    # Pad with value 127 (gray)
+    padded = cv2.copyMakeBorder(resized, top, bottom, left, right,
+                                borderType=cv2.BORDER_CONSTANT, value=127)
+    return padded
+
+
+def resize_nv12_yuv(y: np.ndarray, uv: np.ndarray,
+                    target_h: int = 672, target_w: int = 672,
+                    keep_ratio: bool = True) -> tuple:
+    """
+    @brief Resize Y and UV planes of an NV12 image to target resolution.
+    @param y Y plane of shape (H, W).
+    @param uv Interleaved UV plane of shape (H/2, W).
+    @param target_h Target height.
+    @param target_w Target width.
+    @param keep_ratio Whether to preserve aspect ratio (uses letterbox if True).
+    @return Tuple of resized:
+        - y_resized: shape (target_h, target_w)
+        - uv_resized: shape (target_h/2, target_w/2, 2)
+    """
+    # Resize Y
+    if keep_ratio:
+        y_resized = letterbox_resize_gray(y, target_w, target_h)
+    else:
+        y_resized = cv2.resize(y, (target_w, target_h))
+
+    # Split UV into U and V components
+    u = uv[:, 0::2]
+    v = uv[:, 1::2]
+
+    # Resize U and V separately
+    if keep_ratio:
+        u_resized = letterbox_resize_gray(u, target_w // 2, target_h // 2)
+        v_resized = letterbox_resize_gray(v, target_w // 2, target_h // 2)
+    else:
+        u_resized = cv2.resize(u, (target_w // 2, target_h // 2))
+        v_resized = cv2.resize(v, (target_w // 2, target_h // 2))
+
+    # Re-stack into UV plane
+    uv_resized = np.stack((u_resized, v_resized), axis=-1)
+
+    return y_resized, uv_resized

+ 114 - 12
brain/PlannerNode2/largemodel/largemodel/action_service.py

@@ -99,6 +99,10 @@ class CustomActionServer(Node):
         self.first_record = True  # 首次记录位置 / First record
         self.is_recording = False  # 录音状态 / Recording status
         self.IS_SAVING = False #是否正在保存图像
+        self.welcome_mode = False  # 迎宾模式标志 / Welcome mode flag
+        self.process_map = {
+            'person_approach': {'pid': None, 'sub': None, 'running': False}
+        }  # 进程管理字典 / Process management map
         self.joint6 = (
             140  # 默认机械臂六轴的初始角度 / Default angle of the six-axis arm
         )
@@ -497,10 +501,28 @@ class CustomActionServer(Node):
         """
 
         if msg.data:
-            if (
+            self.get_logger().info(f"wakeup_callback: welcome_mode={self.welcome_mode}, action_runing={self.action_runing}")
+            # 迎宾模式打断处理
+            if self.welcome_mode:
+                ##self.stop_event.set()  # 停止 TTS 播放
+                # 杀掉所有管理的进程
+                for process_name, process_info in self.process_map.items():
+                    if process_info['pid'] is not None:
+                        self.kill_process_tree(process_info['pid'])
+                        process_info['pid'] = None
+                    if process_info['sub'] is not None:
+                        self.destroy_subscription(process_info['sub'])
+                        process_info['sub'] = None
+                    process_info['running'] = False
+                self.welcome_mode = False
+                self.stop_event.clear()  # 清除停止事件,避免影响后续播放
+                self.get_logger().info("Welcome mode interrupted by wakeup")
+
+            elif (
                 pygame.mixer.music.get_busy()  # 如果音乐正在播放/If the music is playing
             ):
                 self.stop_event.set()  # 停止正在播放的音乐/Stop the music currently playing
+                self.stop_event.clear()  # 清除事件,避免影响后续播放
             if (
                 self.action_runing  # 如果当前有动作正在执行/If there is an action currently being
             ):
@@ -508,6 +530,65 @@ class CustomActionServer(Node):
                 self.stop()
         # self.check_all_process()
 
+    def welcome(self):
+        """
+        迎宾模式函数 / Welcome mode function
+        启动人物靠近检测节点,订阅检测事件,收到事件后播放欢迎语
+        """
+        if self.welcome_mode:
+            self.get_logger().warn("Welcome mode already running")
+            return
+
+        self.welcome_mode = True
+
+        # 启动 person_approach 节点
+        process = subprocess.Popen(
+            ["ros2", "launch", "person_approach_skill", "person_approach_node.launch.py"]
+        )
+        self.process_map['person_approach']['pid'] = process.pid
+        self.process_map['person_approach']['running'] = True
+        self.get_logger().info(f"Started person_approach node, PID: {process.pid}")
+
+        # 创建事件订阅
+        self.process_map['person_approach']['sub'] = self.create_subscription(
+            String, "/skill/person_approach/event",
+            self.person_approach_event_callback, 10
+        )
+        self.get_logger().info("Subscribed to /skill/person_approach/event")
+
+    def person_approach_event_callback(self, msg):
+        """
+        人物靠近事件回调函数 / Person approach event callback
+        收到事件后播放欢迎语
+        """
+        if not self.welcome_mode:
+            return
+
+        try:
+            data = json.loads(msg.data)
+            if data.get('event') == 'person_approach':
+                self.get_logger().info(f"Person approach detected: {data}")
+
+                # 停止上一个 TTS 播放
+                self.stop_event.set()
+                time.sleep(0.1)
+
+                # 欢迎语内容(后续可修改)
+                welcome_text = "欢迎光临"
+
+                # TTS 合成
+                self.model_client.voice_synthesis(
+                    welcome_text, self.tts_out_path
+                )
+                # 异步播放
+                self.play_audio_async(self.tts_out_path)
+                self.get_logger().info(f"Playing welcome TTS: {welcome_text}")
+
+        except json.JSONDecodeError:
+            self.get_logger().error("Failed to parse person_approach event data")
+        except Exception as e:
+            self.get_logger().error(f"Error in person_approach_event_callback: {e}")
+
     def get_current_pose(self):
         """
         获取当前在全局地图坐标系下的位置 /Get the current position in the global map coordinate system
@@ -556,6 +637,7 @@ class CustomActionServer(Node):
         """
         从navpose_dict字典中获取目标点坐标.并导航到目标点
         """
+        self.get_logger().info(f"navigation called with point_name: {point_name}")
         self.navigation_finish_flag = False
         self.goal_handle = None
         self.result = None
@@ -1307,6 +1389,7 @@ class CustomActionServer(Node):
             else:
                 action_name, args_str = match.groups()
                 args = [arg.strip() for arg in args_str.split(",")] if args_str else []
+                self.get_logger().info(f"Executing action: {action_name} with args: {args}")
                 method = getattr(self, action_name)
                 method(*args)
 
@@ -1370,6 +1453,7 @@ class CustomActionServer(Node):
         """
         return
 
+    @staticmethod
     def kill_process_tree(pid):
         try:
             parent = psutil.Process(pid)
@@ -1408,16 +1492,24 @@ class CustomActionServer(Node):
         """
         同步方式播放音频函数The function for playing audio in synchronous mode
         """
+        self.get_logger().info(f"play_audio called: file={file_path}, is_recording={self.is_recording}")
         if self.is_recording:
+            self.get_logger().warn("play_audio: is_recording=True, skip")
             return
+        if pygame.mixer.music.get_busy():
+            pygame.mixer.music.stop()
+        self.stop_event.clear()
         pygame.mixer.music.load(file_path)
         pygame.mixer.music.play()
+        self.get_logger().info(f"play_audio: started playing {file_path}")
         while pygame.mixer.music.get_busy():
             if self.stop_event.is_set() or self.is_recording:
                 pygame.mixer.music.stop()
-                self.stop_event.clear()  # 清除事件
+                self.stop_event.clear()
+                self.get_logger().info("play_audio: stopped by event")
                 return
             pygame.time.Clock().tick(10)
+        self.get_logger().info("play_audio: playback finished")
         if feedback:
             self.action_status_pub("response_done")
 
@@ -1426,19 +1518,29 @@ class CustomActionServer(Node):
         异步方式播放音频函数The function for playing audio in asynchronous mode
         """
         if self.is_recording:
+            self.get_logger().warn("play_audio_async: is_recording=True, skip")
             return
 
         def target():
-            pygame.mixer.music.load(file_path)
-            pygame.mixer.music.play()
-            while pygame.mixer.music.get_busy():
-                if self.stop_event.is_set() or self.is_recording:
-                    pygame.mixer.music.stop()
-                    self.stop_event.clear()  # 清除事件
-                    return
-                pygame.time.Clock().tick(5)
-            if feedback:
-                self.action_status_pub("response_done")
+            try:
+                if pygame.mixer.music.get_busy():
+                    pygame.mixer.music.stop()  # 只在播放中才停止
+                pygame.mixer.music.load(file_path)
+                self.stop_event.clear()  # 清除停止事件,在播放前清除
+                pygame.mixer.music.play()
+                self.get_logger().info(f"play_audio_async: started playing {file_path}")
+                while pygame.mixer.music.get_busy():
+                    if self.stop_event.is_set() or self.is_recording:
+                        pygame.mixer.music.stop()
+                        self.stop_event.clear()
+                        self.get_logger().info("play_audio_async: stopped by event")
+                        return
+                    pygame.time.Clock().tick(5)
+                self.get_logger().info("play_audio_async: playback finished")
+                if feedback:
+                    self.action_status_pub("response_done")
+            except Exception as e:
+                self.get_logger().error(f"play_audio_async error: {e}")
 
         thread = threading.Thread(target=target)
         thread.daemon = True

+ 3 - 0
brain/PlannerNode2/largemodel/utils/promot.py

@@ -208,6 +208,9 @@ action_function_library='''
 - **获取当前视角图像**:`seewhat()`  
   - 说明:调用后机器人上传一张`640×480`像素的俯视图像,用于物体定位。  
 ## 其他函数   
+- **迎宾模式**:`welcome()`
+  - 说明:启动迎宾模式,机器人会检测人员靠近并播放欢迎语"欢迎光临"。
+
 - **结束当前任务周期**:`finish_dialogue()`  
   - 说明:清空上下文,结束任务(如用户指令“退下”“休息”)。  
 - **等待一段时间**:`wait(x)`