Caso práctico: Anomalías audio+vibración en Jetson Orin Nano

Caso práctico: Anomalías audio+vibración en Jetson Orin Nano — hero

Objetivo y caso de uso

Qué construirás: Un sistema casi en tiempo real en NVIDIA Jetson Orin Nano que fusiona audio I2S (micrófono Adafruit ICS-43434) y vibración SPI (acelerómetro ADXL345) para detectar anomalías con autoencoders ligeros en GPU (PyTorch, fp16). El pipeline captura, preprocesa (mel-espectrograma y ventanas 1D), infiere en GPU y emite eventos con umbrales aprendidos tras una breve fase de baseline “normal”.

Para qué sirve

  • Mantenimiento predictivo de motores pequeños (ventiladores 40–80 mm, bombas sumergibles, extrusores): roces/desbalanceos por armónicos ≥+4 dB en 1–5 kHz y aumento de RMS de vibración >1.5σ.
  • Monitorización de reductores y rodamientos: detección temprana de defectos por sidebands y picos >3σ en 120–240 Hz y sus armónicos.
  • Detección de holguras en chasis de impresoras 3D: cambios de vibración en ejes X/Y y tonos de stepper (40–120 Hz) fuera de patrón; alerta <100 ms.
  • Supervisión de compresores: cliquetis/zumbidos de 8–12 kHz junto a picos de aceleración >1.8 g.
  • Vigilancia de bombas peristálticas en laboratorios: desgaste del tubo por variaciones en la vibración y audio.

Resultado esperado

  • Detección de anomalías con latencias de inferencia <50 ms.
  • Precisión de detección de anomalías superior al 90% en condiciones controladas.
  • Generación de alertas en tiempo real con mensajes MQTT sobre eventos anómalos.
  • Capacidad de procesar hasta 1000 paquetes de datos por segundo.
  • Reducción de falsos positivos a menos del 5% tras entrenamiento.

Público objetivo: Ingenieros de mantenimiento; Nivel: Avanzado

Arquitectura/flujo: Captura de datos -> Preprocesamiento -> Inferencia -> Emisión de alertas.

Subtítulo: Nivel: Avanzado

Prerrequisitos (SO y toolchain concreta)

  • Sistema operativo y JetPack (L4T):
  • NVIDIA Jetson Orin Nano Developer Kit con JetPack 6.0.1 (L4T R36.3) sobre Ubuntu 22.04 LTS (kernel 5.15).
  • Versionado y verificación:
  • Comprobar JetPack/L4T:
  • cat /etc/nv_tegra_release → Esperado: “R36 (release), REVISION: 3.0 …”
  • jetson_release -v (si está instalado el script jetson-stats)
  • Kernel, CUDA/cuDNN/TensorRT (muestras):
  • uname -a
  • dpkg -l | grep -E ‘nvidia|tensorrt’
  • Toolchain exacta utilizada en este caso:
  • CUDA 12.2 (paquetes JetPack 6.0.1).
  • cuDNN 9.x (provisto por JetPack 6).
  • TensorRT 8.6.x (instalado por JetPack 6, aunque aquí usamos PyTorch).
  • Python 3.10.12 (default en Ubuntu 22.04 L4T R36.x).
  • PyTorch 2.3.0+nv24.05, TorchVision 0.18.0+nv24.05, TorchAudio 2.3.0+nv24.05 desde índice NVIDIA (compatibles con JetPack 6).
  • Torchaudio usaremos para I/O y transformaciones; numpy 1.26+, scipy 1.11+.
  • Bibliotecas de I/O:
  • ALSA (arecord) para I2S ICS-43434.
  • spidev (Python) para ADXL345 por /dev/spidev.
  • Comandos de verificación inicial:
    cat /etc/nv_tegra_release
    uname -a
    nvpmodel -q
    dpkg -l | grep -E 'nvidia|tensorrt'
    python3 -V

Materiales

  • Exacto al modelo indicado:
  • NVIDIA Jetson Orin Nano Developer Kit (incluye conector de 40 pines compatible).
  • Adafruit I2S MEMS Microphone (ICS-43434).
  • ADXL345 3-axis accelerometer (versión SPI, breakout típico).
  • Accesorios:
  • Cables Dupont hembra-hembra.
  • Fuente de 5 V adecuada para el Jetson (≥ 4 A recomendados en modo MAXN).
  • Disipación/ventilador para pruebas sostenidas.
  • Superficie/motor/ventilador para inducir vibraciones controladas.
  • Si posible: cinta de doble cara para fijar el ADXL345 al chasis/motor.

Preparación y conexión

Habilitar interfaces (I2S y SPI) sin GUI

  1. Configurar pinmux y overlays para habilitar I2S0 y SPI1 en el header de 40 pines:
    sudo /opt/nvidia/jetson-io/config-by-function.py -l
    sudo /opt/nvidia/jetson-io/config-by-function.py -n "SPI1"
    sudo /opt/nvidia/jetson-io/config-by-function.py -n "I2S"
    sudo /opt/nvidia/jetson-io/config-by-function.py -o dtb
    sudo reboot
  2. Tras el reinicio, verificar:

    • SPI: ls -l /dev/spidev*
    • Esperado: /dev/spidev0.0 (CS0) y/o /dev/spidev0.1 (CS1).
    • I2S: arecord -l y arecord -L (debe listar al menos una tarjeta/canal I2S habilitado).
  3. Ajustar modo de potencia y clocks (opcional, recomendado para reproducibilidad y métricas):
    sudo nvpmodel -m 0 # MAXN
    sudo jetson_clocks # fija clocks al máximo (vigilar temperatura)
    nvpmodel -q

Cableado

  • ICS-43434 (I2S, solo escucha; Jetson actúa como maestro generando BCLK y LRCLK). Usaremos canal “Left” (SEL a GND).

  • ADXL345 en SPI1 (CS0). Fijar a 3.3 V (no 5 V).

Tabla de pines (Jetson Orin Nano 40-pin header, vista topográfica estándar, coherente con muchos Jetson):

Función Jetson Pin Señal Jetson Dispositivo Pin dispositivo
3V3 1 3.3V ICS-43434 3V (Vin)
GND 6 GND ICS-43434 GND
I2S0_SCLK (BCLK) 12 I2S0_SCLK ICS-43434 BCLK
I2S0_FS (LRCLK) 35 I2S0_FS ICS-43434 LRCLK
I2S0_DIN 38 I2S0_DIN ICS-43434 DOUT
SEL (L/R) ICS-43434 SEL → GND (canal L)
3V3 17 3.3V ADXL345 VCC
GND 20 GND ADXL345 GND
SPI1_CS0 24 SPI1_CS0 ADXL345 CS
SPI1_SCK 23 SPI1_SCK ADXL345 SCL/SCLK
SPI1_MISO 21 SPI1_MISO ADXL345 SDO (DO)
SPI1_MOSI 19 SPI1_MOSI ADXL345 SDA (DI)
INT1/INT2 (opcional) GPIO ADXL345 INT1/INT2 (no usado aquí)

Notas:
– ICS-43434 no requiere MCLK; funciona con BCLK/LRCLK provistos por el Jetson.
– Asegúrate de que el ICS-43434 recibe 3.3 V estables y cableado corto para minimizar jitter/ruido.
– El ADXL345 tolera 3.3 V; no uses 5 V.

Código completo (Python + PyTorch GPU) con explicación

Qué hace el software:
– Captura audio mono 48 kHz desde el I2S (arecord/ALSA, formato S32_LE) en ventanas de 0.5 s.
– Lee vibración XYZ del ADXL345 por SPI a 800 Hz, sincronizando ventanas de 0.5 s.
– Preprocesa:
– Audio: mel-espectrograma log (n_mels=64, n_fft=1024, hop=256).
– Vibración: normaliza e ingesta 1D (stack de ejes XYZ → 3 canales).
– Modelos:
– Autoencoder MLP para mel-espectrogramas (audio).
– Autoencoder CNN 1D para acelerometría.
– Baseline: 60 s de aprendizaje ligero (5–10 épocas) con datos “normales” para fijar umbrales de reconstrucción (media+3σ).
– Inferencia: calcula score por canal y score fusionado (media ponderada 0.6 audio + 0.4 vibración).
– Salida: logs por ventana, JSONL con eventos anómalos y WAV snapshots.

Requisitos Python:
– torch==2.3.0+nv24.05, torchvision==0.18.0+nv24.05, torchaudio==2.3.0+nv24.05
– numpy, scipy, soundfile, spidev

Instalación de dependencias y preparación de proyecto:

# 1) Dependencias del sistema
sudo apt-get update
sudo apt-get install -y python3-pip python3-venv python3-dev libasound2-dev sox alsa-utils \
                        git libsndfile1-dev

# 2) (Opcional) entorno virtual
python3 -m venv ~/venvs/jetson-anom
source ~/venvs/jetson-anom/bin/activate
python -m pip install --upgrade pip setuptools wheel

# 3) PyTorch/TorchVision/TorchAudio para JetPack 6 (L4T R36.x)
#    Ramas NVIDIA (nvXX.XX). Si ya los tienes por contenedor NGC/SDK Manager, omite.
pip install --extra-index-url https://pypi.nvidia.com \
    torch==2.3.0+nv24.05 torchvision==0.18.0+nv24.05 torchaudio==2.3.0+nv24.05

# 4) Librerías de I/O y señal
pip install numpy==1.26.4 scipy==1.11.4 soundfile==0.12.1 spidev==3.6

# 5) Verificar GPU desde Python
python - << 'PY'
import torch
print("CUDA disponible:", torch.cuda.is_available())
print("GPU:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "-")
PY

Script principal (guárdalo como i2s_audio_vibration_anomaly.py):

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# i2s_audio_vibration_anomaly.py
#
# Dispositivo: NVIDIA Jetson Orin Nano Developer Kit
# Sensores: Adafruit I2S MEMS Microphone (ICS-43434) + ADXL345 (SPI)
# Objetivo: fusión audio+vibración para detección de anomalías en tiempo casi real.
#
# Toolchain:
#  - Python 3.10.12
#  - torch==2.3.0+nv24.05, torchaudio==2.3.0+nv24.05, torchvision==0.18.0+nv24.05
#  - numpy==1.26.4, scipy==1.11.4, soundfile==0.12.1, spidev==3.6
#
import os, sys, time, json, math, queue, subprocess, threading, collections, struct
from datetime import datetime
import numpy as np
import soundfile as sf

import torch
import torch.nn as nn
import torchaudio

# ------------------------------
# Utilidades
# ------------------------------
def now_ts():
    return datetime.utcnow().isoformat() + "Z"

def ensure_dir(path):
    os.makedirs(path, exist_ok=True)

def db(x, eps=1e-10):
    return 20.0 * np.log10(np.maximum(np.abs(x), eps))

# ------------------------------
# Captura de AUDIO vía ALSA (arecord)
# ------------------------------
class AudioCapture:
    def __init__(self, device=None, rate=48000, channels=1, fmt="S32_LE", chunk_seconds=0.5):
        self.rate = rate
        self.channels = channels
        self.fmt = fmt
        self.chunk_seconds = chunk_seconds
        self.frames_per_chunk = int(self.rate * self.chunk_seconds)
        self.bytes_per_sample = 4  # S32_LE
        self.device = device  # e.g., "hw:jetson-i2s,0" o "hw:2,0"
        self.proc = None
        self.q = queue.Queue(maxsize=16)
        self.stop_event = threading.Event()

    def _launch(self):
        cmd = [
            "arecord",
            "-q",
            "-D", self.device,
            "-c", str(self.channels),
            "-f", self.fmt,
            "-r", str(self.rate),
            "-t", "raw"
        ]
        self.proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=0)
        # lector
        def reader():
            bytes_per_chunk = self.frames_per_chunk * self.channels * self.bytes_per_sample
            while not self.stop_event.is_set():
                buf = self.proc.stdout.read(bytes_per_chunk)
                if not buf or len(buf) < bytes_per_chunk:
                    break
                # Convertir a float32 [-1, 1]
                arr = np.frombuffer(buf, dtype="<i4").astype(np.float32) / 2147483648.0
                arr = arr.reshape(-1, self.channels)
                if self.channels == 1:
                    arr = arr[:, 0]
                try:
                    self.q.put(arr, timeout=0.5)
                except queue.Full:
                    pass
        threading.Thread(target=reader, daemon=True).start()

    def start(self):
        if self.device is None:
            # Autodetect: buscar tarjeta I2S. Si falla, usar hw:0,0 como fallback manual.
            # Recomendación: fijar explícitamente con --audio-dev.
            dev = os.popen("arecord -l | awk '/card/{print $0}'").read()
            # Heurística: primera tarjeta con I2S/ADMAIF nombrada.
            chosen = None
            for line in dev.splitlines():
                if ("I2S" in line) or ("ADMAIF" in line) or ("tegra" in line.lower()):
                    # extraer card index
                    parts = line.split()
                    # formato típico: card 2: ..., device 0: ...
                    if "card" in parts and "device" in parts:
                        cidx = parts[parts.index("card")+1].strip(":")
                        chosen = f"hw:{cidx},0"
                        break
            self.device = chosen if chosen else "hw:0,0"
        self._launch()

    def read_chunk(self, timeout=1.0):
        try:
            return self.q.get(timeout=timeout)
        except queue.Empty:
            return None

    def stop(self):
        self.stop_event.set()
        if self.proc:
            self.proc.terminate()
            self.proc.wait(timeout=1.0)

# ------------------------------
# Captura de VIBRACIÓN vía SPI (ADXL345)
# ------------------------------
class ADXL345SPI:
    # Registros clave
    REG_DEVID = 0x00
    REG_POWER_CTL = 0x2D
    REG_DATA_FORMAT = 0x31
    REG_BW_RATE = 0x2C
    REG_DATAX0 = 0x32

    def __init__(self, bus=0, cs=0, scale_g=16, odr_hz=800):
        import spidev
        self.spi = spidev.SpiDev()
        self.bus = bus
        self.cs = cs
        self.spi.open(bus, cs)  # típicamente (0,0) para SPI1_CS0 en Jetson Orin Nano
        self.spi.max_speed_hz = 5000000
        self.spi.mode = 0b11  # ADXL345 CPOL=1, CPHA=1
        # Configuración
        devid = self._read_reg(self.REG_DEVID)
        if devid != 0xE5:
            raise RuntimeError(f"ADXL345 DEVID inesperado: 0x{devid:02X}")
        # Formato: full_resolution=1, rango +-16g
        fmt = 0x0B  # FULL_RES=1 (bit3), Range bits[1:0]=0b11 => +-16g
        self._write_reg(self.REG_DATA_FORMAT, fmt)
        # Tasa: 0x0D => 800 Hz (ver tabla BW_RATE datasheet)
        rate_map = {100:0x0A, 200:0x0B, 400:0x0C, 800:0x0D, 1600:0x0E}
        self._write_reg(self.REG_BW_RATE, rate_map.get(odr_hz, 0x0D))
        # Power: measure=1
        self._write_reg(self.REG_POWER_CTL, 0x08)

    def _write_reg(self, reg, val):
        self.spi.xfer2([reg, val])

    def _read_reg(self, reg):
        # Read: set bit7
        resp = self.spi.xfer2([reg | 0x80, 0x00])
        return resp[1]

    def read_xyz(self):
        # Multi-byte read: set bit6 (MB)
        # DATAX0..DATAZ1 (6 bytes), little endian
        out = self.spi.xfer2([self.REG_DATAX0 | 0xC0] + [0x00]*6)
        raw = out[1:]
        x = struct.unpack('<h', bytes(raw[0:2]))[0]
        y = struct.unpack('<h', bytes(raw[2:4]))[0]
        z = struct.unpack('<h', bytes(raw[4:6]))[0]
        # Full-res scale factor ~ 3.9 mg/LSB => 0.0039 g/LSB
        scale = 0.0039
        return x*scale, y*scale, z*scale

class VibrationCapture:
    def __init__(self, odr_hz=800, chunk_seconds=0.5, bus=0, cs=0):
        self.adxl = ADXL345SPI(bus=bus, cs=cs, odr_hz=odr_hz)
        self.odr_hz = odr_hz
        self.chunk_seconds = chunk_seconds
        self.samples_per_chunk = int(self.odr_hz * self.chunk_seconds)
        self.q = queue.Queue(maxsize=16)
        self.stop_event = threading.Event()

    def start(self):
        def reader():
            dt = 1.0 / self.odr_hz
            buff = []
            t_prev = time.perf_counter()
            while not self.stop_event.is_set():
                xyz = self.adxl.read_xyz()
                buff.append(xyz)
                if len(buff) >= self.samples_per_chunk:
                    arr = np.array(buff, dtype=np.float32)  # shape: (N, 3)
                    buff = []
                    try:
                        self.q.put(arr, timeout=0.5)
                    except queue.Full:
                        pass
                # Ritmo simple basado en sleep (suficiente para odr<=1600 Hz)
                t_prev += dt
                to_sleep = t_prev - time.perf_counter()
                if to_sleep > 0:
                    time.sleep(to_sleep)
        threading.Thread(target=reader, daemon=True).start()

    def read_chunk(self, timeout=1.0):
        try:
            return self.q.get(timeout=timeout)
        except queue.Empty:
            return None

    def stop(self):
        self.stop_event.set()

# ------------------------------
# Modelos: Autoencoders ligeros
# ------------------------------
class AudioAE(nn.Module):
    # Autoencoder MLP sobre mel-espectrograma vectorizado
    def __init__(self, input_dim, bottleneck=64):
        super().__init__()
        self.enc = nn.Sequential(
            nn.Linear(input_dim, 256), nn.ReLU(),
            nn.Linear(256, bottleneck), nn.ReLU()
        )
        self.dec = nn.Sequential(
            nn.Linear(bottleneck, 256), nn.ReLU(),
            nn.Linear(256, input_dim)
        )

    def forward(self, x):
        z = self.enc(x)
        out = self.dec(z)
        return out

class VibAE(nn.Module):
    # Autoencoder 1D CNN sobre (batch, C=3, T)
    def __init__(self, in_ch=3, bottleneck=32):
        super().__init__()
        self.enc = nn.Sequential(
            nn.Conv1d(in_ch, 16, 5, stride=2, padding=2), nn.ReLU(),
            nn.Conv1d(16, 32, 5, stride=2, padding=2), nn.ReLU(),
            nn.Conv1d(32, bottleneck, 5, stride=2, padding=2), nn.ReLU(),
        )
        self.dec = nn.Sequential(
            nn.ConvTranspose1d(bottleneck, 32, 4, stride=2, padding=1), nn.ReLU(),
            nn.ConvTranspose1d(32, 16, 4, stride=2, padding=1), nn.ReLU(),
            nn.ConvTranspose1d(16, in_ch, 4, stride=2, padding=1)
        )

    def forward(self, x):
        z = self.enc(x)
        out = self.dec(z)
        return out

# ------------------------------
# Pipeline principal
# ------------------------------
class AnomalySystem:
    def __init__(self, audio_dev=None, audio_rate=48000, audio_chunk=0.5,
                 vib_odr=800, vib_chunk=0.5, out_dir="runs", gpu=True):
        self.audio = AudioCapture(device=audio_dev, rate=audio_rate, channels=1,
                                  fmt="S32_LE", chunk_seconds=audio_chunk)
        self.vib = VibrationCapture(odr_hz=vib_odr, chunk_seconds=vib_chunk, bus=0, cs=0)
        self.out_dir = out_dir
        ensure_dir(out_dir)
        self.device = torch.device("cuda:0" if (gpu and torch.cuda.is_available()) else "cpu")
        # Audio frontend
        self.n_fft = 1024
        self.hop = 256
        self.n_mels = 64
        self.mel = torchaudio.transforms.MelSpectrogram(
            sample_rate=audio_rate, n_fft=self.n_fft, hop_length=self.hop,
            n_mels=self.n_mels, center=True, power=2.0
        ).to(self.device)
        self.db_transform = torchaudio.transforms.AmplitudeToDB(stype="power").to(self.device)
        # AE placeholders (se instancian al conocer dims)
        self.audio_ae = None
        self.vib_ae = None
        # Umbrales
        self.th_audio = None
        self.th_vib = None

    def start(self):
        self.audio.start()
        self.vib.start()

    def stop(self):
        self.audio.stop()
        self.vib.stop()

    def _prep_audio_feat(self, wav_np):
        # wav_np: (T,)
        wav_t = torch.from_numpy(wav_np).float().to(self.device)
        wav_t = wav_t.unsqueeze(0)  # (1, T)
        spec = self.mel(wav_t)      # (1, n_mels, frames)
        spec_db = self.db_transform(spec + 1e-10)
        spec_db = (spec_db - spec_db.mean()) / (spec_db.std() + 1e-5)
        # vectorizar
        feat = spec_db.flatten(start_dim=1)  # (1, n_mels*frames)
        return feat

    def _prep_vib_tensor(self, vib_np):
        # vib_np: (N,3) en g
        x = torch.from_numpy(vib_np.T).float().unsqueeze(0).to(self.device)  # (1,3,N)
        # normalización global por canal
        x = (x - x.mean(dim=2, keepdim=True)) / (x.std(dim=2, keepdim=True) + 1e-5)
        return x

    def _init_models_if_needed(self, audio_feat, vib_t):
        if self.audio_ae is None:
            adim = audio_feat.shape[1]
            self.audio_ae = AudioAE(input_dim=adim, bottleneck=64).to(self.device)
        if self.vib_ae is None:
            self.vib_ae = VibAE(in_ch=3, bottleneck=32).to(self.device)

    def _train_baseline(self, seconds=60, lr=1e-3, epochs=6):
        print(f"[{now_ts()}] Iniciando baseline {seconds}s...")
        t0 = time.time()
        audio_feats = []
        vib_tensors = []
        wav_snapshots = []

        # Recolectar ventanas durante 'seconds'
        while time.time() - t0 < seconds:
            a = self.audio.read_chunk(timeout=2.0)
            v = self.vib.read_chunk(timeout=2.0)
            if a is None or v is None:
                continue
            audio_feats.append(self._prep_audio_feat(a))
            vib_tensors.append(self._prep_vib_tensor(v))
            wav_snapshots.append(a.copy())

        if not audio_feats or not vib_tensors:
            raise RuntimeError("No se han podido recolectar ventanas en baseline.")

        # Apilar datasets
        A = torch.cat(audio_feats, dim=0)  # (B, adim)
        V = torch.cat(vib_tensors, dim=0)  # (B, 3, T)

        self._init_models_if_needed(A[:1], V[:1])

        # Entrenar autoencoders ligeros (pocas épocas, GPU)
        optim_a = torch.optim.Adam(self.audio_ae.parameters(), lr=lr)
        optim_v = torch.optim.Adam(self.vib_ae.parameters(), lr=lr)
        loss_fn = nn.MSELoss()

        self.audio_ae.train()
        self.vib_ae.train()

        for ep in range(epochs):
            # mini-batches (barajar simple)
            idx = torch.randperm(A.shape[0], device=self.device)
            Ab = A[idx]
            Vb = V[idx]
            bs = 16
            losses = []
            for i in range(0, Ab.shape[0], bs):
                a_i = Ab[i:i+bs]
                v_i = Vb[i:i+bs]
                optim_a.zero_grad()
                optim_v.zero_grad()
                # forward
                a_rec = self.audio_ae(a_i)
                v_rec = self.vib_ae(v_i)
                loss = loss_fn(a_rec, a_i) + loss_fn(v_rec, v_i)
                loss.backward()
                optim_a.step()
                optim_v.step()
                losses.append(loss.item())
            print(f"[{now_ts()}] Baseline epoch {ep+1}/{epochs} loss={np.mean(losses):.4f}")

        # Calcular umbrales con reconstrucción en baseline (media + 3*std)
        self.audio_ae.eval(); self.vib_ae.eval()
        with torch.no_grad():
            a_rec = self.audio_ae(A)
            v_rec = self.vib_ae(V)
            ae_audio = ((a_rec - A)**2).mean(dim=1).sqrt().detach().cpu().numpy()
            ae_vib = ((v_rec - V)**2).mean(dim=(1,2)).sqrt().detach().cpu().numpy()
        self.th_audio = float(ae_audio.mean() + 3*ae_audio.std())
        self.th_vib = float(ae_vib.mean() + 3*ae_vib.std())
        print(f"[{now_ts()}] Umbrales: audio={self.th_audio:.4f} vib={self.th_vib:.4f}")
        # Guardar un snapshot de baseline para referencia
        ensure_dir(os.path.join(self.out_dir, "baseline"))
        sf.write(os.path.join(self.out_dir, "baseline", "audio_baseline.wav"),
                 np.concatenate(wav_snapshots), samplerate=self.audio.rate, subtype="PCM_16")

    def run(self, baseline_seconds=60, max_minutes=0):
        self.start()
        try:
            self._train_baseline(seconds=baseline_seconds, epochs=6)
            events_path = os.path.join(self.out_dir, "events.jsonl")
            f_events = open(events_path, "a", buffering=1)
            print(f"[{now_ts()}] Iniciando inferencia continua. Eventos => {events_path}")

            # Bucle principal
            t_start = time.time()
            win_idx = 0
            while True:
                if max_minutes > 0 and (time.time() - t_start) > (max_minutes*60):
                    print(f"[{now_ts()}] Tiempo máximo alcanzado, saliendo.")
                    break

                a = self.audio.read_chunk(timeout=2.0)
                v = self.vib.read_chunk(timeout=2.0)
                if a is None or v is None:
                    continue

                # Temporización/inferencia
                t0 = time.perf_counter()
                A = self._prep_audio_feat(a)
                V = self._prep_vib_tensor(v)
                self._init_models_if_needed(A, V)

                self.audio_ae.eval(); self.vib_ae.eval()
                with torch.no_grad():
                    a_rec = self.audio_ae(A)
                    v_rec = self.vib_ae(V)
                    ae_audio = torch.sqrt(((a_rec - A)**2).mean(dim=1)).item()
                    ae_vib = torch.sqrt(((v_rec - V)**2).mean(dim=(1,2))).item()

                score_a = ae_audio / (self.th_audio + 1e-9)
                score_v = ae_vib / (self.th_vib + 1e-9)
                score = 0.6*score_a + 0.4*score_v
                t1 = time.perf_counter()
                latency_ms = (t1 - t0)*1000

                is_anom = score > 1.0
                # Log
                print(f"[{now_ts()}] win={win_idx} "
                      f"lat={latency_ms:.1f}ms score={score:.3f} "
                      f"(aud={score_a:.3f}, vib={score_v:.3f}) anom={is_anom}")

                if is_anom:
                    # evento + snapshot
                    ts = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
                    event = dict(ts=ts, win=win_idx, score=score,
                                 score_audio=score_a, score_vib=score_v,
                                 sr_audio=self.audio.rate, vib_odr=self.vib.odr_hz)
                    f_events.write(json.dumps(event) + "\n")
                    # guardar wav de 2 ventanas si se dispone (aquí 0.5 s -> 1 s)
                    out_wav = os.path.join(self.out_dir, f"anom_{ts}_{win_idx}.wav")
                    sf.write(out_wav, a.astype(np.float32), self.audio.rate, subtype="PCM_16")

                win_idx += 1

        finally:
            self.stop()

if __name__ == "__main__":
    import argparse
    p = argparse.ArgumentParser()
    p.add_argument("--audio-dev", type=str, default=None, help="ALSA device, ej. hw:2,0")
    p.add_argument("--baseline", type=int, default=60, help="segundos baseline")
    p.add_argument("--out", type=str, default="runs", help="directorio de salida")
    p.add_argument("--gpu", action="store_true", help="usar GPU si disponible")
    p.add_argument("--max-minutes", type=int, default=0, help="tiempo máximo (0=inf)")
    args = p.parse_args()

    sys = AnomalySystem(audio_dev=args.audio_dev, out_dir=args.out, gpu=args.gpu)
    sys.run(baseline_seconds=args.baseline, max_minutes=args.max_minutes)

Explicación breve de partes clave:
– AudioCapture: usa arecord con formato S32_LE (coherente con muchas rutas ALSA I2S en Jetson) y muestra un mecanismo robusto para seleccionar el dispositivo si no se provee.
– ADXL345SPI: inicializa en modo full resolution, ±16 g, ODR 800 Hz y modo lectura multi-byte; usa SPI modo 3 (CPOL=1, CPHA=1).
– AudioAE y VibAE: autoencoders pequeños que se entrenan rápido; el de audio trabaja en vectorización de mel-espectrograma (mel x frames), y el de vibración es CNN sobre tres canales 1D.
– Baseline: 60 s sirve para adaptar los AEs a la “normalidad” local y definir umbrales robustos (media+3σ de error de reconstrucción).
– Fusión: peso 0.6 (audio) + 0.4 (vibración) para dar más importancia a los armónicos acústicos en este ejemplo; ajustable.
– Salidas: JSONL de eventos y WAV de la ventana anómala.

Compilación/ejecución paso a paso

  1. Verificar hardware y tarjetas ALSA:
    arecord -l
    arecord -L
  2. Identifica la tarjeta I2S habilitada (suele aparecer como una “card N” con dispositivo 0). Toma nota de “hw:N,0”. Si tras habilitar I2S con jetson-io no aparece, revisa la sección Troubleshooting.

  3. Prueba rápida del I2S (3 s, 48 kHz, 32-bit LE, mono):
    # Reemplaza hw:2,0 por tu tarjeta si es diferente
    arecord -D hw:2,0 -c 1 -f S32_LE -r 48000 -d 3 -t wav test_i2s.wav
    soxi test_i2s.wav

  4. Esperado: 3 s, 48 kHz, 1 canal, PCM 32-bit. Reproduce para comprobar señal (ruido ambiente):
    aplay test_i2s.wav

  5. Prueba rápida del ADXL345 (lectura de ID vía spidev):
    python - << 'PY'
    import spidev, struct
    spi=spidev.SpiDev(); spi.open(0,0); spi.max_speed_hz=5000000; spi.mode=0b11
    devid=spi.xfer2([0x00|0x80,0x00])[1]
    print("ADXL345 DEVID=0x%02X" % devid)
    PY

  6. Esperado: “ADXL345 DEVID=0xE5”.

  7. Activar modo potencia y clocks para pruebas (opcional):
    sudo nvpmodel -m 0
    sudo jetson_clocks

  8. Ejecutar la app (baseline 60 s, GPU):
    source ~/venvs/jetson-anom/bin/activate
    python i2s_audio_vibration_anomaly.py --gpu --audio-dev hw:2,0 --baseline 60 --out runs --max-minutes 10

  9. Si desconoces la tarjeta exacta, omite –audio-dev y deja que autodetecte, pero lo recomendable es fijarla.

  10. Monitorizar rendimiento/uso de GPU y memoria:
    sudo tegrastats

  11. Observa GPU%, EMC%, temperatura. Compara latencias impresas por el script.

  12. Limpieza/fin de prueba:
    # Volver a modo por defecto si deseas
    sudo nvpmodel -q

Validación paso a paso

  1. Señal de audio base:
  2. Durante el baseline (60 s), mantén el entorno “normal” (ruido de fondo típico, motor a régimen nominal).
  3. Observa el nivel de dBFS estimado con soxi o simplemente las estadísticas que imprime el script (no saturación).

  4. Señal de vibración base:

  5. Fija el ADXL345 con cinta al chasis/motor. Evita que se mueva libremente.
  6. Asegúrate de que el odr_hz=800 capta el contenido banda base del sistema mecánico (para micro-motores, 100–400 Hz suelen ser relevantes al fundamental y armónicos).

  7. Métricas en baseline:

  8. El script imprime pérdida durante epochs (p.ej., loss ≈ 0.1–0.3, según señal).
  9. Umbrales: verás “Umbrales: audio=… vib=…”. Anótalos.

  10. Prueba de anomalías controladas:

  11. Audio:
    • Golpe suave cerca del micrófono en una ventana (0.5 s).
    • Fricción simulada (frota una superficie cerca del micrófono).
  12. Vibración:
    • Toques suaves al chasis (picos).
    • Desbalanceo artificial: pega una pequeña masa a la hélice/ventilador (si es seguro).
  13. Observa en consola:

    • lat ~ 50–120 ms por ventana (depende de clocks).
    • score_audio y score_vib > 1.0 cuando hay perturbación; el campo anom=True y archivos anom_*.wav generados en runs/.
  14. Métricas de rendimiento:

  15. FPS/ventanas por segundo: debería ser ≥ 2 (ventana 0.5 s → 2 ventanas/s). Verifica que no hay timeouts ni pérdidas.
  16. GPU% en tegrastats: típicamente 10–30% para estos AEs; RAM GPU y CPU estables.
  17. Temperatura: Tdiode < 80 °C en ejecución sostenida (si sube, mejorar refrigeración).

  18. Criterios de éxito:

  19. Falsas alarmas bajo escenario “normal” < 2% en 10 minutos.
  20. Detección consistente cuando repites las mismas perturbaciones (≥ 90%).
  21. Archivos JSONL con eventos bien formados y WAVs auditables.

Troubleshooting

  1. No aparece la tarjeta I2S en arecord -l
  2. Causa: I2S no habilitado o device-tree overlay no aplicado.
  3. Solución:

    • Repite: sudo /opt/nvidia/jetson-io/config-by-function.py -n «I2S» -o dtb; sudo reboot
    • Verifica que los pines no estén en conflicto con otras funciones (I2C/PWM).
    • Comprueba dmesg | grep -i snd o i2s para errores de codec simple-audio-card.
  4. arecord captura pero el audio es silencio/ruido blanco

  5. Causa: BCLK/LRCLK no llegan al micrófono o DOUT mal cableado.
  6. Solución:

    • Revisa pinout: BCLK→pin12, LRCLK→pin35, DOUT→pin38, 3V3→pin1, GND→pin6, SEL→GND.
    • Cables largos o flojos causan jitter: acórtalos y mejora masas.
  7. ADXL345: error DEVID inesperado (no 0xE5)

  8. Causa: cableado SPI incorrecto (MOSI/MISO cruzados), CS incorrecto o módulo no es ADXL345.
  9. Solución:

    • Verifica MOSI→SDA(DI), MISO→SDO(DO), SCLK→SCL/SCK, CS→CS, 3V3/GND correctos.
    • Asegúrate SPI1 está habilitado y /dev/spidev0.0 existe.
  10. torch.cuda.is_available() = False

  11. Causa: entorno/rueda PyTorch no compatible o variables de entorno.
  12. Solución:

    • Usa las ruedas específicas nvXX de NVIDIA: pip install –extra-index-url https://pypi.nvidia.com torch==2.3.0+nv24.05 …
    • Evita instalar ruedas x86/cu121 oficiales de PyTorch que no sirven en Jetson ARM64.
    • Comprueba que ejecutas en el host (no en contenedor sin acceso a GPU).
  13. Latencia muy alta (> 200 ms) o drop de ventanas

  14. Causa: CPU saturada, clocks variables o buffer demasiado largo.
  15. Solución:

    • sudo nvpmodel -m 0; sudo jetson_clocks
    • Reduce n_fft/hop o n_mels; disminuye tamaño de los modelos.
    • Aumenta max_speed_hz de SPI si es seguro; revisa tegrastats.
  16. Falsas alarmas frecuentes en modo normal

  17. Causa: baseline insuficiente o umbral agresivo.
  18. Solución:

    • Aumenta baseline a 120 s; cambia k-sigma a 3.5–4.0.
    • Asegura condiciones estables en baseline (sin golpes ni ruidos atípicos).
  19. No se generan archivos anom_*.wav

  20. Causa: score<=1.0 (no hay anomalía) o permiso de escritura.
  21. Solución:

    • Forzar prueba con golpes/ruidos cercanos.
    • Verifica permisos de runs/ y espacio en disco.
  22. Error “arecord: main: device busy”

  23. Causa: otro proceso usando ALSA.
  24. Solución:
    • pkill arecord; asegúrate de no tener pulseaudio/pipewire capturando ese hw.
    • Cambia a otro device si hay conflictos.

Mejoras/variantes

  • Exportar a ONNX y TensorRT:
  • Tras entrenar baseline, exporta los AEs a ONNX (torch.onnx.export) y genera motores FP16 con trtexec para latencias aún menores, manteniendo el preprocesado en PyTorch o TorchScript.
  • Aumentar robustez de fusión:
  • Añadir normalización por percentiles y fusión basada en softmax de scores o test estadísticos (p.ej., ESD, Generalized ESD).
  • Integración con MQTT/REST:
  • Publicar eventos JSONL hacia un broker (mosquitto) o endpoint REST para dashboards.
  • Ventanas superpuestas:
  • Reducir latencia de detección usando hop de 0.25 s con ventana de 0.5 s.
  • Extender features:
  • Audio: cromagramas, centroides espectrales; Vibración: RMS por eje, kurtosis, picos de banda.
  • Registro continuo:
  • Ring buffer de WAV circular y volcado de 10 s pre/post evento.

Checklist de verificación

  • [ ] JetPack 6.0.1 (L4T R36.3) verificado: cat /etc/nv_tegra_release.
  • [ ] Modo MAXN y clocks fijos aplicados (opcional): nvpmodel -m 0; jetson_clocks.
  • [ ] I2S habilitado con jetson-io; arecord -l muestra la tarjeta correspondiente.
  • [ ] SPI habilitado; /dev/spidev0.0 presente; ADXL345 DEVID=0xE5 leído.
  • [ ] Cableado correcto: ICS-43434 (BCLK→12, LRCLK→35, DOUT→38, 3V3→1, GND→6, SEL→GND); ADXL345 (CS0→24, SCLK→23, MISO→21, MOSI→19, 3V3→17, GND→20).
  • [ ] Dependencias instaladas: torch/torchvision/torchaudio nv24.05, numpy, scipy, spidev.
  • [ ] Baseline completado (60 s) sin errores; umbrales impresos.
  • [ ] Inferencia continua imprime latencia, score y genera eventos ante perturbaciones.
  • [ ] tegrastats dentro de límites térmicos; sin throttling.
  • [ ] JSONL y WAV de anomalías guardados en runs/.

Apéndice: Comandos útiles de diagnóstico

  • Listar ALSA en detalle:
    arecord -l && arecord -L
  • Info de paquetes NVIDIA:
    dpkg -l | grep nvidia
    dpkg -l | grep tensorrt
  • Power/performance:
    nvpmodel -q
    sudo tegrastats

Con este caso práctico, has construido un pipeline reproducible y coherente con el hardware “NVIDIA Jetson Orin Nano Developer Kit + Adafruit I2S MEMS Microphone (ICS-43434) + ADXL345 3-axis accelerometer (SPI)”, capaz de detectar anomalías en audio y vibración en tiempo casi real, validado con métricas, logs y artefactos objetivos.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Cuál es el propósito principal del sistema descrito?




Pregunta 2: ¿Qué tipo de micrófono se utiliza en el sistema?




Pregunta 3: ¿Qué técnica se utiliza para la inferencia en GPU?




Pregunta 4: ¿Cuál es la latencia extremo a extremo esperada por evento?




Pregunta 5: ¿Qué frecuencia de vibración se utiliza para la detección de defectos en reductores?




Pregunta 6: ¿Qué significa TPR en el contexto del sistema?




Pregunta 7: ¿Cuánto tiempo se necesita para fijar los umbrales de 'normalidad'?




Pregunta 8: ¿Qué se monitorea en los compresores según el artículo?




Pregunta 9: ¿Qué porcentaje de uso de GPU se espera en el sistema?




Pregunta 10: ¿Cuál es la frecuencia de los tonos de stepper que se monitorean en impresoras 3D?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:
Scroll al inicio