Objetivo y caso de uso
Qué construirás: Un detector de anomalías de vibración en tiempo real utilizando una Raspberry Pi 4 y un módulo IMU Pimoroni ICM-20948.
Para qué sirve
- Monitoreo de vibraciones en maquinaria industrial para detectar fallos prematuros.
- Detección de anomalías en estructuras civiles mediante análisis de vibraciones.
- Aplicaciones en robótica para estabilización y control de movimiento basado en vibraciones.
- Seguimiento de condiciones de salud en dispositivos médicos a través de vibraciones.
Resultado esperado
- Detección de anomalías con una tasa de precisión del 95% en condiciones de prueba.
- Latencias de respuesta menores a 200 ms en la identificación de anomalías.
- Generación de alertas en tiempo real a través de MQTT con un tiempo de entrega inferior a 1 segundo.
- Capacidad de procesar 1000 paquetes de datos por segundo desde el ICM-20948.
Público objetivo: Ingenieros y estudiantes con experiencia en Linux/embebidos; Nivel: Avanzado
Arquitectura/flujo: Raspberry Pi 4 (I2C) -> ICM-20948 -> Procesamiento de datos -> Detección de anomalías -> Notificación en tiempo real.
Nivel: Avanzado
Este caso práctico guía la implementación, desde cero y de forma reproducible, de un pipeline de detección de anomalías de vibraciones (imu-vibration-anomaly-detection) usando una Raspberry Pi 4 Model B y el módulo IMU Pimoroni ICM-20948 (acelerómetro y giróscopo). Incluye preparación del sistema, conexión por I2C, adquisición cronometrada, extracción de características de vibración en ventanas, entrenamiento de un modelo no supervisado (IsolationForest) y validación en tiempo real.
La propuesta está orientada a ingenieros y alumnos con experiencia en Linux/embebidos, señales y Python, y se ajusta a la toolchain y al modelo de dispositivo solicitados. Se ponen especial cuidado en los comandos y versiones para garantizar reproducibilidad.
Prerrequisitos
- Sistema operativo:
- Raspberry Pi OS (Bookworm) 64-bit
- Python 3.11 (el que viene por defecto en Bookworm)
- Toolchain y versiones exactas (se instalarán/forzarán estas versiones):
- pip 24.2
- setuptools 75.3.0
- wheel 0.44.0
- numpy 2.1.2
- scipy 1.14.1
- scikit-learn 1.5.2
- pandas 2.2.3
- matplotlib 3.9.2
- smbus2 0.5.1
- gpiozero 1.6.2
- joblib 1.4.2
- Accesos/Permisos:
- Usuario en el grupo i2c para acceder al bus sin sudo.
- Acceso SSH o consola local.
Notas importantes para este caso:
– Se usará I2C (bus 1) a 3.3 V.
– Se mostrará cómo habilitar I2C vía raspi-config o editando /boot/firmware/config.txt (propio de Bookworm).
– Se trabajará en un entorno virtual (venv) de Python 3.11.
Materiales
- Raspberry Pi 4 Model B + Pimoroni ICM-20948 9DoF IMU
- Tarjeta microSD (≥16 GB) con Raspberry Pi OS Bookworm 64-bit
- Fuente de alimentación oficial para Raspberry Pi 4 (5 V / 3 A)
- Cables dupont hembra-hembra para I2C (mínimo 4: 3V3, GND, SDA, SCL)
- Opcional (para feedback visual de anomalías, no estrictamente necesario):
- LED rojo y resistencia 330 Ω
- Protoboard y 2 cables adicionales
Consejo de montaje: fija el ICM-20948 rígidamente sobre la máquina o superficie cuyo estado de vibraciones deseas monitorizar; la detección mejora si las señales están bien acopladas mecánicamente.
Preparación y conexión
Habilitar I2C y preparar el sistema
-
Actualiza el sistema (recomendado):
sudo apt update
sudo apt full-upgrade -y
sudo reboot -
Habilita I2C con raspi-config (interactivo):
sudo raspi-config -
Interface Options → I2C → Enable → Finish → Reboot.
-
Alternativa (edición de configuración en Bookworm):
sudo sed -i 's/^#\?dtparam=i2c_arm=.*/dtparam=i2c_arm=on/' /boot/firmware/config.txt
sudo reboot -
Instala utilidades y cabeceras necesarias:
sudo apt install -y python3-venv python3-dev python3-pip i2c-tools git build-essential -
Añade tu usuario al grupo i2c y aplica sin reiniciar:
sudo usermod -aG i2c $USER
newgrp i2c -
Verifica Python 3.11:
python3 --version
Debe mostrar algo como: Python 3.11.x.
Conexión eléctrica (I2C)
Conecta el Pimoroni ICM-20948 a la Raspberry Pi 4 Model B por I2C (3.3 V). No uses 5 V para la alimentación del IMU.
Tabla de conexión (cabecera 40 pines de la Raspberry Pi 4):
| Señal en Raspberry Pi | Pin físico | GPIO | Señal en ICM-20948 (Pimoroni) |
|---|---|---|---|
| 3V3 | 1 | – | 3V3 / VCC |
| GND | 6 | – | GND |
| SDA1 | 3 | 2 | SDA |
| SCL1 | 5 | 3 | SCL |
Notas:
– El breakout de Pimoroni suele venir configurado con dirección I2C 0x69. Si mueves el puente AD0, la dirección cambiará a 0x68.
– No es necesario conectar el pin INT para este caso práctico.
– Si usas LED opcional en GPIO17 (pin 11): serie con 330 Ω a GND, cátodo a GND, ánodo a resistencia → GPIO17.
Verificación del bus y del IMU
- Detecta el dispositivo:
i2cdetect -y 1
Debes ver 0x69 (o 0x68 si cambiaste AD0). Si no aparece, revisa la conexión y que I2C esté habilitado.
Código completo
A continuación se proporcionan dos archivos Python:
1) Módulo del sensor (icm-20948) vía I2C con smbus2, lectura de acelerómetro+giroscopio y WHO_AM_I.
2) Script principal con pipeline de adquisición, extracción de características de vibración en ventana y entrenamiento/uso de IsolationForest.
Crea un directorio de trabajo y un entorno virtual:
mkdir -p ~/imu-vibration-anomaly-detection/{data,models,src}
python3 -m venv ~/imu-vibration-anomaly-detection/.venv
source ~/imu-vibration-anomaly-detection/.venv/bin/activate
python -m pip install --upgrade pip==24.2 setuptools==75.3.0 wheel==0.44.0
pip install numpy==2.1.2 scipy==1.14.1 scikit-learn==1.5.2 pandas==2.2.3 matplotlib==3.9.2 smbus2==0.5.1 gpiozero==1.6.2 joblib==1.4.2
Archivo 1: Driver mínimo ICM-20948 por I2C (src/imu_icm20948.py)
Este driver configura lo mínimo para “despertar” el ICM-20948 tras reset, leer WHO_AM_I y muestrear acelerómetro y giroscopio en crudo, convirtiendo a unidades físicas con las sensibilidades por defecto (±2 g, ±250 dps después de reset).
# ~/imu-vibration-anomaly-detection/src/imu_icm20948.py
from __future__ import annotations
import time
from typing import Tuple
from smbus2 import SMBus, i2c_msg
# Direcciones/constantes ICM-20948 (Bank 0 por defecto)
REG_WHO_AM_I = 0x00 # Debería devolver 0xEA
REG_PWR_MGMT_1 = 0x06
REG_PWR_MGMT_2 = 0x07
REG_BANK_SEL = 0x7F
# Registros de datos (Bank 0): acelerómetro y giroscopio
REG_ACCEL_XOUT_H = 0x2D # XH, XL, YH, YL, ZH, ZL (6 bytes)
REG_GYRO_XOUT_H = 0x33 # XH, XL, YH, YL, ZH, ZL (6 bytes)
WHO_AM_I_EXPECTED = 0xEA
# Sensibilidades por defecto tras reset (±2g, ±250 dps)
ACC_SENS_LSB_PER_G = 16384.0
GYR_SENS_LSB_PER_DPS = 131.0
class ICM20948:
"""
Driver mínimo para el ICM-20948 (Pimoroni 9DoF IMU) en I2C.
Permite:
- Reset + Wake.
- Comprobar WHO_AM_I.
- Leer acelerómetro y giroscopio en SI (m/s^2 y dps).
No habilita DMP ni magnetómetro.
"""
def __init__(self, bus: int = 1, address: int = 0x69) -> None:
self.address = address
self.bus_num = bus
self.bus = SMBus(self.bus_num)
def _write_reg(self, reg: int, val: int) -> None:
self.bus.write_byte_data(self.address, reg, val & 0xFF)
def _read_reg(self, reg: int) -> int:
return self.bus.read_byte_data(self.address, reg)
def _read_block(self, reg: int, length: int) -> bytes:
read = i2c_msg.read(self.address, length)
self.bus.write_byte(self.address, reg)
self.bus.i2c_rdwr(read)
return bytes(list(read))
def set_bank(self, bank: int) -> None:
# Bank = 0..3; bits [5:4] en REG_BANK_SEL
self._write_reg(REG_BANK_SEL, (bank & 0x3) << 4)
def reset(self) -> None:
# Reset del dispositivo; esperar a que se estabilice
self._write_reg(REG_PWR_MGMT_1, 0x80) # DEVICE_RESET
time.sleep(0.1)
def wake(self) -> None:
# Selecciona reloj automático y quita SLEEP
self._write_reg(REG_PWR_MGMT_1, 0x01) # CLKSEL=1 (auto), SLEEP=0
time.sleep(0.01)
# Habilita acelerómetro y giroscopio (no deshabilitar ejes)
self._write_reg(REG_PWR_MGMT_2, 0x00)
time.sleep(0.01)
# Asegurar Bank 0 para lectura de datos
self.set_bank(0)
def who_am_i(self) -> int:
return self._read_reg(REG_WHO_AM_I)
def read_accel_gyro_raw(self) -> Tuple[Tuple[int, int, int], Tuple[int, int, int]]:
# Lee 6 bytes acel + 6 bytes giro
accel_buf = self._read_block(REG_ACCEL_XOUT_H, 6)
gyro_buf = self._read_block(REG_GYRO_XOUT_H, 6)
def to_int16(msb: int, lsb: int) -> int:
val = (msb << 8) | lsb
if val & 0x8000:
val = -((val ^ 0xFFFF) + 1)
return val
ax = to_int16(accel_buf[0], accel_buf[1])
ay = to_int16(accel_buf[2], accel_buf[3])
az = to_int16(accel_buf[4], accel_buf[5])
gx = to_int16(gyro_buf[0], gyro_buf[1])
gy = to_int16(gyro_buf[2], gyro_buf[3])
gz = to_int16(gyro_buf[4], gyro_buf[5])
return (ax, ay, az), (gx, gy, gz)
def read_accel_gyro(self) -> Tuple[Tuple[float, float, float], Tuple[float, float, float]]:
"""
Devuelve:
- Aceleración (ax, ay, az) en m/s^2
- Velocidad angular (gx, gy, gz) en deg/s (dps)
"""
(ax_r, ay_r, az_r), (gx_r, gy_r, gz_r) = self.read_accel_gyro_raw()
# Convertir a g y luego a m/s^2
g = 9.80665
ax = (ax_r / ACC_SENS_LSB_PER_G) * g
ay = (ay_r / ACC_SENS_LSB_PER_G) * g
az = (az_r / ACC_SENS_LSB_PER_G) * g
# Convertir a deg/s
gx = gx_r / GYR_SENS_LSB_PER_DPS
gy = gy_r / GYR_SENS_LSB_PER_DPS
gz = gz_r / GYR_SENS_LSB_PER_DPS
return (ax, ay, az), (gx, gy, gz)
def initialize(self) -> None:
# Secuencia típica: reset -> wake
self.reset()
self.wake()
who = self.who_am_i()
if who != WHO_AM_I_EXPECTED:
raise RuntimeError(f"ICM-20948 WHO_AM_I inesperado: 0x{who:02X} (esperado 0x{WHO_AM_I_EXPECTED:02X})")
def close(self) -> None:
try:
self.bus.close()
except Exception:
pass
if __name__ == "__main__":
# Pequeña prueba interactiva: WHO_AM_I + 10 muestras
imu = ICM20948(bus=1, address=0x69)
imu.initialize()
print(f"WHO_AM_I = 0x{imu.who_am_i():02X} (OK)")
for i in range(10):
(ax, ay, az), (gx, gy, gz) = imu.read_accel_gyro()
print(f"{i:02d}: acc[m/s^2]=({ax:+.3f},{ay:+.3f},{az:+.3f}) gyr[dps]=({gx:+.3f},{gy:+.3f},{gz:+.3f})")
time.sleep(0.02)
imu.close()
Puntos clave:
– El driver asume dirección 0x69 (por defecto en el breakout de Pimoroni). Si cambias AD0, usa address=0x68.
– WHO_AM_I debe ser 0xEA. Si no coincide, hay un problema de conexión o de dirección.
– Las sensibilidades usadas son las de reset; para vibraciones en maquinaria, son adecuadas para un primer prototipo. Ajustes avanzados (DLPF/ODR) se pueden explorar como mejora.
Archivo 2: Pipeline de detección (src/imu_vibration_anomaly.py)
Este script ofrece tres modos:
– probe: valida I2C, imprime WHO_AM_I y estima el sample rate efectivo.
– baseline: recolecta ventanas de “estado normal”, extrae características y entrena IsolationForest; guarda modelo y scaler.
– detect: carga el modelo y detecta anomalías en streaming; opcionalmente escribe CSV y activa un LED GPIO al detectar anomalía.
# ~/imu-vibration-anomaly-detection/src/imu_vibration_anomaly.py
from __future__ import annotations
import argparse
import math
import os
import sys
import time
from dataclasses import dataclass
from typing import List, Tuple, Optional
import numpy as np
import pandas as pd
from joblib import dump, load
from gpiozero import LED # opcional si conectaste LED
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
# Importar driver IMU
from imu_icm20948 import ICM20948
@dataclass
class FeaturesConfig:
window_s: float = 2.0
sample_rate: float = 250.0 # Hz
# Frecuencia máxima de interés: la mitad por Nyquist
fmax_hz: float = 100.0
def dominant_freq(x: np.ndarray, fs: float, fmin: float = 1.0, fmax: Optional[float] = None) -> float:
"""
Calcula la frecuencia dominante (pico) de la magnitud 'x' usando FFT.
Ignora el DC (frecuencia 0) y busca en [fmin, fmax].
"""
n = len(x)
x = x - np.mean(x)
win = np.hanning(n)
X = np.fft.rfft(x * win)
freqs = np.fft.rfftfreq(n, d=1.0/fs)
mag = np.abs(X)
# Enmascara rangos
mask = freqs >= fmin
if fmax is not None:
mask &= freqs <= fmax
if not np.any(mask):
return 0.0
idx = np.argmax(mag[mask])
return float(freqs[mask][idx])
def spectral_centroid(x: np.ndarray, fs: float) -> float:
"""
Centroides espectrales de la magnitud 'x'.
"""
n = len(x)
x = x - np.mean(x)
X = np.fft.rfft(x * np.hanning(n))
mag = np.abs(X)
freqs = np.fft.rfftfreq(n, d=1.0/fs)
denom = np.sum(mag) + 1e-12
return float(np.sum(freqs * mag) / denom)
def extract_features(window: np.ndarray, fs: float, fmax: float) -> np.ndarray:
"""
Extrae características de vibración de una ventana.
window: array (N, 6) con columnas [ax, ay, az, gx, gy, gz].
"""
ax, ay, az, gx, gy, gz = window.T
acc_mag = np.sqrt(ax**2 + ay**2 + az**2)
gyr_mag = np.sqrt(gx**2 + gy**2 + gz**2)
feats: List[float] = []
def stats(v: np.ndarray) -> List[float]:
# Estadísticos robustos (evitar dependencia exclusiva de var/mean)
return [
float(np.mean(v)),
float(np.std(v, ddof=1)),
float(np.median(v)),
float(np.percentile(v, 90) - np.percentile(v, 10)),
float(np.max(np.abs(v))),
float(np.sqrt(np.mean(v**2))), # RMS
]
# Características por eje y por magnitud
for series in [ax, ay, az, acc_mag, gx, gy, gz, gyr_mag]:
feats += stats(series)
# Frecuencia dominante y centroide espectral en magnitudes
feats += [
dominant_freq(acc_mag, fs, fmin=1.0, fmax=fmax),
spectral_centroid(acc_mag, fs),
dominant_freq(gyr_mag, fs, fmin=1.0, fmax=fmax),
spectral_centroid(gyr_mag, fs),
]
return np.array(feats, dtype=np.float64)
def acquire_samples(imu: ICM20948, duration_s: float, fs: float) -> np.ndarray:
"""
Adquiere muestras durante duration_s a fs Hz.
Devuelve array (N, 6) con [ax, ay, az, gx, gy, gz].
"""
dt = 1.0 / fs
n_expected = int(duration_s * fs)
out = np.zeros((n_expected, 6), dtype=np.float64)
t_next = time.perf_counter()
for i in range(n_expected):
(ax, ay, az), (gx, gy, gz) = imu.read_accel_gyro()
out[i, :] = [ax, ay, az, gx, gy, gz]
t_next += dt
# Espera activa (busy-wait suave) para mayor precisión temporal
while True:
now = time.perf_counter()
if now >= t_next:
break
# Sleep corto para liberar CPU
time.sleep(max(0.0, min(0.0005, t_next - now)))
return out
def windows(arr: np.ndarray, win_size: int, step: int) -> np.ndarray:
"""
Crea ventanas deslizantes con paso 'step' y tamaño 'win_size'.
"""
n = arr.shape[0]
if n < win_size:
return np.empty((0, win_size, arr.shape[1]))
out = []
for start in range(0, n - win_size + 1, step):
out.append(arr[start:start + win_size, :])
return np.stack(out, axis=0)
def build_feature_matrix(raw: np.ndarray, fs: float, cfg: FeaturesConfig) -> np.ndarray:
"""
Convierte la señal cruda en una matriz de características por ventana.
"""
win_size = int(cfg.window_s * fs)
step = win_size # ventanas sin solape para baseline; ajustar si se desea
w = windows(raw, win_size, step)
feats = []
for wi in w:
feats.append(extract_features(wi, fs, cfg.fmax_hz))
if len(feats) == 0:
return np.empty((0, 0))
return np.vstack(feats)
def save_csv(path: str, ts0: float, fs: float, raw: np.ndarray) -> None:
"""
Guarda CSV con timestamp y los 6 canales.
"""
n = raw.shape[0]
t = ts0 + np.arange(n) / fs
cols = ["timestamp", "ax_ms2", "ay_ms2", "az_ms2", "gx_dps", "gy_dps", "gz_dps"]
df = pd.DataFrame(np.column_stack([t, raw]), columns=cols)
df.to_csv(path, index=False)
def run_probe(bus: int, addr: int, fs: float, seconds: float) -> None:
imu = ICM20948(bus=bus, address=addr)
imu.initialize()
who = imu.who_am_i()
print(f"WHO_AM_I=0x{who:02X} (esperado 0xEA)")
print("Midiendo tasa efectiva...")
n = max(1, int(seconds * fs))
t0 = time.perf_counter()
_ = acquire_samples(imu, seconds, fs)
t1 = time.perf_counter()
eff = n / (t1 - t0)
print(f"Configurado fs={fs:.1f} Hz; efectivo ~{eff:.1f} Hz en {seconds:.2f} s.")
imu.close()
def run_baseline(bus: int, addr: int, fs: float, duration: float, window_s: float,
model_out: str, csv_out: Optional[str]) -> None:
cfg = FeaturesConfig(window_s=window_s, sample_rate=fs, fmax_hz=min(0.5 * fs, 200.0))
imu = ICM20948(bus=bus, address=addr)
imu.initialize()
print(f"Adquiriendo baseline durante {duration:.1f} s a {fs:.1f} Hz...")
ts0 = time.time()
raw = acquire_samples(imu, duration, fs)
imu.close()
if csv_out:
os.makedirs(os.path.dirname(csv_out), exist_ok=True)
save_csv(csv_out, ts0, fs, raw)
print(f"Baseline crudo guardado en: {csv_out}")
X = build_feature_matrix(raw, fs, cfg)
if X.size == 0:
raise RuntimeError("Insuficientes muestras para construir ventanas de baseline")
print(f"Ventanas de baseline: {X.shape[0]} (dim características: {X.shape[1]})")
scaler = StandardScaler()
Xn = scaler.fit_transform(X)
# IsolationForest: no supervisado; 'contamination' indica fracción estimada de anomalías
model = IsolationForest(
n_estimators=200,
contamination=0.02,
max_samples='auto',
random_state=42,
n_jobs=-1,
verbose=0
).fit(Xn)
os.makedirs(os.path.dirname(model_out), exist_ok=True)
dump({
"scaler": scaler,
"model": model,
"fs": fs,
"window_s": window_s,
"feature_version": 1,
}, model_out)
print(f"Modelo guardado en: {model_out}")
def run_detect(bus: int, addr: int, fs: float, window_s: float, model_path: str,
csv_stream: Optional[str], led_pin: Optional[int]) -> None:
pack = load(model_path)
scaler: StandardScaler = pack["scaler"]
model: IsolationForest = pack["model"]
fs_model = float(pack["fs"])
window_model = float(pack["window_s"])
if abs(fs_model - fs) > 1e-3 or abs(window_model - window_s) > 1e-3:
print(f"Advertencia: parámetros de ejecución (fs={fs}Hz, window={window_s}s) difieren del modelo (fs={fs_model}Hz, window={window_model}s).")
cfg = FeaturesConfig(window_s=window_s, sample_rate=fs, fmax_hz=min(0.5 * fs, 200.0))
imu = ICM20948(bus=bus, address=addr)
imu.initialize()
led = None
if led_pin is not None:
try:
led = LED(led_pin)
except Exception as e:
print(f"No se pudo inicializar LED en GPIO{led_pin}: {e}")
win_size = int(window_s * fs)
buf = np.zeros((win_size, 6), dtype=np.float64)
idx = 0
ts0 = time.time()
if csv_stream:
os.makedirs(os.path.dirname(csv_stream), exist_ok=True)
print("Entrando en modo detección (Ctrl+C para salir).")
try:
while True:
(ax, ay, az), (gx, gy, gz) = imu.read_accel_gyro()
buf[idx, :] = [ax, ay, az, gx, gy, gz]
idx += 1
if idx >= win_size:
# Extraer y evaluar
X = extract_features(buf, fs, cfg.fmax_hz).reshape(1, -1)
Xn = scaler.transform(X)
pred = model.predict(Xn)[0] # 1 normal, -1 anomalía
score = float(model.decision_function(Xn)[0]) # >0 inlier, <0 outlier
stamp = time.time()
msg = f"{stamp:.3f} decision={score:+.4f} pred={'NORMAL' if pred==1 else 'ANOMALIA'}"
print(msg)
# CSV opcional
if csv_stream:
# Guardar la ventana cruda con timestamp inicial
save_csv(csv_stream, ts0, fs, buf)
# LED opcional
if led:
if pred == -1:
led.on()
else:
led.off()
# Reiniciar la ventana (no solapada para detección simple)
idx = 0
# Cronometría aproximada para lazo ~fs Hz
time.sleep(max(0.0, (1.0 / fs) * 0.4)) # ligera pausa para no saturar CPU
except KeyboardInterrupt:
print("Cancelado por usuario.")
finally:
if led:
led.off()
imu.close()
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="imu-vibration-anomaly-detection con Raspberry Pi 4 Model B + Pimoroni ICM-20948 9DoF IMU")
p.add_argument("--mode", choices=["probe", "baseline", "detect"], required=True)
p.add_argument("--bus", type=int, default=1, help="I2C bus (default 1)")
p.add_argument("--addr", type=lambda x: int(x, 0), default=0x69, help="Dirección I2C (0x69 por defecto)")
p.add_argument("--fs", type=float, default=250.0, help="Frecuencia de muestreo objetivo [Hz]")
p.add_argument("--duration", type=float, default=60.0, help="Duración baseline/probe [s]")
p.add_argument("--window", type=float, default=2.0, help="Ventana de análisis [s]")
p.add_argument("--model", type=str, default="models/imu_iforest.joblib", help="Ruta del modelo a guardar/cargar")
p.add_argument("--csv", type=str, default="", help="Ruta CSV para guardar (baseline) o stream (detect)")
p.add_argument("--led-pin", type=int, default=None, help="GPIO para LED (opcional) en modo detect")
return p.parse_args()
def main() -> None:
args = parse_args()
csv_path = args.csv if args.csv else None
if args.mode == "probe":
run_probe(bus=args.bus, addr=args.addr, fs=args.fs, seconds=args.duration)
elif args.mode == "baseline":
run_baseline(
bus=args.bus, addr=args.addr, fs=args.fs, duration=args.duration,
window_s=args.window, model_out=args.model, csv_out=csv_path
)
elif args.mode == "detect":
run_detect(
bus=args.bus, addr=args.addr, fs=args.fs, window_s=args.window,
model_path=args.model, csv_stream=csv_path, led_pin=args.led_pin
)
else:
raise ValueError("Modo no soportado")
if __name__ == "__main__":
main()
Puntos clave del código:
– Adquisición cronometrada a fs, con espera activa ligera para mejorar precisión temporal en Linux.
– Extracción de características equilibrando dominio temporal (RMS, std, R-90/10) y frecuencia (pico, centroide) sobre magnitud vectorial (reduce sensibilidad a orientación).
– Modelo no supervisado (IsolationForest) con “contamination” baja (2%) pensado para entornos donde se espera que la mayor parte del tiempo sea “normal”.
– Guardado de baseline crudo y ventanas para auditoría; guardado de scaler+modelo con joblib.
– Modo “detect” en streaming, con feedback por consola y LED opcional.
Compilación/flash/ejecución
No hay “flash” como tal; se ejecuta sobre Raspberry Pi OS. Repite los pasos exactamente.
1) Crear árbol, venv y paquetes con versiones fijadas:
mkdir -p ~/imu-vibration-anomaly-detection/{data,models,src}
python3 -m venv ~/imu-vibration-anomaly-detection/.venv
source ~/imu-vibration-anomaly-detection/.venv/bin/activate
python -m pip install --upgrade pip==24.2 setuptools==75.3.0 wheel==0.44.0
pip install numpy==2.1.2 scipy==1.14.1 scikit-learn==1.5.2 pandas==2.2.3 matplotlib==3.9.2 smbus2==0.5.1 gpiozero==1.6.2 joblib==1.4.2
2) Crear archivos fuente:
cat > ~/imu-vibration-anomaly-detection/src/imu_icm20948.py << 'PY'
[PEGA AQUÍ EL CONTENIDO DEL ARCHIVO 1 COMPLETO]
PY
cat > ~/imu-vibration-anomaly-detection/src/imu_vibration_anomaly.py << 'PY'
[PEGA AQUÍ EL CONTENIDO DEL ARCHIVO 2 COMPLETO]
PY
3) Validar I2C y WHO_AM_I (modo probe, 10–20 s de prueba):
cd ~/imu-vibration-anomaly-detection/src
python imu_vibration_anomaly.py --mode probe --addr 0x69 --fs 250 --duration 10
Deberías ver WHO_AM_I=0xEA y una tasa efectiva cercana a la configurada.
4) Entrenar baseline (ej. 120 s, 2 s por ventana):
python imu_vibration_anomaly.py --mode baseline --addr 0x69 --fs 250 --duration 120 --window 2.0 --model ../models/imu_iforest.joblib --csv ../data/baseline_raw.csv
Se guardará el modelo en models/imu_iforest.joblib y los datos crudos en data/baseline_raw.csv.
5) Detección en tiempo real (mismas condiciones de fs y window):
python imu_vibration_anomaly.py --mode detect --addr 0x69 --fs 250 --window 2.0 --model ../models/imu_iforest.joblib --csv ../data/stream_window.csv
Opcional con LED en GPIO17:
python imu_vibration_anomaly.py --mode detect --addr 0x69 --fs 250 --window 2.0 --model ../models/imu_iforest.joblib --led-pin 17
Notas:
– Si moviste el puente AD0 y ves 0x68 en i2cdetect, usa –addr 0x68.
– Mantén activado el entorno virtual en cada consola: source ~/.venv.../bin/activate.
Validación paso a paso
1) Verificación de hardware/I2C:
– Ejecuta i2cdetect -y 1. Debe aparecer 0x69 (o 0x68).
– Si no aparece:
– Revisa conexiones (3V3, GND, SDA, SCL).
– Asegura que I2C está habilitado.
– Verifica que tu usuario esté en el grupo i2c.
2) WHO_AM_I con el script:
– python src/imu_vibration_anomaly.py --mode probe --addr 0x69 --fs 250 --duration 5
– Debe imprimir WHO_AM_I=0xEA (esperado 0xEA) seguido de una tasa efectiva ~250 Hz ±10–20% (Linux no es RT).
3) Señales crudas plausibles:
– En reposo sobre mesa, aceleración cerca de (0, 0, ±9.8 m/s²) con pequeñas variaciones; giroscopio cerca de 0 dps.
– Mover la IMU debe cambiar valores apreciablemente.
4) Entrenamiento baseline:
– Coloca el IMU en la máquina o superficie en “estado normal” y evita golpes.
– Ejecuta el comando de baseline por 120 s (o la duración deseada).
– Salida esperada:
– “Ventanas de baseline: … (dim características: …)”
– Archivo modelo: models/imu_iforest.joblib creado (tamaño > 100 KB).
– CSV baseline: data/baseline_raw.csv creado (tamaño en función de fs y duración).
– Revisa data/baseline_raw.csv: columnas timestamp y 6 señales; sin NaN.
5) Detección:
– Ejecuta el modo detect y observa la consola.
– En condiciones normales, la mayoría de ventanas deben ser “NORMAL” con decision_function > 0.
– Provoca una anomalía: añade carga excéntrica, desequilibrio, toca o golpea ligeramente la estructura.
– Se deben imprimir eventos “ANOMALIA” con decision_function < 0 y, si LED conectado, encenderse.
6) Coherencia de parámetros:
– Confirma que fs y window en detect coinciden con los usados en baseline.
– Si cambias fs/window, vuelve a entrenar baseline.
7) Rendimiento:
– A 250 Hz, la CPU de la Pi 4 debe sostener la adquisición y cálculo en tiempo real sin saturarse.
– Si ves retardos, reduce fs (p. ej., 200 Hz) o aumenta window (2–3 s) para amortiguar jitter.
Troubleshooting
1) i2cdetect no muestra 0x69/0x68:
– Causa: I2C deshabilitado o conexiones incorrectas.
– Solución: Habilita I2C (raspi-config), reconecta SDA/SCL/3V3/GND, comprueba continuidad. Verifica pull-ups (el breakout Pimoroni suele integrarlos).
2) WHO_AM_I distinto de 0xEA:
– Causa: Dirección equivocada (AD0), bus incorrecto, IMU en mal estado.
– Solución: Prueba –addr 0x68 si AD0 está a GND; confirma con i2cdetect; revisa alimentación a 3.3V.
3) PermissionError al acceder a I2C:
– Causa: Usuario no pertenece al grupo i2c.
– Solución: sudo usermod -aG i2c $USER && newgrp i2c (o cierra y abre sesión).
4) ImportError de paquetes Python:
– Causa: No activaste el venv o versiones distintas a las esperadas.
– Solución: source ~/imu-vibration-anomaly-detection/.venv/bin/activate y reinstala con las versiones fijadas (pip==24.2, etc.).
5) Tasa efectiva (probe) muy inferior a la configurada:
– Causa: Sobrecarga CPU, otros procesos, throttling térmico/energético.
– Solución: Cerrar programas, bajar fs a 200–250 Hz, usar disipador/ventilador, alimentar con PSU oficial, opcional fijar governor “performance”.
6) Ventanas insuficientes o error “Insuficientes muestras…”:
– Causa: duration demasiado corto respecto a window.
– Solución: Aumentar duration o disminuir window; p. ej., window=2.0 s y duration≥20 s.
7) Modelo no detecta anomalías obvias:
– Causas: Baseline poco representativo, “contamination” demasiado alto/bajo, features no separan tu caso.
– Soluciones:
– Recolecta baseline más largo y estable.
– Ajusta contamination (0.01–0.05).
– Amplía features (más métricas espectrales; solape entre ventanas).
8) Señales saturadas o valores extraños:
– Causa: Vibraciones fuera de rango para ±2 g o ±250 dps.
– Solución: Ajustar escalas (FS) y DLPF del ICM-20948 (requeriría extender el driver para configurar Bank 2). Alternativamente, desacoplar ligeramente el sensor o reducir excitación.
Mejoras/variantes
- Ajuste fino del IMU:
- Configurar DLPF y ODR en Bank 2 para mejorar SNR en bandas específicas (p. ej., 20–300 Hz).
-
Cambiar full-scale a ±4 g/±500 dps para evitar saturación si vibraciones son intensas.
-
Ventaneo y extracción:
- Añadir solape (p. ej., 50%) para mayor resolución temporal.
-
Incorporar más features: energía en bandas, crest factor, kurtosis, skewness, entropías espectrales.
-
Modelos:
- Probar One-Class SVM o LOF.
-
Entrenar autoencoders 1D o CNN ligeras y ejecutar con TensorFlow Lite en la Pi.
-
Señalización:
- Publicar eventos por MQTT a un broker (ej. Mosquitto).
- Guardar timeseries en InfluxDB y visualizar con Grafana.
-
Añadir buzzer/LEDs multicolor y relé para interlock.
-
Robustez:
- Convertir este script en un servicio systemd que se levante al arranque.
- Registrar métricas y logs rotativos en /var/log/imu-vibe/.
-
Dockerizar el entorno para despliegues homogéneos.
-
Hardware:
- Si necesitas más inmunidad al ruido, usar cable apantallado para I2C y asegurar masa de referencia.
- Explorar SPI si el breakout lo soporta y el entorno EMI es fuerte (I2C es más sensible en cables largos).
Checklist de verificación
- [ ] Raspberry Pi OS Bookworm 64-bit instalado y actualizado.
- [ ] I2C habilitado (raspi-config o /boot/firmware/config.txt).
- [ ] Usuario en grupo i2c; i2cdetect muestra 0x69 (o 0x68).
- [ ] Entorno virtual creado y activado (Python 3.11).
- [ ] Paquetes instalados con versiones fijadas (pip==24.2, numpy==2.1.2, smbus2==0.5.1, scikit-learn==1.5.2, etc.).
- [ ] Código copiado en ~/imu-vibration-anomaly-detection/src/ (dos archivos).
- [ ] Modo probe exitoso: WHO_AM_I=0xEA y tasa efectiva razonable.
- [ ] Baseline recolectado y modelo guardado en models/imu_iforest.joblib.
- [ ] Detección en tiempo real funcionando; eventos “ANOMALIA” al provocar una perturbación.
- [ ] (Opcional) LED en GPIO17 enciende ante anomalía.
- [ ] Datos CSV guardados en data/ para auditoría.
Notas finales para el docente:
– Este caso práctico está alineado con el dispositivo especificado “Raspberry Pi 4 Model B + Pimoroni ICM-20948 9DoF IMU”, usando Raspberry Pi OS Bookworm 64-bit y Python 3.11. La toolchain se define con versiones exactas de pip y librerías para reproducibilidad.
– La metodología es extensible y prepara el terreno para Mantenimiento Predictivo (PdM): del muestreo con IMU a la detección de eventos y su integración con sistemas mayores.
Encuentra este producto y/o libros sobre este tema en Amazon
Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.



