Objetivo y caso de uso
Qué construirás: Un sistema casi en tiempo real en NVIDIA Jetson Orin Nano que fusiona audio I2S (micrófono Adafruit ICS-43434) y vibración SPI (acelerómetro ADXL345) para detectar anomalías con autoencoders ligeros en GPU (PyTorch, fp16). El pipeline captura, preprocesa (mel-espectrograma y ventanas 1D), infiere en GPU y emite eventos con umbrales aprendidos tras una breve fase de baseline “normal”.
Para qué sirve
- Mantenimiento predictivo de motores pequeños (ventiladores 40–80 mm, bombas sumergibles, extrusores): roces/desbalanceos por armónicos ≥+4 dB en 1–5 kHz y aumento de RMS de vibración >1.5σ.
- Monitorización de reductores y rodamientos: detección temprana de defectos por sidebands y picos >3σ en 120–240 Hz y sus armónicos.
- Detección de holguras en chasis de impresoras 3D: cambios de vibración en ejes X/Y y tonos de stepper (40–120 Hz) fuera de patrón; alerta <100 ms.
- Supervisión de compresores: cliquetis/zumbidos de 8–12 kHz junto a picos de aceleración >1.8 g.
- Vigilancia de bombas peristálticas en laboratorios: desgaste del tubo por variaciones en la vibración y audio.
Resultado esperado
- Detección de anomalías con latencias de inferencia <50 ms.
- Precisión de detección de anomalías superior al 90% en condiciones controladas.
- Generación de alertas en tiempo real con mensajes MQTT sobre eventos anómalos.
- Capacidad de procesar hasta 1000 paquetes de datos por segundo.
- Reducción de falsos positivos a menos del 5% tras entrenamiento.
Público objetivo: Ingenieros de mantenimiento; Nivel: Avanzado
Arquitectura/flujo: Captura de datos -> Preprocesamiento -> Inferencia -> Emisión de alertas.
Subtítulo: Nivel: Avanzado
Prerrequisitos (SO y toolchain concreta)
- Sistema operativo y JetPack (L4T):
- NVIDIA Jetson Orin Nano Developer Kit con JetPack 6.0.1 (L4T R36.3) sobre Ubuntu 22.04 LTS (kernel 5.15).
- Versionado y verificación:
- Comprobar JetPack/L4T:
- cat /etc/nv_tegra_release → Esperado: “R36 (release), REVISION: 3.0 …”
- jetson_release -v (si está instalado el script jetson-stats)
- Kernel, CUDA/cuDNN/TensorRT (muestras):
- uname -a
- dpkg -l | grep -E ‘nvidia|tensorrt’
- Toolchain exacta utilizada en este caso:
- CUDA 12.2 (paquetes JetPack 6.0.1).
- cuDNN 9.x (provisto por JetPack 6).
- TensorRT 8.6.x (instalado por JetPack 6, aunque aquí usamos PyTorch).
- Python 3.10.12 (default en Ubuntu 22.04 L4T R36.x).
- PyTorch 2.3.0+nv24.05, TorchVision 0.18.0+nv24.05, TorchAudio 2.3.0+nv24.05 desde índice NVIDIA (compatibles con JetPack 6).
- Torchaudio usaremos para I/O y transformaciones; numpy 1.26+, scipy 1.11+.
- Bibliotecas de I/O:
- ALSA (arecord) para I2S ICS-43434.
- spidev (Python) para ADXL345 por /dev/spidev.
- Comandos de verificación inicial:
cat /etc/nv_tegra_release
uname -a
nvpmodel -q
dpkg -l | grep -E 'nvidia|tensorrt'
python3 -V
Materiales
- Exacto al modelo indicado:
- NVIDIA Jetson Orin Nano Developer Kit (incluye conector de 40 pines compatible).
- Adafruit I2S MEMS Microphone (ICS-43434).
- ADXL345 3-axis accelerometer (versión SPI, breakout típico).
- Accesorios:
- Cables Dupont hembra-hembra.
- Fuente de 5 V adecuada para el Jetson (≥ 4 A recomendados en modo MAXN).
- Disipación/ventilador para pruebas sostenidas.
- Superficie/motor/ventilador para inducir vibraciones controladas.
- Si posible: cinta de doble cara para fijar el ADXL345 al chasis/motor.
Preparación y conexión
Habilitar interfaces (I2S y SPI) sin GUI
- Configurar pinmux y overlays para habilitar I2S0 y SPI1 en el header de 40 pines:
sudo /opt/nvidia/jetson-io/config-by-function.py -l
sudo /opt/nvidia/jetson-io/config-by-function.py -n "SPI1"
sudo /opt/nvidia/jetson-io/config-by-function.py -n "I2S"
sudo /opt/nvidia/jetson-io/config-by-function.py -o dtb
sudo reboot -
Tras el reinicio, verificar:
- SPI: ls -l /dev/spidev*
- Esperado: /dev/spidev0.0 (CS0) y/o /dev/spidev0.1 (CS1).
- I2S: arecord -l y arecord -L (debe listar al menos una tarjeta/canal I2S habilitado).
-
Ajustar modo de potencia y clocks (opcional, recomendado para reproducibilidad y métricas):
sudo nvpmodel -m 0 # MAXN
sudo jetson_clocks # fija clocks al máximo (vigilar temperatura)
nvpmodel -q
Cableado
-
ICS-43434 (I2S, solo escucha; Jetson actúa como maestro generando BCLK y LRCLK). Usaremos canal “Left” (SEL a GND).
-
ADXL345 en SPI1 (CS0). Fijar a 3.3 V (no 5 V).
Tabla de pines (Jetson Orin Nano 40-pin header, vista topográfica estándar, coherente con muchos Jetson):
| Función | Jetson Pin | Señal Jetson | Dispositivo | Pin dispositivo |
|---|---|---|---|---|
| 3V3 | 1 | 3.3V | ICS-43434 | 3V (Vin) |
| GND | 6 | GND | ICS-43434 | GND |
| I2S0_SCLK (BCLK) | 12 | I2S0_SCLK | ICS-43434 | BCLK |
| I2S0_FS (LRCLK) | 35 | I2S0_FS | ICS-43434 | LRCLK |
| I2S0_DIN | 38 | I2S0_DIN | ICS-43434 | DOUT |
| SEL (L/R) | — | — | ICS-43434 | SEL → GND (canal L) |
| 3V3 | 17 | 3.3V | ADXL345 | VCC |
| GND | 20 | GND | ADXL345 | GND |
| SPI1_CS0 | 24 | SPI1_CS0 | ADXL345 | CS |
| SPI1_SCK | 23 | SPI1_SCK | ADXL345 | SCL/SCLK |
| SPI1_MISO | 21 | SPI1_MISO | ADXL345 | SDO (DO) |
| SPI1_MOSI | 19 | SPI1_MOSI | ADXL345 | SDA (DI) |
| INT1/INT2 | (opcional) | GPIO | ADXL345 | INT1/INT2 (no usado aquí) |
Notas:
– ICS-43434 no requiere MCLK; funciona con BCLK/LRCLK provistos por el Jetson.
– Asegúrate de que el ICS-43434 recibe 3.3 V estables y cableado corto para minimizar jitter/ruido.
– El ADXL345 tolera 3.3 V; no uses 5 V.
Código completo (Python + PyTorch GPU) con explicación
Qué hace el software:
– Captura audio mono 48 kHz desde el I2S (arecord/ALSA, formato S32_LE) en ventanas de 0.5 s.
– Lee vibración XYZ del ADXL345 por SPI a 800 Hz, sincronizando ventanas de 0.5 s.
– Preprocesa:
– Audio: mel-espectrograma log (n_mels=64, n_fft=1024, hop=256).
– Vibración: normaliza e ingesta 1D (stack de ejes XYZ → 3 canales).
– Modelos:
– Autoencoder MLP para mel-espectrogramas (audio).
– Autoencoder CNN 1D para acelerometría.
– Baseline: 60 s de aprendizaje ligero (5–10 épocas) con datos “normales” para fijar umbrales de reconstrucción (media+3σ).
– Inferencia: calcula score por canal y score fusionado (media ponderada 0.6 audio + 0.4 vibración).
– Salida: logs por ventana, JSONL con eventos anómalos y WAV snapshots.
Requisitos Python:
– torch==2.3.0+nv24.05, torchvision==0.18.0+nv24.05, torchaudio==2.3.0+nv24.05
– numpy, scipy, soundfile, spidev
Instalación de dependencias y preparación de proyecto:
# 1) Dependencias del sistema
sudo apt-get update
sudo apt-get install -y python3-pip python3-venv python3-dev libasound2-dev sox alsa-utils \
git libsndfile1-dev
# 2) (Opcional) entorno virtual
python3 -m venv ~/venvs/jetson-anom
source ~/venvs/jetson-anom/bin/activate
python -m pip install --upgrade pip setuptools wheel
# 3) PyTorch/TorchVision/TorchAudio para JetPack 6 (L4T R36.x)
# Ramas NVIDIA (nvXX.XX). Si ya los tienes por contenedor NGC/SDK Manager, omite.
pip install --extra-index-url https://pypi.nvidia.com \
torch==2.3.0+nv24.05 torchvision==0.18.0+nv24.05 torchaudio==2.3.0+nv24.05
# 4) Librerías de I/O y señal
pip install numpy==1.26.4 scipy==1.11.4 soundfile==0.12.1 spidev==3.6
# 5) Verificar GPU desde Python
python - << 'PY'
import torch
print("CUDA disponible:", torch.cuda.is_available())
print("GPU:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "-")
PY
Script principal (guárdalo como i2s_audio_vibration_anomaly.py):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# i2s_audio_vibration_anomaly.py
#
# Dispositivo: NVIDIA Jetson Orin Nano Developer Kit
# Sensores: Adafruit I2S MEMS Microphone (ICS-43434) + ADXL345 (SPI)
# Objetivo: fusión audio+vibración para detección de anomalías en tiempo casi real.
#
# Toolchain:
# - Python 3.10.12
# - torch==2.3.0+nv24.05, torchaudio==2.3.0+nv24.05, torchvision==0.18.0+nv24.05
# - numpy==1.26.4, scipy==1.11.4, soundfile==0.12.1, spidev==3.6
#
import os, sys, time, json, math, queue, subprocess, threading, collections, struct
from datetime import datetime
import numpy as np
import soundfile as sf
import torch
import torch.nn as nn
import torchaudio
# ------------------------------
# Utilidades
# ------------------------------
def now_ts():
return datetime.utcnow().isoformat() + "Z"
def ensure_dir(path):
os.makedirs(path, exist_ok=True)
def db(x, eps=1e-10):
return 20.0 * np.log10(np.maximum(np.abs(x), eps))
# ------------------------------
# Captura de AUDIO vía ALSA (arecord)
# ------------------------------
class AudioCapture:
def __init__(self, device=None, rate=48000, channels=1, fmt="S32_LE", chunk_seconds=0.5):
self.rate = rate
self.channels = channels
self.fmt = fmt
self.chunk_seconds = chunk_seconds
self.frames_per_chunk = int(self.rate * self.chunk_seconds)
self.bytes_per_sample = 4 # S32_LE
self.device = device # e.g., "hw:jetson-i2s,0" o "hw:2,0"
self.proc = None
self.q = queue.Queue(maxsize=16)
self.stop_event = threading.Event()
def _launch(self):
cmd = [
"arecord",
"-q",
"-D", self.device,
"-c", str(self.channels),
"-f", self.fmt,
"-r", str(self.rate),
"-t", "raw"
]
self.proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=0)
# lector
def reader():
bytes_per_chunk = self.frames_per_chunk * self.channels * self.bytes_per_sample
while not self.stop_event.is_set():
buf = self.proc.stdout.read(bytes_per_chunk)
if not buf or len(buf) < bytes_per_chunk:
break
# Convertir a float32 [-1, 1]
arr = np.frombuffer(buf, dtype="<i4").astype(np.float32) / 2147483648.0
arr = arr.reshape(-1, self.channels)
if self.channels == 1:
arr = arr[:, 0]
try:
self.q.put(arr, timeout=0.5)
except queue.Full:
pass
threading.Thread(target=reader, daemon=True).start()
def start(self):
if self.device is None:
# Autodetect: buscar tarjeta I2S. Si falla, usar hw:0,0 como fallback manual.
# Recomendación: fijar explícitamente con --audio-dev.
dev = os.popen("arecord -l | awk '/card/{print $0}'").read()
# Heurística: primera tarjeta con I2S/ADMAIF nombrada.
chosen = None
for line in dev.splitlines():
if ("I2S" in line) or ("ADMAIF" in line) or ("tegra" in line.lower()):
# extraer card index
parts = line.split()
# formato típico: card 2: ..., device 0: ...
if "card" in parts and "device" in parts:
cidx = parts[parts.index("card")+1].strip(":")
chosen = f"hw:{cidx},0"
break
self.device = chosen if chosen else "hw:0,0"
self._launch()
def read_chunk(self, timeout=1.0):
try:
return self.q.get(timeout=timeout)
except queue.Empty:
return None
def stop(self):
self.stop_event.set()
if self.proc:
self.proc.terminate()
self.proc.wait(timeout=1.0)
# ------------------------------
# Captura de VIBRACIÓN vía SPI (ADXL345)
# ------------------------------
class ADXL345SPI:
# Registros clave
REG_DEVID = 0x00
REG_POWER_CTL = 0x2D
REG_DATA_FORMAT = 0x31
REG_BW_RATE = 0x2C
REG_DATAX0 = 0x32
def __init__(self, bus=0, cs=0, scale_g=16, odr_hz=800):
import spidev
self.spi = spidev.SpiDev()
self.bus = bus
self.cs = cs
self.spi.open(bus, cs) # típicamente (0,0) para SPI1_CS0 en Jetson Orin Nano
self.spi.max_speed_hz = 5000000
self.spi.mode = 0b11 # ADXL345 CPOL=1, CPHA=1
# Configuración
devid = self._read_reg(self.REG_DEVID)
if devid != 0xE5:
raise RuntimeError(f"ADXL345 DEVID inesperado: 0x{devid:02X}")
# Formato: full_resolution=1, rango +-16g
fmt = 0x0B # FULL_RES=1 (bit3), Range bits[1:0]=0b11 => +-16g
self._write_reg(self.REG_DATA_FORMAT, fmt)
# Tasa: 0x0D => 800 Hz (ver tabla BW_RATE datasheet)
rate_map = {100:0x0A, 200:0x0B, 400:0x0C, 800:0x0D, 1600:0x0E}
self._write_reg(self.REG_BW_RATE, rate_map.get(odr_hz, 0x0D))
# Power: measure=1
self._write_reg(self.REG_POWER_CTL, 0x08)
def _write_reg(self, reg, val):
self.spi.xfer2([reg, val])
def _read_reg(self, reg):
# Read: set bit7
resp = self.spi.xfer2([reg | 0x80, 0x00])
return resp[1]
def read_xyz(self):
# Multi-byte read: set bit6 (MB)
# DATAX0..DATAZ1 (6 bytes), little endian
out = self.spi.xfer2([self.REG_DATAX0 | 0xC0] + [0x00]*6)
raw = out[1:]
x = struct.unpack('<h', bytes(raw[0:2]))[0]
y = struct.unpack('<h', bytes(raw[2:4]))[0]
z = struct.unpack('<h', bytes(raw[4:6]))[0]
# Full-res scale factor ~ 3.9 mg/LSB => 0.0039 g/LSB
scale = 0.0039
return x*scale, y*scale, z*scale
class VibrationCapture:
def __init__(self, odr_hz=800, chunk_seconds=0.5, bus=0, cs=0):
self.adxl = ADXL345SPI(bus=bus, cs=cs, odr_hz=odr_hz)
self.odr_hz = odr_hz
self.chunk_seconds = chunk_seconds
self.samples_per_chunk = int(self.odr_hz * self.chunk_seconds)
self.q = queue.Queue(maxsize=16)
self.stop_event = threading.Event()
def start(self):
def reader():
dt = 1.0 / self.odr_hz
buff = []
t_prev = time.perf_counter()
while not self.stop_event.is_set():
xyz = self.adxl.read_xyz()
buff.append(xyz)
if len(buff) >= self.samples_per_chunk:
arr = np.array(buff, dtype=np.float32) # shape: (N, 3)
buff = []
try:
self.q.put(arr, timeout=0.5)
except queue.Full:
pass
# Ritmo simple basado en sleep (suficiente para odr<=1600 Hz)
t_prev += dt
to_sleep = t_prev - time.perf_counter()
if to_sleep > 0:
time.sleep(to_sleep)
threading.Thread(target=reader, daemon=True).start()
def read_chunk(self, timeout=1.0):
try:
return self.q.get(timeout=timeout)
except queue.Empty:
return None
def stop(self):
self.stop_event.set()
# ------------------------------
# Modelos: Autoencoders ligeros
# ------------------------------
class AudioAE(nn.Module):
# Autoencoder MLP sobre mel-espectrograma vectorizado
def __init__(self, input_dim, bottleneck=64):
super().__init__()
self.enc = nn.Sequential(
nn.Linear(input_dim, 256), nn.ReLU(),
nn.Linear(256, bottleneck), nn.ReLU()
)
self.dec = nn.Sequential(
nn.Linear(bottleneck, 256), nn.ReLU(),
nn.Linear(256, input_dim)
)
def forward(self, x):
z = self.enc(x)
out = self.dec(z)
return out
class VibAE(nn.Module):
# Autoencoder 1D CNN sobre (batch, C=3, T)
def __init__(self, in_ch=3, bottleneck=32):
super().__init__()
self.enc = nn.Sequential(
nn.Conv1d(in_ch, 16, 5, stride=2, padding=2), nn.ReLU(),
nn.Conv1d(16, 32, 5, stride=2, padding=2), nn.ReLU(),
nn.Conv1d(32, bottleneck, 5, stride=2, padding=2), nn.ReLU(),
)
self.dec = nn.Sequential(
nn.ConvTranspose1d(bottleneck, 32, 4, stride=2, padding=1), nn.ReLU(),
nn.ConvTranspose1d(32, 16, 4, stride=2, padding=1), nn.ReLU(),
nn.ConvTranspose1d(16, in_ch, 4, stride=2, padding=1)
)
def forward(self, x):
z = self.enc(x)
out = self.dec(z)
return out
# ------------------------------
# Pipeline principal
# ------------------------------
class AnomalySystem:
def __init__(self, audio_dev=None, audio_rate=48000, audio_chunk=0.5,
vib_odr=800, vib_chunk=0.5, out_dir="runs", gpu=True):
self.audio = AudioCapture(device=audio_dev, rate=audio_rate, channels=1,
fmt="S32_LE", chunk_seconds=audio_chunk)
self.vib = VibrationCapture(odr_hz=vib_odr, chunk_seconds=vib_chunk, bus=0, cs=0)
self.out_dir = out_dir
ensure_dir(out_dir)
self.device = torch.device("cuda:0" if (gpu and torch.cuda.is_available()) else "cpu")
# Audio frontend
self.n_fft = 1024
self.hop = 256
self.n_mels = 64
self.mel = torchaudio.transforms.MelSpectrogram(
sample_rate=audio_rate, n_fft=self.n_fft, hop_length=self.hop,
n_mels=self.n_mels, center=True, power=2.0
).to(self.device)
self.db_transform = torchaudio.transforms.AmplitudeToDB(stype="power").to(self.device)
# AE placeholders (se instancian al conocer dims)
self.audio_ae = None
self.vib_ae = None
# Umbrales
self.th_audio = None
self.th_vib = None
def start(self):
self.audio.start()
self.vib.start()
def stop(self):
self.audio.stop()
self.vib.stop()
def _prep_audio_feat(self, wav_np):
# wav_np: (T,)
wav_t = torch.from_numpy(wav_np).float().to(self.device)
wav_t = wav_t.unsqueeze(0) # (1, T)
spec = self.mel(wav_t) # (1, n_mels, frames)
spec_db = self.db_transform(spec + 1e-10)
spec_db = (spec_db - spec_db.mean()) / (spec_db.std() + 1e-5)
# vectorizar
feat = spec_db.flatten(start_dim=1) # (1, n_mels*frames)
return feat
def _prep_vib_tensor(self, vib_np):
# vib_np: (N,3) en g
x = torch.from_numpy(vib_np.T).float().unsqueeze(0).to(self.device) # (1,3,N)
# normalización global por canal
x = (x - x.mean(dim=2, keepdim=True)) / (x.std(dim=2, keepdim=True) + 1e-5)
return x
def _init_models_if_needed(self, audio_feat, vib_t):
if self.audio_ae is None:
adim = audio_feat.shape[1]
self.audio_ae = AudioAE(input_dim=adim, bottleneck=64).to(self.device)
if self.vib_ae is None:
self.vib_ae = VibAE(in_ch=3, bottleneck=32).to(self.device)
def _train_baseline(self, seconds=60, lr=1e-3, epochs=6):
print(f"[{now_ts()}] Iniciando baseline {seconds}s...")
t0 = time.time()
audio_feats = []
vib_tensors = []
wav_snapshots = []
# Recolectar ventanas durante 'seconds'
while time.time() - t0 < seconds:
a = self.audio.read_chunk(timeout=2.0)
v = self.vib.read_chunk(timeout=2.0)
if a is None or v is None:
continue
audio_feats.append(self._prep_audio_feat(a))
vib_tensors.append(self._prep_vib_tensor(v))
wav_snapshots.append(a.copy())
if not audio_feats or not vib_tensors:
raise RuntimeError("No se han podido recolectar ventanas en baseline.")
# Apilar datasets
A = torch.cat(audio_feats, dim=0) # (B, adim)
V = torch.cat(vib_tensors, dim=0) # (B, 3, T)
self._init_models_if_needed(A[:1], V[:1])
# Entrenar autoencoders ligeros (pocas épocas, GPU)
optim_a = torch.optim.Adam(self.audio_ae.parameters(), lr=lr)
optim_v = torch.optim.Adam(self.vib_ae.parameters(), lr=lr)
loss_fn = nn.MSELoss()
self.audio_ae.train()
self.vib_ae.train()
for ep in range(epochs):
# mini-batches (barajar simple)
idx = torch.randperm(A.shape[0], device=self.device)
Ab = A[idx]
Vb = V[idx]
bs = 16
losses = []
for i in range(0, Ab.shape[0], bs):
a_i = Ab[i:i+bs]
v_i = Vb[i:i+bs]
optim_a.zero_grad()
optim_v.zero_grad()
# forward
a_rec = self.audio_ae(a_i)
v_rec = self.vib_ae(v_i)
loss = loss_fn(a_rec, a_i) + loss_fn(v_rec, v_i)
loss.backward()
optim_a.step()
optim_v.step()
losses.append(loss.item())
print(f"[{now_ts()}] Baseline epoch {ep+1}/{epochs} loss={np.mean(losses):.4f}")
# Calcular umbrales con reconstrucción en baseline (media + 3*std)
self.audio_ae.eval(); self.vib_ae.eval()
with torch.no_grad():
a_rec = self.audio_ae(A)
v_rec = self.vib_ae(V)
ae_audio = ((a_rec - A)**2).mean(dim=1).sqrt().detach().cpu().numpy()
ae_vib = ((v_rec - V)**2).mean(dim=(1,2)).sqrt().detach().cpu().numpy()
self.th_audio = float(ae_audio.mean() + 3*ae_audio.std())
self.th_vib = float(ae_vib.mean() + 3*ae_vib.std())
print(f"[{now_ts()}] Umbrales: audio={self.th_audio:.4f} vib={self.th_vib:.4f}")
# Guardar un snapshot de baseline para referencia
ensure_dir(os.path.join(self.out_dir, "baseline"))
sf.write(os.path.join(self.out_dir, "baseline", "audio_baseline.wav"),
np.concatenate(wav_snapshots), samplerate=self.audio.rate, subtype="PCM_16")
def run(self, baseline_seconds=60, max_minutes=0):
self.start()
try:
self._train_baseline(seconds=baseline_seconds, epochs=6)
events_path = os.path.join(self.out_dir, "events.jsonl")
f_events = open(events_path, "a", buffering=1)
print(f"[{now_ts()}] Iniciando inferencia continua. Eventos => {events_path}")
# Bucle principal
t_start = time.time()
win_idx = 0
while True:
if max_minutes > 0 and (time.time() - t_start) > (max_minutes*60):
print(f"[{now_ts()}] Tiempo máximo alcanzado, saliendo.")
break
a = self.audio.read_chunk(timeout=2.0)
v = self.vib.read_chunk(timeout=2.0)
if a is None or v is None:
continue
# Temporización/inferencia
t0 = time.perf_counter()
A = self._prep_audio_feat(a)
V = self._prep_vib_tensor(v)
self._init_models_if_needed(A, V)
self.audio_ae.eval(); self.vib_ae.eval()
with torch.no_grad():
a_rec = self.audio_ae(A)
v_rec = self.vib_ae(V)
ae_audio = torch.sqrt(((a_rec - A)**2).mean(dim=1)).item()
ae_vib = torch.sqrt(((v_rec - V)**2).mean(dim=(1,2))).item()
score_a = ae_audio / (self.th_audio + 1e-9)
score_v = ae_vib / (self.th_vib + 1e-9)
score = 0.6*score_a + 0.4*score_v
t1 = time.perf_counter()
latency_ms = (t1 - t0)*1000
is_anom = score > 1.0
# Log
print(f"[{now_ts()}] win={win_idx} "
f"lat={latency_ms:.1f}ms score={score:.3f} "
f"(aud={score_a:.3f}, vib={score_v:.3f}) anom={is_anom}")
if is_anom:
# evento + snapshot
ts = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
event = dict(ts=ts, win=win_idx, score=score,
score_audio=score_a, score_vib=score_v,
sr_audio=self.audio.rate, vib_odr=self.vib.odr_hz)
f_events.write(json.dumps(event) + "\n")
# guardar wav de 2 ventanas si se dispone (aquí 0.5 s -> 1 s)
out_wav = os.path.join(self.out_dir, f"anom_{ts}_{win_idx}.wav")
sf.write(out_wav, a.astype(np.float32), self.audio.rate, subtype="PCM_16")
win_idx += 1
finally:
self.stop()
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
p.add_argument("--audio-dev", type=str, default=None, help="ALSA device, ej. hw:2,0")
p.add_argument("--baseline", type=int, default=60, help="segundos baseline")
p.add_argument("--out", type=str, default="runs", help="directorio de salida")
p.add_argument("--gpu", action="store_true", help="usar GPU si disponible")
p.add_argument("--max-minutes", type=int, default=0, help="tiempo máximo (0=inf)")
args = p.parse_args()
sys = AnomalySystem(audio_dev=args.audio_dev, out_dir=args.out, gpu=args.gpu)
sys.run(baseline_seconds=args.baseline, max_minutes=args.max_minutes)
Explicación breve de partes clave:
– AudioCapture: usa arecord con formato S32_LE (coherente con muchas rutas ALSA I2S en Jetson) y muestra un mecanismo robusto para seleccionar el dispositivo si no se provee.
– ADXL345SPI: inicializa en modo full resolution, ±16 g, ODR 800 Hz y modo lectura multi-byte; usa SPI modo 3 (CPOL=1, CPHA=1).
– AudioAE y VibAE: autoencoders pequeños que se entrenan rápido; el de audio trabaja en vectorización de mel-espectrograma (mel x frames), y el de vibración es CNN sobre tres canales 1D.
– Baseline: 60 s sirve para adaptar los AEs a la “normalidad” local y definir umbrales robustos (media+3σ de error de reconstrucción).
– Fusión: peso 0.6 (audio) + 0.4 (vibración) para dar más importancia a los armónicos acústicos en este ejemplo; ajustable.
– Salidas: JSONL de eventos y WAV de la ventana anómala.
Compilación/ejecución paso a paso
- Verificar hardware y tarjetas ALSA:
arecord -l
arecord -L -
Identifica la tarjeta I2S habilitada (suele aparecer como una “card N” con dispositivo 0). Toma nota de “hw:N,0”. Si tras habilitar I2S con jetson-io no aparece, revisa la sección Troubleshooting.
-
Prueba rápida del I2S (3 s, 48 kHz, 32-bit LE, mono):
# Reemplaza hw:2,0 por tu tarjeta si es diferente
arecord -D hw:2,0 -c 1 -f S32_LE -r 48000 -d 3 -t wav test_i2s.wav
soxi test_i2s.wav -
Esperado: 3 s, 48 kHz, 1 canal, PCM 32-bit. Reproduce para comprobar señal (ruido ambiente):
aplay test_i2s.wav -
Prueba rápida del ADXL345 (lectura de ID vía spidev):
python - << 'PY'
import spidev, struct
spi=spidev.SpiDev(); spi.open(0,0); spi.max_speed_hz=5000000; spi.mode=0b11
devid=spi.xfer2([0x00|0x80,0x00])[1]
print("ADXL345 DEVID=0x%02X" % devid)
PY -
Esperado: “ADXL345 DEVID=0xE5”.
-
Activar modo potencia y clocks para pruebas (opcional):
sudo nvpmodel -m 0
sudo jetson_clocks -
Ejecutar la app (baseline 60 s, GPU):
source ~/venvs/jetson-anom/bin/activate
python i2s_audio_vibration_anomaly.py --gpu --audio-dev hw:2,0 --baseline 60 --out runs --max-minutes 10 -
Si desconoces la tarjeta exacta, omite –audio-dev y deja que autodetecte, pero lo recomendable es fijarla.
-
Monitorizar rendimiento/uso de GPU y memoria:
sudo tegrastats -
Observa GPU%, EMC%, temperatura. Compara latencias impresas por el script.
-
Limpieza/fin de prueba:
# Volver a modo por defecto si deseas
sudo nvpmodel -q
Validación paso a paso
- Señal de audio base:
- Durante el baseline (60 s), mantén el entorno “normal” (ruido de fondo típico, motor a régimen nominal).
-
Observa el nivel de dBFS estimado con soxi o simplemente las estadísticas que imprime el script (no saturación).
-
Señal de vibración base:
- Fija el ADXL345 con cinta al chasis/motor. Evita que se mueva libremente.
-
Asegúrate de que el odr_hz=800 capta el contenido banda base del sistema mecánico (para micro-motores, 100–400 Hz suelen ser relevantes al fundamental y armónicos).
-
Métricas en baseline:
- El script imprime pérdida durante epochs (p.ej., loss ≈ 0.1–0.3, según señal).
-
Umbrales: verás “Umbrales: audio=… vib=…”. Anótalos.
-
Prueba de anomalías controladas:
- Audio:
- Golpe suave cerca del micrófono en una ventana (0.5 s).
- Fricción simulada (frota una superficie cerca del micrófono).
- Vibración:
- Toques suaves al chasis (picos).
- Desbalanceo artificial: pega una pequeña masa a la hélice/ventilador (si es seguro).
-
Observa en consola:
- lat ~ 50–120 ms por ventana (depende de clocks).
- score_audio y score_vib > 1.0 cuando hay perturbación; el campo anom=True y archivos anom_*.wav generados en runs/.
-
Métricas de rendimiento:
- FPS/ventanas por segundo: debería ser ≥ 2 (ventana 0.5 s → 2 ventanas/s). Verifica que no hay timeouts ni pérdidas.
- GPU% en tegrastats: típicamente 10–30% para estos AEs; RAM GPU y CPU estables.
-
Temperatura: Tdiode < 80 °C en ejecución sostenida (si sube, mejorar refrigeración).
-
Criterios de éxito:
- Falsas alarmas bajo escenario “normal” < 2% en 10 minutos.
- Detección consistente cuando repites las mismas perturbaciones (≥ 90%).
- Archivos JSONL con eventos bien formados y WAVs auditables.
Troubleshooting
- No aparece la tarjeta I2S en arecord -l
- Causa: I2S no habilitado o device-tree overlay no aplicado.
-
Solución:
- Repite: sudo /opt/nvidia/jetson-io/config-by-function.py -n «I2S» -o dtb; sudo reboot
- Verifica que los pines no estén en conflicto con otras funciones (I2C/PWM).
- Comprueba dmesg | grep -i snd o i2s para errores de codec simple-audio-card.
-
arecord captura pero el audio es silencio/ruido blanco
- Causa: BCLK/LRCLK no llegan al micrófono o DOUT mal cableado.
-
Solución:
- Revisa pinout: BCLK→pin12, LRCLK→pin35, DOUT→pin38, 3V3→pin1, GND→pin6, SEL→GND.
- Cables largos o flojos causan jitter: acórtalos y mejora masas.
-
ADXL345: error DEVID inesperado (no 0xE5)
- Causa: cableado SPI incorrecto (MOSI/MISO cruzados), CS incorrecto o módulo no es ADXL345.
-
Solución:
- Verifica MOSI→SDA(DI), MISO→SDO(DO), SCLK→SCL/SCK, CS→CS, 3V3/GND correctos.
- Asegúrate SPI1 está habilitado y /dev/spidev0.0 existe.
-
torch.cuda.is_available() = False
- Causa: entorno/rueda PyTorch no compatible o variables de entorno.
-
Solución:
- Usa las ruedas específicas nvXX de NVIDIA: pip install –extra-index-url https://pypi.nvidia.com torch==2.3.0+nv24.05 …
- Evita instalar ruedas x86/cu121 oficiales de PyTorch que no sirven en Jetson ARM64.
- Comprueba que ejecutas en el host (no en contenedor sin acceso a GPU).
-
Latencia muy alta (> 200 ms) o drop de ventanas
- Causa: CPU saturada, clocks variables o buffer demasiado largo.
-
Solución:
- sudo nvpmodel -m 0; sudo jetson_clocks
- Reduce n_fft/hop o n_mels; disminuye tamaño de los modelos.
- Aumenta max_speed_hz de SPI si es seguro; revisa tegrastats.
-
Falsas alarmas frecuentes en modo normal
- Causa: baseline insuficiente o umbral agresivo.
-
Solución:
- Aumenta baseline a 120 s; cambia k-sigma a 3.5–4.0.
- Asegura condiciones estables en baseline (sin golpes ni ruidos atípicos).
-
No se generan archivos anom_*.wav
- Causa: score<=1.0 (no hay anomalía) o permiso de escritura.
-
Solución:
- Forzar prueba con golpes/ruidos cercanos.
- Verifica permisos de runs/ y espacio en disco.
-
Error “arecord: main: device busy”
- Causa: otro proceso usando ALSA.
- Solución:
- pkill arecord; asegúrate de no tener pulseaudio/pipewire capturando ese hw.
- Cambia a otro device si hay conflictos.
Mejoras/variantes
- Exportar a ONNX y TensorRT:
- Tras entrenar baseline, exporta los AEs a ONNX (torch.onnx.export) y genera motores FP16 con trtexec para latencias aún menores, manteniendo el preprocesado en PyTorch o TorchScript.
- Aumentar robustez de fusión:
- Añadir normalización por percentiles y fusión basada en softmax de scores o test estadísticos (p.ej., ESD, Generalized ESD).
- Integración con MQTT/REST:
- Publicar eventos JSONL hacia un broker (mosquitto) o endpoint REST para dashboards.
- Ventanas superpuestas:
- Reducir latencia de detección usando hop de 0.25 s con ventana de 0.5 s.
- Extender features:
- Audio: cromagramas, centroides espectrales; Vibración: RMS por eje, kurtosis, picos de banda.
- Registro continuo:
- Ring buffer de WAV circular y volcado de 10 s pre/post evento.
Checklist de verificación
- [ ] JetPack 6.0.1 (L4T R36.3) verificado: cat /etc/nv_tegra_release.
- [ ] Modo MAXN y clocks fijos aplicados (opcional): nvpmodel -m 0; jetson_clocks.
- [ ] I2S habilitado con jetson-io; arecord -l muestra la tarjeta correspondiente.
- [ ] SPI habilitado; /dev/spidev0.0 presente; ADXL345 DEVID=0xE5 leído.
- [ ] Cableado correcto: ICS-43434 (BCLK→12, LRCLK→35, DOUT→38, 3V3→1, GND→6, SEL→GND); ADXL345 (CS0→24, SCLK→23, MISO→21, MOSI→19, 3V3→17, GND→20).
- [ ] Dependencias instaladas: torch/torchvision/torchaudio nv24.05, numpy, scipy, spidev.
- [ ] Baseline completado (60 s) sin errores; umbrales impresos.
- [ ] Inferencia continua imprime latencia, score y genera eventos ante perturbaciones.
- [ ] tegrastats dentro de límites térmicos; sin throttling.
- [ ] JSONL y WAV de anomalías guardados en runs/.
Apéndice: Comandos útiles de diagnóstico
- Listar ALSA en detalle:
arecord -l && arecord -L - Info de paquetes NVIDIA:
dpkg -l | grep nvidia
dpkg -l | grep tensorrt - Power/performance:
nvpmodel -q
sudo tegrastats
Con este caso práctico, has construido un pipeline reproducible y coherente con el hardware “NVIDIA Jetson Orin Nano Developer Kit + Adafruit I2S MEMS Microphone (ICS-43434) + ADXL345 3-axis accelerometer (SPI)”, capaz de detectar anomalías en audio y vibración en tiempo casi real, validado con métricas, logs y artefactos objetivos.
Encuentra este producto y/o libros sobre este tema en Amazon
Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.




