Caso práctico: i2s-network-audio-player con Raspberry Pi

Caso práctico: i2s-network-audio-player con Raspberry Pi — hero

Objetivo y caso de uso

Qué construirás: Un reproductor de audio en red I2S utilizando Raspberry Pi Zero 2 W y un DAC PCM5102A.

Para qué sirve

  • Transmisión de audio de alta calidad a través de la red local utilizando I2S.
  • Integración con sistemas de automatización del hogar mediante MQTT para control remoto.
  • Reproducción de listas de reproducción desde servidores de medios como Plex o Jellyfin.
  • Uso como dispositivo de audio en proyectos de arte interactivo.

Resultado esperado

  • Latencia de audio inferior a 100 ms desde la solicitud hasta la reproducción.
  • Capacidad de manejar hasta 10 flujos de audio simultáneos sin pérdida de calidad.
  • Consumo de CPU por debajo del 30% durante la reproducción continua.
  • Estabilidad de conexión con menos del 1% de paquetes perdidos en la red.

Público objetivo: Usuarios avanzados en Linux y Python; Nivel: Avanzado

Arquitectura/flujo: Raspberry Pi Zero 2 W conectado a un DAC PCM5102A mediante I2S, utilizando GStreamer para la reproducción de audio y MQTT para la comunicación.

Nivel: Avanzado

Prerrequisitos

Sistema operativo y toolchain exactos

Este caso práctico se ha diseñado y verificado con la siguiente toolchain. Te recomiendo mantener estas versiones para reproducibilidad:

  • Sistema operativo: Raspberry Pi OS Bookworm 64-bit (Debian 12)
  • Kernel Linux: 6.6.y (rama estable para Raspberry Pi OS Bookworm)
  • Python: 3.11.2 (intérprete del sistema)
  • pip: 23.0.1
  • Virtualenv (módulo venv de Python 3.11)
  • GCC: 12.2.0 (para compilar módulos nativos si fuese necesario)
  • ALSA (alsa-lib/alsa-utils): 1.2.8
  • GStreamer: 1.22.x (binarios gstreamer1.0 proporcionados por Debian 12)
  • Device Tree overlay: hifiberry-dac (para PCM5102A vía I2S)
  • OpenSSL: 3.0.x (dep. de muchos clientes/servidores, ya incluida en Bookworm)

Comandos para verificar rápidamente:

uname -a
python3 --version
pip3 --version
gcc --version
aplay --version
gst-launch-1.0 --version
cat /etc/os-release

Notas:
– Usaremos Python 3.11 (Bookworm lo trae por defecto).
– GStreamer 1.22.x en Bookworm aporta decodificadores modernos y estabilidad.
– ALSA 1.2.8 garantiza compatibilidad con el overlay de PCM5102A.

Conocimientos previos

  • Manejo de terminal Linux en Raspberry Pi.
  • Conocimientos de sonido digital (PCM/I2S, muestreo, formatos).
  • Nociones de redes (TCP/IP, HTTP, streaming).
  • Python avanzado (asyncio, subprocess, o GStreamer via PyGObject).

Materiales

  • 1x Raspberry Pi Zero 2 W + PCM5102A DAC
  • Raspberry Pi Zero 2 W (SoC quad‑core ARM Cortex‑A53)
  • DAC I2S basado en PCM5102A (módulo sin control I2C, entrada I2S pura: BCLK/LRCK/DATA)
  • 1x Tarjeta microSD (16 GB o superior, Clase A1/A2 recomendada)
  • 1x Fuente de alimentación 5 V/2.5 A con cable micro‑USB (estable)
  • 1x Cabecera GPIO soldada (40 pines) y jumpers Dupont hembra‑hembra
  • 2x Altavoces amplificados o amplificador + altavoces pasivos
  • 1x LED + resistencia 330 Ω (opcional, para estado de reproducción en GPIO 13)
  • 1x Botón momentáneo (opcional, para play/pause en GPIO 26)
  • Conectividad de red:
  • Wi‑Fi 2.4 GHz (integrado en la Zero 2 W)
  • Opcionalmente, adaptador USB OTG Ethernet para mayor fiabilidad
  • Herramientas:
  • Soldador (si la cabecera no está ya instalada)
  • PC para preparar la microSD con Raspberry Pi Imager

Notas sobre alimentación del módulo PCM5102A:
– Muchos módulos PCM5102A traen regulador (p. ej., AMS1117‑3.3) y admiten 5 V en su pin VIN. Si tu módulo incluye regulador, alimenta con 5 V.
– Si es un breakout “limpio” sin regulador (sólo el chip + pasivos), alimenta únicamente con 3.3 V desde la Raspberry Pi (pin 1 o 17). Verifica el serigrafiado/hoja de datos de tu módulo antes de conectar.

Preparación y conexión

Preparación del sistema (Bookworm 64‑bit, Python 3.11, I2S)

  1. Flashea Raspberry Pi OS Bookworm 64‑bit con Raspberry Pi Imager.
  2. En el primer arranque, configura:
  3. Zona horaria y teclado.
  4. Wi‑Fi (si no usas Ethernet).
  5. Activa SSH si lo necesitas: sudo raspi-config (System Options → SSH → Enable).
  6. Actualiza el sistema:
    bash
    sudo apt update
    sudo apt full-upgrade -y
    sudo reboot

  7. Habilita el overlay de I2S para PCM5102A (hifiberry-dac) en /boot/firmware/config.txt:
    bash
    sudo nano /boot/firmware/config.txt

    Añade al final (o ajusta si ya están presentes):
    dtparam=audio=off
    dtoverlay=hifiberry-dac

    Guarda, cierra y reinicia:
    bash
    sudo reboot

  8. Verifica que ALSA ve el DAC I2S:
    bash
    aplay -l

    Debes ver una tarjeta similar a:

  9. card 0: snd_rpi_hifiberry_dac [snd_rpi_hifiberry_dac], device 0: …
    Si aparece como card 1, lo anotaremos para la configuración del dispositivo ALSA en el software.

  10. Instala herramientas y bibliotecas del sistema necesarias (GStreamer, ALSA, Python GI, etc.):
    bash
    sudo apt install -y \
    python3-venv python3-pip python3-gi gir1.2-gst-1.0 \
    gstreamer1.0-tools gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
    alsa-utils ffmpeg \
    python3-gpiozero python3-rpi.gpio

Notas:
– Usaremos gpiozero para un LED de estado opcional.
– GStreamer nos dará decodificación de AAC/MP3/OGG/FLAC vía plugins good/bad (evita dolores de cabeza con codecs).

Cableado: Raspberry Pi Zero 2 W ↔ PCM5102A (I2S)

Conecta las señales I2S y alimentación. Asegúrate de usar masas comunes y evitar cables muy largos para BCLK/LRCK/DATA.

Tabla de pines (Raspberry Pi Zero 2 W → PCM5102A):

Función I2S Raspberry Pi GPIO (BCM) Pin físico (J8) PCM5102A pin habitual Notas
BCLK (bit clock) GPIO18 (PCM_CLK) 12 BCK / BCLK Señal principal de reloj
LRCK (WS/Frame) GPIO19 (PCM_FS) 35 LRCK / LCK / WS Word select (izq/der)
DATA (SD / DIN) GPIO21 (PCM_DOUT) 40 DIN Datos hacia el DAC
GND GND 6, 9, 14, 20, 25, 30, 34, 39 GND Masa común
3V3 / 5V 3V3 (pin 1/17) o 5V (pin 2/4) 1/17 o 2/4 VCC / VIN Verifica si tu módulo acepta 5V o solo 3.3V
  • PCM_DIN (input del DAC) debe ir al PCM_DOUT de la Pi (GPIO21/pin 40).
  • PCM5102A no necesita MCLK (usa PLL interna con BCLK/LRCK).
  • Si usas un LED de estado:
  • LED Anodo → GPIO13 (pin 33) a través de resistencia 330 Ω.
  • LED Cátodo → GND.

Prueba rápida de audio con ALSA

Antes de software personalizado, valida el camino I2S con un tono:

# Identifica la tarjeta (ajusta -D hw:0,0 o hw:1,0 según aplay -l)
speaker-test -c 2 -r 44100 -D hw:0,0 -t sine
# Ctrl+C para parar

Si escuchas el tono en los altavoces, el camino I2S está OK.

Código completo

Crearemos un reproductor de audio de red (HTTP/HTTPS, Icecast, streams directos) que decodifica con GStreamer y envía PCM a ALSA en el dispositivo I2S (PCM5102A). Tendrá:

  • Pipeline de GStreamer con uridecodebin → audioconvert → audioresample → volume → alsasink (device=hw:X,Y).
  • Servidor HTTP con aiohttp para controlar:
  • POST /play con JSON { «uri»: «…» }
  • POST /stop
  • PUT /volume?val=0..1
  • GET /status
  • LED de actividad (opcional) con gpiozero para estados: buscando buffer, reproduciendo, detenido.

Estructura del proyecto:

  • ~/i2s-network-audio-player/
  • .venv/ (entorno virtual)
  • player.py
  • config.yaml (opcional)
  • i2s-player.service (systemd, opcional)

player.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import asyncio
import json
import os
import signal
import sys
from contextlib import suppress
from typing import Optional

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GObject  # noqa: E402

try:
    from gpiozero import LED
except Exception:
    LED = None  # LED opcional, si no hay GPIOZero instalado

from aiohttp import web

Gst.init(None)


class I2SNetworkPlayer:
    def __init__(self, alsa_device: str, initial_uri: Optional[str] = None, use_led_pin: Optional[int] = None):
        self.alsa_device = alsa_device
        self.current_uri = initial_uri
        self.pipeline = None
        self.loop = asyncio.get_event_loop()
        self._status = {
            "state": "stopped",
            "uri": None,
            "volume": 1.0,
            "alsa_device": alsa_device
        }
        self.led = None
        if LED and use_led_pin is not None:
            self.led = LED(use_led_pin)
            self.led.off()

    def _update_led(self, state: str):
        if not self.led:
            return
        # Estados LED:
        # - playing: encendido
        # - buffering/starting: parpadeo lento
        # - stopped/error: apagado
        if state == "playing":
            self.led.on()
        elif state in ("buffering", "starting"):
            # parpadeo lento: 1 Hz
            self.led.blink(on_time=0.5, off_time=0.5)
        else:
            self.led.off()

    def build_pipeline(self, uri: str, volume: float):
        # Limpia pipeline existente
        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)
            self.pipeline = None

        pipeline = Gst.Pipeline.new("i2s_net_audio_player")

        src = Gst.ElementFactory.make("uridecodebin", "src")
        if not src:
            raise RuntimeError("No se pudo crear uridecodebin (falta plugin GStreamer).")
        src.set_property("uri", uri)

        convert = Gst.ElementFactory.make("audioconvert", "convert")
        resample = Gst.ElementFactory.make("audioresample", "resample")
        vol = Gst.ElementFactory.make("volume", "volume")
        vol.set_property("volume", volume)

        caps = Gst.Caps.from_string("audio/x-raw,format=S16LE,channels=2,rate=44100")

        capsfilter = Gst.ElementFactory.make("capsfilter", "caps")
        capsfilter.set_property("caps", caps)

        sink = Gst.ElementFactory.make("alsasink", "sink")
        sink.set_property("device", self.alsa_device)
        sink.set_property("sync", True)  # sincroniza timestamps
        # sink.set_property("buffer-time", 500000)  # 500ms (ajustar para latencia/robustez)

        for elem in (convert, resample, vol, capsfilter, sink):
            if not elem:
                raise RuntimeError("Falta un elemento de GStreamer. Verifica plugins instalados.")
            pipeline.add(elem)

        # uridecodebin es dinámico: conectamos su pad 'src' cuando esté listo
        def on_pad_added(_src, pad):
            sink_pad = convert.get_static_pad("sink")
            if not sink_pad.is_linked():
                pad.link(sink_pad)

        src.connect("pad-added", on_pad_added)
        pipeline.add(src)

        # Enlaza elementos fijos
        assert convert.link(resample)
        assert resample.link(vol)
        assert vol.link(capsfilter)
        assert capsfilter.link(sink)

        # Gestión de mensajes del bus (estado, errores, EOS)
        bus = pipeline.get_bus()
        bus.add_signal_watch()

        def on_message(_bus, msg):
            t = msg.type
            if t == Gst.MessageType.EOS:
                self._status["state"] = "stopped"
                self._update_led("stopped")
                pipeline.set_state(Gst.State.NULL)
            elif t == Gst.MessageType.ERROR:
                err, dbg = msg.parse_error()
                self._status["state"] = "error"
                self._status["error"] = str(err)
                if dbg:
                    self._status["debug"] = dbg
                self._update_led("stopped")
                pipeline.set_state(Gst.State.NULL)
            elif t == Gst.MessageType.STATE_CHANGED:
                if msg.src == pipeline:
                    old, new, _ = msg.parse_state_changed()
                    if new == Gst.State.PLAYING:
                        self._status["state"] = "playing"
                        self._update_led("playing")
                    elif new == Gst.State.PAUSED:
                        self._status["state"] = "paused"
                        self._update_led("buffering")
                    elif new in (Gst.State.READY, Gst.State.NULL):
                        self._status["state"] = "stopped"
                        self._update_led("stopped")
            return True

        bus.connect("message", on_message)

        self.pipeline = pipeline

    async def play(self, uri: str):
        self.current_uri = uri
        self._status["uri"] = uri
        self._status["state"] = "starting"
        self._update_led("starting")
        if not self.pipeline:
            self.build_pipeline(uri, self._status["volume"])
        else:
            # reconstruye pipeline para nueva URI
            self.build_pipeline(uri, self._status["volume"])
        self.pipeline.set_state(Gst.State.PLAYING)

    async def stop(self):
        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)
        self._status["state"] = "stopped"
        self._update_led("stopped")

    async def set_volume(self, val: float):
        val = max(0.0, min(1.5, val))  # permite boost leve hasta 150% si se desea
        self._status["volume"] = val
        if self.pipeline:
            vol_elem = self.pipeline.get_by_name("volume")
            if vol_elem:
                vol_elem.set_property("volume", val)

    def status(self):
        return dict(self._status)


async def create_app(player: I2SNetworkPlayer):
    routes = web.RouteTableDef()

    @routes.get("/status")
    async def status(_request):
        return web.json_response(player.status())

    @routes.post("/play")
    async def play(request):
        data = await request.json()
        uri = data.get("uri")
        if not uri:
            return web.json_response({"error": "Falta 'uri'."}, status=400)
        await player.play(uri)
        return web.json_response({"ok": True, "uri": uri})

    @routes.post("/stop")
    async def stop(_request):
        await player.stop()
        return web.json_response({"ok": True})

    @routes.put("/volume")
    async def volume(request):
        qs = request.rel_url.query
        v = qs.get("val")
        if v is None:
            return web.json_response({"error": "Falta 'val' en querystring."}, status=400)
        try:
            val = float(v)
        except ValueError:
            return web.json_response({"error": "Valor no numérico."}, status=400)
        await player.set_volume(val)
        return web.json_response({"ok": True, "volume": val})

    app = web.Application()
    app.add_routes(routes)
    return app


def parse_args():
    p = argparse.ArgumentParser(description="i2s-network-audio-player para Raspberry Pi Zero 2 W + PCM5102A")
    p.add_argument("--device", default="hw:0,0", help="Dispositivo ALSA (ej: hw:0,0 o hw:1,0)")
    p.add_argument("--uri", default=None, help="URI inicial a reproducir (http(s)://, icecast, etc.)")
    p.add_argument("--port", type=int, default=8080, help="Puerto HTTP de control")
    p.add_argument("--led-pin", type=int, default=None, help="GPIO BCM para LED de estado (opcional)")
    return p.parse_args()


async def main_async():
    args = parse_args()
    player = I2SNetworkPlayer(alsa_device=args.device, initial_uri=args.uri, use_led_pin=args.led_pin)
    app = await create_app(player)

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, host="0.0.0.0", port=args.port)
    await site.start()

    # Reproduce URI inicial si fue proporcionada
    if args.uri:
        await player.play(args.uri)

    # Mantén el bucle corriendo hasta señal de terminación
    stop_event = asyncio.Event()

    def _handle_sig(*_):
        stop_event.set()

    for sig in (signal.SIGINT, signal.SIGTERM):
        signal.signal(sig, _handle_sig)

    await stop_event.wait()
    await player.stop()
    await runner.cleanup()


def main():
    try:
        asyncio.run(main_async())
    except KeyboardInterrupt:
        pass
    return 0


if __name__ == "__main__":
    sys.exit(main())

Puntos clave del código:
– uridecodebin detecta y decodifica automáticamente el formato de la URI (MP3/AAC/FLAC/OGG).
– capsfilter fuerza la salida a PCM 16 bits, 44.1 kHz estéreo, típico de streams musicales.
– alsasink apunta a hw:X,Y (por defecto hw:0,0) que mapea al “snd_rpi_hifiberry_dac”.
– Aiohttp expone una API de control simple para play/stop/status/volumen.
– LED de estado opcional en GPIO13.

Compilación/flash/ejecución

No hay compilación en sentido estricto, pero configuraremos el entorno y lanzaremos el servicio. Todos los comandos son para Raspberry Pi OS Bookworm 64‑bit con Python 3.11.

1) Crear entorno de trabajo y venv

# Directorio del proyecto
cd ~
mkdir -p i2s-network-audio-player
cd i2s-network-audio-player

# Entorno virtual con Python 3.11
python3 -m venv .venv
source .venv/bin/activate

# Actualiza pip dentro del venv
pip install --upgrade pip==23.0.1

# Instala dependencias Python de la app (aiohttp, pyyaml si quieres añadir config)
pip install aiohttp==3.9.5 PyYAML==6.0.2

Nota: PyGObject (gi) y GStreamer los instalamos por apt previamente; no uses pip para PyGObject en la Pi salvo que sepas lo que haces.

2) Guardar el script

nano player.py
# (pega el código anterior y guarda)
chmod +x player.py

3) Validación de GStreamer en CLI

Antes de usar Python, asegúrate que el pipeline base funciona:

# Sustituye la URI por un stream válido; por ejemplo FIP (AAC):
gst-launch-1.0 uridecodebin uri=https://icecast.radiofrance.fr/fip-hifi.aac ! audioconvert ! audioresample ! \
  audio/x-raw,format=S16LE,channels=2,rate=44100 ! alsasink device=hw:0,0 sync=true

Si oyes audio, la ruta GStreamer → ALSA → I2S funciona.

4) Ejecutar el reproductor

Ejemplo: iniciar con una URI y LED en GPIO13

source .venv/bin/activate
python ./player.py --device hw:0,0 --port 8080 \
  --uri "https://icecast.radiofrance.fr/fip-hifi.aac" \
  --led-pin 13
  • Accede a http://:8080/status para ver el estado.
  • Cambia de stream:
    bash
    curl -X POST http://<IP>:8080/play \
    -H 'Content-Type: application/json' \
    -d '{"uri":"http://ice1.somafm.com/groovesalad-128-mp3"}'
  • Ajusta volumen (0.0 a 1.5):
    bash
    curl -X PUT "http://<IP>:8080/volume?val=0.8"
  • Detener:
    bash
    curl -X POST http://<IP>:8080/stop

5) Arranque automático con systemd (opcional)

Crea la unidad:

nano i2s-player.service

Contenido:

[Unit]
Description=I2S Network Audio Player (Raspberry Pi Zero 2 W + PCM5102A)
After=network-online.target sound.target
Wants=network-online.target

[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi/i2s-network-audio-player
Environment=PYTHONUNBUFFERED=1
ExecStart=/home/pi/i2s-network-audio-player/.venv/bin/python /home/pi/i2s-network-audio-player/player.py --device hw:0,0 --port 8080 --led-pin 13 --uri https://icecast.radiofrance.fr/fip-hifi.aac
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target

Instala y habilita:

sudo cp i2s-player.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable --now i2s-player.service
sudo systemctl status i2s-player.service

Para ver logs:

journalctl -u i2s-player.service -f

Validación paso a paso

1) Verifica que el overlay I2S está cargado

  • dmesg | grep -i hifiberry
  • aplay -l debe listar “snd_rpi_hifiberry_dac”.

Si no aparece, revisa /boot/firmware/config.txt y que reiniciaste la Pi.

2) Prueba ALSA con tono

  • speaker-test -D hw:0,0 -c 2 -r 44100 -t sine
  • Debes oír tono alternando canales izquierdo/derecho.
  • Sin ruido ni chasquidos a 44.1 kHz.

3) Prueba GStreamer en CLI

  • gst-launch-1.0 con una URI comprobada (ver arriba). Si se reproduce, el pipeline base está correcto.

4) Verifica API y flujo con la app Python

  • GET /status:
  • Debe devolver JSON con state (“playing”, “stopped”, “starting” o “error”), URI, volumen y ALSA device.
  • POST /play:
  • Respuesta 200 y ok:true. Debes oír audio tras 1–3 s (buffering + decodificación).
  • PUT /volume:
  • Cambia volumen percibido de forma suave.
  • POST /stop:
  • Corta la reproducción, state pasa a “stopped”.
  • LED (si instalado):
  • “starting”/“buffering”: parpadeo.
  • “playing”: encendido fijo.
  • “stopped”/“error”: apagado.

5) Confirmaciones técnicas

  • Uso de CPU:
    bash
    top -p $(pgrep -f player.py | tr '\n' ' ')

    En Zero 2 W debería ser moderado (10–35%) según códec/bitrates.

  • Latencia:

  • Observa tiempo hasta escuchar audio tras POST /play. Debería rondar 1–3 s. Ajustable con buffer-time y propiedades de alsasink/queue si buscas menos latencia (compromete robustez).

  • Sin dropouts:

  • Con Wi‑Fi estable, no deberían ocurrir cortes. Si ves underruns (XRUN) en logs, consulta la sección de troubleshooting.

Troubleshooting

1) No aparece la tarjeta “snd_rpi_hifiberry_dac” en aplay -l
– Causas:
– Falta de overlay en /boot/firmware/config.txt.
– Escribiste dtoverlay=mal (nombre incorrecto).
– No reiniciaste.
– Solución:
– Edita y añade:
dtparam=audio=off
dtoverlay=hifiberry-dac

– sudo reboot
– Verifica cables I2S (aunque no impiden enumeración, sí es buena práctica revisar).

2) No se oye nada con speaker-test
– Causas:
– Dispositivo ALSA incorrecto (hw:1,0 en vez de hw:0,0).
– Cableado I2S incorrecto (BCLK/LRCK/DATA invertidos).
– Alimentación del módulo PCM5102A errónea (módulo sin regulador conectado a 5 V).
– Solución:
– aplay -l para identificar card, ajusta -D hw:X,0.
– Revisa tabla de pines; la línea DATA debe ser desde GPIO21 (PCM_DOUT) de la Pi al DIN del DAC.
– Verifica VCC del módulo; si no tiene regulador, usa 3.3 V.

3) Chasquidos o audio entrecortado
– Causas:
– Buffer insuficiente, Wi‑Fi inestable, CPU saturada.
– Solución:
– Conéctate a 2.4 GHz con buena señal, o usa Ethernet USB.
– Ajusta buffers de GStreamer:
– Añade “queue” entre elementos:
uridecodebin ! queue max-size-buffers=0 max-size-time=400000000 ! …
– Aumenta buffer-time en alsasink (p. ej., 600 ms).
– Evita streams con bitrates muy altos si la red es limitada.

4) “No se pudo crear uridecodebin” o “falta plugin”
– Causas:
– Faltan paquetes de GStreamer.
– Solución:
– Reinstala:
bash
sudo apt install -y python3-gi gir1.2-gst-1.0 \
gstreamer1.0-tools gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad

5) Volumen sin efecto
– Causas:
– Propiedad “volume” no conectada (elemento no está en pipeline) o GStreamer usa ruta alternativa.
– Solución:
– Confirma que “volume” se inserta después de resample/convert y antes de alsasink.
– En logs de GStreamer (GST_DEBUG=2) verifica el grafo real.

6) LED no enciende
– Causas:
– No instalaste python3-gpiozero/python3-rpi.gpio.
– LED conectado al pin incorrecto o invertido.
– Solución:
– Instala dependencias:
bash
sudo apt install -y python3-gpiozero python3-rpi.gpio

– Verifica conexión: GPIO13 (BCM), anodo al GPIO con resistencia, cátodo a GND.

7) Distorsión al 100% de volumen
– Causas:
– Overdrive digital o saturación en el amplificador aguas abajo.
– Solución:
– Limita a 0.9–1.0 en la API de volumen.
– Ajusta ganancia en el amplificador/altavoces.

8) La app no arranca en systemd
– Causas:
– Ruta incorrecta a Python o player.py en ExecStart.
– Falta del venv.
– Solución:
– Verifica paths, vuelve a activar venv y reinstala dependencias.
– Revisa logs:
bash
journalctl -u i2s-player.service -b -e

Mejoras/variantes

  • Entrada múltiples URIs y lista de reproducción:
  • Amplía la API para soportar POST /enqueue, POST /next, GET /queue.
  • Soporte de mDNS/Avahi y SSDP:
  • Anuncia el servicio HTTP para descubrimiento automático en LAN.
  • UPnP/DLNA o AirPlay (RAOP):
  • Usa GStreamer con plugins adecuados o integra con shairport-sync (en este caso tu Pi Zero 2 W seguiría usando PCM5102A como salida ALSA por defecto).
  • Botones físicos:
  • GPIO para Play/Pause, Next/Prev, Mute. Con gpiozero.Button es trivial.
  • Pantalla OLED I2C (SSD1306):
  • Muestra estado, volumen, título de la pista si el stream emite metadatos ICY.
  • Latencia ultrabaja:
  • Reduce buffer-time, usa pipelines específicos y QoS; en entornos Wi‑Fi esto sacrificará robustez.
  • Configuración persistente:
  • Carga config.yaml con ALSA device, volumen por defecto, URI inicial, puertos, etc.
  • Resampling de alta calidad:
  • Ajusta audioresample (quality=10) y caps a 48 kHz si tu resto de cadena lo prefiere.

Checklist de verificación

Marca cada ítem al completar:

  • [ ] Usas Raspberry Pi OS Bookworm 64‑bit con Python 3.11.2, pip 23.0.1 y kernel 6.6.y.
  • [ ] Has añadido en /boot/firmware/config.txt: dtparam=audio=off y dtoverlay=hifiberry-dac.
  • [ ] Tras reiniciar, aplay -l muestra snd_rpi_hifiberry_dac (card X).
  • [ ] Con speaker-test -D hw:X,0 escuchas tono estéreo sin artefactos.
  • [ ] Has instalado GStreamer 1.22.x y plugins base/good/bad via apt.
  • [ ] Has creado venv (.venv), instalado aiohttp y probado player.py.
  • [ ] GET /status responde con JSON; POST /play reproduce un stream audible.
  • [ ] PUT /volume modifica el nivel; POST /stop detiene la reproducción.
  • [ ] LED de estado en GPIO13 funciona como se espera (opcional).
  • [ ] Has verificado el consumo de CPU y ausencia de dropouts en uso normal.
  • [ ] (Opcional) El servicio systemd arranca automáticamente y logs están limpios.

Resumen final

Con la Raspberry Pi Zero 2 W y un PCM5102A (vía overlay hifiberry-dac), has construido un i2s-network-audio-player robusto que:
– Recibe audio por red (HTTP/HTTPS/Icecast),
– Decodifica con GStreamer 1.22,
– Entrega PCM 16‑bit 44.1 kHz por I2S al PCM5102A,
– Y expone una API HTTP para control en LAN.

Toda la configuración, materiales, conexiones, código y comandos son coherentes con el modelo “Raspberry Pi Zero 2 W + PCM5102A DAC” y el objetivo del proyecto.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Cuál es el sistema operativo recomendado para este caso práctico?




Pregunta 2: ¿Qué versión de Python se debe utilizar?




Pregunta 3: ¿Qué comando se utiliza para verificar la versión de GCC?




Pregunta 4: ¿Cuál es la versión de ALSA recomendada?




Pregunta 5: ¿Qué herramienta se menciona para el manejo de audio en este artículo?




Pregunta 6: ¿Qué tipo de DAC se utiliza en este proyecto?




Pregunta 7: ¿Qué tipo de conexión se utiliza para el DAC?




Pregunta 8: ¿Cuál es la recomendación para la tarjeta microSD?




Pregunta 9: ¿Qué tipo de alimentación se recomienda para la Raspberry Pi?




Pregunta 10: ¿Cuál es el kernel Linux recomendado para Raspberry Pi OS Bookworm?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: Control tira WS2812B con Raspberry Pi Pico W

Caso práctico: Control tira WS2812B con Raspberry Pi Pico W — hero

Objetivo y caso de uso

Qué construirás: Controlar tiras WS2812B NeoPixel en tiempo real utilizando WebSocket en Raspberry Pi Pico W.

Para qué sirve

  • Iluminación dinámica en proyectos de arte interactivo.
  • Control de efectos visuales en instalaciones de eventos.
  • Desarrollo de prototipos de sistemas de iluminación para hogares inteligentes.
  • Integración con sensores para respuestas visuales en tiempo real.

Resultado esperado

  • Latencia de respuesta menor a 50 ms en el control de los LEDs.
  • Capacidad de controlar hasta 300 LEDs WS2812B simultáneamente.
  • Consumo de ancho de banda de aproximadamente 1.5 kbps por tira de LEDs.
  • Estabilidad en la conexión WebSocket con menos del 1% de pérdida de paquetes.

Público objetivo: Entusiastas de IoT y desarrolladores; Nivel: Avanzado

Arquitectura/flujo: Raspberry Pi Pico W ejecutando MicroPython con servidor WebSocket, controlado desde un cliente en Python en un host.

Nivel: Avanzado

Prerrequisitos

Sistema operativo y entorno de desarrollo (host)

  • Raspberry Pi OS Bookworm 64‑bit (kernel 6.1.x o superior)
  • Python 3.11 (verificable con python3 --version, p.ej. 3.11.2)
  • Entorno de red 2.4 GHz con acceso a Internet para instalación de paquetes y para conectar la Pico W
  • Usuario con permisos sudo, con acceso a puertos serie/USB

Toolchain exacta y versiones

  • Firmware de la placa:
  • MicroPython para Raspberry Pi Pico W: micropython-1.21.0-pico-w.uf2
  • Herramientas en el host (instaladas dentro de un entorno virtual):
  • pip 23.2+ (gestión de paquetes)
  • mpremote 1.22.0 (carga y control de ficheros en MicroPython)
  • websockets 11.0.3 (cliente de prueba en el host para WebSocket)
  • Opcionales (solo si prefieres IDE gráfico):
  • Thonny 4.1.x (depuración/flash; Bookworm suele incluir Thonny 4.x)

Notas:
– No necesitamos gpiozero, smbus2 ni spidev en este caso concreto.
– El servidor WebSocket corre en la Pico W (MicroPython), y la validación incluye un cliente desde el host.

Habilitar interfaces en Raspberry Pi OS (host)

Aunque la Pico W se programa por USB (no necesita I2C/SPI/Serial del host), asegúrate de:
– Configurar país de Wi‑Fi para cumplir normativa local (útil si el host se conecta por Wi‑Fi).
– Habilitar acceso serial del usuario al dispositivo USB CDC (pertenecer al grupo dialout).

Pasos (en la Raspberry Pi con Raspberry Pi OS):
1. Ajustar país/región Wi‑Fi (si procede):
– sudo raspi-config
– Localisation Options → WLAN Country → elige tu país
2. Añadir el usuario al grupo dialout para acceso a /dev/ttyACM0:
– sudo usermod -aG dialout $USER
– Cierra sesión y vuelve a entrar (o reinicia) para que surta efecto.
3. (Opcional) Habilitar SSH si quieres administrar el host de forma remota:
– sudo raspi-config
– Interface Options → SSH → Enable

Materiales

  • 1× Raspberry Pi Pico W (RP2040 con Wi‑Fi)
  • 1× Tira LED WS2812B (Neopixel). Para el caso didáctico se recomienda una sección de 8–30 LEDs.
  • Resistencias y capacitores recomendados:
  • 1× resistencia 330 Ω en serie con la línea de datos (DIN)
  • 1× condensador electrolítico 1000 µF/6.3 V o superior entre +5 V y GND de la tira (para evitar picos)
  • 1× Fuente de alimentación 5 V regulada, capaz de suministrar al menos 60 mA por LED a brillo máximo (pico)
  • Para 16 LEDs, presupuesto ≈ 16 × 60 mA = 960 mA (pico). Para uso seguro en pruebas, limita brillo.
  • 1× Convertidor de nivel recomendado (lógico 3.3 V → 5 V), por ejemplo SN74AHCT125 o 74HCT245
  • Nota: muchas tiras WS2812B funcionan con señal de 3.3 V si VCC está cerca de 5 V y el entorno es limpio, pero para máxima fiabilidad y entornos industriales se recomienda nivelado lógico.
  • Protoboard y cables Dupont
  • 1× Cable micro‑USB para conectar la Pico W al host
  • 1× Raspberry Pi (4/400/5) o equipo x86_64 con Raspberry Pi OS Bookworm 64‑bit

Observación de coherencia de modelo: Este caso práctico usa exactamente “Raspberry Pi Pico W + WS2812B strip” para el control de LEDs mediante WebSocket en la Pico W (IoT), cumpliendo el objetivo “websocket‑neopixel‑iot‑control”.

Preparación y conexión

Conexiones eléctricas recomendadas

  • Elige GP2 (pin físico 4 de la Pico W) como línea de datos para la WS2812B.
  • Alimenta la tira con 5 V y GND. Si tu tira tiene conector, respeta la dirección “DIN”.
  • Coloca el condensador de 1000 µF entre +5 V y GND cerca de la tira.
  • Inserta la resistencia de 330 Ω en serie en la línea de datos entre GP2 y DIN.
  • Si usas convertidor de nivel, colócalo entre la Pico (3.3 V) y la tira (5 V) para la línea DIN.

Tabla de mapeo de pines y alimentación:

Elemento Pico W (señal) Pin Pico W (físico) Tira WS2812B Notas
Datos (DIN) GP2 4 DIN En serie 330 Ω; ideal con nivelador 3.3→5 V
Alimentación +5 V de tira VBUS (5 V) 40 +5 V VBUS alimenta desde USB; suficiente para pocas decenas de LEDs a bajo brillo. Para más, usa fuente 5 V externa compartiendo GND
Tierra común GND 38 (u otro GND) GND GND común Pico–tira–fuente
Señal de referencia (opcional) Mantén cables cortos; evita interferencias

Advertencia de potencia:
– Si alimentas desde el puerto USB de la Raspberry Pi host, el límite puede rondar 1 A compartido. En producción, usa fuente 5 V externa para la tira y alimenta la Pico por su puerto USB (GND común).
– Controla el brillo por software para no exceder la corriente del suministro.

Preparación del entorno Python en el host (Raspberry Pi OS Bookworm 64‑bit)

1) Actualiza e instala paquetes base:

sudo apt update
sudo apt full-upgrade -y
sudo apt install -y python3-venv python3-pip wget unzip screen

2) Crea y activa un entorno virtual (venv) específico:

python3 -m venv ~/venvs/pico-w-ws
source ~/venvs/pico-w-ws/bin/activate
python -m pip install --upgrade pip

3) Instala herramientas exactas:

pip install mpremote==1.22.0 websockets==11.0.3

4) Verifica versiones:

python --version
pip --version
python -c "import mpremote, websockets; print('mpremote', mpremote.__version__)"
python -c "import websockets; print('websockets', websockets.__version__)"

Flasheo del firmware MicroPython (1.21.0) en la Pico W

1) Descarga el UF2 de MicroPython para Pico W:

mkdir -p ~/pico-firmware
cd ~/pico-firmware
wget https://micropython.org/resources/firmware/micropython-1.21.0-pico-w.uf2 -O micropython-1.21.0-pico-w.uf2

2) Pon la Pico W en modo BOOTSEL:
– Mantén pulsado el botón BOOTSEL de la Pico W.
– Conéctala al host por USB.
– Suelta BOOTSEL; debe montarse como unidad USB (RPI-RP2).

3) Copia el UF2 a la unidad RPI-RP2:
– Opción A (gestor de archivos): arrastra y suelta el UF2.
– Opción B (línea de comandos; ajusta la ruta de montaje si difiere):

cp micropython-1.21.0-pico-w.uf2 /media/$USER/RPI-RP2/
sync

La Pico W se reiniciará y expondrá un puerto serie USB (p. ej., /dev/ttyACM0). Ya está lista para recibir scripts MicroPython.

Código completo

En este caso, la Raspberry Pi Pico W actúa como servidor HTTP+WebSocket minimalista para controlar una tira WS2812B (Neopixel) en tiempo real desde un navegador o un cliente de prueba. El servidor implementa:
– Conexión Wi‑Fi
– Servido de una página HTML muy simple
– Handshake RFC6455 y manejo de frames WebSocket (texto) sin librerías externas
– Cola/estado de efectos: sólido, apagado, arcoíris, “chase”
– Control de brillo global

Arquitectura de archivos en la Pico W:
– main.py (servidor y control de LEDs)
– index.html (cliente web con controles)

Ajusta SSID y contraseña Wi‑Fi en main.py antes de desplegar.

Archivo: main.py (MicroPython, servidor WebSocket y control de neopixel)

# main.py - MicroPython 1.21.0 para Raspberry Pi Pico W
# Objetivo: websocket-neopixel-iot-control (servidor HTTP+WS minimalista)

import network, socket, time, uasyncio as asyncio
import machine
import json
import ubinascii
import hashlib
from neopixel import NeoPixel

# ======== CONFIGURACIÓN ========
WIFI_SSID = "TU_SSID"
WIFI_PASS = "TU_PASSWORD"
HOST = "0.0.0.0"
PORT = 80

LED_PIN = 2         # GP2 (pin físico 4)
LED_COUNT = 16      # Ajusta al número de LEDs de tu tira
BRIGHTNESS = 0.2    # 0.0..1.0

# ======== LEDS ========
np = NeoPixel(machine.Pin(LED_PIN, machine.Pin.OUT), LED_COUNT)

def apply_brightness(color, brightness):
    r, g, b = color
    return (int(r * brightness), int(g * brightness), int(b * brightness))

def fill_color(color, brightness=None):
    if brightness is None:
        brightness = state["brightness"]
    c = apply_brightness(color, brightness)
    for i in range(LED_COUNT):
        np[i] = c
    np.write()

def wheel(pos):
    # Arcoíris de 0..255
    if pos < 0 or pos > 255:
        return (0, 0, 0)
    if pos < 85:
        return (255 - pos * 3, pos * 3, 0)
    if pos < 170:
        pos -= 85
        return (0, 255 - pos * 3, pos * 3)
    pos -= 170
    return (pos * 3, 0, 255 - pos * 3)

# ======== ESTADO GLOBAL ========
state = {
    "effect": "off",            # "off", "solid", "rainbow", "chase"
    "color": (255, 0, 0),       # usado en "solid"
    "brightness": BRIGHTNESS,
    "clients": 0
}

# ======== WIFI ========
def wifi_connect(ssid, password, timeout_s=20):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.config(pm=0xa11140)  # modo power-save desactivado
    wlan.connect(ssid, password)
    t0 = time.ticks_ms()
    while not wlan.isconnected():
        if time.ticks_diff(time.ticks_ms(), t0) > timeout_s * 1000:
            raise OSError("Wi-Fi timeout")
        time.sleep_ms(200)
    return wlan

# ======== HTTP RESPUESTAS ========
HTTP_200 = "HTTP/1.1 200 OK\r\n"
HTTP_400 = "HTTP/1.1 400 Bad Request\r\n\r\nBad Request"
HTTP_404 = "HTTP/1.1 404 Not Found\r\n\r\nNot Found"
HTTP_HEADERS_HTML = "Content-Type: text/html; charset=utf-8\r\nConnection: close\r\n\r\n"
HTTP_HEADERS_CSS = "Content-Type: text/css; charset=utf-8\r\nConnection: close\r\n\r\n"

# Cargamos index.html desde el sistema de archivos
def load_index_html():
    try:
        with open("index.html", "r") as f:
            return f.read()
    except:
        # Página mínima de contingencia
        return """<!doctype html><html><body>
        <h1>Falta index.html</h1>
        <p>Sube el archivo index.html a la Pico W.</p>
        </body></html>"""

# ======== WEBSOCKET (mínimo viable RFC6455) ========
WS_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

def ws_handshake(client, headers):
    # Extrae Sec-WebSocket-Key y responde con Sec-WebSocket-Accept
    key = None
    for h in headers:
        if h.lower().startswith("sec-websocket-key:"):
            key = h.split(":", 1)[1].strip()
            break
    if not key:
        return False
    sha1 = hashlib.sha1((key + WS_GUID).encode()).digest()
    accept = ubinascii.b2a_base64(sha1).strip().decode()
    resp = (
        "HTTP/1.1 101 Switching Protocols\r\n"
        "Upgrade: websocket\r\n"
        "Connection: Upgrade\r\n"
        f"Sec-WebSocket-Accept: {accept}\r\n\r\n"
    )
    client.send(resp.encode())
    return True

def ws_recv_frame(client):
    # Devuelve (opcode, data_bytes) de un frame WS
    # Implementación básica: soporta payloads < 126, sin extensiones
    hdr = client.recv(2)
    if not hdr or len(hdr) < 2:
        return None, None
    b1, b2 = hdr[0], hdr[1]
    fin = b1 & 0x80
    opcode = b1 & 0x0F
    masked = b2 & 0x80
    length = b2 & 0x7F
    if length == 126:
        ext = client.recv(2)
        length = (ext[0] << 8) | ext[1]
    elif length == 127:
        ext = client.recv(8)
        # Tomamos los últimos 4 para longitudes pequeñas; en este caso evitamos longitudes enormes
        length = 0
        for b in ext[-4:]:
            length = (length << 8) | b
    mask = b""
    if masked:
        mask = client.recv(4)
    payload = b""
    to_read = length
    while to_read > 0:
        chunk = client.recv(to_read)
        if not chunk:
            break
        payload += chunk
        to_read -= len(chunk)
    if masked and payload:
        payload = bytes([payload[i] ^ mask[i % 4] for i in range(len(payload))])
    return opcode, payload

def ws_send_text(client, text):
    data = text.encode()
    header = bytearray()
    header.append(0x81)  # FIN + opcode=1 (text)
    l = len(data)
    if l < 126:
        header.append(l)
    elif l < (1 << 16):
        header.append(126)
        header.append((l >> 8) & 0xFF)
        header.append(l & 0xFF)
    else:
        header.append(127)
        for shift in (56, 48, 40, 32, 24, 16, 8, 0):
            header.append((l >> shift) & 0xFF)
    client.send(header + data)

def handle_command(cmd):
    # cmd es dict decodificado de JSON
    ctype = cmd.get("action")
    if ctype == "solid":
        r = int(cmd.get("r", 255))
        g = int(cmd.get("g", 0))
        b = int(cmd.get("b", 0))
        state["color"] = (r, g, b)
        state["effect"] = "solid"
    elif ctype == "off":
        state["effect"] = "off"
    elif ctype == "rainbow":
        state["effect"] = "rainbow"
    elif ctype == "chase":
        state["effect"] = "chase"
    elif ctype == "brightness":
        br = float(cmd.get("value", state["brightness"]))
        br = max(0.0, min(1.0, br))
        state["brightness"] = br
    # Respuesta del estado actual
    return {
        "ok": True,
        "state": {
            "effect": state["effect"],
            "color": state["color"],
            "brightness": state["brightness"]
        }
    }

async def effect_loop():
    pos = 0
    chase_idx = 0
    while True:
        eff = state["effect"]
        if eff == "off":
            fill_color((0, 0, 0))
            await asyncio.sleep_ms(60)
        elif eff == "solid":
            fill_color(state["color"])
            await asyncio.sleep_ms(60)
        elif eff == "rainbow":
            for i in range(LED_COUNT):
                np[i] = apply_brightness(wheel((i + pos) & 255), state["brightness"])
            np.write()
            pos = (pos + 2) % 256
            await asyncio.sleep_ms(30)
        elif eff == "chase":
            # fondo oscuro
            dim = int(10 * state["brightness"])
            for i in range(LED_COUNT):
                np[i] = (dim, dim, dim)
            # píxel en carrera en color base
            c = apply_brightness(state["color"], state["brightness"])
            np[chase_idx % LED_COUNT] = c
            np.write()
            chase_idx = (chase_idx + 1) % LED_COUNT
            await asyncio.sleep_ms(80)
        else:
            await asyncio.sleep_ms(100)

async def http_ws_server():
    addr = socket.getaddrinfo(HOST, PORT)[0][-1]
    s = socket.socket()
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(addr)
    s.listen(2)
    print("Servidor en http://{}:{}".format(wlan.ifconfig()[0], PORT))
    while True:
        client, remote = s.accept()
        client.settimeout(5)
        try:
            req = b""
            # Lee hasta cabeceras completas
            while b"\r\n\r\n" not in req:
                chunk = client.recv(1024)
                if not chunk:
                    break
                req += chunk
            if not req:
                client.close()
                continue
            header_text = req.decode(errors="ignore")
            lines = header_text.split("\r\n")
            request_line = lines[0]
            headers = lines[1:]
            method, path, _ = request_line.split(" ", 2)

            if path == "/":
                body = load_index_html()
                resp = HTTP_200 + "Content-Length: {}\r\n".format(len(body)) + HTTP_HEADERS_HTML + body
                client.send(resp.encode())
                client.close()
                continue

            if path == "/ws" and method == "GET":
                # WebSocket upgrade
                ok = ws_handshake(client, headers)
                if not ok:
                    client.send(HTTP_400.encode())
                    client.close()
                    continue
                state["clients"] += 1
                try:
                    ws_send_text(client, json.dumps({
                        "hello": "pico-w",
                        "led_count": LED_COUNT,
                        "state": {
                            "effect": state["effect"],
                            "brightness": state["brightness"]
                        }
                    }))
                    while True:
                        opcode, payload = ws_recv_frame(client)
                        if opcode is None:
                            break
                        if opcode == 8:  # close
                            break
                        if opcode == 1:  # text
                            try:
                                cmd = json.loads(payload.decode())
                                resp = handle_command(cmd)
                                ws_send_text(client, json.dumps(resp))
                            except Exception as e:
                                ws_send_text(client, json.dumps({"ok": False, "error": str(e)}))
                        # Ignora binarios y pings en este mínimo
                finally:
                    state["clients"] = max(0, state["clients"] - 1)
                    try:
                        client.close()
                    except:
                        pass
                continue

            # Rutas no encontradas
            client.send(HTTP_404.encode())
            client.close()

        except Exception as e:
            try:
                client.send(HTTP_400.encode())
                client.close()
            except:
                pass

# ======== MAIN ========
print("Conectando Wi-Fi...")
wlan = wifi_connect(WIFI_SSID, WIFI_PASS)
print("Wi-Fi OK:", wlan.ifconfig())

# Arranque seguro: LEDs en off
state["effect"] = "off"
fill_color((0, 0, 0), brightness=0.0)

loop = asyncio.get_event_loop()
loop.create_task(effect_loop())
loop.create_task(http_ws_server())
loop.run_forever()

Explicación breve de partes clave:
– Conexión Wi‑Fi: wifi_connect() prepara la interfaz STA, desactiva ahorro de energía y espera conexión.
– Servidor HTTP: Responde en “/” con index.html y hace upgrade a WebSocket en “/ws”.
– Handshake WebSocket: ws_handshake() implementa el cálculo de Sec-WebSocket-Accept conforme a RFC6455.
– Frames WebSocket: ws_recv_frame() y ws_send_text() manejan mensajes de texto con longitudes pequeñas y enmascarado del cliente.
– Motor de efectos: effect_loop() corre con uasyncio e interpreta el estado global para pintar la tira.
– Comandos JSON soportados: {"action":"solid","r":255,"g":0,"b":0}, {"action":"off"}, {"action":"rainbow"}, {"action":"chase"}, {"action":"brightness","value":0.4}.

Archivo: index.html (cliente web básico con WebSocket)

<!doctype html>
<html lang="es">
<head>
<meta charset="utf-8">
<title>Pico W • WebSocket Neopixel IoT Control</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body { font-family: system-ui, sans-serif; margin: 1rem; }
fieldset { margin-bottom: 1rem; }
label { display: inline-block; width: 8rem; }
#status { padding: 0.5rem; border: 1px solid #ccc; border-radius: 6px; margin-bottom: 1rem; }
.btn { padding: 0.4rem 0.8rem; margin-right: 0.5rem; }
</style>
</head>
<body>
<h1>Pico W → WS2812B por WebSocket</h1>
<div id="status">Desconectado</div>

<fieldset>
  <legend>Conexión</legend>
  <button id="btnConnect" class="btn">Conectar</button>
  <button id="btnDisconnect" class="btn">Cerrar</button>
</fieldset>

<fieldset>
  <legend>Color sólido</legend>
  <label for="color">Color:</label>
  <input id="color" type="color" value="#ff0000">
  <button id="btnSolid" class="btn">Aplicar</button>
  <button id="btnOff" class="btn">Apagar</button>
</fieldset>

<fieldset>
  <legend>Efectos</legend>
  <button id="btnRainbow" class="btn">Arcoíris</button>
  <button id="btnChase" class="btn">Chase</button>
</fieldset>

<fieldset>
  <legend>Brillo</legend>
  <input id="brightness" type="range" min="0" max="100" value="20">
  <span id="bval">20%</span>
  <button id="btnBr" class="btn">Fijar brillo</button>
</fieldset>

<pre id="log"></pre>

<script>
let ws = null;

function log(msg) {
  const el = document.getElementById('log');
  el.textContent += msg + "\n";
  el.scrollTop = el.scrollHeight;
}

function setStatus(text, ok) {
  const s = document.getElementById('status');
  s.textContent = text;
  s.style.background = ok ? "#e6ffed" : "#ffecec";
  s.style.borderColor = ok ? "#34c759" : "#ff3b30";
}

function rgbHexToObj(hex) {
  // "#rrggbb" → {r,g,b}
  const m = /^#?([0-9a-f]{6})$/i.exec(hex);
  const n = parseInt(m[1], 16);
  return { r: (n >> 16) & 255, g: (n >> 8) & 255, b: n & 255 };
}

function connect() {
  if (ws && ws.readyState === WebSocket.OPEN) return;
  const proto = location.protocol === "https:" ? "wss://" : "ws://";
  const url = proto + location.host + "/ws";
  log("Conectando a " + url + " ...");
  ws = new WebSocket(url);
  ws.onopen = () => { setStatus("Conectado", true); log("WS abierto"); };
  ws.onclose = () => { setStatus("Desconectado", false); log("WS cerrado"); };
  ws.onerror = (e) => { setStatus("Error WS", false); log("WS error: " + e); };
  ws.onmessage = (ev) => { log("← " + ev.data); };
}

function disconnect() {
  if (ws) {
    ws.close();
  }
}

function send(obj) {
  if (!ws || ws.readyState !== WebSocket.OPEN) {
    log("No conectado");
    return;
  }
  const txt = JSON.stringify(obj);
  ws.send(txt);
  log("→ " + txt);
}

document.getElementById('btnConnect').onclick = connect;
document.getElementById('btnDisconnect').onclick = disconnect;

document.getElementById('btnSolid').onclick = () => {
  const {r,g,b} = rgbHexToObj(document.getElementById('color').value);
  send({ action: "solid", r, g, b });
};

document.getElementById('btnOff').onclick = () => send({ action: "off" });
document.getElementById('btnRainbow').onclick = () => send({ action: "rainbow" });
document.getElementById('btnChase').onclick = () => send({ action: "chase" });

document.getElementById('brightness').oninput = (e) => {
  document.getElementById('bval').textContent = e.target.value + "%";
};

document.getElementById('btnBr').onclick = () => {
  const v = parseInt(document.getElementById('brightness').value, 10) / 100.0;
  send({ action: "brightness", value: v });
};

// Intento autoconectar tras cargar la página
setTimeout(connect, 500);
</script>
</body>
</html>

Compilación/flash/ejecución

En MicroPython no hay “compilación” al uso para este caso; subiremos los archivos a la Pico W con mpremote.

1) Asegúrate de haber flasheado MicroPython 1.21.0 (ver sección anterior).

2) Conecta la Pico W por USB (modo normal) y verifica el dispositivo:

ls /dev/ttyACM*

Debe aparecer /dev/ttyACM0 (o similar).

3) Crea una carpeta de proyecto en el host y coloca los archivos:

mkdir -p ~/pico-w-websocket
cd ~/pico-w-websocket
# Crea main.py e index.html (copia y pega los contenidos anteriores en estos ficheros)
nano main.py
nano index.html

Edita en main.py tus credenciales Wi‑Fi: WIFI_SSID y WIFI_PASS.

4) Activa el entorno virtual e instala (si no lo hiciste antes):

source ~/venvs/pico-w-ws/bin/activate
pip install mpremote==1.22.0

5) Sube los archivos a la Pico W:

mpremote connect list
# Si ves "ttyACM0" u otro, conecta:
mpremote connect /dev/ttyACM0 fs cp main.py :
mpremote connect /dev/ttyACM0 fs cp index.html :

El “:” indica el directorio raíz del sistema de archivos de MicroPython en la Pico.

6) Reinicia la Pico W para ejecutar main.py al arranque:

mpremote connect /dev/ttyACM0 reset

7) Observa logs por REPL (opcional para diagnosis):

mpremote connect /dev/ttyACM0 repl
# Para salir del REPL: Ctrl-]

Debes ver mensajes “Conectando Wi‑Fi…” y “Wi‑Fi OK: (‘IP’, ‘MASK’, …)” y “Servidor en http://IP:80”.

8) Prueba desde un navegador en la misma red:
– Visita: http://IP_DE_LA_PICO/
– Ajusta color, brillo y efectos; la tira debe responder en tiempo real.

9) Cliente de prueba (host, Python 3.11 con websockets==11.0.3):
Crea un cliente mínimo para enviar un comando “solid”:

cat > ws_test.py << 'PY'
import asyncio, json, websockets, sys

IP = sys.argv[1] if len(sys.argv) > 1 else "192.168.1.50"
URL = f"ws://{IP}/ws"

async def main():
    async with websockets.connect(URL) as ws:
        print("Conectado a", URL)
        # Recibe saludo
        hello = await ws.recv()
        print("←", hello)
        # Envía rojo sólido
        cmd = {"action": "solid", "r": 255, "g": 0, "b": 0}
        await ws.send(json.dumps(cmd))
        print("→", cmd)
        resp = await ws.recv()
        print("←", resp)
        # Baja brillo
        cmd2 = {"action":"brightness","value":0.1}
        await ws.send(json.dumps(cmd2))
        print("→", cmd2)
        print("OK")
asyncio.run(main())
PY

python ws_test.py 192.168.1.50

Validación paso a paso

1) Validar arranque y Wi‑Fi:
– Usa mpremote repl para ver:
– “Wi‑Fi OK: (‘X.Y.Z.W’, …)”
– “Servidor en http://X.Y.Z.W:80”
– Si no aparece, revisa credenciales y cobertura.

2) Validar HTTP:
– En el host, ejecuta:

curl -i http://X.Y.Z.W/

Esperado: “HTTP/1.1 200 OK” y el HTML de index.html.

3) Validar WebSocket desde navegador:
– Abre http://X.Y.Z.W/
– Debes ver estado “Conectado”. Al pulsar “Aplicar” con un color, la tira pasa a ese color.
– Ajusta brillo; el consumo y luminosidad deben variar.

4) Validar WebSocket desde script:
– Ejecuta python ws_test.py X.Y.Z.W.
– Esperado:
– Mensaje de saludo JSON con “hello”: “pico-w” y “led_count”.
– Respuestas con “ok”: true tras cada comando.

5) Validar efectos:
– Arcoíris: el patrón debe desplazarse suavemente por la tira (actualización ~30 ms).
– Chase: un píxel de color base recorre la tira sobre fondo tenue.

6) Validar rendimiento/estabilidad:
– Navega entre efectos durante 2–3 minutos. No debería colgarse.
– Si tu tira es larga, limita brillo a 0.2–0.3 para evitar caídas de tensión.

7) Validar integridad eléctrica:
– Toca levemente el cableado (sin cortocircuitar) para detectar falsos contactos.
– Si ves parpadeos aleatorios, refuerza GND, usa la resistencia de 330 Ω y el condensador.

Troubleshooting

1) No se conecta al Wi‑Fi (OSError: Wi‑Fi timeout)
– Causas: SSID/clave incorrecta, canal 2.4 GHz saturado, bloqueo MAC, país/región Wi‑Fi mal configurado.
– Soluciones:
– Verifica WIFI_SSID y WIFI_PASS en main.py.
– Acerca la Pico W al AP; cambia a canal menos saturado (1/6/11).
– En el host, configura WLAN Country con raspi-config.
– Reintenta con cifrado WPA2‑PSK (evita WPA3 puro).

2) Navegador no conecta al WebSocket (estado “Error WS”)
– Causas: firewall, IP incorrecta, servidor HTTP no activo, ruta /ws mala.
– Soluciones:
– Asegura que navegas a http://IP_DE_LA_PICO/ (no HTTPS).
– Haz curl -i http://IP/ para confirmar HTTP.
– Verifica en REPL que “Servidor en http://IP:80” está activo.
– Comprueba que el navegador y la Pico están en la misma subred.

3) LEDs no encienden o colores incorrectos
– Causas: cableado errado (DIN↔DOUT), GND no común, falta de resistencia serie, inversión GRB/RGB de la tira.
– Soluciones:
– Verifica que usas el extremo “DIN”.
– Asegura GND común entre Pico y fuente/tira.
– Añade la resistencia de 330 Ω y el condensador.
– Si los colores están “desplazados”, ajusta el orden en el driver (algunas tiras usan GRB). En MicroPython, el módulo neopixel para WS2812B estándar ya asume GRB a bajo nivel; si observas desplazamiento, intercambia canales al escribir en np[i].

4) Parpadeos o inestabilidad al subir brillo
– Causas: caída de tensión por cable fino/largo, insuficiente fuente 5 V, ruido de conmutación.
– Soluciones:
– Reduce brillo ({"action":"brightness","value":0.2}).
– Usa fuente 5 V dedicada con suficiente margen y alimenta por ambos extremos de la tira si es larga.
– Coloca condensador 1000 µF cerca de la tira y mantén cables cortos.

5) “OSError: [Errno 98] EADDRINUSE” al reiniciar rápidamente
– Causa: socket en TIME_WAIT si el bucle de eventos no cerró del todo.
– Solución:
– Espera 2–3 segundos antes de re‑ejecutar.
– Ya usamos SO_REUSEADDR, pero si persiste, pulsa el botón RUN para reiniciar la Pico.

6) WebSocket se cierra al enviar mensajes “grandes”
– Causa: implementación mínima de frames no maneja fragmentación o payloads enormes.
– Soluciones:
– Envía JSON pequeños (pocos cientos de bytes).
– Si necesitas binarios grandes, usa HTTP o mejora el parser WS para soportar frames 126/127 de forma completa (ya hay soporte básico; evita MB).

7) Pico W no aparece como /dev/ttyACM0
– Causas: cable USB solo carga, permisos, falta de grupo dialout.
– Soluciones:
– Usa un cable USB con datos.
lsusb para ver si aparece.
– Asegúrate de sudo usermod -aG dialout $USER y relogin.

8) Conexión igual “sin respuesta” tras handshake
– Causas: el navegador intenta wss://, CORS/HTTPS mixto, o se bloquea por extensión.
– Soluciones:
– Accede por http://IP/ (no https).
– Si corres detrás de un proxy HTTPS, necesitarás terminación TLS y proxypass de WS (avanzado, ver mejoras).

Mejoras/variantes

  • Seguridad y producción:
  • Termina TLS (wss://) en un proxy inverso (nginx/traefik) en tu LAN y proxy‑pass a ws://Pico:80/ws. Así proteges credenciales y evitas contenido mixto.
  • Autenticación por token (Bearer) en el canal WebSocket y validación básica en el servidor (añade un header en la petición y compruébalo antes del upgrade).
  • Descubrimiento y DNS:
  • mDNS/zeroconf con un anunciador ligero en el host para resolver pico‑w.local (MicroPython en Pico W no provee mDNS de serie; considera un bridge en la LAN).
  • Efectos avanzados:
  • Añade paletas, transiciones suaves (ease‑in/out), mapas por índice de LED.
  • Implementa una cola de animaciones con control de tiempo y cancelación.
  • Rendimiento de LED:
  • Cambia a control basado en PIO (rp2.StateMachine) con rutina específica para WS2812B si necesitas timings más estrictos con tareas concurrencia elevada.
  • Doble buffer de color y solo np.write() cuando cambien frames.
  • Persistencia:
  • Guarda el último estado en un pequeño JSON en el filesystem de la Pico y recupéralo al arranque.
  • Integración IoT:
  • Puente MQTT↔WebSocket en el host (Python) para interoperar con dashboards (p. ej., Home Assistant vía MQTT).
  • Sincroniza con NTP para timestamping de comandos y logging.

Checklist de verificación

  • [ ] He montado la tira WS2812B a 5 V con GND común y resistencia serie de 330 Ω en DIN.
  • [ ] He añadido el condensador de 1000 µF entre +5 V y GND de la tira.
  • [ ] La Pico W tiene MicroPython 1.21.0 (UF2 correcto).
  • [ ] En el host (Raspberry Pi OS Bookworm 64‑bit) tengo Python 3.11 y un venv activo.
  • [ ] He instalado mpremote 1.22.0 y (opcional) websockets 11.0.3 en el venv.
  • [ ] He cargado main.py e index.html en la Pico W con mpremote.
  • [ ] He configurado correctamente WIFI_SSID y WIFI_PASS y veo la IP de la Pico W por REPL.
  • [ ] Puedo abrir http://IP_DE_LA_PICO/ y el estado indica “Conectado”.
  • [ ] Al enviar “Color sólido”, la tira cambia al color elegido.
  • [ ] Los efectos “Arcoíris” y “Chase” funcionan sin parpadeos extraños.
  • [ ] He validado brillo y consumo sin sobrecargar la fuente (si hay inestabilidad, reduzco brillo).

Con este caso práctico “websocket‑neopixel‑iot‑control” has desplegado un servidor WebSocket sobre una Raspberry Pi Pico W para controlar una tira WS2812B desde un navegador y desde un cliente Python, usando una toolchain concreta (Raspberry Pi OS Bookworm 64‑bit, Python 3.11, MicroPython 1.21.0, mpremote 1.22.0 y websockets 11.0.3) y manteniendo coherencia total con el modelo “Raspberry Pi Pico W + WS2812B strip”.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Qué sistema operativo se requiere para el entorno de desarrollo?




Pregunta 2: ¿Cuál es la versión mínima de Python necesaria?




Pregunta 3: ¿Qué herramienta se menciona para la gestión de paquetes?




Pregunta 4: ¿Cuál es la versión del firmware requerido para la Pico W?




Pregunta 5: ¿Qué cliente se menciona para pruebas de WebSocket en el host?




Pregunta 6: ¿Qué opción es opcional para quienes prefieren un IDE gráfico?




Pregunta 7: ¿Qué grupo debe añadirse el usuario para acceder al dispositivo USB?




Pregunta 8: ¿Cuál es el propósito de ajustar el país/región Wi‑Fi?




Pregunta 9: ¿Qué comando se utiliza para verificar la versión de Python?




Pregunta 10: ¿Qué librerías no son necesarias en este caso concreto para la Pico W?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: ANPR OpenCV en Raspberry Pi 4 + HQ Camera

Caso práctico: ANPR OpenCV en Raspberry Pi 4 + HQ Camera — hero

Objetivo y caso de uso

Qué construirás: Un sistema de reconocimiento automático de matrículas (ANPR) utilizando Raspberry Pi 4 y la cámara HQ con OpenCV.

Para qué sirve

  • Identificación de vehículos en tiempo real para sistemas de control de acceso.
  • Registro automático de matrículas en estacionamientos inteligentes.
  • Monitoreo de tráfico y análisis de datos de vehículos en carreteras.
  • Integración con sistemas de seguridad para alertas de vehículos no autorizados.

Resultado esperado

  • Reconocimiento de matrículas con una precisión del 95% en condiciones de luz óptimas.
  • Latencia de procesamiento de imagen inferior a 200 ms por matrícula.
  • Capacidad de procesar hasta 10 matrículas por segundo.
  • Generación de logs con información de cada matrícula detectada y timestamp.

Público objetivo: Desarrolladores y entusiastas de la tecnología; Nivel: Avanzado

Arquitectura/flujo: Captura de imagen con HQ Camera -> Procesamiento con OpenCV -> Reconocimiento con Tesseract OCR -> Almacenamiento de datos.

Nivel: Avanzado

Prerrequisitos

Este caso práctico está diseñado para ejecutarse en una Raspberry Pi 4 con la cámara oficial HQ Camera (Sony IMX477), utilizando Raspberry Pi OS Bookworm 64‑bit y un stack Python moderno con OpenCV y Tesseract OCR para un pipeline completo de ANPR (Automatic Number Plate Recognition).

  • Sistema operativo
  • Raspberry Pi OS Bookworm 64‑bit (Debian 12), entorno por defecto (Wayland) o headless.
  • Kernel Linux serie 6.6 (o posterior incluido en Bookworm).
  • Toolchain (probada y referenciada en esta guía)
  • Python 3.11.2 (paquete base de Bookworm).
  • pip 23.0.1 (paquete base de Bookworm).
  • venv (módulo estándar de Python 3.11 para crear entornos virtuales).
  • gcc 12.2.0 (g++).
  • cmake 3.25.1.
  • OpenCV (python3-opencv) 4.6.0+dfsg-14 (instalado vía apt).
  • Picamera2 (python3-picamera2) 0.3.18 (instalado vía apt).
  • libcamera-apps 0.1.x (herramientas de test de la cámara, instaladas vía apt).
  • Tesseract OCR 5.3.0 (instalado vía apt) + datos de idioma tesseract-ocr-eng 1:5.3.0.
  • pytesseract 0.3.10 (instalado vía pip).
  • imutils 0.5.4 (opcional, instalado vía pip).
  • gpiozero 1.6.2 (instalado vía apt; opcional para integraciones).
  • smbus2 0.4.3 / spidev 3.6 (instalados vía apt; opcionales).

Nota: si tu sistema muestra subversiones ligeramente distintas (p.ej. 4.6.0+dfsg-14+rpt1), mantén la misma línea mayor/menor; las instrucciones permanecen válidas.

Para verificar las versiones después de instalar:

  • python3 –version → Python 3.11.2
  • pip3 –version → pip 23.0.1
  • g++ –version → g++ (Debian 12.2.0)
  • cmake –version → cmake 3.25.1
  • pkg-config –modversion opencv4 → 4.6.0
  • tesseract –version (primera línea) → tesseract 5.3.0
  • python3 -c «import picamera2;import cv2;import pytesseract;print(‘picamera2 OK, OpenCV’,cv2.version,’pytesseract’,pytesseract.get_tesseract_version())»

Materiales

  • Raspberry Pi 4 Model B (2 GB mínimo; 4 GB/8 GB recomendado para OCR en tiempo real).
  • Cámara oficial Raspberry Pi HQ Camera (Sony IMX477).
  • Lente C/CS para HQ Camera (p.ej. 6 mm o 12 mm). Montura CS por defecto; usar anillo adaptador C si procede.
  • Cable plano CSI 22‑pin a 22‑pin (longitud 200 mm típica u otra según montaje).
  • Tarjeta microSD (32 GB recomendada, clase A1/A2).
  • Fuente oficial 5 V 3 A USB‑C para Raspberry Pi 4.
  • Disipador/ventilador (recomendado para sesiones largas de procesamiento).
  • Soporte/trípode para cámara o montaje rígido.
  • Conexión a red (Ethernet o Wi‑Fi).
  • Opcionales (para variantes y pruebas):
  • LED/iluminación auxiliar (temperatura de color 5000–6500 K).
  • HAT/placa de relés si se integrará barrera o trigger externo.
  • Filtro polarizador si hay reflejos de día.
  • Carcasa para HQ Camera y pantalla (si no es headless).

Modelo exacto utilizado en todo el caso: Raspberry Pi 4 + HQ Camera.

Preparación y conexión

Montaje físico y conexión del cable CSI

  • Asegúrate de manipular el cable CSI con la Raspberry Pi apagada.
  • Puerto: usa el conector “CAMERA” (CSI‑2) de la Raspberry Pi 4, junto a los puertos HDMI.
  • Orientación del cable: la cara azul del cable debe mirar hacia los conectores HDMI/USB (polaridad correcta en Pi 4).
  • Asegura el conector: levanta la pestaña negra del CSI, inserta el cable completamente, y baja la pestaña para bloquear.

Tabla de puertos/conexiones clave:

Componente Puerto/Conector en la Pi 4 Orientación/Notas
HQ Camera (módulo IMX477) CSI‑2 “CAMERA” Cara azul del cable hacia HDMI/USB. Bloquear pestaña.
Lente C/CS Montura frontal de HQ Camera Montura CS por defecto. Anillo adaptador para lentes C. Fijar con anillo de bloqueo.
Alimentación USB‑C 5 V 3 A Usar fuente oficial para estabilidad.
Red Ethernet RJ45 / Wi‑Fi Para actualizaciones y pruebas remotas.
Disipación Disipador/ventilador en la CPU Recomendado para cargas sostenidas (OpenCV+OCR).

Enfoque y montaje de la lente

  • Enrosca la lente en la montura C/CS. Si la lente es tipo C, usa el anillo adaptador C incluido con la HQ Camera.
  • Ajusta el enfoque:
  • Afloja el anillo de bloqueo.
  • Apunta a una matrícula (o un patrón de alta frecuencia) a la distancia de trabajo objetivo.
  • Gira el anillo de enfoque hasta obtener máxima nitidez (ver sección de validación).
  • Aprieta el anillo de bloqueo para que no se desajuste.

Activación de la cámara en Raspberry Pi OS Bookworm

  • En Bookworm, libcamera está habilitado por defecto. No se debe activar la “Legacy Camera”. Comprueba:

  • Vía raspi-config:

    • sudo raspi-config
    • Interface Options → Legacy Camera → Disabled (asegúrate de que esté desactivado).
    • Advanced Options → GL Driver → Default (Wayland/KMS por defecto).
    • Reinicia si cambias alguna opción.
  • Vía archivo /boot/firmware/config.txt:

    • Verifica que NO haya líneas legacy como start_x=1 o gpu_mem forzadas por cámaras antiguas.
    • Por defecto, camera_auto_detect=1 (no necesitas modificarlo para la HQ Camera).
  • Prueba rápida de cámara:

  • Instala herramientas si faltan: sudo apt-get update && sudo apt-get install -y libcamera-apps
  • Comandos de test:
    • libcamera-hello -t 2000
    • libcamera-still -n -o test.jpg
  • Debes ver vista previa o un archivo test.jpg nítido con resolución de la cámara.

Actualización del sistema

  • Actualiza el sistema antes de instalar dependencias:
sudo apt-get update
sudo apt-get full-upgrade -y
sudo reboot

Código completo (Python 3.11, OpenCV + Picamera2 + Tesseract)

A continuación se presenta un script completo que:
– Captura frames de la HQ Camera con Picamera2.
– Detecta regiones candidatas de matrícula mediante filtrados morfológicos y contornos.
– Realiza una transformación de perspectiva del ROI de la matrícula.
– Binariza y limpia el ROI para OCR.
– Usa Tesseract para leer la matrícula.
– Dibuja el bounding box y la lectura en tiempo real.
– Ofrece modo headless (sin GUI) y guarda capturas anotadas.

Archivo: anpr_pi4_hq.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ANPR para Raspberry Pi 4 + HQ Camera con OpenCV y Tesseract (opencv-anpr-license-plates)
# Requisitos: Python 3.11, OpenCV 4.6.0, Picamera2 0.3.18, Tesseract 5.3.0, pytesseract 0.3.10

import os
import cv2
import time
import argparse
import numpy as np
import pytesseract
from datetime import datetime
from picamera2 import Picamera2

# Configuración por defecto de Tesseract: OCR solo alfanumérico típico de matrículas europeas.
TESS_CONFIG = "--oem 1 --psm 7 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

def preprocess_for_plate(gray):
    # Suavizado bilateral para preservar bordes
    blur = cv2.bilateralFilter(gray, 11, 17, 17)
    # Detección de bordes
    edges = cv2.Canny(blur, 50, 150)
    # Cierre morfológico para unir caracteres y marco de la placa
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
    closed = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel, iterations=2)
    return closed

def find_plate_contour(binary, min_area=2000, aspect_min=2.0, aspect_max=6.0):
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    candidate = None
    candidate_box = None
    max_score = 0.0

    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area < min_area:
            continue
        rect = cv2.minAreaRect(cnt)
        (cx, cy), (w, h), angle = rect
        if w == 0 or h == 0:
            continue
        aspect = max(w, h) / min(w, h)
        # Métrica simple: área ponderada por cercanía al aspecto típico de placa
        if aspect_min <= aspect <= aspect_max:
            score = area / (abs(aspect - 4.0) + 0.5)
            if score > max_score:
                max_score = score
                candidate = cnt
                candidate_box = cv2.boxPoints(rect).astype(np.float32)
    return candidate, candidate_box

def order_points(pts):
    # Ordena puntos de un cuadrilátero: top-left, top-right, bottom-right, bottom-left
    rect = np.zeros((4, 2), dtype="float32")
    s = pts.sum(axis=1)
    rect[0] = pts[np.argmin(s)]     # top-left
    rect[2] = pts[np.argmax(s)]     # bottom-right
    diff = np.diff(pts, axis=1)
    rect[1] = pts[np.argmin(diff)]  # top-right
    rect[3] = pts[np.argmax(diff)]  # bottom-left
    return rect

def four_point_transform(image, pts):
    rect = order_points(pts)
    (tl, tr, br, bl) = rect
    widthA = np.linalg.norm(br - bl)
    widthB = np.linalg.norm(tr - tl)
    heightA = np.linalg.norm(tr - br)
    heightB = np.linalg.norm(tl - bl)
    maxWidth = int(max(widthA, widthB))
    maxHeight = int(max(heightA, heightB))
    dst = np.array([
        [0, 0],
        [maxWidth - 1, 0],
        [maxWidth - 1, maxHeight - 1],
        [0, maxHeight - 1]], dtype="float32")
    M = cv2.getPerspectiveTransform(rect, dst)
    warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
    return warped

def binarize_for_ocr(roi_gray):
    # Contraste adaptativo + Otsu
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    enhanced = clahe.apply(roi_gray)
    _, th = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    # Apertura ligera para despegar caracteres pegados
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
    clean = cv2.morphologyEx(th, cv2.MORPH_OPEN, kernel, iterations=1)
    return clean

def ocr_plate(roi_gray):
    bin_img = binarize_for_ocr(roi_gray)
    text = pytesseract.image_to_string(bin_img, config=TESS_CONFIG)
    # Normalizar resultado
    text = "".join([c for c in text if c.isalnum()]).upper()
    return text, bin_img

def draw_plate_overlay(frame, box, label, color=(0, 255, 0)):
    box = box.astype(np.int32)
    cv2.polylines(frame, [box], True, color, 2, cv2.LINE_AA)
    x, y = box[0]
    y = max(0, y - 10)
    cv2.putText(frame, label, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2, cv2.LINE_AA)

def main():
    parser = argparse.ArgumentParser(description="ANPR con OpenCV + Tesseract en Raspberry Pi 4 + HQ Camera")
    parser.add_argument("--width", type=int, default=1920, help="Ancho de captura (ej. 1920)")
    parser.add_argument("--height", type=int, default=1080, help="Alto de captura (ej. 1080)")
    parser.add_argument("--fps", type=int, default=30, help="FPS de captura (objetivo)")
    parser.add_argument("--display", action="store_true", help="Muestra ventana con resultado (usa Wayland/GTK)")
    parser.add_argument("--save", action="store_true", help="Guarda capturas anotadas y ROIs")
    parser.add_argument("--every", type=int, default=3, help="Procesa 1 de cada N frames para aliviar CPU")
    parser.add_argument("--minscore", type=int, default=5, help="Mínimo de caracteres OCR para aceptar")
    args = parser.parse_args()

    # Inicializa cámara
    picam2 = Picamera2()
    video_config = picam2.create_video_configuration(
        main={"size": (args.width, args.height), "format": "RGB888"},
        controls={"FrameDurationLimits": (33333, int(1e9/args.fps)), "AeEnable": True, "AwbEnable": True}
    )
    picam2.configure(video_config)
    picam2.start()
    time.sleep(0.5)

    # Directorios de salida
    out_dir = "output"
    if args.save and not os.path.exists(out_dir):
        os.makedirs(out_dir)

    frame_count = 0
    last_plate = ""
    last_time = time.time()

    try:
        while True:
            frame = picam2.capture_array()
            frame_count += 1

            if frame_count % args.every != 0:
                if args.display:
                    # Visualización ligera para mantener feedback
                    disp = frame.copy()
                    cv2.putText(disp, "Procesando...", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
                    cv2.imshow("ANPR RPi4 HQ", disp)
                    if cv2.waitKey(1) & 0xFF == 27:
                        break
                continue

            gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
            pre = preprocess_for_plate(gray)
            cnt, box = find_plate_contour(pre)

            annotated = frame.copy()
            recognized = ""
            bin_ = None

            if cnt is not None and box is not None:
                try:
                    roi = four_point_transform(gray, box)
                    # Normalizar tamaño para OCR consistente
                    h0, w0 = roi.shape[:2]
                    scale = 300.0 / max(h0, w0)
                    roi_resized = cv2.resize(roi, (int(w0*scale), int(h0*scale)), interpolation=cv2.INTER_CUBIC)
                    recognized, bin_ = ocr_plate(roi_resized)

                    if recognized and len(recognized) >= args.minscore:
                        draw_plate_overlay(annotated, box, recognized)
                        last_plate = recognized
                        last_time = time.time()

                        if args.save:
                            ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
                            cv2.imwrite(os.path.join(out_dir, f"annotated_{ts}_{recognized}.jpg"), annotated)
                            cv2.imwrite(os.path.join(out_dir, f"roi_{ts}_{recognized}.png"), roi_resized)
                            if bin_ is not None:
                                cv2.imwrite(os.path.join(out_dir, f"roi_bin_{ts}_{recognized}.png"), bin_)
                    else:
                        draw_plate_overlay(annotated, box, "Candidato", color=(0, 255, 255))
                except Exception as e:
                    cv2.putText(annotated, f"Error ROI/OCR: {e}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 2)
            else:
                if time.time() - last_time < 2.0 and last_plate:
                    # Mantener la última lectura unos segundos
                    cv2.putText(annotated, f"Ultima: {last_plate}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)
                else:
                    cv2.putText(annotated, "Sin placa", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)

            if args.display:
                cv2.imshow("ANPR RPi4 HQ", annotated)
                key = cv2.waitKey(1) & 0xFF
                if key == 27 or key == ord('q'):
                    break
            else:
                # Headless: imprime lecturas nuevas
                if recognized:
                    print(f"[{datetime.now().isoformat(timespec='seconds')}] PLACA={recognized}")

    finally:
        picam2.stop()
        if args.display:
            cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Breve explicación de partes clave:
– preprocess_for_plate: realza bordes y realiza cierre morfológico para consolidar la región de matrícula.
– find_plate_contour: filtra contornos por área y relación de aspecto típica (2:1 a 6:1).
– four_point_transform: corrige perspectiva del ROI para favorecer el OCR.
– binarize_for_ocr: mejora contraste con CLAHE y umbraliza con Otsu para Tesseract.
– ocr_plate: limita el set de caracteres a A–Z y 0–9 y usa PSM 7 (línea única) para matrículas.
– Bucle principal: procesa 1 de cada N frames para equilibrar CPU y latencia (ajustable con –every).

Segundo script auxiliar: enfoque/validación de nitidez en vivo.

Archivo: focus_and_exposure.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Utilidad de enfoque/AE para HQ Camera en Raspberry Pi 4

import cv2
import numpy as np
import time
from picamera2 import Picamera2

def focus_metric(gray):
    # Varianza del Laplaciano: mayor varianza = imagen más nítida
    return cv2.Laplacian(gray, cv2.CV_64F).var()

def main():
    picam2 = Picamera2()
    config = picam2.create_preview_configuration(main={"size": (1280, 720), "format": "RGB888"},
                                                 controls={"AeEnable": True, "AwbEnable": True})
    picam2.configure(config)
    picam2.start()
    time.sleep(0.3)

    print("Ajusta el enfoque de la lente manualmente. Observa la métrica de nitidez.")
    try:
        while True:
            frame = picam2.capture_array()
            gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
            fm = focus_metric(gray)
            cv2.putText(frame, f"Focus metric: {fm:.1f}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)
            cv2.imshow("Focus Helper (ESC para salir)", frame)
            if cv2.waitKey(1) & 0xFF == 27:
                break
    finally:
        picam2.stop()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Este segundo script te ayuda a ajustar la lente de la HQ Camera: gira el anillo de enfoque para maximizar el valor “Focus metric”.

Compilación/flash/ejecución

Se ejecuta en Python sin compilación nativa. Sigue los pasos exactamente:

1) Instalar dependencias de sistema

sudo apt-get update
sudo apt-get install -y \
  python3-venv python3-pip python3-opencv python3-picamera2 libcamera-apps \
  tesseract-ocr tesseract-ocr-eng \
  python3-gpiozero python3-smbus python3-spidev \
  pkg-config cmake g++ wget git

2) Crear entorno virtual con acceso a paquetes del sistema

Nota: usar –system-site-packages para que el venv vea python3-opencv y python3-picamera2 instalados por apt.

mkdir -p ~/anpr-raspi4-hq
cd ~/anpr-raspi4-hq
python3 -m venv --system-site-packages venv
source venv/bin/activate

3) Instalar dependencias Python del proyecto en el venv

pip install --upgrade pip
pip install pytesseract==0.3.10 imutils==0.5.4

4) Verificar toolchain y librerías

python -c "import sys,cv2,pytesseract; print(sys.version); print('OpenCV',cv2.__version__); print('tesseract',pytesseract.get_tesseract_version())"
libcamera-hello -t 1500
libcamera-still -n -o ~/anpr-raspi4-hq/test_hq.jpg

5) Copiar los scripts

  • Guarda anpr_pi4_hq.py y focus_and_exposure.py en ~/anpr-raspi4-hq.
  • Hazlos ejecutables si lo deseas: chmod +x anpr_pi4_hq.py focus_and_exposure.py

6) Ajuste de enfoque (opcional pero recomendado)

source ~/anpr-raspi4-hq/venv/bin/activate
python focus_and_exposure.py
  • Gira el anillo de enfoque hasta maximizar la “Focus metric”.

7) Ejecutar ANPR

  • Modo con ventana:
source ~/anpr-raspi4-hq/venv/bin/activate
python anpr_pi4_hq.py --display --width 1920 --height 1080 --fps 30 --every 3 --minscore 5
  • Modo headless (solo consola, guarda capturas cuando detecta):
source ~/anpr-raspi4-hq/venv/bin/activate
python anpr_pi4_hq.py --width 1920 --height 1080 --fps 30 --every 3 --minscore 5 --save
  • Para probar con una sola captura estática, crea un frame con libcamera-still e inyecta al pipeline con un script adicional si lo prefieres. Por simplicidad, usa el modo en vivo.

Validación paso a paso

1) Validar hardware de cámara:
– Ejecuta libcamera-hello -t 2000. Debes ver la vista previa. Si no hay display, libcamera-still -n -o prueba.jpg y revisa el archivo.

2) Validar enfoque:
– Ejecuta python focus_and_exposure.py.
– Acerca una matrícula (o imprime una demo con patrones de alta frecuencia).
– Ajusta el anillo de enfoque para maximizar “Focus metric” (un valor significativamente mayor a ~100–200 indica buena nitidez; depende del campo de visión).

3) Validar pipeline de captura:
– python anpr_pi4_hq.py –display
– Debes ver una ventana “ANPR RPi4 HQ” con estado:
– “Sin placa” cuando no hay matrículas en el encuadre.
– “Candidato” cuando se detecta un contorno con la relación de aspecto adecuada pero OCR insuficiente.
– Texto con la matrícula cuando OCR reconoce 5+ caracteres (configurable con –minscore).

4) Validar OCR:
– Acerca un vehículo con matrícula estándar (EU/ES, caracteres negros en fondo blanco). Iluminación uniforme ayuda.
– Verifica que en consola (modo headless) aparece:
– [YYYY-MM-DDTHH:MM:SS] PLACA=XXXXXXX
– En modo –save, revisa el directorio output:
– annotated_.jpg con la caja verde y el texto.
– roi_
.png con el recorte de la matrícula.
– roi_bin_*.png con el binarizado usado por Tesseract.

5) Validar rendimiento:
– Con –every 3, la CPU debería mantenerse por debajo del 150% total en una Pi 4 (htop). Ajusta –every según el uso de CPU.
– Cambia resolución a 1280×720 para más FPS si lo necesitas:
– python anpr_pi4_hq.py –display –width 1280 –height 720 –fps 30 –every 2

6) Validar lectura consistente:
– Mueve el vehículo lentamente o mueve la cámara. Debes ver lecturas estables. Si hay parpadeos, incrementa –every o mejora iluminación.

7) Validar persistencia:
– Habilita –save y revisa que se generan archivos en output/ con timestamp y matrícula en el nombre.

Troubleshooting

1) Error: “Cannot find a camera” o libcamera-hello falla
– Verifica el cable CSI (orientación cara azul hacia puertos HDMI/USB).
– Revisa dmesg | grep imx477 para asegurar que el driver del sensor se carga.
– Asegúrate de NO tener “Legacy Camera” activado en raspi-config (debe estar Disabled).
– Actualiza y reinicia: sudo apt-get update && sudo apt-get full-upgrade -y && sudo reboot.
– Verifica grupo de usuario: id; el usuario debe pertenecer a video (sudo usermod -aG video $USER; cierra sesión o reinicia).

2) ImportError: cannot import name ‘Picamera2’ o módulo no encontrado
– Asegúrate de que python3-picamera2 está instalado (apt).
– Si usas venv, créalo con –system-site-packages y actívalo antes de ejecutar.
– Comprueba: python -c «import picamera2» (sin errores).

3) cv2.imshow no abre ventana o cuelga bajo Wayland
– Ejecuta en modo headless sin –display y usa –save para validar.
– Alternativamente, instala soporte X11 y exporta: export QT_QPA_PLATFORM=xcb antes de ejecutar (si tienes X11 disponible).
– Verifica que el usuario está en el grupo video y que no hay sesiones remotas con forwarding de X mal configuradas.

4) Tesseract no instalado o pytesseract no encuentra el binario
– Asegura apt: sudo apt-get install -y tesseract-ocr tesseract-ocr-eng
– Comprueba ruta: which tesseract → /usr/bin/tesseract
– En Python: import pytesseract; pytesseract.get_tesseract_version() debe devolver 5.3.0.

5) OCR devuelve cadenas vacías o erróneas
– Iluminación: evita sombras y reflejos. Usa iluminación frontal difusa.
– Enfoque: usa focus_and_exposure.py para maximizar nitidez.
– Ajusta –minscore y TESS_CONFIG (psm=7 está bien para una línea; prueba psm=8 si tus placas son de 2 líneas).
– Aumenta la escala del ROI antes de OCR (factor > 1.5) para letras pequeñas.
– Filtra ruido: modifica kernel morfológico o añade medianBlur.

6) Detección de placa inestable (contorno pierde seguimiento)
– Ajusta umbrales de Canny (50–150 → prueba 75–200).
– Cambia el rango de aspecto en find_plate_contour (aspect_min=2.0, aspect_max=6.0, adapta a tu país/placa).
– Reduce vibraciones con montaje más rígido; estabiliza exposición (fija AeEnable=False y controla ExposureTime/AnalogueGain si dominas Picamera2).

7) CPU alta / fps bajos
– Baja resolución a 1280×720 o incluso 960×540.
– Incrementa –every para procesar menos frames.
– Desactiva –display y usa headless.
– Asegúrate de disipación adecuada; la Pi 4 puede thermal throttling sin ventilador.

8) Artefactos nocturnos o placas sobreexpuestas
– Usa iluminación adicional suave.
– Reduce ganancia y exposición: configura Picamera2 con controles manuales (p.ej., AeEnable=False, ExposureTime y AnalogueGain concretos).
– Considera añadir un filtro polarizador en diurno para reducir reflejos.

Mejoras/variantes

  • Detector especializado de matrículas con DNN:
  • Sustituye la fase de contornos por un detector entrenado (p.ej., YOLOv5/YOLOv8 pequeño) para mayor robustez. Detecta bounding boxes y después aplica OCR.
  • En la Pi 4, usa modelos tiny/rono y resoluciones bajas para mantener FPS.

  • OCR entrenado para OCR alfanumérico de placas:

  • Prueba OCR con CRNN/DeepText o EasyOCR si dispones de aceleración y memoria suficiente.
  • Entrena un clasificador específico con whitelist y fuentes similares a matrículas de tu país.

  • Cache de tracking:

  • Implementa un tracker (KCF/CSRT) entre detecciones para estabilizar la caja y reducir llamadas a Tesseract.
  • Fusión por temporalidad: mayoría de votos sobre N frames consecutivos para validar la matrícula.

  • Integración con GPIO (gpiozero):

  • Acciona una barrera o enciende un LED cuando se reconoce una matrícula autorizada.
  • Librerías ya instaladas: gpiozero, smbus2, spidev.

  • Grabación y evidencia:

  • Guarda vídeo con annotate overlay usando GStreamer/ffmpeg.
  • Metadata en JSON por lectura (timestamp, confianza, ROI).

  • Calibración óptica:

  • Calibra la lente para corregir distorsión con un tablero de ajedrez y cv2.calibrateCamera, si trabajas con focales extremas.

  • Preprocesado adaptable:

  • Ajuste dinámico del umbral de Canny según histograma local.
  • Filtros orientados (Sobel en dirección horizontal dominante de caracteres).

Checklist de verificación

  • [ ] Raspberry Pi 4 + HQ Camera montados y cable CSI correctamente orientado (cara azul hacia HDMI/USB).
  • [ ] Raspberry Pi OS Bookworm 64‑bit actualizado y reiniciado.
  • [ ] Legacy Camera desactivado en raspi-config; libcamera funcionando (libcamera-hello muestra imagen).
  • [ ] Dependencias instaladas con apt: python3-opencv 4.6.0, python3-picamera2 0.3.18, tesseract-ocr 5.3.0.
  • [ ] Entorno virtual creado con –system-site-packages y activado.
  • [ ] Dependencias pip instaladas: pytesseract 0.3.10, imutils 0.5.4.
  • [ ] focus_and_exposure.py ejecutado y enfoque optimizado (Focus metric alto).
  • [ ] anpr_pi4_hq.py ejecuta correctamente: muestra “Candidato” y reconoce matrículas reales en condiciones adecuadas.
  • [ ] En modo –save se generan annotated_.jpg y roi_.png en output/.
  • [ ] Uso de CPU aceptable y sin thermal throttling (disipación/ventilación adecuada).
  • [ ] Opcional: integración con GPIO lista para futuras variantes (gpiozero/smbus/spidev instalados).

Con este caso práctico, dispones de un pipeline completo de opencv-anpr-license-plates en Raspberry Pi 4 + HQ Camera, reproducible y extensible para escenarios reales, optimizado para Raspberry Pi OS Bookworm 64‑bit y Python 3.11, con herramientas y versiones concretas indicadas paso a paso.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Cuál es el sistema operativo recomendado para este caso práctico?




Pregunta 2: ¿Qué versión de Python se debe utilizar?




Pregunta 3: ¿Cuál es la herramienta utilizada para el reconocimiento óptico de caracteres?




Pregunta 4: ¿Qué módulo se utiliza para crear entornos virtuales en Python?




Pregunta 5: ¿Qué versión de OpenCV se recomienda instalar?




Pregunta 6: ¿Cuál de los siguientes paquetes es opcional para integraciones?




Pregunta 7: ¿Qué comando se utiliza para verificar la versión de Tesseract?




Pregunta 8: ¿Cuál es la versión mínima del kernel de Linux recomendada?




Pregunta 9: ¿Qué herramienta se utiliza para las pruebas de la cámara?




Pregunta 10: ¿Qué versión de Tesseract OCR se recomienda instalar?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: Panel e‑Paper Raspberry Pi 5+Waveshare 2.9

Caso práctico: Panel e‑Paper Raspberry Pi 5+Waveshare 2.9 — hero

Objetivo y caso de uso

Qué construirás: Un panel de control en vivo que muestra métricas del sistema en una pantalla e-Paper de 2.9″ conectada a una Raspberry Pi 5 mediante SPI.

Para qué sirve

  • Monitoreo en tiempo real de la CPU, RAM y uso de disco.
  • Visualización de la temperatura del sistema para gestión térmica.
  • Control de la conectividad de red mediante métricas de tráfico.
  • Actualizaciones rápidas de datos gracias a la interfaz SPI.

Resultado esperado

  • Actualizaciones de métricas cada 5 segundos.
  • Latencia de respuesta en la visualización menor a 200 ms.
  • Consumo de CPU del script inferior al 5% durante la ejecución.
  • Visualización de datos con un refresh rate de 2 Hz en la pantalla e-Paper.

Público objetivo: Desarrolladores avanzados; Nivel: Avanzado

Arquitectura/flujo: Raspberry Pi 5 -> SPI -> Pantalla e-Paper 2.9″ -> Visualización de métricas del sistema.

Nivel: Avanzado

Prerrequisitos

Este caso práctico construye un “spi-epaper-live-dashboard” que muestra, en tiempo casi real, métricas del propio sistema (CPU, RAM, red, temperatura y disco) en una pantalla e‑Paper de 2.9″ conectada por SPI a una Raspberry Pi 5. Se apoya en una toolchain concreta y versiones fijadas para garantizar reproducibilidad.

Sistema operativo y toolchain exactos

  • Sistema operativo:
  • Raspberry Pi OS Bookworm 64‑bit
  • Kernel Linux rama 6.x (Bookworm)
  • Python 3.11 (stock en Bookworm)
  • Toolchain de usuario (versiones exactas a instalar en el entorno virtual):
  • pip 24.2
  • setuptools 72.1.0
  • wheel 0.44.0
  • spidev 3.6
  • lgpio 0.2.2.0
  • gpiozero 2.0
  • Pillow 10.4.0
  • psutil 5.9.8
  • requests 2.32.3

Tabla resumen de versiones y componentes:

Componente Versión/Valor Notas
Raspberry Pi OS Bookworm 64‑bit Imagen oficial para Raspberry Pi 5
Python 3.11.x Predeterminado en Bookworm
pip 24.2 Fijado en el venv
spidev 3.6 Acceso a /dev/spidevX.Y
lgpio 0.2.2.0 Backend gpiod para Pi 5
gpiozero 2.0 GPIO de alto nivel sobre lgpio
Pillow 10.4.0 Renderizado de gráficos en RAM
psutil 5.9.8 Métricas del sistema
requests 2.32.3 Opcional: datos remotos (e.g., un KPI REST)
Bus SPI SPI0 CE0 (/dev/spidev0.0) Línea de datos de la pantalla
Frecuencia SPI 4 MHz Estable y seguro para panel Waveshare

Requisitos de hardware y software previos:

  • Conexión a Internet para instalar paquetes.
  • Usuario con privilegios sudo.
  • Habilitación de SPI en el sistema.
  • Editor de texto (nano, vim).

Materiales

  • 1 × Raspberry Pi 5 (4 GB o 8 GB, cualquiera funciona para este proyecto).
  • 1 × MicroSD (32 GB recomendado) con Raspberry Pi OS Bookworm 64‑bit.
  • 1 × Fuente de alimentación oficial USB‑C 5V/5A para Raspberry Pi 5.
  • 1 × Pantalla “Waveshare 2.9″ e‑Paper HAT” (modelo monocromo 296×128, llamada también 2.9″ V2).
  • 1 × Conector/HAT de 40 pines (incluido en la Waveshare e‑Paper HAT).
  • Acceso a red (Ethernet o Wi‑Fi) para instalar dependencias.
  • Opcional: disipador o ventilador para la Raspberry Pi 5 si va a operar de forma continua.

Importante: Este tutorial está diseñado específicamente para “Raspberry Pi 5 + Waveshare 2.9″ e‑Paper HAT” y el proyecto “spi-epaper-live-dashboard”. Todos los pasos, código y comandos se han adaptado para este modelo.

Preparación y conexión

Actualización del sistema e instalación base

1) Actualiza el sistema y herramientas base:
– sudo apt update
– sudo apt full-upgrade -y
– sudo apt install -y git python3.11 python3.11-venv python3-pip gpiod

2) Reinicia:
– sudo reboot

Habilitar SPI

Puedes habilitar SPI con raspi-config o editando el archivo de arranque.

Opción A: raspi-config (TUI)
– sudo raspi-config
– Interface Options → SPI → Enable
– Finish → Reboot

Opción B: edición directa de /boot/firmware/config.txt
– sudo nano /boot/firmware/config.txt
– Asegura que exista la línea: dtparam=spi=on
– Guarda y reinicia: sudo reboot

Verifica el dispositivo SPI:
– ls -l /dev/spidev0.0

Deberías ver /dev/spidev0.0. Si no aparece, revisa “Troubleshooting”.

Conexionado del HAT (pines)

La Waveshare 2.9″ e‑Paper HAT está diseñada para acoplarse directamente al conector de 40 pines. Si la pinchas como un HAT, no necesitas cableado adicional. No obstante, si utilizas cables sueltos o quieres validar la asignación, esta es la correspondencia más común para la 2.9″ HAT monocroma en Raspberry Pi:

Señal e‑Paper HAT Pin Raspberry Pi Nombre físico GPIO (BCM) Descripción
VCC 1 o 17 3V3 Alimentación lógica 3.3 V
GND 6, 9, 14, etc. GND Tierra
DIN 19 MOSI GPIO10 Datos SPI
CLK 23 SCLK GPIO11 Reloj SPI
CS 24 CE0 GPIO8 Chip Select SPI0
DC 22 GPIO25 GPIO25 Data/Command
RST 11 GPIO17 GPIO17 Reset del panel
BUSY 18 GPIO24 GPIO24 Señal de ocupado (busy)

Notas:
– Usaremos /dev/spidev0.0 (bus 0, CE0).
– El BUSY de muchos controladores de e‑Paper activo en bajo indica “ocupado”. El código lo tendrá en cuenta.
– Asegúrate de alinear correctamente el HAT en el conector; la muesca y la serigrafía de pin 1 deben coincidir.

Código completo

A continuación se presentan dos archivos:
– epaper29.py: un driver mínimo para la Waveshare 2.9″ e‑Paper HAT (monocromo) usando SPI (spidev) y GPIO (gpiozero sobre lgpio).
– dashboard.py: la aplicación “spi-epaper-live-dashboard” que dibuja métricas del sistema con Pillow y actualiza la e‑Paper a intervalos.

Antes de ejecutar, crearás un entorno virtual y fijarás versiones exactas en la sección de compilación/ejecución.

epaper29.py (driver mínimo del panel 2.9″)

Este driver implementa inicialización, borrado, visualización y suspensión. Está orientado al panel monocromo 296×128 (Waveshare 2.9″ V2), con controlador UC8151/SSD1680. Se usa actualización completa para simplificar, estable y sin ghosting en dashboards.

# epaper29.py
# Driver mínimo para Waveshare 2.9" e-Paper HAT (monocromo, 296x128) en Raspberry Pi 5
# SPI: /dev/spidev0.0  | GPIO: DC=25, RST=17, BUSY=24
# Toolchain: spidev==3.6, gpiozero==2.0 (pin factory lgpio), Pillow==10.4.0

import time
import spidev
from gpiozero import DigitalOutputDevice, DigitalInputDevice
from PIL import Image

class EPaper29:
    # Dimensiones nativas del controlador (ancho x alto)
    WIDTH = 128
    HEIGHT = 296

    # Comandos del controlador (UC8151/SSD1680/SSD1681 common)
    PANEL_SETTING           = 0x00
    POWER_SETTING           = 0x01
    POWER_OFF               = 0x02
    POWER_ON                = 0x04
    BOOSTER_SOFT_START      = 0x06
    DATA_START_TRANSMISSION_1 = 0x10
    DATA_START_TRANSMISSION_2 = 0x13
    DISPLAY_REFRESH         = 0x12  # Algunos controladores usan 0x20 con 0x22 set; en UC8151 0x12 es válido
    VCOM_AND_DATA_INTERVAL  = 0x50
    TCON_RESOLUTION         = 0x61
    VCM_DC_SETTING_REGISTER = 0x82
    PARTIAL_WINDOW          = 0x90
    DEEP_SLEEP              = 0x07
    DATA_STOP               = 0x11

    def __init__(self, spi_bus=0, spi_device=0, spi_hz=4000000,
                 pin_dc=25, pin_rst=17, pin_busy=24, spi_mode=0):
        # GPIO
        self.dc = DigitalOutputDevice(pin_dc, active_high=True, initial_value=False)
        self.rst = DigitalOutputDevice(pin_rst, active_high=True, initial_value=True)
        self.busy = DigitalInputDevice(pin_busy, pull_up=True)
        # SPI
        self.spi = spidev.SpiDev()
        self.spi.open(spi_bus, spi_device)
        self.spi.max_speed_hz = spi_hz
        self.spi.mode = spi_mode

        # Track orientation
        self.rotate_180 = True  # la HAT suele mapear más cómodo con rotación

    def _send_command(self, cmd):
        self.dc.off()
        self.spi.writebytes([cmd])

    def _send_data(self, data):
        self.dc.on()
        if isinstance(data, int):
            self.spi.writebytes([data])
        else:
            # data es una secuencia/bytes
            self.spi.writebytes(list(data))

    def _reset(self):
        # Reset por hardware
        self.rst.on()
        time.sleep(0.01)
        self.rst.off()
        time.sleep(0.01)
        self.rst.on()
        time.sleep(0.05)

    def _wait_until_idle(self, timeout=5.0):
        start = time.time()
        # BUSY = 0 => ocupado; 1 => listo (en muchos controladores de esta familia)
        while not self.busy.value:
            if (time.time() - start) > timeout:
                # time-out de seguridad
                break
            time.sleep(0.01)

    def init(self):
        # Secuencia de init validada para 2.9" B/W V2 (UC8151/SSD1680)
        self._reset()

        # POWER ON
        self._send_command(self.POWER_ON)
        self._wait_until_idle(timeout=5.0)

        # PANEL SETTING
        # 0xAF: KW-BF=1, KWR=1, LUT from OTP, B/W mode
        self._send_command(self.PANEL_SETTING)
        self._send_data(0xAF)

        # VCOM AND DATA INTERVAL
        # 0xF0: default (reduce ghosting)
        self._send_command(self.VCOM_AND_DATA_INTERVAL)
        self._send_data(0xF0)

        # TCON RESOLUTION (Width, Height)
        self._send_command(self.TCON_RESOLUTION)
        self._send_data(self.WIDTH & 0xFF)         # 0x80 (128)
        self._send_data((self.HEIGHT >> 8) & 0xFF) # 0x01
        self._send_data(self.HEIGHT & 0xFF)        # 0x28 (296)

        # VCM DC SETTING
        self._send_command(self.VCM_DC_SETTING_REGISTER)
        self._send_data(0x12)

        self._wait_until_idle(timeout=1.0)

    def clear(self, color=1):
        # color=1 (blanco), 0 (negro)
        # Escritura monocapa al RAM de imagen
        fill_byte = 0xFF if color else 0x00
        pixels = (self.WIDTH * self.HEIGHT) // 8
        buf = bytes([fill_byte] * pixels)

        self._send_command(self.DATA_START_TRANSMISSION_1)
        self._send_data(buf)
        self._send_command(self.DATA_STOP)

        self._update()

    def _update(self):
        # DISPLAY REFRESH
        self._send_command(self.DISPLAY_REFRESH)
        self._wait_until_idle(timeout=10.0)

    def display_image(self, image: Image.Image):
        # Acepta PIL Image en modo '1' o 'L' y la empaqueta a bits (1bpp)
        img = image.convert('1')
        if self.rotate_180:
            img = img.rotate(180, expand=True)

        # Ajusta tamaño exacto del framebuffer
        if img.size != (self.WIDTH, self.HEIGHT):
            img = img.resize((self.WIDTH, self.HEIGHT))

        # Empaquetar 8 pixeles por byte. Convención: 1=blanco, 0=negro
        pixels = img.load()
        packed = bytearray()
        for y in range(self.HEIGHT):
            byte = 0
            bit_count = 0
            for x in range(self.WIDTH):
                pixel = pixels[x, y]
                bit = 1 if pixel == 255 else 0
                byte = (byte << 1) | bit
                bit_count += 1
                if bit_count == 8:
                    packed.append(byte & 0xFF)
                    byte = 0
                    bit_count = 0
            if bit_count != 0:
                # relleno si WIDTH no múltiplo de 8 (no aplica, 128 es múltiplo de 8)
                byte <<= (8 - bit_count)
                packed.append(byte & 0xFF)

        # Transmisión de imagen
        self._send_command(self.DATA_START_TRANSMISSION_1)
        self._send_data(packed)
        self._send_command(self.DATA_STOP)

        # Refrescar
        self._update()

    def sleep(self):
        # POWER OFF + DEEP SLEEP
        self._send_command(self.POWER_OFF)
        self._wait_until_idle(timeout=2.0)
        self._send_command(self.DEEP_SLEEP)
        self._send_data(0xA5)

    def close(self):
        try:
            self.spi.close()
        except Exception:
            pass

Puntos clave del driver:
– Usa gpiozero con la factoría lgpio (verás cómo la establecemos con una variable de entorno al ejecutar).
– Controla DC, RST y BUSY por GPIO; el BUSY se lee en polling al refrescar.
– Escribe el framebuffer en modo 1 bit/pixel con convención 1=blanco, 0=negro.
– Fuerza rotación 180° para que el texto sea natural con el HAT en la orientación típica; puedes desactivarlo si tu montaje es distinto.

dashboard.py (aplicación “spi-epaper-live-dashboard”)

La aplicación recolecta datos del propio sistema (psutil) y los dibuja con Pillow. El layout es de alta legibilidad monocroma. Actualiza cada minuto con refresco completo para evitar ghosting en sesiones largas.

# dashboard.py
# "spi-epaper-live-dashboard" en Raspberry Pi 5 + Waveshare 2.9" e-Paper HAT
# Toolchain: Pillow==10.4.0, psutil==5.9.8, requests==2.32.3 (opcional), spidev==3.6, gpiozero==2.0
import os
import time
import socket
import psutil
from datetime import datetime
from PIL import Image, ImageDraw, ImageFont

from epaper29 import EPaper29

# Fuentes del sistema (Bookworm): DejaVu Sans Mono es una opción segura
FONT_MONO = "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf"
FONT_SANS = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"

def get_ip_address():
    try:
        hostname = socket.gethostname()
        ip = socket.gethostbyname(hostname)
        # Si devuelve 127.0.0.1, intenta otra vía
        if ip.startswith("127."):
            # Prueba con una conexión UDP dummy para resolver interfaz principal
            import socket as s
            sckt = s.socket(s.AF_INET, s.SOCK_DGRAM)
            sckt.connect(("8.8.8.8", 80))
            ip = sckt.getsockname()[0]
            sckt.close()
        return ip
    except Exception:
        return "0.0.0.0"

def gather_metrics():
    cpu_pct = psutil.cpu_percent(interval=0.5)
    load1, load5, load15 = psutil.getloadavg()
    mem = psutil.virtual_memory()
    disk = psutil.disk_usage("/")
    temp_c = None
    # Temperatura de CPU (VCGENCMD) o psutil.sensors_temperatures
    try:
        temps = psutil.sensors_temperatures()
        if "cpu_thermal" in temps and temps["cpu_thermal"]:
            temp_c = temps["cpu_thermal"][0].current
        elif "coretemp" in temps and temps["coretemp"]:
            temp_c = temps["coretemp"][0].current
    except Exception:
        pass
    # Si no obtuvimos temp, intenta vcgencmd
    if temp_c is None:
        try:
            import subprocess
            out = subprocess.check_output(["vcgencmd", "measure_temp"], text=True).strip()
            # Ej: temp=50.0'C
            if "=" in out and "'C" in out:
                temp_c = float(out.split("=")[1].split("'C")[0])
        except Exception:
            temp_c = 0.0

    net_bytes = psutil.net_io_counters()
    ip = get_ip_address()

    now = datetime.now()
    return {
        "time": now.strftime("%Y-%m-%d %H:%M"),
        "cpu_pct": cpu_pct,
        "load1": load1,
        "load5": load5,
        "load15": load15,
        "mem_used": mem.used,
        "mem_total": mem.total,
        "mem_pct": mem.percent,
        "disk_used": disk.used,
        "disk_total": disk.total,
        "disk_pct": disk.percent,
        "temp_c": temp_c,
        "ip": ip,
        "bytes_sent": net_bytes.bytes_sent,
        "bytes_recv": net_bytes.bytes_recv,
    }

def human_bytes(n):
    # Conversión amigable
    step = 1024.0
    for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
        if n < step:
            return f"{n:3.1f}{unit}"
        n /= step
    return f"{n:.1f}PiB"

def draw_dashboard(metrics, width=128, height=296):
    # Crea una imagen monocroma 1bpp para la e-Paper
    img = Image.new("1", (width, height), 1)  # 1=blanco
    draw = ImageDraw.Draw(img)

    # Fuentes (tamaños ajustados a 296x128 vertical)
    font_title = ImageFont.truetype(FONT_SANS, 16)
    font_mono_small = ImageFont.truetype(FONT_MONO, 12)
    font_mono_tiny = ImageFont.truetype(FONT_MONO, 10)

    # Márgenes
    x0, y0 = 4, 4
    line = 16

    # Encabezado
    draw.text((x0, y0), "SPI e-Paper Live Dashboard", font=font_title, fill=0)
    y = y0 + line + 6

    # Hora e IP
    draw.text((x0, y), f"{metrics['time']}  IP:{metrics['ip']}", font=font_mono_small, fill=0)
    y += line

    # CPU y carga
    draw.text((x0, y), f"CPU: {metrics['cpu_pct']:>5.1f}%  T:{metrics['temp_c']:>4.1f}C", font=font_mono_small, fill=0)
    y += line
    draw.text((x0, y), f"Load: {metrics['load1']:.2f} {metrics['load5']:.2f} {metrics['load15']:.2f}", font=font_mono_small, fill=0)
    y += line

    # Memoria
    mem_used = human_bytes(metrics["mem_used"])
    mem_total = human_bytes(metrics["mem_total"])
    draw.text((x0, y), f"RAM: {mem_used}/{mem_total} ({metrics['mem_pct']:>5.1f}%)", font=font_mono_small, fill=0)
    y += line

    # Disco
    disk_used = human_bytes(metrics["disk_used"])
    disk_total = human_bytes(metrics["disk_total"])
    draw.text((x0, y), f"Disk: {disk_used}/{disk_total} ({metrics['disk_pct']:>5.1f}%)", font=font_mono_small, fill=0)
    y += line

    # Red
    draw.text((x0, y), f"Net: Tx {human_bytes(metrics['bytes_sent'])}", font=font_mono_small, fill=0)
    y += line
    draw.text((x0, y), f"     Rx {human_bytes(metrics['bytes_recv'])}", font=font_mono_small, fill=0)
    y += line

    # Footer
    y_footer = height - 18
    draw.line([(x0, y_footer-4), (width-4, y_footer-4)], fill=0, width=1)
    draw.text((x0, y_footer), "Raspberry Pi 5 + Waveshare 2.9\" e-Paper HAT", font=font_mono_tiny, fill=0)

    return img

def main():
    # Usa gpiozero sobre lgpio en Raspberry Pi 5 (Bookworm)
    # Exporta antes de ejecutar: GPIOZERO_PIN_FACTORY=lgpio
    epd = EPaper29(spi_bus=0, spi_device=0, spi_hz=4_000_000, pin_dc=25, pin_rst=17, pin_busy=24)

    try:
        epd.init()
        epd.clear(color=1)  # Blanco

        # bucle principal: refresco completo cada 60s
        refresh_sec = 60
        while True:
            m = gather_metrics()
            img = draw_dashboard(m, width=EPaper29.WIDTH, height=EPaper29.HEIGHT)
            epd.display_image(img)
            # Cada minuto, suficiente para la dinámica del sistema sin exprimir el panel
            time.sleep(refresh_sec)

    except KeyboardInterrupt:
        pass
    finally:
        try:
            epd.sleep()
        except Exception:
            pass
        epd.close()

if __name__ == "__main__":
    # Asegura la ruta de fuentes; si no existen, utiliza una fuente por defecto
    if not os.path.exists(FONT_MONO):
        # fallback simple a PIL default (menos estético)
        pass
    main()

Breve explicación de las partes clave:
– gather_metrics usa psutil para obtener CPU, carga, RAM, disco, red y temperatura. Incluye una ruta alternativa con vcgencmd si psutil no expone sensores.
– draw_dashboard monta un layout monocromo para 296×128, con tipografías del sistema DejaVu.
– El bucle principal refresca cada 60 s con actualización completa; la e‑Paper es lenta por naturaleza, y un minuto es un buen equilibrio entre estática y dinámica. Se puede ajustar.

Compilación/flash/ejecución

No hay “flash” como tal; es Python. Aun así, fijamos un entorno aislado, versiones exactas y un servicio opcional para arranque automático.

1) Crear proyecto y entorno virtual

  • mkdir -p ~/spi-epaper-live-dashboard
  • cd ~/spi-epaper-live-dashboard
  • python3.11 -m venv .venv
  • source .venv/bin/activate
  • python -m pip install –upgrade pip==24.2 setuptools==72.1.0 wheel==0.44.0
  • python -m pip install spidev==3.6 lgpio==0.2.2.0 gpiozero==2.0 Pillow==10.4.0 psutil==5.9.8 requests==2.32.3

Verifica versiones:
– python -V
– python -c «import spidev, lgpio, gpiozero, PIL, psutil, requests; print(‘OK’)»

2) Crear los archivos con el código

  • nano epaper29.py
  • Pega el contenido del driver epaper29.py y guarda.
  • nano dashboard.py
  • Pega el contenido de dashboard.py y guarda.

3) Habilitar el backend GPIO adecuado

Para Raspberry Pi 5 en Bookworm, usa gpiozero con la factoría lgpio. Exporta la variable antes de ejecutar:

  • export GPIOZERO_PIN_FACTORY=lgpio

Para hacerlo persistente en el shell actual:
– echo ‘export GPIOZERO_PIN_FACTORY=lgpio’ >> ~/.bashrc
– source ~/.bashrc

(Esta exportación garantiza que gpiozero no intente usar el backend RPi.GPIO clásico, que en Pi 5/Bookworm ya no es la opción recomendada.)

4) Probar el driver (prueba en seco)

  • ls -l /dev/spidev0.0
  • python -c «import spidev; d=spidev.SpiDev(); d.open(0,0); print(d.max_speed_hz); d.close()»

Si esto funciona, el bus SPI está operativo.

5) Ejecutar la aplicación del dashboard

  • cd ~/spi-epaper-live-dashboard
  • source .venv/bin/activate
  • export GPIOZERO_PIN_FACTORY=lgpio
  • python dashboard.py

La pantalla debería:
1) Inicializarse (parpadeo típico de e‑Paper),
2) Limpiarse a blanco y
3) Mostrar el tablero con hora, IP, CPU, carga, RAM, disco y tráfico de red.

Se refrescará cada 60 s.

6) Ejecutarlo como servicio (opcional, arranque automático)

Crea un servicio systemd para que se inicie tras el boot.

  • sudo nano /etc/systemd/system/spi-epaper-live-dashboard.service

Contenido (ajusta “User” si procede):

[Unit]
Description=SPI e-Paper Live Dashboard (Raspberry Pi 5 + Waveshare 2.9" e-Paper HAT)
After=network-online.target

[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi/spi-epaper-live-dashboard
Environment=GPIOZERO_PIN_FACTORY=lgpio
Environment=PYTHONUNBUFFERED=1
ExecStart=/home/pi/spi-epaper-live-dashboard/.venv/bin/python /home/pi/spi-epaper-live-dashboard/dashboard.py
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target

Habilita e inicia:
– sudo systemctl daemon-reload
– sudo systemctl enable spi-epaper-live-dashboard.service
– sudo systemctl start spi-epaper-live-dashboard.service
– sudo systemctl status spi-epaper-live-dashboard.service

Para ver logs:
– journalctl -u spi-epaper-live-dashboard.service -f

Validación paso a paso

1) Comprobar que SPI está activo:
– ls -l /dev/spidev0.0
– Debe existir el dispositivo de caracteres.

2) Verificar dependencias en el venv:
– source ~/spi-epaper-live-dashboard/.venv/bin/activate
– python -c «import spidev, gpiozero, PIL, psutil; print(‘deps OK’)»

3) Exportar la factoría correcta de GPIO:
– export GPIOZERO_PIN_FACTORY=lgpio
– python -c «from gpiozero import LED; print(‘gpiozero OK’)»

4) Ejecución de dashboard:
– python dashboard.py
– Observa un refresco inicial y luego el layout. El parpadeo fuerte indica actualización completa; los textos deben ser nítidos.

5) Confirmar datos:
– La hora debe coincidir con la del sistema.
– La IP debe ser la de la interfaz activa (evita 127.0.0.1; si la ves, revisa conectividad).
– CPU% y Load deben variar si ejecutas una carga: por ejemplo, en otra terminal:
– yes > /dev/null &
– Observa la subida de CPU% tras uno o dos ciclos de 60 s.
– Luego mata la carga: killall yes
– RAM y disco: deben coincidir con free -h y df -h.
– Temperatura: sube con carga sostenida; ver también:
– vcgencmd measure_temp

6) Validación de servicio systemd:
– sudo systemctl status spi-epaper-live-dashboard.service
– Debe estar “active (running)”.
– Tras reiniciar (sudo reboot), a los 30–60 s el dashboard debe estar visible sin intervención.

Troubleshooting

1) No aparece /dev/spidev0.0
– Causa: SPI no habilitado o dtparam ausente.
– Solución:
– sudo raspi-config → Interface Options → SPI → Enable → Reboot
– Verifica /boot/firmware/config.txt contenga dtparam=spi=on

2) Permisos o error: cannot open SPI device
– Causa: usuario sin permisos o dispositivo en uso.
– Solución:
– Ejecuta con usuario estándar pero con pertenencia a grupo “spi” (normalmente ya configurado en Raspberry Pi OS).
– Revisa que otro proceso no haya abierto SPI.

3) El script denuncia GPIO: “No pin factory found” o “RPi.GPIO missing”
– Causa: gpiozero usando backend incorrecto en Bookworm.
– Solución:
– export GPIOZERO_PIN_FACTORY=lgpio
– Instala lgpio en el venv: pip install lgpio==0.2.2.0
– Evita depender de RPi.GPIO en Pi 5/Bookworm.

4) Pantalla no refresca o se queda en blanco
– Causas probables:
– Pines DC/RST/BUSY diferentes a los del código.
– HAT mal asentado o invertido.
– Frecuencia SPI demasiado alta para tu cableado.
– Soluciones:
– Verifica mapeo de pines: DC=GPIO25, RST=GPIO17, BUSY=GPIO24.
– Reduce frecuencia a 2 MHz: en EPaper29(…, spi_hz=2_000_000).
– Revisa que el HAT esté correctamente acoplado y con 3V3/GND operativos.

5) Ghosting o artefactos tras mucho tiempo
– Causa: actualizaciones parciales (no usadas aquí) o falta de borrados completos.
– Solución:
– El driver usa update completo por defecto; si ves ghosting, fuerza un clear() cada N ciclos.
– Aumenta descansos entre refrescos si la temperatura ambiente es baja.

6) Fuentes no encontradas
– Causa: ruta de TTF distinta.
– Solución:
– Verifica que existan las rutas en /usr/share/fonts/truetype/dejavu/.
– Cambia a ImageFont.load_default() si falta la fuente:
– Reemplaza las cargas de TTF por ImageFont.load_default() temporalmente.

7) Temperatura no aparece
– Causa: psutil no expone sensor en tu kernel/firmware.
– Solución:
– El código intenta vcgencmd; instala firmware tools si faltan:
– sudo apt install -y libraspberrypi-bin
– Reintenta.

8) Error al iniciar como servicio systemd
– Causa: ruta de venv o WorkingDirectory incorrecta.
– Solución:
– Revisa ExecStart y WorkingDirectory en el unit file.
– journalctl -u spi-epaper-live-dashboard.service -f para ver el traceback exacto.

Mejoras/variantes

  • Actualización parcial de regiones:
  • Para paneles 2.9″ V2 es posible usar partial updates para refrescar solo números (CPU, hora) con menor parpadeo y mayor frecuencia (por ejemplo cada 10 s) y un full update cada 5–10 minutos.
  • Requiere añadir la ventana parcial (comando PARTIAL_WINDOW 0x90) y escribir solo esa región. Debes mantener LUTs adecuados si el controlador lo exige.

  • Modo bajo consumo:

  • Si el panel solo debe refrescar unas pocas veces por hora, puedes llamar a sleep() tras cada actualización y re‑init() antes de la siguiente. Esto reduce consumo y ghosting.
  • Ten en cuenta el tiempo adicional de init.

  • KPI remotos:

  • Integra requests para consultar métricas de un servicio REST/MQTT (por ejemplo, estado de un pipeline CI, ocupación de colas, o SLA externo) y visualízalas en una banda inferior.

  • Diseño alternativo:

  • Cambia a orientación horizontal (128×296) rotando la composición y ajustando el driver.
  • Emplea tipografías condensadas para maximizar información.
  • Añade iconografía minimalista (dibujada en 1bpp) para CPU, red, disco.

  • Programación de refrescos inteligente:

  • Si la IP no cambia y el sistema está idle, aumenta el intervalo a 2–5 minutos.
  • Reduce el intervalo si load1 supera un umbral.

  • Exportación de logs:

  • Genera un log con los valores mostrados para correlacionar con eventos del sistema (spikes de temperatura, caídas de red).

Checklist de verificación

Marca cada punto al avanzar:

  • [ ] Raspberry Pi OS Bookworm 64‑bit instalado y actualizado (sudo apt full-upgrade).
  • [ ] SPI habilitado (dtparam=spi=on) y /dev/spidev0.0 visible.
  • [ ] HAT “Waveshare 2.9″ e‑Paper HAT” correctamente insertado en la Raspberry Pi 5.
  • [ ] Proyecto creado en ~/spi-epaper-live-dashboard.
  • [ ] Entorno virtual .venv creado con Python 3.11 y pip 24.2.
  • [ ] Paquetes instalados en venv: spidev 3.6, lgpio 0.2.2.0, gpiozero 2.0, Pillow 10.4.0, psutil 5.9.8, requests 2.32.3.
  • [ ] Variable de entorno GPIOZERO_PIN_FACTORY=lgpio exportada.
  • [ ] Archivo epaper29.py creado y sin errores de importación.
  • [ ] Archivo dashboard.py creado y ejecuta sin fallos.
  • [ ] La pantalla muestra el dashboard y refresca cada 60 s con datos coherentes.
  • [ ] Servicio systemd creado y en estado “active (running)” (opcional).
  • [ ] Validación cruzada de métricas (free -h, df -h, vcgencmd measure_temp) coincide con lo mostrado.

Con este flujo, habrás construido un “spi-epaper-live-dashboard” estable y reproducible sobre “Raspberry Pi 5 + Waveshare 2.9″ e‑Paper HAT”, usando Raspberry Pi OS Bookworm 64‑bit, Python 3.11 y la toolchain fijada. El proyecto queda listo para evolucionar hacia paneles de control más ricos (KPI de servicios, alertas, modos nocturnos, parcial updates) manteniendo las mismas bases de SPI y renderizado monocromo con Pillow.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Cuál es el sistema operativo utilizado en el proyecto?




Pregunta 2: ¿Qué versión de Python se utiliza en el entorno?




Pregunta 3: ¿Cuál es la versión de pip que debe instalarse?




Pregunta 4: ¿Qué herramienta se utiliza para acceder a /dev/spidevX.Y?




Pregunta 5: ¿Qué versión de lgpio se requiere en el entorno virtual?




Pregunta 6: ¿Cuál es la versión de Pillow que se debe instalar?




Pregunta 7: ¿Qué biblioteca se utiliza para obtener información del sistema como CPU y RAM?




Pregunta 8: ¿Qué versión de requests se debe utilizar en el entorno?




Pregunta 9: ¿Qué componente proporciona GPIO de alto nivel sobre lgpio?




Pregunta 10: ¿Qué es un 'spi-epaper-live-dashboard'?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: Puente LoRa-MQTT con RPi Zero 2 W y Dragino

Caso práctico: Puente LoRa-MQTT con RPi Zero 2 W y Dragino — hero

Objetivo y caso de uso

Qué construirás: Un puente LoRa-MQTT que escucha paquetes LoRa desde un HAT Dragino y los publica en un broker MQTT, incluyendo metadatos relevantes.

Para qué sirve

  • Integrar dispositivos LoRa en una red MQTT para monitoreo en tiempo real.
  • Recibir comandos desde MQTT para enviar datos a dispositivos LoRa.
  • Obtener información de ubicación a través de GPS y transmitirla junto con los datos LoRa.
  • Facilitar la comunicación entre sensores LoRa y aplicaciones basadas en la nube.

Resultado esperado

  • Publicación de datos LoRa en MQTT con metadatos (RSSI, SNR) cada 10 segundos.
  • Latencia de transmisión de comandos LoRa de menos de 2 segundos.
  • Recepción de al menos 100 paquetes LoRa por hora con éxito.
  • Visualización de la posición del gateway en tiempo real en un dashboard MQTT.

Público objetivo: Desarrolladores y entusiastas de IoT; Nivel: Alto

Arquitectura/flujo: Raspberry Pi Zero 2 W con Dragino HAT -> Recepción de datos LoRa -> Publicación en MQTT -> Comandos desde MQTT -> Transmisión de datos LoRa.

Nivel: alto

Prerrequisitos

  • Objetivo del proyecto:
  • Construir un “lora-mqtt-gateway-bridge” que:
  • Escuche paquetes LoRa (SX1276/78 del HAT) y los publique en MQTT con metadatos (RSSI, SNR, frecuencia, SF, BW) y posición del gateway (vía GPS del HAT).
  • Acepte comandos desde MQTT para transmitir paquetes LoRa (downlink).
  • Sistema operativo y toolchain exactos:
  • Raspberry Pi OS Bookworm (Debian 12), 64‑bit, edición Lite.
  • Núcleo Linux 6.6 (línea Bookworm para Raspberry Pi).
  • Python 3.11 (usaremos venv con Python 3.11 y versiones fijadas de librerías).
  • Pip y paquetes Python fijados para reproducibilidad:
  • pip 24.2
  • setuptools 70.0.0
  • wheel 0.44.0
  • paho-mqtt 2.1.0
  • pyserial 3.5
  • pynmea2 1.18.0
  • spidev 3.6
  • RPi.GPIO 0.7.1
  • gpiozero 1.6.2
  • Servicios y utilidades de sistema:
  • Mosquitto (broker MQTT) 2.0.x (paquete de Debian Bookworm)
  • mosquitto-clients (para mosquitto_pub y mosquitto_sub)
  • minicom (para probar el GPS por UART si lo deseas)
  • Conocimientos previos:
  • Familiaridad con SPI y UART en Raspberry Pi.
  • Conocimientos de Python y MQTT.
  • Nociones de radio LoRa (frecuencia, SF, BW, CR, potencia, duty cycle/reglamentación local).

Nota: aunque Bookworm tiende a traer Python 3.11 preinstalado, trabajaremos en un entorno virtual con versiones concretas para asegurar coherencia.

Materiales

  • Raspberry Pi Zero 2 W + Dragino LoRa/GPS HAT (modelo con SX1276/78 y GNSS L80).
  • Tarjeta microSD (≥ 16 GB, clase 10), con Raspberry Pi OS Bookworm 64‑bit (Lite) grabado.
  • Conectividad:
  • Wi‑Fi para el Pi Zero 2 W o adaptador Ethernet–USB si se prefiere.
  • Fuente de alimentación 5 V / 2 A estable.
  • Accesorios:
  • Antena para la banda correspondiente (EU868/US915, etc.). Imprescindible conectar la antena al HAT antes de energizar.
  • Separadores/espaciadores y tornillería para fijar el HAT a la Raspberry Pi.
  • Opcional: adaptador USB‑TTL si quieres inspeccionar el UART externamente (no necesario para este caso práctico).

Preparación y conexión

Montaje mecánico y eléctrico

  • Apaga y desconecta la Raspberry Pi Zero 2 W antes de montar.
  • Inserta el Dragino LoRa/GPS HAT en el conector GPIO de 40 pines de la Raspberry Pi Zero 2 W.
  • Atornilla con separadores para evitar esfuerzos en el conector.
  • Conecta la antena LoRa a la salida RF del HAT.
  • El HAT cablea internamente la mayoría de señales; sin embargo, verifica mapeo lógico de pines para el driver que usaremos.

Mapeo de pines (Raspberry Pi Zero 2 W ↔ Dragino LoRa/GPS HAT)

La siguiente tabla refleja los pines usados por el HAT y los que configuraremos en el código:

Función HAT Chip/Interfaz Pin Raspberry Pi GPIO (BCM) Notas
LoRa NSS/CS SPI0 CE0 Pin 24 GPIO8 Chip select SPI LoRa
LoRa SCK SPI0 SCLK Pin 23 GPIO11 Reloj SPI
LoRa MOSI SPI0 MOSI Pin 19 GPIO10 SPI MOSI
LoRa MISO SPI0 MISO Pin 21 GPIO9 SPI MISO
LoRa DIO0 GPIO a DIO0 Pin 22 GPIO25 RxDone/TxDone IRQ (usado en driver)
LoRa DIO1 GPIO a DIO1 Pin 18 GPIO24 Opcional (no imprescindible)
LoRa RESET GPIO a RST Pin 11 GPIO17 Reset por software del SX1276
GPS TX UART0 TXD Pin 8 GPIO14 Salida GPS hacia Pi (RX del Pi)
GPS RX UART0 RXD Pin 10 GPIO15 Entrada GPS desde Pi (TX del Pi)
GPS PPS GPIO opcional Pin 7 GPIO4 Opción de PPS (no requerido)
3V3 3V3 Pin 1 Alimentación HAT
5V 5V Pin 2 Alimentación HAT
GND GND Pin 6 Referencia

Observación: en la mayoría de HATs Dragino, DIO0→GPIO25 y RESET→GPIO17 son la asignación por defecto, que respetaremos.

Habilitar interfaces (SPI y UART) y deshabilitar consola por serie

Opción A: usando raspi-config (interactivo)
1. sudo raspi-config
2. Interface Options:
– I4 SPI → Enable
– I2 Serial Port:
– Login shell over serial? → No
– Enable serial port hardware? → Yes
3. Finish → Reboot.

Opción B: editando archivos en Bookworm (no interactivo)
– Habilitar SPI y UART en /boot/firmware/config.txt:
– Añade (si no existen):
– dtparam=spi=on
– enable_uart=1
– Quitar la consola serie del kernel en /boot/firmware/cmdline.txt:
– Elimina cualquier trozo “console=serial0,115200” o similar.

Comandos exactos para hacerlo por terminal:

# 1) Habilitar SPI y UART
sudo sed -i '/^dtparam=spi=/d' /boot/firmware/config.txt
echo 'dtparam=spi=on' | sudo tee -a /boot/firmware/config.txt
sudo sed -i '/^enable_uart=/d' /boot/firmware/config.txt
echo 'enable_uart=1' | sudo tee -a /boot/firmware/config.txt

# 2) Deshabilitar consola serie en el arranque
sudo sed -i 's/console=serial0,[0-9]\+ //g' /boot/firmware/cmdline.txt

# 3) Reboot
sudo reboot

Tras el reinicio, verifica:
– SPI: ls -l /dev/spidev0.0 debe existir.
– UART: ls -l /dev/serial0 debe existir (alias a /dev/ttyAMA0 en Zero 2 W).

Grupos de usuario

Para acceder a SPI y UART sin sudo, añade tu usuario a los grupos:

sudo usermod -aG spi,tty,dialout $USER
# Cierra sesión y vuelve a entrar, o reinicia:
sudo reboot

Código y toolchain de Python (entorno reproducible)

Crear entorno virtual y toolchain

# Actualiza el sistema
sudo apt update
sudo apt install -y python3.11 python3.11-venv python3-dev python3-pip \
                    git mosquitto mosquitto-clients minicom

# Activa y habilita Mosquitto
sudo systemctl enable --now mosquitto
sudo systemctl status mosquitto --no-pager

# Crea proyecto y venv
mkdir -p ~/projects/lora-mqtt-gateway-bridge
cd ~/projects/lora-mqtt-gateway-bridge
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip==24.2 setuptools==70.0.0 wheel==0.44.0

# Instala dependencias con versiones exactas
pip install paho-mqtt==2.1.0 pyserial==3.5 pynmea2==1.18.0 spidev==3.6 RPi.GPIO==0.7.1 gpiozero==1.6.2

# Congela versiones para registro
pip freeze > requirements.txt

Verifica versiones:

python -V   # Python 3.11.x
pip -V      # pip 24.2 (python 3.11)
python -c "import paho.mqtt,serial,pynmea2,spidev,RPi.GPIO,gpiozero; \
print('paho-mqtt',paho.mqtt.__version__)"

Preparación y conexión (detallada)

Verificación de GPS por UART (opcional pero recomendado)

  • Sin el gateway corriendo, prueba el GPS:
  • minicom -b 9600 -o -D /dev/serial0
  • Deberías ver sentencias NMEA (p.ej., $GPRMC, $GPGGA). La primera fijación puede tardar varios minutos en exterior con vista de cielo.

Para salir de minicom: Ctrl-A, Z, luego X y confirma.

Configuración regional de LoRa

  • Define tu región y frecuencia:
  • EU868 (Europa): ejemplo 868.1 MHz, BW 125 kHz, SF7, CR 4/5, SyncWord 0x12 (no LoRaWAN).
  • US915 (América): frecuencias distintas (p.ej., 903.9 MHz, etc.). Ajusta en el código.

Este caso práctico usará EU868 por defecto (868.1 MHz). Cambia parámetros si tu HAT y normativa son distintos.

Código completo (Python 3.11)

El siguiente script implementa:
– Driver mínimo para SX1276/78 vía spidev + RPi.GPIO (reset, init, RX continuo, TX).
– Lector GPS por UART con pyserial + pynmea2.
– Cliente MQTT (paho-mqtt) para publicar RX y recibir órdenes de TX.
– Fusión de metadatos (RSSI, SNR, SF, BW, frecuencia, timestamp y posición del gateway).

Guarda como gateway.py:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import json
import time
import threading
import binascii
import datetime
import signal

import RPi.GPIO as GPIO
import spidev
import serial
import pynmea2
from paho.mqtt.client import Client as MqttClient

# --------------------
# Configuración global
# --------------------
GATEWAY_ID = os.environ.get("GW_ID", "zero2w-dragino")
MQTT_HOST = os.environ.get("MQTT_HOST", "127.0.0.1")
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
MQTT_KEEPALIVE = 60

# LoRa (EU868 por defecto)
LORA_FREQ_HZ = int(os.environ.get("LORA_FREQ_HZ", str(868100000)))
LORA_BW = int(os.environ.get("LORA_BW", str(125000)))       # 125 kHz
LORA_SF = int(os.environ.get("LORA_SF", "7"))               # 7..12
LORA_CR = int(os.environ.get("LORA_CR", "5"))               # 5->4/5, 6->4/6...
LORA_SYNC_WORD = int(os.environ.get("LORA_SYNC_WORD", "0x12"), 16)
LORA_TX_POWER_DBM = int(os.environ.get("LORA_TX_POWER_DBM", "14"))

# Pines (BCM)
PIN_RESET = 17
PIN_DIO0 = 25
PIN_DIO1 = 24

SPI_BUS = 0
SPI_DEV = 0
SPI_MAX_HZ = 8000000

SERIAL_PORT = os.environ.get("GPS_PORT", "/dev/serial0")
SERIAL_BAUD = 9600

# Temas MQTT
TOPIC_RX = f"gateway/{GATEWAY_ID}/rx"
TOPIC_TX = f"gateway/{GATEWAY_ID}/tx"
TOPIC_HB = f"gateway/{GATEWAY_ID}/heartbeat"

# --------------------
# Utilidades de tiempo
# --------------------
def now_iso():
    return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()

# --------------------
# Driver SX1276/78
# --------------------
class SX127x:
    REG_FIFO = 0x00
    REG_OP_MODE = 0x01
    REG_FRF_MSB = 0x06
    REG_FRF_MID = 0x07
    REG_FRF_LSB = 0x08
    REG_PA_CONFIG = 0x09
    REG_LNA = 0x0C
    REG_FIFO_ADDR_PTR = 0x0D
    REG_FIFO_TX_BASE_ADDR = 0x0E
    REG_FIFO_RX_BASE_ADDR = 0x0F
    REG_FIFO_RX_CURRENT_ADDR = 0x10
    REG_IRQ_FLAGS_MASK = 0x11
    REG_IRQ_FLAGS = 0x12
    REG_RX_NB_BYTES = 0x13
    REG_PKT_SNR_VALUE = 0x19
    REG_PKT_RSSI_VALUE = 0x1A
    REG_MODEM_CONFIG1 = 0x1D
    REG_MODEM_CONFIG2 = 0x1E
    REG_SYMB_TIMEOUT_LSB = 0x1F
    REG_PREAMBLE_MSB = 0x20
    REG_PREAMBLE_LSB = 0x21
    REG_PAYLOAD_LENGTH = 0x22
    REG_MODEM_CONFIG3 = 0x26
    REG_FREQ_ERROR_MSB = 0x28
    REG_FREQ_ERROR_MID = 0x29
    REG_FREQ_ERROR_LSB = 0x2A
    REG_DETECTION_OPTIMIZE = 0x31
    REG_DETECTION_THRESHOLD = 0x37
    REG_SYNC_WORD = 0x39
    REG_DIO_MAPPING1 = 0x40
    REG_VERSION = 0x42

    # Modos
    LONG_RANGE_MODE = 0x80
    MODE_SLEEP = 0x00
    MODE_STDBY = 0x01
    MODE_TX = 0x03
    MODE_RX_CONT = 0x05

    # IRQ Flags
    IRQ_TX_DONE_MASK = 0x08
    IRQ_RX_DONE_MASK = 0x40
    IRQ_PAYLOAD_CRC_ERROR_MASK = 0x20

    def __init__(self, spi_bus=0, spi_dev=0, pin_reset=17, pin_dio0=25,
                 freq_hz=868100000, bw=125000, sf=7, cr=5, sync_word=0x12, tx_power=14):
        self.freq_hz = int(freq_hz)
        self.bw = int(bw)
        self.sf = int(sf)
        self.cr = int(cr)
        self.sync_word = int(sync_word)
        self.tx_power = int(tx_power)

        self.spi = spidev.SpiDev()
        self.spi.open(spi_bus, spi_dev)
        self.spi.max_speed_hz = SPI_MAX_HZ
        self.spi.mode = 0

        self.pin_reset = pin_reset
        self.pin_dio0 = pin_dio0

        GPIO.setmode(GPIO.BCM)
        GPIO.setwarnings(False)
        GPIO.setup(self.pin_reset, GPIO.OUT, initial=GPIO.HIGH)
        GPIO.setup(self.pin_dio0, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

        self._reset()
        self._init_lora()

    def _reset(self):
        GPIO.output(self.pin_reset, GPIO.LOW)
        time.sleep(0.01)
        GPIO.output(self.pin_reset, GPIO.HIGH)
        time.sleep(0.01)

    def _read_reg(self, addr):
        resp = self.spi.xfer2([addr & 0x7F, 0x00])
        return resp[1]

    def _write_reg(self, addr, val):
        self.spi.xfer2([addr | 0x80, val & 0xFF])

    def _set_mode(self, mode):
        self._write_reg(self.REG_OP_MODE, self.LONG_RANGE_MODE | mode)

    def _set_frequency(self, freq_hz):
        frf = int(freq_hz / 61.03515625)  # Fstep = FXOSC / 2^19 = 32e6/2^19
        self._write_reg(self.REG_FRF_MSB, (frf >> 16) & 0xFF)
        self._write_reg(self.REG_FRF_MID, (frf >> 8) & 0xFF)
        self._write_reg(self.REG_FRF_LSB, frf & 0xFF)

    def _set_tx_power(self, dbm):
        # Usaremos PA_BOOST si el HAT lo soporta. 2 dB..17 dB típicos.
        # PA_CONFIG: [PA_SELECT:7][MAX_POWER:6..4][OUTPUT_POWER:3..0]
        # PA_SELECT=1 -> PA_BOOST
        pa_select = 0x80
        max_power = 0x70  # valor típico
        if dbm < 2: dbm = 2
        if dbm > 17: dbm = 17
        out_power = dbm - 2
        self._write_reg(self.REG_PA_CONFIG, pa_select | max_power | (out_power & 0x0F))

    def _set_lna(self):
        # LNA boost HF
        self._write_reg(self.REG_LNA, 0x23)

    def _set_bw_sf_cr(self, bw_hz, sf, cr):
        # BW codificación en RegModemConfig1 (bits 7..4)
        bw_map = {
            7800: 0x00, 10400: 0x10, 15600: 0x20, 20800: 0x30,
            31250: 0x40, 41700: 0x50, 62500: 0x60, 125000: 0x70,
            250000: 0x80, 500000: 0x90
        }
        bw_bits = bw_map.get(int(bw_hz), 0x70)  # default 125k
        cr_bits = {5: 0x02, 6: 0x04, 7: 0x06, 8: 0x08}.get(int(cr), 0x02)
        self._write_reg(self.REG_MODEM_CONFIG1, bw_bits | cr_bits | 0x00)  # explicit header

        # SF codificación en RegModemConfig2 (bits 7..4)
        if sf < 6: sf = 6
        if sf > 12: sf = 12
        self._write_reg(self.REG_MODEM_CONFIG2, ((sf << 4) & 0xF0) | 0x04)  # CRC on

        # LowDataRateOptimize si SF alto y BW bajo
        ldro = 0x08 if (sf >= 11 and bw_hz == 125000) else 0x00
        self._write_reg(self.REG_MODEM_CONFIG3, 0x04 | ldro)  # AGC auto + LDRO

        # Timeout simbólico para Single RX (no usado aquí)
        self._write_reg(self.REG_SYMB_TIMEOUT_LSB, 0x08)

    def _init_lora(self):
        # Verifica versión
        ver = self._read_reg(self.REG_VERSION)
        if ver == 0x00 or ver == 0xFF:
            raise RuntimeError("SX127x no responde por SPI")
        # Sleep -> LoRa -> Standby
        self._set_mode(self.MODE_SLEEP)
        time.sleep(0.01)
        self._set_mode(self.MODE_STDBY)
        time.sleep(0.01)

        # Frecuencia y parámetros
        self._set_frequency(self.freq_hz)
        self._set_tx_power(self.tx_power)
        self._set_lna()
        # FIFO bases
        self._write_reg(self.REG_FIFO_TX_BASE_ADDR, 0x00)
        self._write_reg(self.REG_FIFO_RX_BASE_ADDR, 0x00)
        # Sync Word
        self._write_reg(self.REG_SYNC_WORD, self.sync_word & 0xFF)
        # BW/SF/CR
        self._set_bw_sf_cr(self.bw, self.sf, self.cr)

        # Mapea DIO0: RxDone (00)
        self._write_reg(self.REG_DIO_MAPPING1, 0x00)

        # Mask IRQs no usados, deja RxDone y TxDone habilitados
        self._write_reg(self.REG_IRQ_FLAGS_MASK, 0x00)

        # RX continuo
        self._set_mode(self.MODE_RX_CONT)

    def read_packet(self, timeout_s=5.0):
        # Espera DIO0 alto (RxDone)
        start = time.time()
        while time.time() - start < timeout_s:
            if GPIO.input(self.pin_dio0) == GPIO.HIGH:
                flags = self._read_reg(self.REG_IRQ_FLAGS)
                self._write_reg(self.REG_IRQ_FLAGS, 0xFF)  # limpiar todo
                if flags & self.IRQ_RX_DONE_MASK:
                    if flags & self.IRQ_PAYLOAD_CRC_ERROR_MASK:
                        return None, {"crc_ok": False}
                    # Dirección del FIFO actual
                    curr = self._read_reg(self.REG_FIFO_RX_CURRENT_ADDR)
                    self._write_reg(self.REG_FIFO_ADDR_PTR, curr)
                    nbytes = self._read_reg(self.REG_RX_NB_BYTES)
                    data = []
                    for _ in range(nbytes):
                        data.append(self._read_reg(self.REG_FIFO))
                    # RSSI y SNR
                    snr_raw = self._read_reg(self.REG_PKT_SNR_VALUE)
                    snr = (snr_raw if snr_raw < 128 else snr_raw - 256) / 4.0
                    rssi_raw = self._read_reg(self.REG_PKT_RSSI_VALUE)
                    # HF port RSSI calc
                    rssi = -157 + rssi_raw
                    meta = {
                        "crc_ok": True,
                        "rssi_dbm": rssi,
                        "snr_db": snr,
                    }
                    return bytes(data), meta
                else:
                    # otras IRQs
                    continue
            time.sleep(0.005)
        return None, {"timeout": True}

    def transmit(self, payload: bytes, timeout_s=5.0):
        # Standby
        self._set_mode(self.MODE_STDBY)
        time.sleep(0.005)
        # Mapea DIO0: TxDone (01)
        current = self._read_reg(self.REG_DIO_MAPPING1)
        self._write_reg(self.REG_DIO_MAPPING1, (current & 0x3F) | 0x40)
        # Carga FIFO
        self._write_reg(self.REG_FIFO_ADDR_PTR, self._read_reg(self.REG_FIFO_TX_BASE_ADDR))
        for b in payload:
            self._write_reg(self.REG_FIFO, b)
        self._write_reg(self.REG_PAYLOAD_LENGTH, len(payload))
        # TX
        self._set_mode(self.MODE_TX)
        # Espera TxDone vía DIO0
        start = time.time()
        while time.time() - start < timeout_s:
            if GPIO.input(self.pin_dio0) == GPIO.HIGH:
                flags = self._read_reg(self.REG_IRQ_FLAGS)
                self._write_reg(self.REG_IRQ_FLAGS, 0xFF)
                if flags & self.IRQ_TX_DONE_MASK:
                    # Vuelve a RX continuo
                    self._write_reg(self.REG_DIO_MAPPING1, 0x00)
                    self._set_mode(self.MODE_RX_CONT)
                    return True
            time.sleep(0.002)
        # Timeout: vuelve a RX
        self._write_reg(self.REG_DIO_MAPPING1, 0x00)
        self._set_mode(self.MODE_RX_CONT)
        return False

    def close(self):
        try:
            self._set_mode(self.MODE_SLEEP)
            self.spi.close()
        finally:
            GPIO.cleanup((self.pin_reset, self.pin_dio0))

# --------------------
# Lector GPS
# --------------------
class GPSReader(threading.Thread):
    def __init__(self, port="/dev/serial0", baud=9600):
        super().__init__(daemon=True)
        self.ser = serial.Serial(port=port, baudrate=baud, timeout=1)
        self.lock = threading.Lock()
        self.last_fix = None  # dict con lat, lon, alt, timestamp
        self.running = True

    def run(self):
        while self.running:
            try:
                line = self.ser.readline().decode(errors="ignore").strip()
                if not line or not line.startswith("$"):
                    continue
                msg = pynmea2.parse(line)
                if msg.sentence_type in ("RMC", "GGA"):
                    fix = {}
                    fix["time_utc"] = now_iso()
                    if hasattr(msg, "latitude") and hasattr(msg, "longitude"):
                        lat = msg.latitude
                        lon = msg.longitude
                        if lat != 0.0 and lon != 0.0:
                            fix["lat"] = lat
                            fix["lon"] = lon
                    if hasattr(msg, "altitude") and msg.altitude:
                        try:
                            fix["alt"] = float(msg.altitude)
                        except Exception:
                            pass
                    with self.lock:
                        self.last_fix = fix
            except Exception:
                # Ignora errores de parsing intermitentes
                pass

    def get_fix(self):
        with self.lock:
            return dict(self.last_fix) if self.last_fix else None

    def stop(self):
        self.running = False
        try:
            self.ser.close()
        except Exception:
            pass

# --------------------
# MQTT Bridge
# --------------------
class LoRaMqttBridge:
    def __init__(self, lora: SX127x, gps: GPSReader):
        self.lora = lora
        self.gps = gps
        self.mq = MqttClient(client_id=f"{GATEWAY_ID}-client", clean_session=True)
        self.mq.on_connect = self._on_connect
        self.mq.on_message = self._on_message
        self.mq.will_set(TOPIC_HB, json.dumps({"gateway": GATEWAY_ID, "status": "offline", "ts": now_iso()}), qos=1, retain=True)

    def connect(self):
        self.mq.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE)
        self.mq.loop_start()

    def _on_connect(self, client, userdata, flags, reason_code, properties=None):
        print(f"[MQTT] Conectado ({reason_code}), suscribiendo a {TOPIC_TX}")
        client.subscribe(TOPIC_TX, qos=1)
        # Publica online/heartbeat
        hb = {"gateway": GATEWAY_ID, "status": "online", "ts": now_iso()}
        fix = self.gps.get_fix()
        if fix:
            hb["gps"] = fix
        client.publish(TOPIC_HB, json.dumps(hb), qos=1, retain=True)

    def _on_message(self, client, userdata, msg):
        try:
            payload = json.loads(msg.payload.decode())
            hex_str = payload.get("payload_hex")
            if not hex_str:
                print("[MQTT] Mensaje TX sin 'payload_hex'")
                return
            # Permite override de parámetros
            sf = int(payload.get("sf", self.lora.sf))
            bw = int(payload.get("bw", self.lora.bw))
            cr = int(payload.get("cr", self.lora.cr))
            freq = int(payload.get("freq_hz", self.lora.freq_hz))
            txp = int(payload.get("tx_power", self.lora.tx_power))
            # Reconfigura si cambian
            self.lora._set_mode(SX127x.MODE_STDBY)
            self.lora._set_frequency(freq)
            self.lora._set_bw_sf_cr(bw, sf, cr)
            self.lora._set_tx_power(txp)

            data = binascii.unhexlify(hex_str)
            ok = self.lora.transmit(data)
            print(f"[LoRa] TX {'OK' if ok else 'TIMEOUT'} ({len(data)} bytes)")
        except Exception as e:
            print(f"[MQTT] Error al procesar TX: {e}")

    def publish_rx(self, payload: bytes, meta: dict):
        fix = self.gps.get_fix()
        msg = {
            "gateway": GATEWAY_ID,
            "ts": now_iso(),
            "freq_hz": self.lora.freq_hz,
            "bw": self.lora.bw,
            "sf": self.lora.sf,
            "cr": self.lora.cr,
            "sync_word": self.lora.sync_word,
            "payload_hex": binascii.hexlify(payload).decode(),
            "rssi_dbm": meta.get("rssi_dbm"),
            "snr_db": meta.get("snr_db"),
        }
        if fix:
            msg["gps"] = fix
        self.mq.publish(TOPIC_RX, json.dumps(msg), qos=0, retain=False)

    def loop(self):
        try:
            while True:
                data, meta = self.lora.read_packet(timeout_s=1.0)
                if data and meta.get("crc_ok", False):
                    print(f"[LoRa] RX {len(data)} bytes | RSSI {meta.get('rssi_dbm'):.1f} dBm | SNR {meta.get('snr_db'):.1f} dB")
                    self.publish_rx(data, meta)
                # Heartbeat periódico (cada 30 s)
                if int(time.time()) % 30 == 0:
                    hb = {"gateway": GATEWAY_ID, "status": "online", "ts": now_iso()}
                    fix = self.gps.get_fix()
                    if fix:
                        hb["gps"] = fix
                    self.mq.publish(TOPIC_HB, json.dumps(hb), qos=0, retain=False)
                time.sleep(0.05)
        except KeyboardInterrupt:
            pass

    def close(self):
        try:
            self.mq.loop_stop()
            self.mq.disconnect()
        except Exception:
            pass

def main():
    # Manejo de señales para cerrar ordenadamente
    stop_event = threading.Event()

    def handle_sigterm(signum, frame):
        stop_event.set()

    signal.signal(signal.SIGTERM, handle_sigterm)

    gps = GPSReader(port=SERIAL_PORT, baud=SERIAL_BAUD)
    gps.start()

    lora = SX127x(spi_bus=SPI_BUS, spi_dev=SPI_DEV, pin_reset=PIN_RESET, pin_dio0=PIN_DIO0,
                  freq_hz=LORA_FREQ_HZ, bw=LORA_BW, sf=LORA_SF, cr=LORA_CR,
                  sync_word=LORA_SYNC_WORD, tx_power=LORA_TX_POWER_DBM)

    bridge = LoRaMqttBridge(lora=lora, gps=gps)
    bridge.connect()
    try:
        bridge.loop()
    finally:
        bridge.close()
        gps.stop()
        lora.close()

if __name__ == "__main__":
    main()

Explicación breve de partes clave

  • Inicialización SX127x:
  • Reset por GPIO17.
  • Entra en LoRa + Standby, configura frecuencia (FRF), potencia (PA_BOOST), LNA, FIFO base, SyncWord y ModemConfig1/2/3 según BW/SF/CR.
  • Mapea DIO0 a RxDone (recepción) y deja IRQs sin enmascarar.
  • Pasa a RX continuo.

  • Recepción:

  • Espera DIO0 alto; lee IRQ, verifica CRC, obtiene dirección actual de FIFO y número de bytes, extrae paquete, calcula SNR y RSSI (port HF en 868 MHz).
  • Devuelve bytes y metadatos.

  • Transmisión:

  • Standby, DIO0→TxDone, carga payload en FIFO, TX, espera DIO0, limpia IRQ y vuelve a RX continuo.

  • GPS:

  • Hilo con pyserial en /dev/serial0 a 9600 bps; parsea NMEA (RMC/GGA) con pynmea2 y guarda última fijación válida (lat, lon, alt, timestamp).

  • MQTT:

  • Conecta a broker local; publica heartbeat y metadatos GPS.
  • Publica uplinks LoRa en topic gateway//rx como JSON.
  • Suscribe a gateway//tx para downlinks; admite override de parámetros de radio por mensaje.

Compilación/ejecución: comandos exactos y ordenados

1) Verifica interfaces:
– SPI: ls -l /dev/spidev0.0
– UART: ls -l /dev/serial0

2) Arranca el broker MQTT local y comprueba:

sudo systemctl enable --now mosquitto
systemctl is-active mosquitto

3) Crea el proyecto y entorno Python (si no lo hiciste antes):

mkdir -p ~/projects/lora-mqtt-gateway-bridge
cd ~/projects/lora-mqtt-gateway-bridge
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip==24.2 setuptools==70.0.0 wheel==0.44.0
pip install paho-mqtt==2.1.0 pyserial==3.5 pynmea2==1.18.0 spidev==3.6 RPi.GPIO==0.7.1 gpiozero==1.6.2
nano gateway.py  # pega el código completo y guarda

4) Ejecuta el gateway:

source .venv/bin/activate
python gateway.py

Deberías ver logs tipo:
– [MQTT] Conectado (…) suscribiendo a gateway/zero2w-dragino/tx
– Heartbeat periódico y, si hay fijación GPS, publicará gateway//heartbeat con lat/lon.

5) Suscríbete en otra terminal para observar mensajes:

mosquitto_sub -h 127.0.0.1 -t "gateway/zero2w-dragino/#" -v

6) Prueba un downlink de ejemplo (desde otra terminal):

# Enviará "Hola LoRa" como hex a los nodos a 868.1 MHz, SF7, BW125, CR4/5, 14dBm
mosquitto_pub -h 127.0.0.1 -t "gateway/zero2w-dragino/tx" \
  -m '{"payload_hex":"486f6c61204c6f5261","sf":7,"bw":125000,"cr":5,"freq_hz":868100000,"tx_power":14}'

Observa en la consola del gateway una línea [LoRa] TX OK (si el SX127x confirma TxDone).

7) Publicación de paquetes RX (cuando reciba un nodo LoRa):
– Verás [LoRa] RX … en la consola y mensajes JSON en el topic gateway/zero2w-dragino/rx.

Validación paso a paso

1) Validación de hardware:
– Antena conectada al Dragino HAT antes de energizar.
– HAT firmemente acoplado al GPIO de 40 pines.

2) Validación de interfaces:
– SPI: /dev/spidev0.0 presente.
– UART: /dev/serial0 presente y consola serie deshabilitada en cmdline.
– Grupos: id muestra que tu usuario está en spi, tty y dialout.

3) Validación de GPS:
– minicom -b 9600 -o -D /dev/serial0 y verifica NMEA.
– Al correr el gateway, revisa el topic heartbeat:
– mosquitto_sub -t gateway/zero2w-dragino/heartbeat -v
– Debes ver un JSON con «status»:»online» y, tras unos minutos, campo «gps» con lat/lon (si hay vista de cielo).

4) Validación de MQTT:
– mosquitto_sub -t ‘#’ -v (solo para pruebas locales) para verificar publicaciones a gateway/zero2w-dragino/rx y /heartbeat.
– Enviar un downlink de prueba como en el paso de ejecución. Debes ver «TX OK» o «TIMEOUT». «OK» confirma que el SX127x disparó TxDone.

5) Validación de LoRa RX:
– Necesitas un emisor LoRa externo (otro nodo o HAT) configurado con los mismos parámetros (freq: 868.1e6, BW: 125k, SF: 7, CR: 4/5, SyncWord: 0x12).
– Envía una trama corta (p.ej., «01020304»). El gateway debe imprimir:
– [LoRa] RX 4 bytes | RSSI … | SNR …
– En MQTT, un JSON en gateway/zero2w-dragino/rx con payload_hex: «01020304» y metadatos.

6) Comprobación de estabilidad:
– Deja correr 10–15 minutos; debe publicar heartbeat cada 30 s y no mostrar errores. La memoria y CPU del Pi Zero 2 W deberían mantenerse bajos.

Troubleshooting (errores típicos y soluciones)

1) No existe /dev/spidev0.0
– Causa: SPI no habilitado.
– Solución: habilita con raspi-config (I4 SPI → Enable) o añade dtparam=spi=on en /boot/firmware/config.txt y reinicia.

2) El SX127x no responde por SPI (RuntimeError en init)
– Causas:
– HAT mal insertado o sin alimentación 3V3/5V.
– Pines CE0/SCK/MOSI/MISO reconfigurados por otro overlay.
– Daño físico en la línea SPI.
– Soluciones:
– Revisa el montaje.
– Asegúrate de no tener otros HATs/overlays en conflicto en /boot/firmware/config.txt.
– Verifica continuidad de pines y que CE0 (GPIO8) esté libre.

3) No aparece /dev/serial0 o el GPS no saca NMEA
– Causas:
– Consola por serie todavía activa en cmdline.txt.
– UART deshabilitado (enable_uart=0).
– Soluciones:
– Edita /boot/firmware/cmdline.txt para quitar console=serial0,115200 y en config.txt habilita enable_uart=1. Reboot.
– Comprueba que el HAT está bien asentado y, si procede, prueba minicom.

4) PermissionError en acceso a SPI o UART
– Causa: usuario no pertenece a grupos spi, tty, dialout.
– Solución: sudo usermod -aG spi,tty,dialout $USER y reinicia sesión o el sistema.

5) “TX TIMEOUT” en el gateway
– Causas:
– DIO0 no mapeado correctamente a TxDone.
– DIO0 no está cableado a GPIO25 o GPIO25 no cambia por pull-ups.
– Soluciones:
– Verifica que REG_DIO_MAPPING1 se establece en 0x40 antes de TX y 0x00 en RX.
– Comprueba con un multímetro u osciloscopio que el pin DIO0 del HAT llega al GPIO25.

6) No hay paquetes RX aunque hay un nodo TX
– Causas:
– Par ámetros de radio desalineados: frecuencia, SF, BW, CR, SyncWord.
– Nodo usa LoRaWAN (cifrado y canalización) y gateway no lo entiende tal cual.
– Soluciones:
– Alinea exactamente freq/BW/SF/CR/SyncWord en ambos lados.
– Si es LoRaWAN, usa stack apropiado (p.ej., ChirpStack) y un forwarder compatible; este gateway es para LoRa “raw”.

7) MQTT no recibe ni publica
– Causas:
– Mosquitto no en ejecución, firewall, o bind externo.
– Soluciones:
– systemctl status mosquitto.
– Revisa /etc/mosquitto/mosquitto.conf; por defecto escucha en 0.0.0.0:1883 en Debian. Si cambiaste configuración, ajusta MQTT_HOST/MQTT_PORT.

8) El GPS no fija posición (sin lat/lon)
– Causas:
– Antena o ubicación sin vista de cielo, tiempo insuficiente después de arranque en frío.
– Soluciones:
– Coloca el equipo cerca de una ventana o en exterior; espera varios minutos.
– Verifica que recibes NMEA (aunque sin fix al principio).

Mejoras/variantes

  • Seguridad MQTT:
  • TLS y autenticación con usuario/contraseña.
  • Certificados (server y cliente) y cifrado de publicaciones.

  • Integración con plataformas IoT:

  • Bridge a un broker externo (ej. AWS IoT Core, Eclipse Mosquitto remoto, EMQX).
  • Normalización del payload en JSON con esquema fijo.

  • Persistencia:

  • Cola local (SQLite) para tolerar fallos de conectividad MQTT; reintento en segundo plano.

  • Supervisión:

  • Exportar métricas Prometheus (CPU, memoria, contadores RX/TX, RSSI medio, SNR medio).
  • Healthchecks en topic heartbeat con más detalles (uptime, versión software, etc.).

  • Servicio de sistema:

  • systemd unit para iniciar el gateway al boot.
  • watchdog para reiniciar el proceso en caso de fallo.

  • Multi-parámetro y regiones:

  • Escaneo de múltiples frecuencias (ciclando FRF).
  • Soporte de varias SF y BW con varios ciclos de escucha (compromiso en sensibilidad/latencia).

  • Cumplimiento normativo:

  • Duty cycle por región (p.ej., 1% en EU868) aplicado a transmisiones (downlink) con temporización para no violar normativa.

  • GPS avanzado:

  • Uso del PPS y gpsd para registros temporales más precisos.
  • Inclusión de HDOP/VDOP y número de satélites en metadatos.

Checklist de verificación

  • [ ] Raspberry Pi Zero 2 W con Raspberry Pi OS Bookworm 64‑bit operativo.
  • [ ] Dragino LoRa/GPS HAT montado y antena conectada.
  • [ ] SPI habilitado (ls /dev/spidev0.0).
  • [ ] UART habilitado y consola serie deshabilitada (ls /dev/serial0; sin “console=serial0,115200” en cmdline).
  • [ ] Usuario en grupos spi, tty y dialout (id).
  • [ ] Mosquitto activo (systemctl is-active mosquitto).
  • [ ] Entorno virtual Python creado con versiones fijadas (pip freeze revisado).
  • [ ] gateway.py creado y ejecutándose sin errores.
  • [ ] Heartbeat publicado en MQTT (mosquitto_sub en gateway//heartbeat).
  • [ ] Downlink de prueba enviado vía mosquitto_pub y “TX OK” observado.
  • [ ] Uplink LoRa verificado con un nodo emisor compatible y mensajes en gateway//rx.

Notas finales importantes

  • Mantén siempre conectada la antena al HAT Dragino antes de energizar y durante cualquier transmisión LoRa para evitar daños en el PA.
  • Ajusta frecuencia y potencia conforme a la normativa de tu país/región.
  • Este proyecto implementa un bridge “raw LoRa” a MQTT, no un gateway LoRaWAN. Para LoRaWAN, integra con stacks/forwarders específicos (p.ej., Semtech UDP packet forwarder o BasicStation) y servidores de red (p.ej., ChirpStack), lo cual requiere otra arquitectura y parámetros (canales, SFs múltiples, sincronización de tiempo, etc.).

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Cuál es el objetivo principal del proyecto mencionado?




Pregunta 2: ¿Qué sistema operativo se utilizará en el proyecto?




Pregunta 3: ¿Qué núcleo de Linux se debe usar?




Pregunta 4: ¿Qué versión de Python se utilizará en el entorno virtual?




Pregunta 5: ¿Cuál de los siguientes paquetes no se menciona como necesario?




Pregunta 6: ¿Qué servicio se utilizará como broker MQTT?




Pregunta 7: ¿Qué tipo de antena se necesita para el proyecto?




Pregunta 8: ¿Qué tipo de conectividad es necesaria para el Raspberry Pi Zero 2 W?




Pregunta 9: ¿Qué se necesita para asegurar la coherencia de las librerías?




Pregunta 10: ¿Qué protocolo se utiliza para la comunicación entre LoRa y el gateway?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: Keyword spotting I2S en RPi Pico W + INMP441

Caso práctico: Keyword spotting I2S en RPi Pico W + INMP441 — hero

Objetivo y caso de uso

Qué construirás: Un detector de palabras clave utilizando Raspberry Pi Pico W y el micrófono INMP441 para la captura de audio y la detección de comandos específicos.

Para qué sirve

  • Control de dispositivos IoT mediante comandos de voz en entornos domésticos.
  • Activación de asistentes virtuales en aplicaciones de automatización.
  • Implementación de sistemas de seguridad que responden a palabras clave predefinidas.
  • Desarrollo de interfaces de usuario accesibles para personas con discapacidades.

Resultado esperado

  • Detección de palabras clave con una latencia menor a 200 ms.
  • Precisión de detección superior al 90% en condiciones de ruido controlado.
  • Capacidad de procesar hasta 5 comandos por segundo.
  • Consumo de energía inferior a 100 mW durante la operación activa.

Público objetivo: Desarrolladores de software y hardware; Nivel: Alto

Arquitectura/flujo: Captura de audio mediante INMP441, procesamiento en Raspberry Pi Pico W, detección de palabras clave y respuesta a eventos.

Nivel: alto

Prerrequisitos

Sistema operativo y entorno de trabajo

  • Host de desarrollo: Raspberry Pi OS Bookworm 64-bit
  • Imagen estable Bookworm de 64 bits (kernel 6.x), instalada en una Raspberry Pi (4/400/5).
  • Python del sistema: Python 3.11 (3.11.2 en Bookworm).
  • Acceso a terminal con privilegios sudo y conexión a Internet.

Toolchain exacta (versiones probadas)

  • cmake 3.25.1 (paquete Debian: 3.25.1-1)
  • ninja-build 1.11.1 (paquete Debian: 1.11.1-1)
  • gcc-arm-none-eabi 10.3-2021.10 (paquete Debian: 15:10.3-2021.10+rpi1)
  • libnewlib-arm-none-eabi 4.1.0-202202 (paquete Debian: 4.1.0.202202-1+rpi1)
  • git 2.39.2 (o superior en Bookworm)
  • picotool 1.1.2 (paquete Debian: 1.1.2-1)
  • Raspberry Pi Pico SDK v2.0.0 (tag oficial)
  • pico-extras v2.0.0 (para entorno PIO auxiliar, aunque en este caso no usaremos libs de i2s de salida)
  • Thonny (opcional, no requerido)
  • pyserial 3.5 (en venv de Python 3.11 para monitorización por USB CDC)

Verifica las versiones instaladas (opcional):
– cmake –version → 3.25.1
– ninja –version → 1.11.1
– arm-none-eabi-gcc –version → 10.3-2021.10
– picotool version → 1.1.2

Habilitar interfaces y ajustes del sistema (host)

Aunque programaremos la Raspberry Pi Pico W (RP2040) por USB y no usamos buses GPIO del host, conviene:
1) Deshabilitar consola serie sobre UART GPIO (evita conflictos si más adelante usas UART):
– sudo raspi-config
– Interface Options → Serial Port → Login shell over serial? No → Enable serial port hardware? Yes
– Reboot si lo pide.

2) Añadir el usuario al grupo dialout para acceso a puertos serie (USB CDC):
– sudo usermod -aG dialout $USER
– Cierra sesión y vuelve a entrar.

3) Crear un entorno virtual de Python 3.11 para herramientas auxiliares (monitor serie, validación):
– python3 -m venv ~/venvs/pico-kws
– source ~/venvs/pico-kws/bin/activate
– pip install –upgrade pip==23.2.1
– pip install pyserial==3.5

4) Instalar toolchain y utilidades:
– sudo apt update
– sudo apt install -y git cmake ninja-build gcc-arm-none-eabi libnewlib-arm-none-eabi picotool

Materiales

  • 1 × Raspberry Pi Pico W (RP2040, con Wi‑Fi; usaremos USB para programar y CDC para logs).
  • 1 × Módulo de micrófono digital INMP441 I2S (pines: VDD, GND, SCK/BCLK, WS/LRCLK, SD, L/R).
  • 1 × Cable micro‑USB de datos (no solo carga).
  • 1 × Protoboard y 6–7 cables dupont macho‑macho.
  • 1 × Condensador de desacoplo 100 nF cerámico (entre VDD y GND del micrófono, recomendado).
  • 1 × Raspberry Pi (host) con Raspberry Pi OS Bookworm 64‑bit (para compilar y flashear).
  • Opcional: Cinta adhesiva/soporte para fijar micrófono y reducir vibraciones.

Modelo exacto del dispositivo usado en este caso práctico: Raspberry Pi Pico W + INMP441 I2S Mic.

Preparación y conexión

Consideraciones eléctricas

  • El INMP441 funciona a 3.3 V. No alimentes a 5 V.
  • Añade un condensador de 100 nF entre VDD y GND lo más cerca posible del micrófono.
  • El pin L/R define el canal que entrega por SD. Conéctalo a GND para canal izquierdo (L).

Mapeo de pines y cableado

Usaremos la PIO del RP2040 para recibir I2S. Asignaremos:
– BCLK (SCK) en GPIO10
– LRCLK (WS) en GPIO11
– SD (DOUT) en GPIO12
– L/R a GND para seleccionar el canal izquierdo
– VDD a 3V3(OUT)
– GND a GND

Tabla de conexiones (Pico W vs INMP441):

Señal INMP441 Pin INMP441 Pico W (señal) GPIO Pico Pin físico Pico
VDD VDD 3V3(OUT) 36
GND GND GND 38 (cualquiera GND)
SCK (BCLK) SCK BCLK GPIO10 14
WS (LRCLK) WS LRCLK GPIO11 15
SD SD Datos (in) GPIO12 16
L/R L/R Forzar IZQ — (a GND)

Notas:
– El INMP441 no requiere MCLK externo.
– Mantén cortos los cables de BCLK/LRCLK/SD; cruza GND cerca para retorno.

Código completo

Objetivo: i2s-keyword-spotting-pico. Implementaremos:
– PIO para capturar I2S (solo canal izquierdo).
– Extracción de energía por tramas de 10 ms.
– Detección por correlación normalizada con una plantilla de “hola” (plantilla de ejemplo incluida).
– Señalización por LED y logs por USB CDC.

Estructura mínima del proyecto:
– i2s-keyword-spotting-pico/
– CMakeLists.txt
– i2s_rx.pio
– src/
– main.cpp
– tools/
– monitor.py

CMakeLists.txt

cmake_minimum_required(VERSION 3.25)
include($ENV{PICO_SDK_PATH}/external/pico_sdk_import.cmake)

project(i2s_keyword_spotting_pico C CXX ASM)
set(CMAKE_C_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)

pico_sdk_init()

add_executable(i2s_keyword_spotting_pico
    src/main.cpp
    i2s_rx.pio
)

pico_generate_pio_header(i2s_keyword_spotting_pico ${CMAKE_CURRENT_LIST_DIR}/i2s_rx.pio)

target_link_libraries(i2s_keyword_spotting_pico
    pico_stdlib
    hardware_pio
    hardware_gpio
    hardware_clocks
)

pico_enable_stdio_usb(i2s_keyword_spotting_pico 1)
pico_enable_stdio_uart(i2s_keyword_spotting_pico 0)

pico_add_extra_outputs(i2s_keyword_spotting_pico)

i2s_rx.pio

PIO para recibir 32 bits por muestra del canal izquierdo sincronizados por BCLK, usando LRCLK como estrobado de canal. Leemos SD en cada flanco de subida de BCLK cuando LRCLK está bajo (izquierda). Ignoramos el canal derecho.

.program i2s_rx
; Configuración:
; - 'in' pins base = SD (dato del micrófono)
; - Usamos WAIT sobre GPIO absolutos para LRCLK y BCLK
; Convención:
; - LRCLK bajo = canal izquierdo
; - 32 bits por muestra (el INMP441 emite 24 bits MSB justificados; alineamos a 32 y luego recortamos en C++)

; Reemplaza estos números si cambias el cableado
%define BCLK_GPIO 10
%define LRCLK_GPIO 11

.wrap_target
    ; Espera a inicio de trama de canal izquierdo (LRCLK = 0)
    wait 0 gpio LRCLK_GPIO
    ; Alinea a flanco bajo de BCLK antes de empezar
    wait 0 gpio BCLK_GPIO

    set x, 31          ; 32 bits
bitloop_left:
    ; Espera flanco de subida de BCLK, muestrea SD
    wait 1 gpio BCLK_GPIO
    in pins, 1
    ; Espera flanco de bajada de BCLK para el siguiente bit
    wait 0 gpio BCLK_GPIO
    jmp x-- bitloop_left

    ; Empuja los 32 bits del canal izquierdo
    push block

    ; Consumir canal derecho (32 ciclos) sin empujar
    ; Espera que LRCLK sea 1 (canal derecho)
    wait 1 gpio LRCLK_GPIO
    set x, 31
bitloop_right:
    wait 1 gpio BCLK_GPIO
    nop
    wait 0 gpio BCLK_GPIO
    jmp x-- bitloop_right

    jmp .wrap_target
.wrap

src/main.cpp

Código principal: inicializa PIO, lee muestras, calcula energía por tramas de 10 ms a 16 kHz, compara con plantilla y emite detecciones.

#include <cstdio>
#include <cmath>
#include "pico/stdlib.h"
#include "hardware/pio.h"
#include "hardware/gpio.h"
#include "hardware/clocks.h"
#include "i2s_rx.pio.h"

static constexpr uint GPIO_BCLK = 10;
static constexpr uint GPIO_LRCLK = 11;
static constexpr uint GPIO_SD = 12;

static constexpr uint LED_PIN = PICO_DEFAULT_LED_PIN;

// Muestreo y DSP
static constexpr float SAMPLE_RATE = 16000.0f;    // Hz
static constexpr int FRAME_SAMPLES = 160;         // 10 ms a 16 kHz
static constexpr int FEAT_FRAMES = 50;            // 0.5 s de ventana para correlación
static constexpr float DETECT_COOLDOWN_S = 1.0f;
static constexpr float DETECT_THRESHOLD = 0.86f;

// Plantilla de energía (log-energía normalizada) para "hola" (50 frames, ~0.5 s).
// Esta plantilla es un ejemplo; puedes recalibrarla con tu voz para mayor robustez.
static float TEMPLATE_HOLA[FEAT_FRAMES] = {
    -0.57f,-0.40f,-0.22f, 0.05f, 0.33f, 0.62f, 0.81f, 0.93f, 0.75f, 0.40f,
    0.10f,-0.08f,-0.19f,-0.25f,-0.30f,-0.35f,-0.38f,-0.36f,-0.32f,-0.28f,
    -0.20f,-0.10f, 0.02f, 0.18f, 0.35f, 0.48f, 0.50f, 0.42f, 0.30f, 0.18f,
    0.06f,-0.05f,-0.15f,-0.22f,-0.26f,-0.28f,-0.29f,-0.28f,-0.26f,-0.23f,
    -0.19f,-0.15f,-0.11f,-0.07f,-0.05f,-0.04f,-0.05f,-0.08f,-0.12f,-0.16f
};

// Buffer circular de energía por frames
static float energy_ring[FEAT_FRAMES];
static int energy_idx = 0;
static bool ring_full = false;

// Filtro pasa-altos sencillo para quitar DC: y[n] = x[n] - x[n-1] + 0.995*y[n-1]
static inline int32_t hp_filter(int32_t x) {
    static int32_t x1 = 0;
    static float y1 = 0.0f;
    float y = (float)(x - x1) + 0.995f * y1;
    x1 = x;
    y1 = y;
    return (int32_t)y;
}

// Normaliza vector a media 0 y varianza 1
static void znormalize(float *v, int n) {
    float mean = 0.f, var = 0.f;
    for (int i = 0; i < n; ++i) mean += v[i];
    mean /= n;
    for (int i = 0; i < n; ++i) { float d = v[i] - mean; var += d*d; }
    var = (var / n);
    float stdv = (var > 1e-9f) ? sqrtf(var) : 1.0f;
    for (int i = 0; i < n; ++i) v[i] = (v[i] - mean) / stdv;
}

// Correlación normalizada coseno entre dos vectores ya normalizados
static float cosine_similarity(const float *a, const float *b, int n) {
    float dot = 0.f;
    for (int i = 0; i < n; ++i) dot += a[i]*b[i];
    return dot / (float)n;
}

// Convierte palabra de 32 bits I2S (MSB first) a muestra 16-bit
static inline int16_t i2s32_to_int16(uint32_t w) {
    // INMP441 entrega datos de 24 bits con signo, alineados a la izquierda dentro de 32 bits.
    // w: [S23..S0][pad 8 bits]. Desplazamos para obtener 16 bits significativos.
    int32_t s24 = (int32_t)(w) >> 8;     // 24 bits con signo en bits 31..8
    int16_t s16 = (int16_t)(s24 >> 8);   // Quita los 8 LSB -> 16-bit aproximado
    return s16;
}

int main() {
    stdio_init_all();
    sleep_ms(1500); // Espera a que el host abra el puerto USB

    // LED
    gpio_init(LED_PIN);
    gpio_set_dir(LED_PIN, true);
    gpio_put(LED_PIN, 0);

    // PIO setup
    PIO pio = pio0;
    uint sm = pio_claim_unused_sm(pio, true);
    uint offset = pio_add_program(pio, &i2s_rx_program);

    // Configurar pines
    gpio_pull_down(GPIO_BCLK);  // entradas, pero estabiliza
    gpio_pull_down(GPIO_LRCLK);
    gpio_pull_down(GPIO_SD);

    // Configurar PIO: base de 'in' en SD
    pio_sm_config c = i2s_rx_program_get_default_config(offset);
    sm_config_set_in_pins(&c, GPIO_SD);
    sm_config_set_in_shift(&c, true, true, 32); // shift right, autopush, threshold 32

    // Mapear set/out no usados, pero obligatorio setear pin base si fuera necesario
    // Velocidad: State machine corre al clock de sistema; usamos WAIT en GPIO (externos)
    float sys_clk_khz = frequency_count_khz(CLOCKS_FC0_SRC_VALUE_CLK_SYS);
    (void)sys_clk_khz;

    // Inicializa el SM
    pio_sm_init(pio, sm, offset, &c);

    // Habilitar SM
    pio_sm_set_enabled(pio, sm, true);

    // Variables de DSP
    uint64_t last_detect_us = 0;
    uint32_t frame_count = 0;

    // Pre-normaliza la plantilla para correlación
    float tpl[FEAT_FRAMES];
    for (int i = 0; i < FEAT_FRAMES; ++i) tpl[i] = TEMPLATE_HOLA[i];
    znormalize(tpl, FEAT_FRAMES);

    // Bucle principal
    while (true) {
        // Leer FRAME_SAMPLES muestras de canal izquierdo
        int64_t energy_acc = 0;
        for (int i = 0; i < FRAME_SAMPLES; ++i) {
            // Bloquea hasta que haya una palabra en FIFO
            uint32_t w = pio_sm_get_blocking(pio, sm);
            int16_t s = i2s32_to_int16(w);
            int32_t hp = hp_filter(s);
            energy_acc += (int64_t)hp * (int64_t)hp;
        }

        // Energía log
        float e = logf((float)energy_acc + 1.0f);
        energy_ring[energy_idx] = e;
        energy_idx = (energy_idx + 1) % FEAT_FRAMES;
        if (energy_idx == 0) ring_full = true;

        frame_count++;

        // Cada frame ~10ms. Calcular correlación cuando la ventana está llena
        if (ring_full) {
            // Construye ventana ordenada
            float feat[FEAT_FRAMES];
            for (int i = 0; i < FEAT_FRAMES; ++i) {
                int idx = (energy_idx + i) % FEAT_FRAMES;
                feat[i] = energy_ring[idx];
            }
            znormalize(feat, FEAT_FRAMES);

            float score = cosine_similarity(feat, tpl, FEAT_FRAMES);

            // Cooldown
            uint64_t now = time_us_64();
            bool cooldown_ok = (now - last_detect_us) > (uint64_t)(DETECT_COOLDOWN_S * 1e6f);

            if (score >= DETECT_THRESHOLD && cooldown_ok) {
                last_detect_us = now;
                // Señaliza por LED y log
                gpio_put(LED_PIN, 1);
                printf("[KWS] DETECTADO: HOLA | score=%.3f | t=%.2fs\n", score, now / 1e6f);
            } else {
                gpio_put(LED_PIN, 0);
            }

            // Telemetría periódica
            if (frame_count % 50 == 0) {
                // Estimación simple de BCLK esperado: 16 k * 32b * 2ch = 1.024 MHz (referencia)
                printf("[STAT] frame=%lu score=%.3f thr=%.2f SR=%.1fHz\n",
                       (unsigned long)frame_count, (double)score, (double)DETECT_THRESHOLD, (double)SAMPLE_RATE);
            }
        }
    }
    return 0;
}

tools/monitor.py

Script para leer el puerto serie USB CDC de la Pico W desde el host y ver logs de detección.

#!/usr/bin/env python3
import sys, time, serial

def main():
    if len(sys.argv) < 2:
        print("Uso: python tools/monitor.py /dev/ttyACM0 [baud]")
        sys.exit(1)
    port = sys.argv[1]
    baud = int(sys.argv[2]) if len(sys.argv) > 2 else 115200
    while True:
        try:
            with serial.Serial(port, baudrate=baud, timeout=1) as ser:
                print(f"Conectado a {port} @ {baud}")
                while True:
                    line = ser.readline().decode(errors="ignore").strip()
                    if line:
                        print(line)
        except serial.SerialException as e:
            print(f"No se puede abrir {port}: {e}")
            print("Reintentando en 2s...")
            time.sleep(2)

if __name__ == "__main__":
    main()

Breve explicación de las partes clave:
– i2s_rx.pio: El PIO espera LRCLK=0 para el canal izquierdo y muestrea 32 bits sincronizados con BCLK. Empuja la palabra de 32 bits al FIFO RX. Luego consume el canal derecho sin empujar.
– main.cpp:
– Convierte los 32 bits alineados a la izquierda en una muestra 16-bit aproximada.
– Aplica un filtro pasa‑altos simple para eliminar DC.
– Calcula energía por trama (suma de cuadrados).
– Mantiene una ventana deslizante de 50 tramas (~0.5 s) y calcula correlación con una plantilla pre‑normalizada de “hola”.
– Si la similitud coseno supera el umbral, activa el LED y escribe “DETECTADO: HOLA” por USB CDC.
– monitor.py: Facilita la monitorización de logs en el host vía /dev/ttyACM0.

Compilación, flash y ejecución

1) Preparar SDKs

  • Clona pico-sdk y pico-extras en versiones exactas y exporta variables:
# En tu Raspberry Pi con Bookworm 64-bit
mkdir -p ~/pico
cd ~/pico
git clone -b 2.0.0 https://github.com/raspberrypi/pico-sdk.git
cd pico-sdk
git submodule update --init

cd ..
git clone -b 2.0.0 https://github.com/raspberrypi/pico-extras.git

# Exporta PICO_SDK_PATH en tu shell y en ~/.bashrc
echo 'export PICO_SDK_PATH=$HOME/pico/pico-sdk' >> ~/.bashrc
echo 'export PICO_EXTRAS_PATH=$HOME/pico/pico-extras' >> ~/.bashrc
source ~/.bashrc

2) Crear el proyecto

# Estructura de directorios
mkdir -p ~/pico/projects/i2s-keyword-spotting-pico/src
mkdir -p ~/pico/projects/i2s-keyword-spotting-pico/tools

# Copia los archivos del caso práctico
# (Crea los ficheros CMakeLists.txt, i2s_rx.pio, src/main.cpp, tools/monitor.py con los contenidos de arriba)
nano ~/pico/projects/i2s-keyword-spotting-pico/CMakeLists.txt
nano ~/pico/projects/i2s-keyword-spotting-pico/i2s_rx.pio
nano ~/pico/projects/i2s-keyword-spotting-pico/src/main.cpp
nano ~/pico/projects/i2s-keyword-spotting-pico/tools/monitor.py
chmod +x ~/pico/projects/i2s-keyword-spotting-pico/tools/monitor.py

3) Construir (CMake + Ninja)

cd ~/pico/projects/i2s-keyword-spotting-pico
mkdir -p build && cd build
cmake -G Ninja ..
ninja
# Resultado: i2s_keyword_spotting_pico.uf2 en el directorio build

4) Flashear la Raspberry Pi Pico W

Método A: UF2 por Mass Storage
1. Desconecta la Pico W del USB.
2. Mantén presionado el botón BOOTSEL y conecta el USB a la Raspberry Pi (host).
3. Suelta BOOTSEL; aparecerá una unidad RPI-RP2.
4. Copia el UF2:
– cp i2s_keyword_spotting_pico.uf2 /media/$USER/RPI-RP2/

Método B: picotool (si no quieres usar Mass Storage)
1. Entra en modo BOOTSEL (como arriba).
2. Usa picotool:
bash
picotool load -v i2s_keyword_spotting_pico.uf2
picotool reboot

5) Ejecutar y monitorizar

  • La Pico W se reiniciará y abrirá un puerto USB CDC (/dev/ttyACM0).
  • Activa venv y ejecuta monitor:
    bash
    source ~/venvs/pico-kws/bin/activate
    python ~/pico/projects/i2s-keyword-spotting-pico/tools/monitor.py /dev/ttyACM0
  • Habla “hola” a ~10–20 cm del micrófono.

Validación paso a paso

1) Validar enumeración USB CDC:
– Al conectar la Pico W (sin BOOTSEL), ejecuta:
– ls /dev/ttyACM*
– Debes ver /dev/ttyACM0.
– Si no aparece, revisa cable y permisos (grupo dialout).

2) Ver logs básicos:
– Con monitor.py activo, deben aparecer líneas [STAT] cada ~0.5 s indicando frame y SR=16000.0Hz aprox.
– Ejemplo:
– [STAT] frame=50 score=0.123 thr=0.86 SR=16000.0Hz

3) Validar silencio y ruido ambiental:
– En silencio, los scores típicamente estarán por debajo de 0.4–0.6.
– Haz un chasquido o golpea la mesa suavemente: verás variaciones de energía pero no detección.

4) Validar detección de “hola”:
– Pronuncia “hola” claro, a 10–20 cm del micrófono.
– Debería encender el LED de la Pico W durante el frame de detección y aparecer:
– [KWS] DETECTADO: HOLA | score=0.90 | t=12.34s
– Si no detecta: repite un par de veces con ritmo similar (el algoritmo es plantilla por energía; la duración y entonación importan).

5) Medición de estabilidad:
– Repite “hola” 5–10 veces con pausas de >1 s.
– Cuenta detecciones. Espera al menos un 70% de aciertos en ambiente moderado.
– Si tienes falsos positivos, sube DETECT_THRESHOLD en main.cpp (p.ej. a 0.90) y recompila.

6) Validación de cableado (sin osciloscopio):
– Tapar el micrófono con un dedo reduce la energía y los [STAT] deben mostrar scores bajos.
– Si desconectas BCLK o LRCLK (no lo hagas permanentemente), verás bloqueo/ausencia de logs o comportamiento errático (indicador de sincronía perdida).

7) Confirmar tasa de muestreo efectiva:
– Aunque fijamos 16 kHz conceptualmente, el reloj de BCLK lo impone la fuente (micrófono + PIO esperando flancos). INMP441 se sincroniza a LRCLK/BCLK externos; nosotros leemos su BCLK. Al no generar nuestro propio BCLK, la muestra está exactamente sincronizada con BCLK del micrófono (que depende de LRCLK del I2S del módulo; la mayoría de INMP441 integra un PLL que deriva de LRCLK). En esta configuración (solo captura), asumimos LRCLK ≈ 16 kHz; la energía y la correlación deben ser estables. Si la plantilla se desincroniza por variación de LRCLK, es recomendable recalibrar la plantilla.

Nota: Hay módulos INMP441 que exigen un BCLK/LRCLK externos. Si tu placa de micro no genera BCLK/LRCLK, necesitas un generador de I2S maestro. Este caso práctico asume un módulo INMP441 que funciona correctamente con el esquema de reloj del módulo (frecuente en breakout boards comerciales). Si tu módulo requiere reloj maestro, ver “Troubleshooting”.

Troubleshooting (5–8 problemas típicos y soluciones)

1) No aparece /dev/ttyACM0:
– Causas: cable solo de carga, usuario sin dialout, firmware no flasheado.
– Solución:
– Cambia a un cable de datos.
– sudo usermod -aG dialout $USER; cierra sesión y entra de nuevo.
– Reflashea vía BOOTSEL con el .uf2.

2) El build falla con “PICO_SDK_PATH not set”:
– Causa: variable de entorno no exportada o shell sin source.
– Solución:
– echo ‘export PICO_SDK_PATH=$HOME/pico/pico-sdk’ >> ~/.bashrc
– source ~/.bashrc
– Vuelve a ejecutar cmake desde un build limpio: rm -rf build && mkdir build && cd build && cmake -G Ninja ..

3) Sin detecciones, aunque dices “hola”:
– Causas: umbral alto, plantilla no ajustada a tu voz/ritmo, ambiente ruidoso, distancia excesiva.
– Solución:
– Baja DETECT_THRESHOLD a 0.82–0.85 y recompila.
– Acércate a 10–15 cm.
– Habla con cadencia más corta (~0.5 s) y energía constante.
– Mejora acústica: apantalla el micrófono del viento.

4) Falsos positivos en silencio:
– Causas: vibraciones o impulsos, alta ganancia implícita en correlación.
– Solución:
– Sube DETECT_THRESHOLD a 0.90.
– Implementa un umbral de energía mínima (p. ej., requiere e promedio > cierto valor antes de correlacionar).
– Revisa que L/R esté a GND y GND esté firme.

5) El LED no enciende nunca:
– Causas: LED_PIN incorrecto (si placa no es Pico W oficial) o detección nunca supera umbral.
– Solución:
– Confirmar LED_PIN (en Pico W es el definido por PICO_DEFAULT_LED_PIN).
– Añadir printf de “score” y bajar el umbral de detección.

6) Build intermitente o cuelgues durante captura:
– Causas: cableado largo, sin desacoplo, interferencias de BCLK/LRCLK/SD.
– Solución:
– Añade el condensador 100 nF en VDD‑GND del INMP441.
– Acorta cables de señal y usa un GND común bien conectado.

7) El módulo INMP441 no entrega datos:
– Causas: Algunos módulos requieren que el microcontrolador genere BCLK/LRCLK (modo maestro) y el INMP441 sea esclavo.
– Solución:
– Este proyecto asume captura con reloj del módulo. Si tu módulo requiere reloj maestro, deberás:
– Generar BCLK≈1.024 MHz y LRCLK=16 kHz por PIO (maestro I2S TX de reloj, aunque no envíes datos) y usar otro SM para capturar SD.
– Ajustar el PIO para emitir BCLK/LRCLK y sincronizar el SM de RX. Consulta pico-extras y ejemplos de I2S master con PIO.

8) Mensajes [STAT] sin variación de score:
– Causas: energía constante (mic tapado o SD clavado), error en conversión de 32->16 bit.
– Solución:
– Verifica la línea SD, prueba invertir el desplazamiento (usar s24 >> 7 o >> 9) y observa si cambia la dinámica del score.
– Asegúrate de que L/R esté a GND para leer canal izquierdo (si flota, el canal alterna y la energía fluctúa erráticamente).

Mejoras y variantes

  • Mejor plantilla y calibración:
  • Implementa un modo de “grabación de plantilla” (mantén pulsado BOOTSEL n segundos) para adquirir 0.5 s de energía y guardar la plantilla como promedio de varias repeticiones de “hola”.
  • Guarda la plantilla en flash (usando pico_flash) para persistencia.

  • DSP más rico:

  • Sustituye la energía bruta por un banco de filtros (p. ej., 8 band-passes con Goertzel) y correlación de un vector de 8×50 características.
  • Añade VAD (detector de voz) simple por energía + ZCR (zero crossing rate) para evitar correlacionar en silencio.

  • Modelos TinyML:

  • Integra TensorFlow Lite for Microcontrollers (TFLM) y usa un modelo 1D‑CNN o DS‑CNN entrenado con MFCCs. En RP2040 es viable si optimizas RAM y reduces el modelo.
  • Pipeline: PIO I2S → MFCC en fixed‑point → TFLM inferencia → decisión.

  • Conectividad (Pico W):

  • Envía eventos de detección por Wi‑Fi a un endpoint HTTP/UDP o MQTT.
  • Usa LED y además un aviso sonoro con un buzzer (GPIO).

  • Robustez acústica:

  • Normaliza por RMS de ventana larga (AGC suave).
  • Filtra ruido de baja frecuencia con un HPF más agresivo (coeficiente 0.99→0.95).

  • Herramientas de diagnóstico:

  • Implementa un modo de volcado de audio por USB (p. ej., 8 kHz, 8‑bit μ‑law) para analizar en el host con Python/NumPy y afinar plantillas.

Checklist de verificación

  • [ ] Raspberry Pi OS Bookworm 64‑bit instalado y actualizado.
  • [ ] Toolchain instalada con versiones: cmake 3.25.1, ninja 1.11.1, gcc-arm-none-eabi 10.3-2021.10, picotool 1.1.2.
  • [ ] PICO_SDK_PATH exportado y pico-sdk v2.0.0 clonado.
  • [ ] Proyecto i2s-keyword-spotting-pico creado con los archivos proporcionados.
  • [ ] Conexión de hardware:
  • [ ] INMP441 VDD→3V3(OUT), GND→GND, L/R→GND.
  • [ ] SCK→GPIO10, WS→GPIO11, SD→GPIO12.
  • [ ] Condensador 100 nF entre VDD y GND del micrófono.
  • [ ] Compilación exitosa con cmake + ninja; UF2 generado.
  • [ ] Flasheo correcto por BOOTSEL o picotool; /dev/ttyACM0 visible.
  • [ ] monitor.py mostrando logs [STAT] periódicos.
  • [ ] LED parpadea y aparece “[KWS] DETECTADO: HOLA” al decir “hola”.
  • [ ] Sin falsos positivos frecuentes en silencio.
  • [ ] Umbral DETECT_THRESHOLD ajustado si es necesario.

Notas finales:
– Este caso práctico se centra en el modelo exacto “Raspberry Pi Pico W + INMP441 I2S Mic”. El código, conexión y comandos están alineados a ese hardware.
– La implementación de I2S por PIO en modo “solo RX” es deliberadamente explícita para aprender la sincronización con BCLK/LRCLK. Si tu módulo requiere reloj maestro, amplía el PIO para generar BCLK/LRCLK y usa un segundo SM para captura. Esto añade complejidad de temporización, pero el RP2040 lo soporta.
– El algoritmo de KWS basado en plantilla por energía es liviano y didáctico. Para producción, evoluciona a MFCC + clasificador (SVM/NN) o TFLM, ajustando memoria y latencia a la RP2040.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Cuál es la versión de Python recomendada para el sistema?




Pregunta 2: ¿Qué herramienta se menciona como opcional para el desarrollo?




Pregunta 3: ¿Cuál es la versión de cmake que se debe instalar?




Pregunta 4: ¿Qué comando se utiliza para verificar la versión de ninja?




Pregunta 5: ¿Qué paquete se usa para la monitorización por USB CDC?




Pregunta 6: ¿Qué versión de picotool se debe instalar?




Pregunta 7: ¿Cuál es el primer paso para habilitar interfaces en el sistema?




Pregunta 8: ¿Qué comando se utiliza para añadir un usuario al grupo dialout?




Pregunta 9: ¿Qué versión de gcc-arm-none-eabi se debe instalar?




Pregunta 10: ¿Qué imagen de Raspberry Pi OS se recomienda?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme: