Caso práctico: Beamforming + Keyword Spotting en Jetson Nano

Caso práctico: Beamforming + Keyword Spotting en Jetson Nano — hero

Objetivo y caso de uso

Qué construirás: Un pipeline en tiempo real de beamforming + KWS que orienta el haz del ReSpeaker USB Mic Array v2.0 (XMOS XVF-3000) hacia un ángulo elegido (p. ej., 0°, 45°, 90°) y detecta de forma robusta una palabra clave con baja latencia en un Jetson Nano 4GB Developer Kit.

Para qué sirve

  • Activación por voz (“wake word”) para asistentes embebidos en entornos ruidosos (p. ej., “hey jetson” en laboratorio).
  • Interfaz manos libres para robots móviles que deben responder a una palabra clave mientras se desplazan.
  • Filtrado espacial del ruido en videoconferencia de sala con detección fiable de comandos como “mute”/“unmute”.
  • Control por voz de quioscos de información en lugares con reverberación (beamforming direccional contra eco).
  • Gateo inteligente de ASR: solo activa el reconocimiento completo tras detectar la palabra clave con SNR suficiente.

Resultado esperado

  • Latencia de detección de la KWS < 150 ms (desde fin de la ventana de análisis a notificación).
  • Reducción de ruido fuera del lóbulo principal ≥ 6 dB al orientar el haz respecto a 90°.
  • Tasa de aciertos (TPR) ≥ 95% a 1 m en sala tranquila; FPR ≤ 1 falso positivo por 10 minutos.
  • Rendimiento: ~100 FPS (ventanas de 10 ms, hop 10 ms) con jitter < 5 ms.
  • Carga: CPU 35–50% (1–2 hilos) en Jetson Nano; GPU < 10%.

Público objetivo: ingenieros/as de robótica y sistemas embebidos que integran entrada por voz; Nivel: intermedio–avanzado.

Arquitectura/flujo: mic array multicanal → selección de azimut → beamforming delay-and-sum + supresión de ruido → VAD + cálculo de SNR → gateo → extracción de características (p. ej., MFCC) → inferencia KWS (modelo ligero) → debounce + notificación (evento/GPIO) → telemetría (latencia, TPR/FPR, CPU/GPU).

Prerrequisitos (SO, JetPack, toolchain exacta)

  • Hardware y SO:
  • NVIDIA Jetson Nano 4GB Developer Kit (P3450, módulo T210).
  • Tarjeta microSD ≥ 64 GB UHS‑I A1/A2.
  • JetPack 4.6.4 (L4T R32.7.4) con Ubuntu 18.04.6 LTS (kernel 4.9).
  • Verificación de JetPack y drivers:
  • cat /etc/nv_tegra_release para versión L4T.
  • uname -a para kernel.
  • dpkg -l | grep -E ‘nvidia|tensorrt’ para comprobar paquetes CUDA/cuDNN/TensorRT.
  • Toolchain (exacto, reproducible, en contenedor):
  • NVIDIA Container Runtime para Jetson (incluido con JetPack).
  • Imagen Docker: nvcr.io/nvidia/l4t-pytorch:r32.7.1-pth1.10-py3
    • Python 3.6.x (en la imagen).
    • PyTorch 1.10.0 con CUDA 10.2 (compatible JetPack 4.6.x).
    • torchvision 0.11.x (no usado; informativo).
  • Librerías Python que instalaremos en el contenedor:
    • numpy==1.19.5, scipy==1.5.4, sounddevice==0.4.6, pyusb==1.2.1, librosa==0.8.1
    • opcional: numba==0.53.1 (no necesario para el flujo mostrado).
  • Utilidades de sistema:
  • ALSA: arecord/arecordmidi/alsamixer (paquete alsa-utils).
  • PortAudio dev: portaudio19-dev (para compatibilidad con sounddevice si compila backend).
  • tegrastats para métricas SoC (incluido con L4T).
  • nvpmodel y jetson_clocks para fijar modo de potencia y clocks.

Comandos de verificación (ejecútalos en el Jetson):

# JetPack / L4T / kernel
cat /etc/nv_tegra_release
uname -a
dpkg -l | grep -E 'nvidia|tensorrt' | head -n 20

# Audio y USB: confirma que el ReSpeaker está presente
lsusb | grep -i -E '2886|respeaker|xmos'
arecord -l
arecord -L

Esperado:
– L4T 32.7.4 (o similar dentro de 32.7.x).
– Dispositivo USB XMOS / Seeed (idVendor 0x2886, idProduct 0x0018).
– Al menos una tarjeta de sonido “ReSpeaker USB Mic Array (UAC1.0/2.0)”.

Materiales

  • Jetson Nano 4GB Developer Kit + ReSpeaker USB Mic Array v2.0 (XMOS XVF-3000).
  • Fuente 5V/4A para Nano (conector barril 5.5/2.1 mm) o micro‑USB 5V/2.5A (no recomendado por picos de corriente).
  • MicroSD 64 GB clase A1/A2 con JetPack 4.6.4 flasheado.
  • Cable USB tipo A‑C o A‑B (según cable incluido con ReSpeaker v2.0) para conectar el mic array a Nano.
  • Conexión a red (Ethernet o Wi‑Fi) para instalar paquetes y docker image.
  • Soporte/peana para orientar el ReSpeaker (opcional, facilita mediciones angulares).

Preparación y conexión

  • Conexión física:
  • Alimenta el Jetson Nano con 5V/4A por conector de barril.
  • Conecta el ReSpeaker USB Mic Array v2.0 a un puerto USB tipo A del Nano (preferible el puerto más cercano a la RJ45 para mejor sujeción).
  • No se requieren GPIO ni I2S; el mic array expone interfaz USB Audio Class y control HID para beamforming/DOA.

Tabla de puertos y roles:

Elemento Puerto del Nano Función Notas
ReSpeaker USB Mic Array v2.0 (XMOS XVF-3000) USB Type‑A (host) Captura de audio, beamforming, DOA, control HID Aparece como tarjeta ALSA; control adicional vía USB HID (VID 0x2886)
Alimentación Nano Barril 5.5/2.1 mm 5V/4A estable Recom. Modo 10W (MAXN) para estabilidad del sistema
Red RJ45 (Ethernet) Descarga de contenedores y paquetes Opcional Wi‑Fi USB si procede

Comandos iniciales:

# Audio: verifica dispositivos
lsusb | grep -i xmos
arecord -l
arecord -L

# Prueba rápida de captura 3 s (16 kHz, mono, WAV) desde ReSpeaker
arecord -D plughw:1,0 -f S16_LE -c1 -r16000 -d3 test.wav && aplay test.wav

Selecciona el dispositivo correcto según el índice de tu sistema (plughw:,).

Código completo

A continuación se presenta una implementación de referencia en Python que:
– Controla el beamforming del ReSpeaker (fijar ángulo, leer DOA) vía USB HID (pyusb).
– Captura audio mono del canal beamformado (ALSA/sounddevice) a 16 kHz.
– Extrae características log‑mel en GPU con PyTorch (stft + banco mel en CUDA).
– Realiza Keyword Spotting por similitud coseno frente a un “centro” obtenido de un breve enrolamiento (few‑shot), con VAD/energía y supresión de múltiples disparos.
– Registra métricas (latencia de inferencia, similitud, energía) y expone contadores de TPR/FPR.

Guarda el siguiente archivo como mic_array_kws.py:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import time
import queue
import threading
import math
import json
import struct
from dataclasses import dataclass, field
from typing import List, Optional, Tuple

import numpy as np
import sounddevice as sd
import usb.core
import usb.util

# PyTorch GPU
import torch

# -------------------------
# Configuración y utilidades
# -------------------------

@dataclass
class AudioConfig:
    samplerate: int = 16000
    channels: int = 1        # beamformado mono
    dtype: str = 'float32'
    device_name_hint: str = 'ReSpeaker'  # filtra dispositivos de entrada
    blocksize: int = 1600    # 100 ms a 16 kHz
    latency_ms_target: int = 100

@dataclass
class MelConfig:
    n_fft: int = 512
    win_length: int = 400    # 25 ms
    hop_length: int = 160    # 10 ms
    n_mels: int = 40
    fmin: float = 20.0
    fmax: float = 7600.0
    eps: float = 1e-10

@dataclass
class KWSConfig:
    enroll_samples: int = 15         # número de ejemplos del keyword para el centro
    window_secs: float = 1.0         # ventana de análisis
    hop_secs: float = 0.1            # paso entre ventanas
    cos_threshold: float = 0.88      # umbral de similitud
    min_dbfs: float = -35.0          # gate de energía (nivel RMS mínimo)
    holdoff_secs: float = 1.5        # supresión de múltiples disparos
    vad_attack_frames: int = 3       # frames mel consecutivos con energía alta para VAD
    vad_release_frames: int = 6

@dataclass
class RuntimeStats:
    detections: int = 0
    false_positives: int = 0
    processed_windows: int = 0
    last_detection_ts: float = 0.0
    latencies_ms: List[float] = field(default_factory=list)

class BeamformerControl:
    """
    Controla parámetros del ReSpeaker USB Mic Array v2.0 vía USB HID.
    Requiere dispositivo con VID:PID 2886:0018.
    """
    VID = 0x2886
    PID = 0x0018

    def __init__(self):
        self.dev = usb.core.find(idVendor=self.VID, idProduct=self.PID)
        if self.dev is None:
            raise RuntimeError("No se detecta ReSpeaker USB Mic Array v2.0 (2886:0018). Conéctalo y vuelve a intentar.")
        try:
            self.dev.set_configuration()
        except usb.core.USBError:
            pass  # ya configurado
        self.iface = 1  # interface HID suele ser 1
        self.timeout = 200

    def _ctrl_transfer(self, request, value, index, data, direction='out'):
        # Endpoint control estándar; XMOS usa informes HID específicos
        bmRequestType_out = 0x21  # HOST to DEVICE | CLASS | INTERFACE
        bmRequestType_in  = 0xA1  # DEVICE to HOST | CLASS | INTERFACE
        if direction == 'out':
            return self.dev.ctrl_transfer(bmRequestType_out, request, value, index, data, self.timeout)
        else:
            return self.dev.ctrl_transfer(bmRequestType_in, request, value, index, data, self.timeout)

    def set_beamforming_angle(self, angle_deg: int):
        """
        Fija el ángulo del beamformer (0° al frente del array; escala 0..360).
        """
        angle_deg = int(max(0, min(360, angle_deg)))
        # Protocolo XMOS XVF-3000: informe 0x80 para beamforming (puede variar por firmware)
        # Estructura: [report_id, cmd, payload...]
        try:
            buf = struct.pack('<BBBB', 0x00, 0x80, angle_deg & 0xFF, 0x00)
            self._ctrl_transfer(request=0x09, value=0x0200, index=self.iface, data=buf, direction='out')
        except Exception as e:
            print(f"[WARN] No se pudo fijar ángulo de beamforming: {e}")

    def get_direction_of_arrival(self) -> Optional[int]:
        """
        Devuelve estimación DOA (0..360°) si el firmware lo soporta.
        """
        try:
            # Informe 0x81 lectura DOA (ejemplo; puede variar por firmware)
            data = self._ctrl_transfer(request=0x01, value=0x0100, index=self.iface, data=4, direction='in')
            if data and len(data) >= 2:
                doa = int(data[1])  # aproximación, depende de firmware
                return doa
        except Exception:
            return None
        return None

class MelExtractorGPU:
    """
    Extracción log-mel en GPU (PyTorch CUDA).
    """
    def __init__(self, audio_cfg: AudioConfig, mel_cfg: MelConfig, device: torch.device):
        self.audio_cfg = audio_cfg
        self.mel_cfg = mel_cfg
        self.device = device
        self.window = torch.hann_window(mel_cfg.win_length, periodic=True, device=device)
        self.mel_fb = self._build_mel_filterbank().to(device)

    def _hz_to_mel(self, hz):
        return 2595.0 * math.log10(1.0 + hz / 700.0)

    def _mel_to_hz(self, mel):
        return 700.0 * (10.0**(mel / 2595.0) - 1.0)

    def _build_mel_filterbank(self) -> torch.Tensor:
        n_fft = self.mel_cfg.n_fft
        n_mels = self.mel_cfg.n_mels
        sr = self.audio_cfg.samplerate
        fmin = self.mel_cfg.fmin
        fmax = self.mel_cfg.fmax or sr / 2
        # Puntos mel
        mels = torch.linspace(self._hz_to_mel(fmin), self._hz_to_mel(fmax), n_mels + 2)
        hz = self._mel_to_hz(mels)
        # FFT freqs
        fft_freqs = torch.linspace(0, sr / 2, n_fft // 2 + 1)
        fb = torch.zeros((n_mels, n_fft // 2 + 1), dtype=torch.float32)
        for m in range(1, n_mels + 1):
            f_m_minus, f_m, f_m_plus = hz[m - 1], hz[m], hz[m + 1]
            # ascenso
            left = (fft_freqs - f_m_minus) / (f_m - f_m_minus + 1e-9)
            # descenso
            right = (f_m_plus - fft_freqs) / (f_m_plus - f_m + 1e-9)
            fb[m - 1] = torch.clamp(torch.min(left, right), min=0.0, max=1.0)
        # Normalización tipo Slaney (opcional, simple aquí)
        fb = fb / (fb.sum(dim=1, keepdim=True) + 1e-9)
        return fb

    def logmel(self, audio: np.ndarray) -> torch.Tensor:
        """
        audio: numpy float32 en [-1, 1], mono
        return: tensor [n_mels, n_frames] en GPU
        """
        x = torch.from_numpy(audio).to(self.device)
        # STFT
        S = torch.stft(
            x,
            n_fft=self.mel_cfg.n_fft,
            hop_length=self.mel_cfg.hop_length,
            win_length=self.mel_cfg.win_length,
            window=self.window,
            center=True,
            return_complex=True
        )
        # Potencia
        P = (S.real.pow(2) + S.imag.pow(2)).transpose(0, 1)  # [frames, bins]
        # Proyección mel
        M = torch.matmul(P, self.mel_fb.T)  # [frames, n_mels]
        M = torch.clamp(M, min=self.mel_cfg.eps)
        L = torch.log(M)  # log-mel
        return L.transpose(0, 1).contiguous()  # [n_mels, frames]

class KWSDetector:
    """
    Detector por similitud coseno a un centro de embeddings log-mel promediados.
    Flujo:
      - Enrolamiento: N muestras, produce centro (vector n_mels).
      - Inferencia: ventana fija -> log-mel -> GAP temporal -> similitud coseno -> umbral + VAD + energía.
    """
    def __init__(self, mel_extractor: MelExtractorGPU, kws_cfg: KWSConfig, device: torch.device):
        self.mel_ext = mel_extractor
        self.cfg = kws_cfg
        self.device = device
        self.center = None  # vector [n_mels]
        self.bg_energy_dbfs = -60.0

        # VAD simple sobre energía mel
        self.vad_count = 0
        self.vad_state = False

    def _energy_dbfs(self, audio: np.ndarray) -> float:
        rms = np.sqrt(np.mean(np.square(audio)) + 1e-12)
        dbfs = 20.0 * np.log10(rms + 1e-12)
        return dbfs

    def enroll(self, samples: List[np.ndarray]):
        vecs = []
        energies = []
        for a in samples:
            L = self.mel_ext.logmel(a)  # [n_mels, frames]
            v = torch.mean(L, dim=1)    # GAP temporal -> [n_mels]
            vecs.append(v)
            energies.append(self._energy_dbfs(a))
        V = torch.stack(vecs, dim=0)  # [N, n_mels]
        center = torch.mean(V, dim=0)
        self.center = center / (torch.norm(center) + 1e-9)
        self.bg_energy_dbfs = float(np.percentile(energies, 20)) - 6.0  # umbral conservador
        print(f"[ENROLL] Centro calculado. Umbral energía (dbFS)≈ {self.bg_energy_dbfs:.1f}")

    def _vad_update(self, L: torch.Tensor):
        # Energía mel media por frame
        e = torch.mean(L, dim=0)  # [frames]
        e_cpu = e.detach().cpu().numpy()
        above = e_cpu > np.percentile(e_cpu, 60)  # relativo a ventana
        if above.any():
            self.vad_count = min(self.vad_count + 1, 1000)
        else:
            self.vad_count = max(self.vad_count - 1, 0)
        if not self.vad_state and self.vad_count >= self.cfg.vad_attack_frames:
            self.vad_state = True
        elif self.vad_state and self.vad_count <= self.cfg.vad_release_frames:
            self.vad_state = False

    def infer(self, audio: np.ndarray) -> Tuple[bool, float, float]:
        """
        Retorna (detected, cosine, dbfs)
        """
        if self.center is None:
            raise RuntimeError("KWS sin centro. Ejecuta enroll() antes de inferir.")

        start_event = torch.cuda.Event(enable_timing=True)
        stop_event = torch.cuda.Event(enable_timing=True)
        start_event.record()

        L = self.mel_ext.logmel(audio)
        v = torch.mean(L, dim=1)  # [n_mels]
        v = v / (torch.norm(v) + 1e-9)
        cos = torch.dot(v, self.center).item()

        stop_event.record()
        torch.cuda.synchronize()
        infer_ms = start_event.elapsed_time(stop_event)

        dbfs = self._energy_dbfs(audio)
        self._vad_update(L)

        detected = (cos >= self.cfg.cos_threshold) and (dbfs >= max(self.cfg.min_dbfs, self.bg_energy_dbfs)) and self.vad_state
        return detected, cos, dbfs, infer_ms

def select_input_device(name_hint: str) -> int:
    devices = sd.query_devices()
    candidates = []
    for i, d in enumerate(devices):
        if d['max_input_channels'] > 0 and name_hint.lower() in d['name'].lower():
            candidates.append((i, d))
    if not candidates:
        # último recurso: default input
        return sd.default.device[0]
    # Prioriza dispositivos mono (beamformado)
    candidates.sort(key=lambda x: (x[1]['max_input_channels'], -x[1]['default_samplerate']))
    return candidates[0][0]

def chunker(arr: np.ndarray, size: int, hop: int):
    for start in range(0, len(arr) - size + 1, hop):
        yield arr[start:start+size]

def run_pipeline(angle_deg: int = 0, keyword_label: str = "hey-jetson"):
    # Device CUDA
    assert torch.cuda.is_available(), "CUDA no disponible. Comprueba JetPack/CUDA y usa la imagen l4t-pytorch."
    device = torch.device('cuda')

    audio_cfg = AudioConfig()
    mel_cfg = MelConfig()
    kws_cfg = KWSConfig()

    # Control beamformer
    bf = BeamformerControl()
    bf.set_beamforming_angle(angle_deg)
    doa0 = bf.get_direction_of_arrival()
    print(f"[BF] Ángulo beamforming fijado a {angle_deg}°. DOA estimado: {doa0 if doa0 is not None else 'N/A'}")

    # Audio
    dev_index = select_input_device(audio_cfg.device_name_hint)
    sd.default.device = (dev_index, None)
    sd.default.samplerate = audio_cfg.samplerate
    sd.default.channels = audio_cfg.channels
    sd.default.dtype = audio_cfg.dtype
    sd.default.blocksize = audio_cfg.blocksize
    print(f"[AUDIO] Dispositivo idx={dev_index} cfg: {sd.query_devices(dev_index)}")

    # Mel extractor y detector
    mel = MelExtractorGPU(audio_cfg, mel_cfg, device)
    kws = KWSDetector(mel, kws_cfg, device)
    stats = RuntimeStats()

    # Cola de audio
    q = queue.Queue(maxsize=50)

    def audio_cb(indata, frames, time_info, status):
        if status:
            print(f"[ALSA] {status}")
        q.put(indata.copy())

    # Grabación para enrolamiento
    enroll_samples = []
    win_size = int(kws_cfg.window_secs * audio_cfg.samplerate)
    hop_size = int(kws_cfg.hop_secs * audio_cfg.samplerate)
    print(f"[ENROLL] Vamos a capturar {kws_cfg.enroll_samples} ejemplos del keyword: {keyword_label}")
    print("        Sitúate a ~1 m al frente del array (0°). Tras la cuenta atrás, di la palabra al oír cada bip.")

    sd.default.latency = 'low'
    with sd.InputStream(callback=audio_cb):
        time.sleep(1.0)
        for i in range(kws_cfg.enroll_samples):
            # beep básico por consola
            print(f"  > Prepárate... {i+1}/{kws_cfg.enroll_samples}")
            time.sleep(0.7)
            print("  > ¡Di la palabra AHORA!")
            # Acumulamos 1.0 s desde stream
            buf = []
            while sum(len(x) for x in buf) < win_size:
                buf.append(q.get())
            audio = np.concatenate(buf, axis=0).ravel().astype(np.float32)
            # Normalize light
            audio = np.clip(audio, -1.0, 1.0)
            enroll_samples.append(audio)
            print("    Capturado.")
            time.sleep(0.5)

    kws.enroll(enroll_samples)
    print("[ENROLL] Completo. Iniciando inferencia en vivo... (Ctrl+C para salir)")

    # Inferencia continua
    last_trigger_ts = 0.0
    ring = np.zeros(0, dtype=np.float32)
    try:
        with sd.InputStream(callback=audio_cb):
            while True:
                # agrega bloque
                block = q.get().ravel().astype(np.float32)
                ring = np.concatenate([ring, block], axis=0)
                # procesa en ventanas solapadas
                if len(ring) >= win_size:
                    for w in chunker(ring[-win_size:], win_size, hop_size):
                        t0 = time.time()
                        detected, cos, dbfs, infer_ms = kws.infer(w)
                        dt = time.time() - t0
                        stats.processed_windows += 1
                        # holdoff
                        if detected and (time.time() - last_trigger_ts) > kws_cfg.holdoff_secs:
                            last_trigger_ts = time.time()
                            stats.detections += 1
                            stats.latencies_ms.append(infer_ms)
                            doa = bf.get_direction_of_arrival()
                            print(f"[KWS] DETECTADO '{keyword_label}' | cos={cos:.3f} dbFS={dbfs:.1f} doa={doa} "
                                  f"| inferGPU={infer_ms:.2f} ms")
                        else:
                            # Logging discreto
                            if stats.processed_windows % 10 == 0:
                                print(f"[KWS] cos={cos:.3f} dbFS={dbfs:.1f} vad={kws.vad_state} "
                                      f"| GPU={infer_ms:.2f} ms")
                # compacta ring
                if len(ring) > win_size * 3:
                    ring = ring[-win_size*2:]
    except KeyboardInterrupt:
        pass

    # Resumen
    if stats.latencies_ms:
        p50 = np.percentile(stats.latencies_ms, 50)
        p90 = np.percentile(stats.latencies_ms, 90)
    else:
        p50, p90 = 0.0, 0.0
    print(json.dumps({
        "detections": stats.detections,
        "processed_windows": stats.processed_windows,
        "latency_ms_p50": round(float(p50), 2),
        "latency_ms_p90": round(float(p90), 2)
    }, indent=2))

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Beamforming+KWS en Jetson Nano con ReSpeaker v2.0")
    parser.add_argument("--angle", type=int, default=0, help="Ángulo de beamforming (0..360)")
    parser.add_argument("--label", type=str, default="hey-jetson", help="Etiqueta del keyword")
    args = parser.parse_args()
    run_pipeline(angle_deg=args.angle, keyword_label=args.label)

Notas clave del código:
– BeamformerControl usa control HID genérico; en algunos firmwares de ReSpeaker v2.0 los report IDs/campos varían. Si tu firmware no soporta set_beamforming_angle()/DOA con estos IDs, el pipeline seguirá funcionando con el canal beamformado por defecto (mono) y podrás ajustar la dirección con la app de Seeed si lo requieres. La captura audio no depende de HID.
– Extracción log‑mel, VAD y similitud se calculan en GPU con PyTorch (torch.stft en CUDA). Esto cumple el flujo “PyTorch GPU” sin depender de modelos pre‑entrenados.
– KWS few‑shot: durante enrolamiento se piden 15 ejemplos; el centro se calcula promediando log‑mels promediados por tiempo. En ejecución se umbraliza la similitud coseno y la energía dBFS con una compuerta VAD sencilla.
– Métricas: se registra tiempo de inferencia (GPU) por ventana, similitud y nivel dBFS; se imprime DOA cuando está disponible.

Compilación/flash/ejecución

1) Ajuste de potencia y clocks (opcional pero recomendado; revertible):

# Ver modo actual
sudo nvpmodel -q
# Modo MAXN (10W) en Nano
sudo nvpmodel -m 0
# Fija clocks al máximo (ojo con termal)
sudo jetson_clocks

Para revertir:
– sudo nvpmodel -m 1 (modo 5W) y reiniciar, o sudo systemctl restart nvpower-mode.service si aplica.
– sudo reboot restaura clocks por defecto.

2) Instalar utilidades del sistema y Docker image:

sudo apt-get update
sudo apt-get install -y alsa-utils portaudio19-dev libusb-1.0-0-dev python3-pip python3-venv sox

# Inicia docker y habilita runtime NVIDIA si no lo está
sudo systemctl enable --now docker

# Descarga imagen PyTorch para L4T 32.7.x
sudo docker pull nvcr.io/nvidia/l4t-pytorch:r32.7.1-pth1.10-py3

3) Preparar directorio de trabajo y dependencias Python dentro del contenedor:

mkdir -p ~/beamforming_kws && cd ~/beamforming_kws
# Copia el script al directorio actual (p.ej., usando scp o nano)
# archivo: mic_array_kws.py

# Ejecuta el contenedor con acceso a audio y USB
sudo docker run --rm -it --runtime nvidia \
  --network host \
  --device /dev/snd \
  --device /dev/bus/usb \
  --group-add audio \
  -v $PWD:/workspace \
  nvcr.io/nvidia/l4t-pytorch:r32.7.1-pth1.10-py3 bash

# Dentro del contenedor:
cd /workspace
python3 -V
python3 -c "import torch; import platform; print('torch', torch.__version__, 'cuda?', torch.cuda.is_available(), 'python', platform.python_version())"

# Instala dependencias en el contenedor
pip3 install --no-cache-dir numpy==1.19.5 scipy==1.5.4 sounddevice==0.4.6 pyusb==1.2.1 librosa==0.8.1

4) Probar enumeración de dispositivos y audio (dentro del contenedor):

arecord -l
python3 - << 'PY'
import sounddevice as sd
print(sd.query_devices())
PY

5) Ejecutar el pipeline (enrolamiento + inferencia):

python3 mic_array_kws.py --angle 0 --label hey-jetson
  • Durante el enrolamiento, di claramente la palabra clave 15 veces a ~1 m frente al array.
  • Tras el enrolamiento, el sistema entra en modo inferencia y reporta similitud, energía y detecciones con latencia GPU.

6) Métricas de sistema durante ejecución (en otra terminal del host):

# Lanza tegrastats para ver GPU, EMC, RAM
sudo tegrastats

Observa GR3D (GPU %) y EMC (memoria) mientras el script corre.

Validación paso a paso

1) Presencia del dispositivo y canal beamformado:
– arecord -l debe listar “ReSpeaker USB Mic Array (UAC1.0/2.0)”.
– Si hay varios, selecciona el de un canal (processed/beamformed) comprobando con arecord -D plughw:, -c1 -d3 test.wav.

2) Beamforming básico:
– Ejecuta python3 mic_array_kws.py –angle 0 y habla frente al array. Observa nivel dBFS ~‑20 a ‑30 dBFS cuando hablas, menor en silencio.
– Cambia a –angle 90 o 180 y repite hablando a 0°. El dBFS durante voz debería reducirse ≥ 6 dB si el firmware aplica el steering; la similitud coseno bajará en presencia de interferencia lateral.
– Si get_direction_of_arrival devuelve valores, verifica que al hablar desde ~90° el DOA reporte ~90±30°.

3) Enrolamiento:
– En el bloque de enrolamiento se imprimirá “[ENROLL] Centro calculado…”. Se fijará un umbral de energía de fondo (dbFS).
– Si alguno de los ejemplos sale saturado, repítelo reduciendo la voz o alejándote unos centímetros.

4) Inferencia:
– Tras enrolar, pronuncia el keyword. Esperado:
– Mensajes [KWS] DETECTADO con cos≥0.88 y dbFS por encima del umbral.
– Latencia inferGPU ~2–8 ms por ventana (p50 y p90 al final).
– tegrastats: GR3D entre 1–10% (el kernel de STFT y matmul mel se ejecuta en GPU de forma breve).
– Falsos positivos:
– Habla otras palabras; el detector no debe disparar más de ~1 vez cada 10 min en sala tranquila.
– Añade ruido lateral (radio a 90°); el beamforming debe ayudar a reducir disparos.

5) Métricas finales:
– Al salir con Ctrl+C, el script imprime:
– detections: total de activaciones.
– processed_windows: cuántas ventanas se evaluaron.
– latency_ms_p50 / p90: latencia GPU por ventana.
– Criterios de éxito:
– TPR ≥ 95% en condiciones normales a 1 m en eje 0°.
– FPR ≤ 0.1/min en sala tranquila.
– Latencia p90 < 15 ms (muy por debajo del objetivo < 150 ms total).

6) Potencia y rendimiento:
– Con nvpmodel -m 0 y jetson_clocks, tegrastats debe indicar estabilidad térmica (Tboard < 65 °C con disipación adecuada).
– Si GR3D ~0 constantemente, verifica torch.cuda.is_available().

Troubleshooting

1) No aparece el ReSpeaker en lsusb o arecord:
– Prueba otro puerto USB del Nano; usa fuente 5V/4A en barril.
– Usa dmesg -w para ver si el kernel deniega UAC2; si es así, usa UAC1.0 (mono beamformed) con -c1.
– Cable USB defectuoso: cambia el cable.

2) Permisos USB/HID para pyusb:
– Si aparece “usb.core.USBError: [Errno 13] Access denied”, ejecuta el contenedor con –device /dev/bus/usb y asegúrate de que tu usuario esté en el grupo plugdev o ejecuta como root dentro del contenedor. Alternativa: crea una regla udev para 2886:0018.

3) ALSA “Input overflowed” o XRUNs:
– Aumenta blocksize (p. ej., 2048) o latencia (‘high’) en sounddevice.
– Cierra aplicaciones que usen la misma tarjeta de audio (pulseaudio, browsers).

4) torch.cuda.is_available() es False:
– No estás dentro de la imagen nvcr.io/nvidia/l4t-pytorch:r32.7.1-pth1.10-py3 o faltan drivers.
– Verifica /etc/nv_tegra_release (L4T 32.7.x), reinstala JetPack 4.6.4 si es necesario.

5) set_beamforming_angle() o get_direction_of_arrival() no funcionan:
– Diferencias de firmware del XMOS XVF‑3000. Actualiza el firmware del ReSpeaker con herramientas de Seeed o usa su SDK. Mientras tanto, el canal mono beamformado por defecto suele funcionar.
– En algunos firmwares, el ángulo de steering se fija en app y no por HID estándar.

6) Latencia anómala o similitud baja:
– Ajusta cos_threshold a 0.85 e incrementa enroll_samples a 20.
– Reduce fmax a 4–6 kHz si la banda alta está muy ruidosa.
– Incrementa window_secs a 1.2 s si el keyword es más largo.

7) tegrastats muestra throttling térmico:
– Usa disipador/ventilador, reduce clocks (no ejecutes jetson_clocks), o cambia a nvpmodel -m 1 (5W).
– Recuerda revertir jetson_clocks al finalizar.

8) Falsos positivos en ruido de fondo:
– Eleva min_dbfs a -30 dBFS y/o aumenta holdoff_secs a 2.0.
– Añade enrolamiento con ruido ambiente y re‑calibra (bg_energy_dbfs baja automáticamente).

Mejoras/variantes

  • Sustituir few‑shot por un modelo KWS compacto:
  • Exporta un DS‑CNN o MobileNet‑KWS a ONNX y acelera con TensorRT (INT8/FP16). El pipeline se mantiene: beamforming + log‑mel + KWS. La entrada del modelo serán espectrogramas log‑mel con shape [1, 1, n_mels, n_frames].
  • Multi‑keyword:
  • Calcula múltiples centros (uno por palabra) y haz un “top‑k” por similitud. Ajusta umbrales por palabra.
  • Gating por DOA:
  • Solo evalúa KWS si la DOA cae dentro de ±30° del ángulo de interés.
  • VAD mejorado:
  • Sustituye la VAD simple por WebRTC VAD (webrtcvad) para robustez en ruido.
  • Registro y telemetría:
  • Publica métricas (cos, dbFS, DOA) por MQTT a un broker y visualiza en Grafana/Telegraf.
  • Persistencia:
  • Guarda el centro calculado en disco (torch.save/np.save) para saltar el enrolamiento en reinicios.

Checklist de verificación

  • [ ] JetPack/L4T verificados: cat /etc/nv_tegra_release muestra R32.7.x.
  • [ ] Modo potencia ajustado: sudo nvpmodel -m 0; sudo jetson_clocks (o documentado cómo revertir).
  • [ ] Docker image descargada: nvcr.io/nvidia/l4t-pytorch:r32.7.1-pth1.10-py3.
  • [ ] ReSpeaker reconocido: lsusb (2886:0018) y arecord -l listan el dispositivo.
  • [ ] Dependencias Python instaladas dentro del contenedor (numpy, scipy, sounddevice, pyusb, librosa).
  • [ ] torch.cuda.is_available() True dentro del contenedor.
  • [ ] Enrolamiento completado con ≥ 15 ejemplos del keyword.
  • [ ] Inferencia en tiempo real imprime cos, dbFS, VAD y detecta la palabra con latencia GPU < 15 ms.
  • [ ] SNR mejora visible al cambiar el ángulo de beamforming (dBFS desciende cuando el haz no apunta al hablante).
  • [ ] tegrastats muestra CPU/GPU/EMC dentro de límites (sin throttling).
  • [ ] Al finalizar, se obtienen métricas p50/p90 de latencia y recuento de detecciones.

Con este caso práctico has implementado un pipeline completo de mic-array-beamforming-kws sobre el modelo exacto Jetson Nano 4GB Developer Kit + ReSpeaker USB Mic Array v2.0 (XMOS XVF‑3000), usando una toolchain reproducible basada en JetPack 4.6.4 y la imagen nvcr.io/nvidia/l4t-pytorch:r32.7.1-pth1.10-py3. El enfoque few‑shot aprovecha la GPU del Nano para extraer espectrogramas log‑mel e inferir con latencias muy bajas, a la vez que el mic array aporta ganancia espacial gracias al beamforming.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Cuál es el objetivo principal del pipeline mencionado en el artículo?




Pregunta 2: ¿Qué palabra clave se usa como ejemplo para la activación por voz?




Pregunta 3: ¿Cuál es la latencia de detección esperada para la KWS?




Pregunta 4: ¿Qué tasa de aciertos (TPR) se espera en una sala tranquila?




Pregunta 5: ¿Qué se busca reducir al orientar el haz respecto a 90°?




Pregunta 6: ¿Cuál es el rendimiento esperado en FPS con ventanas de 10 ms?




Pregunta 7: ¿Qué porcentaje de carga de CPU se espera en el Jetson Nano?




Pregunta 8: ¿Cuál es el público objetivo de este proyecto?




Pregunta 9: ¿Qué se utiliza para la supresión de ruido en el proceso?




Pregunta 10: ¿Qué tipo de entornos se beneficia de la activación por voz mencionada?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:
Scroll to Top