Practical case: Raspberry Pi I2S Network Audio Player

Practical case: Raspberry Pi I2S Network Audio Player — hero

Objective and use case

What you’ll build: An I2S network audio player using a Raspberry Pi Zero 2 W and a PCM5102A DAC, capable of receiving uncompressed PCM audio over the network and outputting it via the I2S interface.

Why it matters / Use cases

  • Stream high-quality audio from a network source to a local audio system, enhancing home audio setups.
  • Utilize the Raspberry Pi Zero 2 W’s compact size for portable audio applications, such as in DIY speaker projects.
  • Integrate with home automation systems to create a smart audio player that responds to network commands.
  • Experiment with audio processing and streaming technologies in a hands-on project for educational purposes.

Expected outcome

  • Achieve low-latency audio playback with less than 50 ms delay from network to output.
  • Maintain audio quality with a signal-to-noise ratio (SNR) exceeding 100 dB.
  • Support streaming audio at a minimum of 16-bit/44.1 kHz resolution.
  • Handle multiple audio streams simultaneously with minimal packet loss (less than 1%).

Audience: Advanced users familiar with Linux, ALSA, and Python; Level: Advanced

Architecture/flow: The system architecture involves a Raspberry Pi Zero 2 W connected to a PCM5102A DAC via I2S, receiving audio data over UDP from a network source.

Advanced Practical Case: Raspberry Pi Zero 2 W + PCM5102A DAC as an I2S Network Audio Player

Objective: Build an I2S network audio player on a Raspberry Pi Zero 2 W using a PCM5102A DAC module. The player will receive uncompressed PCM audio over the network (UDP) and output it via the I2S interface to the DAC.

Target audience: Advanced users familiar with Linux, ALSA, and Python.

OS and language baseline:
– Raspberry Pi OS Bookworm 64‑bit
– Python 3.11


Prerequisites

  • A Linux/macOS/Windows workstation to flash the microSD card.
  • Local network with Wi‑Fi coverage (2.4 GHz) for the Raspberry Pi Zero 2 W.
  • Basic soldering and wiring capability for I2S connections.
  • An external audio amplifier or powered speakers to connect to the PCM5102A DAC outputs.
  • Familiarity with the Linux terminal, SSH, and editing configuration files.

Materials (exact model)

  • Device: Raspberry Pi Zero 2 W + PCM5102A DAC
  • microSD card (≥16 GB, Class 10/UHS‑I recommended)
  • 5 V / 2.5 A USB power supply for the Pi Zero 2 W
  • PCM5102A I2S DAC breakout board (typical module with BCK, LRCK, DIN, GND, VCC pins; many accept 5 V input)
  • Jumper wires (female‑female or mixed, per your board headers)
  • RCA cables (or 3.5 mm cable) from DAC to amplifier/speakers
  • Optional: small breadboard, 10 kΩ resistor, SPST push button (for later improvements)

Setup/Connection

1) Flash Raspberry Pi OS Bookworm 64‑bit

  • Use Raspberry Pi Imager on your workstation.
  • Device: Raspberry Pi Zero 2 W
  • OS: Raspberry Pi OS (64‑bit) Bookworm
  • Storage: your microSD card
  • Click the gear icon to preconfigure:
    • Set hostname (e.g., rpi‑i2s)
    • Enable SSH (password or public key)
    • Configure Wi‑Fi SSID, password, country
    • Set locale, keyboard, timezone
  • Write and verify.

Insert the card into the Pi Zero 2 W, connect power, and SSH into the device when it appears on the network. Verify Python and kernel:

ssh pi@rpi-i2s.local
uname -a
python3 --version

You should see Linux kernel 6.x and Python 3.11.x.

2) Wire the PCM5102A DAC to the Pi Zero 2 W (I2S)

The PCM5102A supports “3‑wire” I2S without MCLK (master clock). The Raspberry Pi’s I2S pins from the primary PCM/I2S controller are routed as follows.

Connection table:

Function Raspberry Pi Zero 2 W Pin (GPIO) PCM5102A Pin (typical labels)
3.3 V Pin 1 (3V3) or 5 V Pin 2/4 VCC (see module labeling)
GND Pin 6 (GND) GND
BCLK Pin 12 (GPIO18, PCM_CLK) BCK / BCLK
LRCLK Pin 35 (GPIO19, PCM_FS) LRCK / WS
DIN Pin 40 (GPIO21, PCM_DOUT) DIN

Notes:
– Many PCM5102A boards accept 5 V on VCC and regulate internally. If your board is explicitly 3.3 V‑only, use Pi Pin 1 (3.3 V). Always confirm the exact marking on your module before powering.
– MCLK is not used on the Raspberry Pi with this overlay; don’t connect it even if present on the module.
– Keep I2S wires short and tidy to avoid signal integrity issues.

3) Enable I2S and the HifiBerry DAC overlay

On Raspberry Pi OS Bookworm the firmware config is under /boot/firmware/config.txt. Disable the on‑board analog audio and enable the PCM5102A‑compatible overlay:

sudo cp /boot/firmware/config.txt /boot/firmware/config.txt.bak
sudo nano /boot/firmware/config.txt

Add the following lines near the end:

dtparam=audio=off

# Enable I2S DAC compatible with PCM5102A
dtoverlay=hifiberry-dac

Save and reboot:

sudo reboot

After reboot, verify the ALSA card:

aplay -l

Expected output includes a card similar to:

  • card 0: sndrpihifiberry [snd_rpi_hifiberry_dac], device 0: HiFiBerry DAC HiFi pcm5102a-hifi-0

If you don’t see it, see Troubleshooting.

4) Set a safe ALSA default with soft volume

Create /etc/asound.conf to define a default PCM with software volume control (the PCM5102A lacks a hardware mixer):

sudo nano /etc/asound.conf

Paste:

pcm.softvol {
    type softvol
    slave {
        pcm "plughw:sndrpihifiberry"
    }
    control {
        name "SoftMaster"
        card "sndrpihifiberry"
    }
    min_dB -51.0
    max_dB 0.0
}

pcm.!default {
    type plug
    slave.pcm "softvol"
}

ctl.!default {
    type hw
    card "sndrpihifiberry"
}

Test playback (sine tone):

speaker-test -D default -c 2 -t sine -f 1000 -l 1

You should hear a brief 1 kHz tone through your amplifier/speakers.


Full Code

We’ll implement a minimal, low‑latency UDP PCM16 network protocol with a jitter buffer on the Raspberry Pi. The sender (for validation) can run on your workstation. Audio format: stereo, 48 kHz, 16‑bit little‑endian, 10 ms frames. Each UDP packet contains:
– 8‑byte header: magic b’P16S’ (4) + sequence (uint32 BE)
– 1920‑byte payload: 480 frames × 2 channels × 2 bytes

Playback uses the ALSA default device via sounddevice (PortAudio), which will route to our softvol on the PCM5102A.

Receiver (run on the Raspberry Pi)

Create the project folder and receiver script:

mkdir -p ~/i2s-net-player
cd ~/i2s-net-player
nano net_i2s_receiver.py

Paste:

#!/usr/bin/env python3
import argparse
import asyncio
import collections
import logging
import socket
import struct
import sys
import time

import numpy as np
import sounddevice as sd

MAGIC = b'P16S'
SAMPLE_RATE = 48000
CHANNELS = 2
FRAME_SAMPLES = 480  # 10 ms @ 48 kHz
BYTES_PER_SAMPLE = 2  # int16
PAYLOAD_SIZE = FRAME_SAMPLES * CHANNELS * BYTES_PER_SAMPLE
HEADER_SIZE = 8  # MAGIC(4) + seq(4)
PACKET_SIZE = HEADER_SIZE + PAYLOAD_SIZE

class UdpAudioReceiver:
    def __init__(self, host, port, device=None, prebuffer_ms=50, gain_db=0.0, rcvbuf=1 << 20):
        self.host = host
        self.port = port
        self.device = device
        self.prebuffer_frames = int((prebuffer_ms / 1000.0) * SAMPLE_RATE)
        # We'll buffer by frames; each network packet is FRAME_SAMPLES frames.
        self.prebuffer_packets = max(1, self.prebuffer_frames // FRAME_SAMPLES)
        self.gain = 10 ** (gain_db / 20.0)
        self.rcvbuf = rcvbuf
        self.queue = asyncio.Queue(maxsize=256)
        self.sock = None
        self.expected_seq = None
        self.drop_count = 0
        self.start_playback = asyncio.Event()
        self.play_started = False

    def _open_socket(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.rcvbuf)
        sock.bind((self.host, self.port))
        self.sock = sock
        logging.info("Listening on UDP %s:%d (SO_RCVBUF=%d)", self.host, self.port, self.rcvbuf)

    async def _producer(self):
        loop = asyncio.get_running_loop()
        self._open_socket()
        self.sock.setblocking(False)
        buf = bytearray(PACKET_SIZE)
        view = memoryview(buf)

        buffered_packets = 0
        while True:
            try:
                nbytes, _addr = await loop.sock_recvfrom(self.sock, PACKET_SIZE)
            except asyncio.CancelledError:
                break
            if nbytes != PACKET_SIZE:
                logging.warning("Bad packet size: got %d, expected %d", nbytes, PACKET_SIZE)
                continue
            if view[:4] != MAGIC:
                logging.warning("Bad magic")
                continue
            seq = struct.unpack(">I", view[4:8])[0]
            payload = bytes(view[8:])

            # Track loss
            if self.expected_seq is None:
                self.expected_seq = (seq + 1) & 0xFFFFFFFF
            else:
                if seq != self.expected_seq:
                    self.drop_count += (seq - self.expected_seq) & 0xFFFFFFFF
                    self.expected_seq = (seq + 1) & 0xFFFFFFFF
                else:
                    self.expected_seq = (self.expected_seq + 1) & 0xFFFFFFFF

            # Convert to ndarray for post-processing if needed
            frame = np.frombuffer(payload, dtype=np.int16).reshape(-1, CHANNELS).astype(np.float32)
            frame *= self.gain
            # clip
            np.clip(frame, -32768.0, 32767.0, out=frame)
            frame_i16 = frame.astype(np.int16)

            try:
                self.queue.put_nowait(frame_i16)
                if not self.play_started:
                    buffered_packets += 1
                    if buffered_packets >= self.prebuffer_packets:
                        self.start_playback.set()
            except asyncio.QueueFull:
                # Drop oldest to prevent unbounded latency
                _ = self.queue.get_nowait()
                await self.queue.put(frame_i16)

    def _sd_callback(self, outdata, frames, time_info, status):
        if status:
            logging.debug("SoundDevice status: %s", status)

        needed = frames
        out = np.zeros((frames, CHANNELS), dtype=np.int16)

        filled = 0
        while filled < needed:
            try:
                block = self.queue.get_nowait()
            except asyncio.QueueEmpty:
                break
            # Each block is FRAME_SAMPLES frames
            take = min(needed - filled, block.shape[0])
            out[filled:filled + take, :] = block[:take, :]
            filled += take

        outdata[:] = out

    async def run(self):
        prod_task = asyncio.create_task(self._producer())

        logging.info("Waiting for prebuffer: ~%d packets (~%d ms)", self.prebuffer_packets, int(self.prebuffer_packets * 10))
        await self.start_playback.wait()
        self.play_started = True

        device_name = self.device if self.device else None
        logging.info("Opening ALSA device: %s", device_name if device_name else "default")
        with sd.OutputStream(
            samplerate=SAMPLE_RATE,
            channels=CHANNELS,
            dtype='int16',
            device=device_name,
            callback=self._sd_callback,
            blocksize=FRAME_SAMPLES,
            latency='low',
        ):
            logging.info("Playback started. Press Ctrl+C to stop.")
            try:
                while True:
                    await asyncio.sleep(1.0)
            except asyncio.CancelledError:
                pass
            except KeyboardInterrupt:
                pass
        prod_task.cancel()
        with contextlib.suppress(Exception):
            await prod_task

def main():
    parser = argparse.ArgumentParser(description="I2S Network Audio Receiver (PCM16 over UDP) for Raspberry Pi Zero 2 W + PCM5102A DAC")
    parser.add_argument("--host", default="0.0.0.0", help="Listen address (default: 0.0.0.0)")
    parser.add_argument("--port", type=int, default=5005, help="Listen UDP port (default: 5005)")
    parser.add_argument("--device", default=None, help="ALSA/PortAudio device name or index (default: system default)")
    parser.add_argument("--prebuffer-ms", type=int, default=50, help="Initial prebuffer in ms (default: 50)")
    parser.add_argument("--gain-db", type=float, default=0.0, help="Digital gain in dB (default: 0.0)")
    parser.add_argument("--rcvbuf", type=int, default=(1<<20), help="SO_RCVBUF bytes (default: 1 MiB)")
    parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase verbosity")

    args = parser.parse_args()
    level = logging.WARNING - min(args.verbose, 2) * 10
    logging.basicConfig(level=level, format="%(asctime)s %(levelname)s: %(message)s")

    try:
        asyncio.run(UdpAudioReceiver(
            host=args.host, port=args.port, device=args.device,
            prebuffer_ms=args.prebuffer_ms, gain_db=args.gain_db, rcvbuf=args.rcvbuf
        ).run())
    except KeyboardInterrupt:
        pass

if __name__ == "__main__":
    import contextlib
    main()

Sender (run on your workstation for validation)

This sender reads a WAV file (stereo, 48 kHz, 16‑bit LE preferred) and streams it as 10 ms UDP packets to the Pi.

nano send_pcm16_udp.py

Paste:

#!/usr/bin/env python3
import argparse
import socket
import struct
import time

import numpy as np
import soundfile as sf

MAGIC = b'P16S'
SAMPLE_RATE = 48000
CHANNELS = 2
FRAME_SAMPLES = 480  # 10 ms
BYTES_PER_SAMPLE = 2

def main():
    p = argparse.ArgumentParser(description="Send PCM16 UDP stream for i2s-network-audio-player")
    p.add_argument("wav", help="Path to stereo 48 kHz 16-bit WAV")
    p.add_argument("--ip", required=True, help="Receiver IP")
    p.add_argument("--port", type=int, default=5005, help="Receiver UDP port (default: 5005)")
    p.add_argument("--repeat", action="store_true", help="Loop playback")
    p.add_argument("--throttle", action="store_true", help="Throttle to realtime (default: True)", default=True)
    args = p.parse_args()

    data, sr = sf.read(args.wav, dtype='int16', always_2d=True)
    if sr != SAMPLE_RATE or data.shape[1] != CHANNELS:
        raise SystemExit(f"Input must be {SAMPLE_RATE} Hz, stereo. Got {sr} Hz, {data.shape[1]} ch")

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    seq = 0
    frame_bytes = FRAME_SAMPLES * CHANNELS * BYTES_PER_SAMPLE
    cursor = 0
    t0 = time.perf_counter()

    while True:
        if cursor + FRAME_SAMPLES > data.shape[0]:
            if args.repeat:
                cursor = 0
                t0 = time.perf_counter()
            else:
                break

        frame = data[cursor:cursor + FRAME_SAMPLES, :]
        payload = frame.astype(np.int16).tobytes()
        packet = MAGIC + struct.pack(">I", seq) + payload
        sock.sendto(packet, (args.ip, args.port))
        seq = (seq + 1) & 0xFFFFFFFF
        cursor += FRAME_SAMPLES

        if args.throttle:
            # Aim for real-time pacing: 10 ms per packet
            time.sleep(0.010)

if __name__ == "__main__":
    main()

Build/Flash/Run commands

All commands below run on the Raspberry Pi unless otherwise specified.

System update and base packages

sudo apt update
sudo apt full-upgrade -y
sudo apt install -y python3-venv python3-dev git build-essential \
    alsa-utils libasound2-dev libportaudio2 portaudio19-dev \
    libsndfile1 \
    iperf3

Optional but recommended: disable Wi‑Fi power saving (to reduce dropouts):

sudo mkdir -p /etc/NetworkManager/conf.d
echo -e "[connection]\nwifi.powersave = 2" | sudo tee /etc/NetworkManager/conf.d/wifi-powersave-off.conf
sudo systemctl restart NetworkManager

Python 3.11 virtual environment and packages

cd ~/i2s-net-player
python3.11 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install numpy sounddevice soundfile
# Per assignment requirement (not strictly needed by the core demo):
pip install gpiozero smbus2 spidev

List sound devices to locate the DAC (optional):

python -c "import sounddevice as sd; print(sd.query_devices())"

You should see a device referencing sndrpihifiberry or default ALSA mapping.

Run the receiver

cd ~/i2s-net-player
source .venv/bin/activate
python net_i2s_receiver.py --host 0.0.0.0 --port 5005 --prebuffer-ms 50 -v

The receiver now waits for UDP audio.

Prepare the sender (on your workstation)

Install Python dependencies:

python3 -m pip install --user soundfile numpy

Find the Pi’s IP (on the Pi):

hostname -I

Assume it prints 192.168.1.50. On your workstation, run:

python3 send_pcm16_udp.py /path/to/stereo_48k_16bit.wav --ip 192.168.1.50 --port 5005 --repeat

If you lack a suitable WAV, create a test file (Linux/macOS with ffmpeg):

ffmpeg -f lavfi -i "sine=frequency=1000:duration=5" -ac 2 -ar 48000 -sample_fmt s16 test_1k_stereo.wav

Step‑by‑step Validation

1) Confirm the I2S DAC is recognized by ALSA:
– aplay -l should list snd_rpi_hifiberry_dac.
– Check dmesg for overlay load:
dmesg | grep -i -E "snd|hifiberry|pcm5102"
Expected: lines indicating pcm5102a codec and machine driver probed.

2) Validate basic audio path:
– Speaker test via ALSA default:
speaker-test -D default -c 2 -t sine -f 440 -l 1
Hear a brief tone. If silent, verify connections and amplifier input.

3) Validate ALSA naming and default route:
– List ALSA logical devices:
aplay -L | sed -n '1,120p'
Ensure default maps to softvol and underlying card is sndrpihifiberry.

4) Confirm Python can access the sound device:
– From the venv:
python - <<'PY'
import sounddevice as sd
for i, d in enumerate(sd.query_devices()):
print(i, d['name'], d['hostapi'], d['max_output_channels'])
PY

Identify an output device with ≥2 channels.

5) Network sanity test:
– Test Wi‑Fi throughput/quality between workstation and Pi:
– On Pi:
iperf3 -s
– On workstation:
iperf3 -c 192.168.1.50
– Expect at least several Mbit/s; our uncompressed stream is ~1.536 Mbit/s (48k × 16‑bit × 2 ch).

6) Run the full streaming path:
– Start receiver on the Pi:
cd ~/i2s-net-player
source .venv/bin/activate
python net_i2s_receiver.py --host 0.0.0.0 --port 5005 --prebuffer-ms 50 -v

You should see “Listening on UDP …” and then “Playback started” after a short prebuffer.
– Start sender on workstation:
python3 send_pcm16_udp.py test_1k_stereo.wav --ip 192.168.1.50 --port 5005 --repeat
The tone should play continuously. Replace with music WAV to validate musical playback.

7) Latency and stability checks:
– Adjust prebuffer for dropouts:
– Increase to 80–120 ms:
python net_i2s_receiver.py --prebuffer-ms 100
– If dropouts persist, confirm Wi‑Fi power saving is off and check RSSI quality (iw dev wlan0 link).

8) Volume control:
– Use softvol control:
alsamixer
– Select “SoftMaster” and adjust volume.
– Note: PCM5102A has no hardware volume; all control is digital.


Troubleshooting

  • No sndrpihifiberry device in aplay -l:
  • Ensure /boot/firmware/config.txt contains both:
    • dtparam=audio=off
    • dtoverlay=hifiberry-dac
  • Reboot and recheck:
    sudo reboot
    aplay -l
  • Inspect kernel messages:
    dmesg | grep -i asoc
    dmesg | grep -i hifiberry

  • No audio, but device is present:

  • Verify wiring:
    • GPIO18 -> BCLK
    • GPIO19 -> LRCLK/WS
    • GPIO21 -> DIN
    • Ground shared
    • Correct VCC (5 V or 3.3 V matching your module)
  • Test with speaker-test:
    speaker-test -D default -c 2 -t sine -f 1000
  • Ensure your amplifier input is correct and volume is up.
  • Check that the module’s output uses the right connectors (RCA/3.5 mm).

  • Distorted or choppy audio:

  • Increase jitter buffer:
    python net_i2s_receiver.py --prebuffer-ms 120
  • Disable Wi‑Fi power saving:
    nmcli radio wifi on
    sudo iw dev wlan0 set power_save off

    Persist via NetworkManager conf as shown earlier.
  • Reduce CPU load:
    • Close other applications.
    • Monitor:
      top
      vcgencmd measure_temp
  • Improve UDP buffering:

    • Increase SO_RCVBUF (already defaulted to 1 MiB). You can raise:
      sudo sysctl -w net.core.rmem_max=4194304
      Then run the receiver with –rcvbuf 4194304.
  • The Python receiver can’t open the sound device:

  • List devices with sounddevice and choose an explicit device:
    python net_i2s_receiver.py --device "default"
    or
    python net_i2s_receiver.py --device "sysdefault"
  • Ensure libportaudio2 and portaudio19-dev are installed, and that sounddevice is installed in the venv.

  • Sender complains about sample rate/channels:

  • Convert your file to stereo 48 kHz 16‑bit:
    ffmpeg -i input.any -ac 2 -ar 48000 -sample_fmt s16 out_48k_16bit_stereo.wav

  • High noise floor or hum:

  • Use a clean 5 V power supply.
  • Keep I2S wires short and away from high‑current cables.
  • Use shielded RCA cables to the amplifier.

Improvements

  • Compression for bandwidth efficiency:
  • Replace raw PCM with Opus or FLAC over RTP. Use GStreamer on both ends:
    • Receiver example (on Pi):
      gst-launch-1.0 udpsrc port=5006 caps="application/x-rtp, media=audio, encoding-name=OPUS, payload=96" \
      ! rtpopusdepay ! opusdec ! audioconvert ! audioresample ! alsasink device=default sync=true
    • Sender example:
      gst-launch-1.0 filesrc location=music.wav ! wavparse ! audioconvert ! audioresample \
      ! opusenc bitrate=128000 ! rtpopuspay ! udpsink host=192.168.1.50 port=5006
  • AirPlay (RAOP) compatibility:
  • Use shairport-sync to turn the Pi into an AirPlay endpoint that outputs via I2S:
    sudo apt install -y shairport-sync
    sudo systemctl enable --now shairport-sync

    Ensure default ALSA routes to your PCM5102A softvol.

  • Multiroom sync:

  • Use Snapcast client/server for synchronized multiroom streaming over the network.

  • System service for auto‑start:

  • Create a systemd unit to start the Python receiver at boot:
    sudo nano /etc/systemd/system/i2s-net-receiver.service
    Contents:
    «`
    [Unit]
    Description=I2S Network Audio Receiver (PCM16 UDP)
    After=network-online.target
    Wants=network-online.target

    [Service]
    User=pi
    WorkingDirectory=/home/pi/i2s-net-player
    ExecStart=/home/pi/i2s-net-player/.venv/bin/python /home/pi/i2s-net-player/net_i2s_receiver.py –host 0.0.0.0 –port 5005 –prebuffer-ms 80
    Restart=always
    RestartSec=2

    [Install]
    WantedBy=multi-user.target
    Enable:
    sudo systemctl daemon-reload
    sudo systemctl enable –now i2s-net-receiver.service
    «`

  • Hardware controls:

  • Add GPIO buttons via gpiozero for play/pause or mute, and expose a simple HTTP API using Flask for remote control.

  • Better resampling and dithering:

  • Accept arbitrary input formats at the sender and resample to 48 kHz with high‑quality SRC before sending.

  • Monitoring:

  • Add Prometheus metrics for packet loss, jitter buffer level, and CPU usage.

Final Checklist

  • OS and language:
  • Raspberry Pi OS Bookworm 64‑bit installed and updated.
  • Python 3.11 available.
  • Hardware:
  • Raspberry Pi Zero 2 W powered by 5 V/2.5 A supply.
  • PCM5102A DAC wired correctly:
    • GPIO18 → BCLK
    • GPIO19 → LRCLK/WS
    • GPIO21 → DIN
    • GND ↔ GND
    • VCC per module spec (5 V or 3.3 V)
  • RCA/3.5 mm outputs connected to amplifier/speakers.
  • Firmware and ALSA:
  • /boot/firmware/config.txt with:
    • dtparam=audio=off
    • dtoverlay=hifiberry-dac
  • /etc/asound.conf softvol mapping to sndrpihifiberry
  • speaker-test emits tone correctly.
  • Python environment:
  • ~/i2s-net-player/.venv created.
  • pip installed: numpy, sounddevice, soundfile (and gpiozero, smbus2, spidev per requirement).
  • Network:
  • Receiver running: net_i2s_receiver.py on UDP port 5005.
  • Sender running on workstation: send_pcm16_udp.py streaming a 48 kHz stereo WAV.
  • Audio plays without dropouts; prebuffer adjusted as needed.
  • Optional optimizations:
  • Wi‑Fi power save disabled.
  • Systemd unit installed for auto‑start.

With these steps completed, your Raspberry Pi Zero 2 W + PCM5102A DAC functions as a robust I2S network audio player, receiving PCM over UDP and outputting high‑quality audio via the I2S interface.

Find this product and/or books on this topic on Amazon

Go to Amazon

As an Amazon Associate, I earn from qualifying purchases. If you buy through this link, you help keep this project running.

Quick Quiz

Question 1: What is the main objective of the project described in the article?




Question 2: Which operating system is recommended for the Raspberry Pi in this project?




Question 3: What type of audio is the player designed to receive over the network?




Question 4: What is the minimum size of the microSD card recommended?




Question 5: Which component is required for I2S connections?




Question 6: What type of power supply is recommended for the Raspberry Pi Zero 2 W?




Question 7: What is the function of the PCM5102A DAC in the project?




Question 8: Which skill is necessary for setting up the I2S connections?




Question 9: What type of network coverage is required for the Raspberry Pi?




Question 10: What is the primary communication protocol used for audio transmission in this project?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Telecommunications Electronics Engineer and Computer Engineer (official degrees in Spain).

Follow me:


Practical case: Drive WS2812B LEDs with Raspberry Pi Pico W

Practical case: Drive WS2812B LEDs with Raspberry Pi Pico W — hero

Objective and use case

What you’ll build: Control WS2812B NeoPixel strips in real-time using WebSocket on Raspberry Pi Pico W. This project enables advanced IoT enthusiasts to achieve low-latency LED control.

Why it matters / Use cases

  • Real-time color and brightness adjustments for LED displays in art installations.
  • Dynamic lighting effects for events or performances, allowing for synchronized visual experiences.
  • Integration into smart home systems for ambient lighting that responds to user interactions.
  • Remote control of LED strips for DIY projects, enhancing user engagement through mobile applications.

Expected outcome

  • Achieve less than 50ms latency in LED color changes over Wi-Fi.
  • Control up to 300 WS2812B LEDs with stable performance at 30 FPS.
  • Maintain consistent brightness levels across all LEDs with less than 5% variance.
  • Successfully handle multiple simultaneous WebSocket connections without performance degradation.

Audience: Advanced IoT enthusiasts; Level: Intermediate to Advanced

Architecture/flow: WebSocket server on Raspberry Pi Pico W communicates with WS2812B strip, controlled via JSON commands sent over Wi-Fi.

Advanced Practical: WebSocket NeoPixel IoT Control on Raspberry Pi Pico W + WS2812B Strip

This hands-on project builds a robust, low-latency WebSocket server on a Raspberry Pi Pico W that controls a WS2812B (NeoPixel) LED strip in real time. You’ll send JSON commands over Wi‑Fi to change colors, brightness, and animations. The code runs fully on the Pico W (MicroPython), while we use a Raspberry Pi computer running Raspberry Pi OS Bookworm 64‑bit as the development/flash host and as a validation client.

Focus: websocket-neopixel-iot-control
Device model: Raspberry Pi Pico W + WS2812B strip

The tutorial is designed for advanced users who want precise control, clean asynchronous design (uasyncio), and a protocol you can integrate into larger IoT systems.


Prerequisites

  • A Raspberry Pi computer running Raspberry Pi OS Bookworm 64‑bit (as your development host). A Pi 4/5 recommended.
  • Internet access on the Raspberry Pi (to download firmware and libraries).
  • Comfort with Python 3.11, virtual environments, MicroPython, and basic networking.
  • Familiarity with LED power considerations and safe wiring practices.

Family defaults adapted to this model:
– We use Raspberry Pi OS Bookworm 64‑bit on the host and Python 3.11 to build tooling, manage the Pico W over USB (mpremote), and run validation clients.
– MicroPython runs on the Pico W; all LED control and the WebSocket server are embedded on the microcontroller.
– We will demonstrate enabling system interfaces on the Raspberry Pi host (via raspi-config or config files) for completeness, even though the project itself does not require GPIO/I2C/SPI on the host.


Materials (exact model)

  • Raspberry Pi Pico W (RPI-PICO-W, RP2040 + CYW43439 Wi‑Fi).
  • WS2812B LED strip (5 V), any length from 30 to 300 LEDs; ensure power budget is adequate.
  • 5 V DC power supply sized for the strip:
  • Rule of thumb: each WS2812B can draw up to ~60 mA at full white (255,255,255).
  • Example: 60 LEDs → up to 3.6 A; use a 5 V 4 A (or higher) supply.
  • Micro‑USB cable (data capable) for Pico W.
  • Level shifting (recommended, but sometimes optional):
  • 74AHCT125, SN74HCT245, or a single‑channel logic level shifter to shift Pico’s 3.3 V data to ~5 V.
  • Alternatively, run the strip at 5 V and verify it accepts 3.3 V logic (many do, but not guaranteed).
  • 300–500 Ω series resistor in the data line (to protect the first LED).
  • 1000 µF electrolytic capacitor across strip 5 V and GND (surge protection).
  • Breadboard and jumpers (if prototyping).

Setup/Connection

Host system setup (Raspberry Pi OS Bookworm 64‑bit, Python 3.11)

  1. Update and install essential tools:
    sudo apt update
    sudo apt full-upgrade -y
    sudo apt install -y python3.11-venv wget git unzip

  2. Enable common interfaces (not strictly required for this project, but included per family defaults):

  3. Launch raspi-config:
    sudo raspi-config

    • System Options → Wireless LAN → set your country/SSID/password (optional; this is for the Raspberry Pi host, not the Pico).
    • Interface Options → enable SSH if you want remote access.
    • Interface Options → I2C/SPI/UART: you may leave disabled for this project; not required.
    • Finish and reboot if prompted.
  4. Alternatively, to edit the firmware config directly:
    sudo nano /boot/firmware/config.txt

    • You can ensure nothing conflicts with USB. No changes required specifically for the Pico W.
  5. Create and activate a Python virtual environment (for host-side tools and validation clients):
    python3 -m venv ~/venvs/pico-ws
    source ~/venvs/pico-ws/bin/activate
    pip install --upgrade pip
    pip install mpremote websockets

Note: The host will not control GPIO. We don’t need gpiozero/smbus2/spidev for this build.

Wiring the Raspberry Pi Pico W + WS2812B

  • Do NOT power the Pico W from the strip’s 5 V supply directly. Power the Pico via USB; power the strip with its own 5 V supply. Always share grounds.
  • Put a 1000 µF capacitor across strip 5 V and GND (observe polarity).
  • Insert a 300–500 Ω resistor in series with the data line to the strip.
  • If using a level shifter, place it between the Pico’s GPIO and the strip’s DIN.

We’ll use GPIO 2 (GP2) as the NeoPixel data pin. Connect as follows:

Pico W Pin WS2812B Connection Notes
GND GND Common ground is required.
GP2 DIN (Data In) Through 300–500 Ω series resistor; via level shifter recommended.
VBUS (5 V output from USB) DO NOT USE Power the strip from a dedicated 5 V supply.
5 V PSU + VCC Provide correct current capacity.
5 V PSU − GND Tie to Pico GND to share reference.

Electrical cautions:
– If the strip is powered but the Pico is not, avoid backfeeding through the data line. Keep a common ground but avoid connecting Pico’s 5 V pin.
– Ensure the first LED’s DIN sees a clean edge and a valid logic-high. A 74AHCT125 buffer is preferred for long runs or high brightness.


Full Code (MicroPython on Pico W)

We’ll deploy three files to the Pico W: secrets.py, wsproto.py (WebSocket handshake and frames), and main.py (Wi‑Fi setup, server, patterns).

File: secrets.py

Replace with your Wi‑Fi credentials and country code.

SSID = "YOUR_SSID"
PASSWORD = "YOUR_PASSWORD"
COUNTRY = "US"  # 2-letter country code (e.g., "US", "DE", "GB")
HOSTNAME = "pico-w-neopixel"

File: wsproto.py

Minimal WebSocket server primitives for MicroPython, handling handshake, text frames, ping/pong, and close.

# wsproto.py
import uasyncio as asyncio
import uhashlib as hashlib
import ubinascii as binascii
import ujson as json

GUID = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

async def _read_exact(reader, n):
    buf = b""
    while len(buf) < n:
        chunk = await reader.read(n - len(buf))
        if not chunk:
            raise OSError("socket closed during read")
        buf += chunk
    return buf

async def handshake(reader, writer):
    # Read HTTP request headers until CRLFCRLF
    data = b""
    while b"\r\n\r\n" not in data:
        chunk = await reader.read(512)
        if not chunk:
            raise OSError("early disconnect")
        data += chunk

    header_text = data.split(b"\r\n\r\n", 1)[0]
    lines = header_text.split(b"\r\n")
    if not lines or not lines[0].startswith(b"GET "):
        raise OSError("invalid HTTP method")

    headers = {}
    for ln in lines[1:]:
        if b":" in ln:
            k, v = ln.split(b":", 1)
            headers[k.strip().lower()] = v.strip()

    if headers.get(b"upgrade", b"").lower() != b"websocket":
        raise OSError("no Upgrade: websocket")
    if b"sec-websocket-key" not in headers:
        raise OSError("missing Sec-WebSocket-Key")

    key = headers[b"sec-websocket-key"]
    sha = hashlib.sha1(key + GUID).digest()
    accept = binascii.b2a_base64(sha).strip()

    resp = b"HTTP/1.1 101 Switching Protocols\r\n" \
           b"Upgrade: websocket\r\n" \
           b"Connection: Upgrade\r\n" \
           b"Sec-WebSocket-Accept: " + accept + b"\r\n\r\n"
    writer.write(resp)
    await writer.drain()

def _encode_frame(opcode, payload=b""):
    # Server->client frames are not masked
    fin_opcode = 0x80 | (opcode & 0x0F)
    length = len(payload)
    if length < 126:
        header = bytes([fin_opcode, length])
    elif length < (1 << 16):
        header = bytes([fin_opcode, 126, (length >> 8) & 0xFF, length & 0xFF])
    else:
        # 64-bit length
        header = bytes([fin_opcode, 127, 0, 0, 0, 0,
                        (length >> 24) & 0xFF, (length >> 16) & 0xFF,
                        (length >> 8) & 0xFF, length & 0xFF])
    return header + payload

async def send_text(writer, text):
    if isinstance(text, str):
        payload = text.encode()
    else:
        payload = text
    frame = _encode_frame(0x1, payload)  # text
    writer.write(frame)
    await writer.drain()

async def send_pong(writer, data=b""):
    frame = _encode_frame(0xA, data)  # pong
    writer.write(frame)
    await writer.drain()

async def send_close(writer, code=1000, reason=b""):
    payload = bytes([code >> 8, code & 0xFF]) + reason
    frame = _encode_frame(0x8, payload)
    writer.write(frame)
    await writer.drain()

async def recv_frame(reader):
    # Returns (opcode, payload_bytes) for a complete frame
    h = await _read_exact(reader, 2)
    b1, b2 = h[0], h[1]
    fin = (b1 & 0x80) != 0
    opcode = b1 & 0x0F
    masked = (b2 & 0x80) != 0
    length = b2 & 0x7F

    if length == 126:
        ext = await _read_exact(reader, 2)
        length = (ext[0] << 8) | ext[1]
    elif length == 127:
        ext = await _read_exact(reader, 8)
        length = 0
        for b in ext:
            length = (length << 8) | b

    mask = b""
    if masked:
        mask = await _read_exact(reader, 4)

    payload = await _read_exact(reader, length) if length else b""

    if masked:
        payload = bytes([payload[i] ^ mask[i % 4] for i in range(length)])

    return opcode, payload

def parse_json_payload(payload_bytes):
    try:
        s = payload_bytes.decode()
        return json.loads(s)
    except Exception:
        return None

File: main.py

Wi‑Fi, uasyncio server, LED driver with effects, and JSON command handling. Adjust LED_COUNT to your strip length.

# main.py
import uasyncio as asyncio
import network
import time
import machine
import neopixel
import ujson as json
import usocket
import rp2

from wsproto import handshake, recv_frame, send_text, send_pong, send_close, parse_json_payload
import secrets

# ---------- Configuration ----------
LED_PIN = 2            # GP2
LED_COUNT = 60         # Adjust to your strip length
FPS_DEFAULT = 40
BRIGHTNESS_DEFAULT = 0.5

# Wi-Fi power save off for lower latency
PM_OFF = 0xA11140

# ---------- LED/Color helpers ----------
class StripController:
    def __init__(self, pin, count):
        self.np = neopixel.NeoPixel(machine.Pin(pin, machine.Pin.OUT), count)
        self.count = count
        self.brightness = BRIGHTNESS_DEFAULT
        self.effect = "solid"
        self.color = (255, 0, 0)
        self.range = (0, count - 1)
        self.fps = FPS_DEFAULT
        self._tick = 0
        self._hue = 0
        self._chase_size = 5
        self._chase_gap = 3
        self._rainbow_speed = 2
        self._running = True

    def set_brightness(self, b):
        self.brightness = max(0.0, min(1.0, float(b)))

    def set_color_hex(self, hexstr):
        # Accept "#RRGGBB" or "RRGGBB"
        h = hexstr.strip().lstrip("#")
        if len(h) != 6:
            return False
        r = int(h[0:2], 16)
        g = int(h[2:4], 16)
        b = int(h[4:6], 16)
        self.color = (r, g, b)
        return True

    def set_range(self, start, end):
        start = max(0, min(self.count - 1, int(start)))
        end = max(0, min(self.count - 1, int(end)))
        if end < start:
            start, end = end, start
        self.range = (start, end)

    def clear(self):
        for i in range(self.count):
            self.np[i] = (0, 0, 0)
        self.np.write()

    def apply_solid(self):
        start, end = self.range
        r, g, b = self._apply_brightness(self.color)
        for i in range(self.count):
            if start <= i <= end:
                self.np[i] = (r, g, b)
            else:
                self.np[i] = (0, 0, 0)
        self.np.write()

    def _apply_brightness(self, rgb):
        return tuple(int(c * self.brightness) for c in rgb)

    def _hsv_to_rgb(self, h, s, v):
        # h: 0..360, s:0..1, v:0..1
        h = h % 360
        c = v * s
        x = c * (1 - abs((h / 60) % 2 - 1))
        m = v - c
        if 0 <= h < 60:
            r1, g1, b1 = c, x, 0
        elif 60 <= h < 120:
            r1, g1, b1 = x, c, 0
        elif 120 <= h < 180:
            r1, g1, b1 = 0, c, x
        elif 180 <= h < 240:
            r1, g1, b1 = 0, x, c
        elif 240 <= h < 300:
            r1, g1, b1 = x, 0, c
        else:
            r1, g1, b1 = c, 0, x
        return (int((r1 + m) * 255), int((g1 + m) * 255), int((b1 + m) * 255))

    def render_rainbow(self):
        start, end = self.range
        span = max(1, end - start + 1)
        hue_step = 360 / span
        # self._hue advances frame to frame
        for idx in range(self.count):
            if start <= idx <= end:
                hue = (self._hue + hue_step * (idx - start)) % 360
                r, g, b = self._hsv_to_rgb(hue, 1.0, 1.0)
                self.np[idx] = self._apply_brightness((r, g, b))
            else:
                self.np[idx] = (0, 0, 0)
        self.np.write()
        self._hue = (self._hue + self._rainbow_speed) % 360

    def render_chase(self):
        start, end = self.range
        r, g, b = self._apply_brightness(self.color)
        size = max(1, self._chase_size)
        gap = max(0, self._chase_gap)
        period = size + gap
        for i in range(self.count):
            if start <= i <= end:
                phase = (i + self._tick) % period
                self.np[i] = (r, g, b) if phase < size else (0, 0, 0)
            else:
                self.np[i] = (0, 0, 0)
        self.np.write()
        self._tick = (self._tick + 1) % period

    def render(self):
        if self.effect == "off":
            self.clear()
        elif self.effect == "solid":
            self.apply_solid()
        elif self.effect == "rainbow":
            self.render_rainbow()
        elif self.effect == "chase":
            self.render_chase()
        else:
            self.apply_solid()

    def apply_command(self, cmd):
        # cmd is a dict parsed from JSON
        op = cmd.get("op") or cmd.get("cmd")
        if op in ("off", "solid", "rainbow", "chase"):
            self.effect = op
        if "color" in cmd and isinstance(cmd["color"], str):
            self.set_color_hex(cmd["color"])
        if "brightness" in cmd:
            self.set_brightness(cmd["brightness"])
        if "range" in cmd and isinstance(cmd["range"], list) and len(cmd["range"]) == 2:
            self.set_range(cmd["range"][0], cmd["range"][1])
        if "fps" in cmd:
            self.fps = max(1, min(120, int(cmd["fps"])))
        if "size" in cmd:
            self._chase_size = int(cmd["size"])
        if "gap" in cmd:
            self._chase_gap = int(cmd["gap"])
        if "speed" in cmd:
            self._rainbow_speed = int(cmd["speed"])

# ---------- Wi-Fi ----------
async def wifi_connect():
    rp2.country(secrets.COUNTRY)
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.config(pm=PM_OFF)
    try:
        wlan.config(hostname=secrets.HOSTNAME)
    except Exception:
        pass
    if not wlan.isconnected():
        wlan.connect(secrets.SSID, secrets.PASSWORD)
        t0 = time.ticks_ms()
        while not wlan.isconnected():
            await asyncio.sleep_ms(200)
            if time.ticks_diff(time.ticks_ms(), t0) > 20000:
                raise RuntimeError("Wi-Fi connect timeout")
    print("Wi-Fi connected:", wlan.ifconfig())
    return wlan

# ---------- WebSocket server ----------
class WSServer:
    def __init__(self, strip: StripController):
        self.strip = strip
        self._client_task = None

    async def _client(self, reader, writer):
        try:
            await handshake(reader, writer)
            await send_text(writer, json.dumps({"status": "ok", "msg": "connected"}))
            while True:
                opcode, payload = await recv_frame(reader)
                # Opcodes: 0x1 text, 0x8 close, 0x9 ping, 0xA pong
                if opcode == 0x9:  # ping
                    await send_pong(writer, payload)
                    continue
                if opcode == 0x8:  # close
                    await send_close(writer, 1000, b"bye")
                    break
                if opcode == 0x1:  # text
                    cmd = parse_json_payload(payload)
                    if not cmd:
                        await send_text(writer, json.dumps({"status": "error", "msg": "bad json"}))
                        continue
                    self.strip.apply_command(cmd)
                    ack = {"status": "ok", "effect": self.strip.effect,
                           "brightness": self.strip.brightness, "fps": self.strip.fps}
                    await send_text(writer, json.dumps(ack))
        except Exception as e:
            # Log or ignore; typical on disconnects
            pass
        finally:
            try:
                await send_close(writer, 1000, b"closed")
            except Exception:
                pass
            await writer.wait_closed()

    async def run(self, host="0.0.0.0", port=8765):
        async def _handler(reader, writer):
            await self._client(reader, writer)

        srv = await asyncio.start_server(_handler, host, port, backlog=2)
        print("WebSocket server listening on ws://{}:{}".format(host, port))
        async with srv:
            await srv.serve_forever()

# ---------- Animation loop ----------
async def animator(strip: StripController):
    while True:
        strip.render()
        await asyncio.sleep_ms(int(1000 / max(1, strip.fps)))

# ---------- Entry ----------
async def main():
    wlan = await wifi_connect()
    strip = StripController(LED_PIN, LED_COUNT)
    strip.clear()
    ws = WSServer(strip)
    anim_task = asyncio.create_task(animator(strip))
    srv_task = asyncio.create_task(ws.run())
    await asyncio.gather(anim_task, srv_task)

try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()

Notes:
– The server expects JSON messages with keys like op, color, brightness, range, fps, etc. Example messages:
– {«op»:»solid»,»color»:»#00FF99″,»brightness»:0.6}
– {«op»:»off»}
– {«op»:»rainbow»,»speed»:3,»fps»:50,»brightness»:0.4}
– {«op»:»chase»,»color»:»#FF0000″,»size»:4,»gap»:2,»fps»:40,»range»:[0,59]}


Build/Flash/Run Commands (Host on Raspberry Pi OS Bookworm)

  1. Activate the virtual environment:
    source ~/venvs/pico-ws/bin/activate

  2. Download MicroPython firmware for Raspberry Pi Pico W (stable). As of this writing:

  3. MicroPython v1.22.2 (rp2-pico-w):
    mkdir -p ~/pico-w-firmware && cd ~/pico-w-firmware
    wget https://micropython.org/resources/firmware/RPI_PICO_W-20240222-v1.22.2.uf2 -O micropython-picow.uf2

  4. Put the Pico W in BOOTSEL mode:

  5. Hold BOOTSEL button while plugging in the USB cable to the Raspberry Pi.
  6. A mass storage device RPI-RP2 appears.

  7. Flash MicroPython UF2:
    cp micropython-picow.uf2 /media/$USER/RPI-RP2/
    The Pico W will reboot into MicroPython and expose a serial device (e.g., /dev/ttyACM0).

  8. Verify the serial device:
    dmesg | tail -n 20
    ls /dev/ttyACM*

  9. Create a project folder on the host and put your MicroPython files:
    mkdir -p ~/pico-ws/project && cd ~/pico-ws/project
    nano secrets.py
    nano wsproto.py
    nano main.py

    Paste the contents from the Full Code section (ensure your SSID/PASSWORD/COUNTRY are set in secrets.py).

  10. Use mpremote to copy files to the Pico W:
    mpremote connect list
    # Example output: /dev/ttyACM0 ...
    mpremote connect /dev/ttyACM0 fs cp secrets.py :secrets.py
    mpremote connect /dev/ttyACM0 fs cp wsproto.py :wsproto.py
    mpremote connect /dev/ttyACM0 fs cp main.py :main.py

Optionally, soft reboot to start main.py:
mpremote connect /dev/ttyACM0 soft-reset

  1. View console logs (optional):
    mpremote connect /dev/ttyACM0 repl
    You should see “Wi‑Fi connected: (‘IP’, …)” and “WebSocket server listening …”.

Validation (Step‑by‑Step)

We’ll validate with a Python 3.11 WebSocket client running on the Raspberry Pi host.

  1. Determine the Pico W IP:
  2. From REPL logs (mpremote repl) you’ll see something like:
    • Wi‑Fi connected: (‘192.168.1.88’, ‘255.255.255.0’, ‘192.168.1.1’, ‘8.8.8.8’)
  3. Or check your DHCP server/router.

  4. Create a test client on the host:
    cd ~/pico-ws/project
    nano ws_client.py

    Paste:

«`python
# ws_client.py
import asyncio, websockets, json, sys

async def main():
if len(sys.argv) < 2:
print(«Usage: python ws_client.py ws://:8765″)
return
uri = sys.argv[1]
async with websockets.connect(uri) as ws:
hello = await ws.recv()
print(«Server:», hello)

       # Solid teal
       cmd = {"op": "solid", "color": "#00CCAA", "brightness": 0.6}
       await ws.send(json.dumps(cmd))
       print("ACK:", await ws.recv())

       # Rainbow
       cmd = {"op": "rainbow", "speed": 3, "fps": 50, "brightness": 0.4}
       await ws.send(json.dumps(cmd))
       print("ACK:", await ws.recv())

       # Chase red
       cmd = {"op": "chase", "color": "#FF0000", "size": 4, "gap": 2, "fps": 40}
       await ws.send(json.dumps(cmd))
       print("ACK:", await ws.recv())

       # Range-limited solid
       cmd = {"op": "solid", "color": "#3344FF", "range": [10, 25], "brightness": 0.8}
       await ws.send(json.dumps(cmd))
       print("ACK:", await ws.recv())

       # Off
       cmd = {"op": "off"}
       await ws.send(json.dumps(cmd))
       print("ACK:", await ws.recv())

if name == «main«:
asyncio.run(main())
«`

  1. Run the client:
    source ~/venvs/pico-ws/bin/activate
    python ws_client.py ws://192.168.1.88:8765
  2. Replace 192.168.1.88 with your Pico W IP.
  3. You should see:
    • Server: {«status»:»ok»,»msg»:»connected»}
    • ACKs for each command.
  4. Visually confirm the LED strip changes to teal, rainbow animation, red chase, blue range slice, then off.

  5. Try custom commands interactively:
    «`
    python -q

    import asyncio, websockets, json
    async def t():
    … ws = await websockets.connect(«ws://192.168.1.88:8765»)
    … print(await ws.recv())
    … await ws.send(json.dumps({«op»:»solid»,»color»:»#FF00FF»,»brightness»:0.3}))
    … print(await ws.recv())
    … await ws.close()

    asyncio.run(t())
    «`

  6. Latency/throughput sanity check:

  7. Increase fps to 60–100 in rainbow/chase and observe animation smoothness:
    python ws_client.py ws://192.168.1.88:8765
    Modify the “fps” field in the script and rerun to see responsiveness.

  8. Optional web browser validation (local file):

  9. Create an HTML test page (static file on your Raspberry Pi host):
    nano ~/pico-ws/project/index.html
    Paste:
    html
    <!doctype html>
    <meta charset="utf-8">
    <title>Pico W NeoPixel WS Test</title>
    <input id="ip" value="ws://192.168.1.88:8765" size="30">
    <button id="connect">Connect</button>
    <div id="status"></div>
    <button onclick='send({"op":"solid","color":"#00FFAA","brightness":0.6})'>Solid</button>
    <button onclick='send({"op":"rainbow","speed":3,"fps":50,"brightness":0.4})'>Rainbow</button>
    <button onclick='send({"op":"chase","color":"#FF0000","size":5,"gap":2,"fps":45})'>Chase</button>
    <button onclick='send({"op":"off"})'>Off</button>
    <script>
    let ws;
    document.getElementById("connect").onclick = () => {
    const u = document.getElementById("ip").value;
    ws = new WebSocket(u);
    ws.onopen = () => document.getElementById("status").innerText = "Connected";
    ws.onmessage = (ev) => console.log("RX:", ev.data);
    ws.onclose = () => document.getElementById("status").innerText = "Closed";
    ws.onerror = (e) => console.error("WS error", e);
    };
    function send(obj) {
    if (ws && ws.readyState === 1) ws.send(JSON.stringify(obj));
    }
    </script>
  10. Open it in a browser on your Raspberry Pi and click Connect and buttons to test.

Troubleshooting

  • Wi‑Fi won’t connect:
  • Ensure secrets.py SSID/PASSWORD are correct and COUNTRY is a valid 2‑letter code.
  • Some APs require 2.4 GHz; Pico W does not support 5 GHz.
  • Move closer to the AP or disable power save: we set wlan.config(pm=0xA11140) already.

  • No LED response:

  • Verify GND shared between Pico W and the strip’s 5 V PSU.
  • Confirm you used the strip’s DIN, not DOUT.
  • Try a lower LED_COUNT in code to match your strip length.
  • Use a level shifter if the first LED fails to latch data; long data wires and 3.3 V logic can be marginal at 5 V power levels.
  • Always have the 300–500 Ω series resistor and the 1000 µF capacitor; they improve reliability and protect the first LED.

  • Flicker or random colors:

  • Inadequate power supply or voltage droop; use thicker wires, power injection for long strips (feed 5 V and GND at multiple points).
  • Increase brightness gradually; at 100% white many supplies sag.
  • Ensure data and ground run together to reduce noise coupling.

  • mpremote cannot connect:

  • Check the device path:
    ls /dev/ttyACM*
    mpremote connect /dev/ttyACM0 repl
  • If Thonny or another tool is connected, close it first.

  • WebSocket client fails to connect:

  • Verify the Pico W IP.
  • Ensure port 8765 is not blocked on your LAN.
  • Confirm that the Pico reports “WebSocket server listening …” in REPL.

  • JSON errors:

  • The server sends {«status»:»error»,»msg»:»bad json»} if parsing fails. Validate JSON formatting.

  • Performance tuning:

  • Reduce LED_COUNT or FPS if you need more CPU headroom for networking.
  • Keep message rate modest; the Pico can handle frequent updates but don’t flood with large frames.

Improvements

  • TLS (wss):
  • MicroPython supports ssl.wrap_socket, but a fully secure WebSocket server with TLS on a microcontroller requires certificate handling and more RAM. For production-grade security, consider placing a reverse proxy (nginx/Caddy) on your LAN and proxying wss to the Pico’s ws endpoint, or implement TLS on the Pico with a carefully tuned cert/key.

  • Persistent configuration:

  • Store default effect/brightness in a JSON file on the Pico filesystem and load at boot. Add a command to save current settings.

  • Input validation and schema:

  • Extend command parsing to validate ranges strictly and send detailed error messages with error codes.

  • mDNS:

  • Provide a friendly hostname discovery. MicroPython mDNS on Pico W is non-trivial; you can also run a tiny mDNS reflector or keep IP in a DHCP reservation.

  • OTA-like updates:

  • Use mpremote fs cp to push updates without re‑flashing firmware. Consider packaging code into a single file and versioning it in git.

  • More effects:

  • Theater chase, comet, twinkle, gradients over time, palettes, and segment groups.

  • Home Assistant integration:

  • Create an automation that uses a Python script or Node-RED to send WebSocket JSON commands.

  • Rate limiting:

  • Add a queue or time gate to avoid command storms overwhelming animations.

Reference JSON Commands

The WebSocket server on the Pico W accepts text frames containing JSON objects. Supported fields:

  • op or cmd: «off» | «solid» | «rainbow» | «chase»
  • color: «#RRGGBB» (for solid/chase)
  • brightness: 0.0–1.0
  • range: [start, end] indices inclusive
  • fps: 1–120
  • size: integer (for chase)
  • gap: integer (for chase)
  • speed: integer (for rainbow hue advance per frame)

Examples:
– {«op»:»off»}
– {«op»:»solid»,»color»:»#00FF99″,»brightness»:0.5}
– {«op»:»rainbow»,»speed»:4,»fps»:60,»brightness»:0.4}
– {«op»:»chase»,»color»:»#FF3300″,»size»:3,»gap»:2,»range»:[0,50]}


Safety and Power Notes

  • Current draw: Full-brightness white is worst case. Use adequate 5 V power and consider multiple injection points for long strips to avoid voltage drop.
  • Thermal: High brightness may heat the strip; ensure ventilation.
  • ESD: Handle the Pico W and strip with care, avoid hot-plugging the data line with power applied.

Final Checklist

  • Raspberry Pi host:
  • Raspberry Pi OS Bookworm 64‑bit installed and updated.
  • Python 3.11 venv created; mpremote and websockets installed.
  • Interfaces enabled via raspi-config as desired; not required for this project.

  • Hardware wiring:

  • Pico W GP2 → 300–500 Ω resistor → WS2812B DIN (via level shifter recommended).
  • Common ground: Pico GND ↔ strip GND ↔ PSU GND.
  • 1000 µF capacitor across strip 5 V and GND.
  • Dedicated 5 V supply sized for total LED count.

  • MicroPython on Pico W:

  • UF2 flashed: RPI_PICO_W-20240222-v1.22.2.uf2 (or a later stable release).
  • Files uploaded: secrets.py, wsproto.py, main.py.

  • Network:

  • secrets.py has correct SSID/PASSWORD/COUNTRY.
  • Pico W obtains an IP; server prints “WebSocket server listening …”.

  • Validation:

  • Python client connects to ws://:8765 and receives {«status»:»ok»,»msg»:»connected»}.
  • Solid, rainbow, chase, range, brightness, fps commands work as expected.
  • Strip responds smoothly without flicker under normal brightness.

By following the steps above, you’ve implemented a clean, low-latency WebSocket interface to a WS2812B LED strip driven by a Raspberry Pi Pico W, ready to integrate into a larger IoT control plane or UI.

Find this product and/or books on this topic on Amazon

Go to Amazon

As an Amazon Associate, I earn from qualifying purchases. If you buy through this link, you help keep this project running.

Quick Quiz

Question 1: What type of server is built on the Raspberry Pi Pico W in this project?




Question 2: Which programming language is used to run the code on the Pico W?




Question 3: What is the purpose of sending JSON commands over Wi-Fi?




Question 4: Which Raspberry Pi model is recommended as the development host?




Question 5: What is the voltage requirement for the WS2812B LED strip?




Question 6: What is the focus of the project described in the article?




Question 7: What is a prerequisite for this project regarding Python?




Question 8: What does the tutorial aim to provide for advanced users?




Question 9: What type of interfaces may be enabled on the Raspberry Pi host?




Question 10: What is the maximum number of LEDs recommended for the WS2812B LED strip in this project?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Telecommunications Electronics Engineer and Computer Engineer (official degrees in Spain).

Follow me:


Practical case: ANPR with OpenCV on Raspberry Pi 4 HQ Camera

Practical case: ANPR with OpenCV on Raspberry Pi 4 HQ Camera — hero

Objective and use case

What you’ll build: An advanced ANPR system on Raspberry Pi 4 using the HQ Camera and OpenCV to accurately recognize license plates in real-time.

Why it matters / Use cases

  • Automated toll collection systems utilizing real-time license plate recognition to streamline vehicle passage.
  • Parking management solutions that automatically identify vehicles for entry and exit, enhancing user experience.
  • Law enforcement applications for tracking stolen vehicles or monitoring traffic violations through automated surveillance.
  • Smart city initiatives that integrate ANPR for traffic analysis and urban planning.

Expected outcome

  • Achieve a recognition accuracy of over 95% for alphanumeric plates under various lighting conditions.
  • Process frames at a rate of at least 10 FPS, ensuring real-time performance.
  • Maintain latency under 200 ms from capture to recognition output.
  • Successfully identify and log license plates in a database for further analysis with a minimum of 100 entries per hour.

Audience: Advanced users; Level: Advanced

Architecture/flow: The system captures images via the HQ Camera, processes them using OpenCV for plate detection, and employs Tesseract OCR for text recognition.

Advanced Hands‑On: Raspberry Pi 4 + HQ Camera ANPR (OpenCV License Plate Recognition)

Objective: Build an on‑device ANPR (Automatic Number Plate Recognition) system on a Raspberry Pi 4 using the HQ Camera, OpenCV, and Tesseract OCR. The pipeline captures frames, detects plate candidates, rectifies the plate region, and performs text recognition tuned for alphanumeric plates. This guide provides code, exact commands, connections, and a validation path, targeted at advanced users.


Prerequisites

  • Expertise level: Advanced (comfortable with Linux, Python, OpenCV, and basic GPIO).
  • Operating System: Raspberry Pi OS Bookworm 64‑bit modern stack (libcamera).
  • Python 3.11 on the Raspberry Pi 4.
  • Stable internet connection for package installation.
  • Basic familiarity with camera optics (focusing the C‑mount lens on the HQ Camera).
  • A test environment where you can legally capture license plates or use printed/stock images for validation.

Materials (Exact Models and Versions)

Item Exact Model / Version Notes
Single‑board computer Raspberry Pi 4 Model B (4 GB or 8 GB RAM recommended) 4 GB minimum for smoother OpenCV OCR workloads
Camera Raspberry Pi HQ Camera (IMX477) With C‑mount
Lens C‑Mount Lens, 6 mm or 12 mm fixed focal length (e.g., Raspberry Pi 6mm Lens) Choose focal based on distance/field of view
Camera cable Official Raspberry Pi CSI‑2 ribbon cable Shorter cables reduce noise
microSD 32 GB microSD card (A1/U1 or better) Raspberry Pi OS Bookworm 64‑bit
Power 5V/3A USB‑C power supply Official recommended
Mount Tripod or rigid mount for HQ Camera Stability is critical
Optional trigger Momentary pushbutton and 2x female‑female jumpers For GPIO capture trigger
Optional indicator 3.3V LED + 330 Ω resistor Visual feedback on detection
Enclosure Any camera mount or case To reduce vibrations and stray light

Setup and Connections

1) Flash and update Raspberry Pi OS Bookworm 64‑bit

  • Use Raspberry Pi Imager to flash “Raspberry Pi OS (64‑bit)” (Bookworm).
  • After first boot:
sudo apt update
sudo apt full-upgrade -y
sudo reboot

2) Enable camera and interfaces (raspi‑config)

  • Interactive:
  • Run:
    sudo raspi-config
  • Interface Options:
    • Camera → Enable
    • I2C → Enable (optional; for future sensors)
    • SPI → Enable (optional; for future SPI peripherals)
  • Finish → Reboot when prompted.

  • Alternatively, via /boot/firmware/config.txt (Bookworm path is /boot/firmware):

  • Edit:
    sudo nano /boot/firmware/config.txt
  • Ensure the following lines exist (add if missing):
    camera_auto_detect=1
    dtoverlay=imx477
    gpu_mem=256
  • Save and reboot:
    sudo reboot

Notes:
– Bookworm uses libcamera; you do not need legacy start_x settings.
– gpu_mem=256 is recommended for camera preview/processing pipelines.

3) Connect the Raspberry Pi HQ Camera

  • Power off the Pi before connecting the cable.
  • Lift the CSI connector latch on the Pi; insert the ribbon cable with the exposed contacts facing the HDMI ports; close latch.
  • On the HQ Camera side, match the ribbon orientation and close latch.
  • Mount the lens and perform coarse focus manually. You will refine focus later using a live preview.

4) Optional: Wire a trigger pushbutton to GPIO 17

If you want a hardware trigger to capture and process:

Component Raspberry Pi header pin Signal
Pushbutton leg 1 Pin 11 GPIO 17 (BCM numbering)
Pushbutton leg 2 Pin 6 GND
  • We will use an internal pull‑up. Pressing the button pulls GPIO 17 to GND (active‑low).
  • Optional LED indicator (annotated detection feedback):
  • LED anode → 330 Ω resistor → Pin 13 (GPIO 27)
  • LED cathode → Pin 9 (GND)

No external power required for the button/LED besides Pi’s own 3.3V domain (and we’re using internal pull‑ups).


Software Setup

We will create a Python 3.11 virtual environment that can use system site packages so we can consume Picamera2 from apt, and install OpenCV, OCR, and utilities with pip.

1) Base packages, camera stack, and OCR engine

sudo apt update
sudo apt install -y \
  libcamera-apps \
  python3-picamera2 \
  tesseract-ocr tesseract-ocr-eng \
  libtesseract-dev \
  git curl
  • Validate camera quickly:
    libcamera-hello -t 2000
    libcamera-still -o /tmp/test.jpg

2) Create a project and venv (with system packages)

mkdir -p ~/projects/anpr-rpi4/src ~/projects/anpr-rpi4/out
cd ~/projects/anpr-rpi4
python3 --version
python3 -m venv --system-site-packages .venv
source .venv/bin/activate

3) Install Python dependencies (pip)

Pin versions for reproducibility:

pip install --upgrade pip
pip install \
  numpy==1.26.4 \
  opencv-python==4.8.1.78 \
  imutils==0.5.4 \
  pytesseract==0.3.10 \
  gpiozero==2.0 \
  smbus2==0.4.3 \
  spidev==3.6

Quick import tests:

python - <<'PY'
import cv2, numpy as np, pytesseract
print("cv2:", cv2.__version__)
print("numpy:", np.__version__)
print("tesseract:", pytesseract.get_tesseract_version())
PY

Full Code (OpenCV + Picamera2 + Tesseract OCR)

Create the file:
– Path: ~/projects/anpr-rpi4/src/anpr_cam.py

#!/usr/bin/env python3
import os
import re
import time
import argparse
from datetime import datetime
from typing import List, Tuple

import cv2
import numpy as np
import pytesseract

try:
    from picamera2 import Picamera2
except ImportError as e:
    raise SystemExit("Picamera2 not found. Ensure 'python3-picamera2' is installed via apt and that your venv uses --system-site-packages.") from e

# Optional GPIO trigger/indicator
try:
    from gpiozero import Button, LED
except Exception:
    Button = None
    LED = None


ALNUM_WHITELIST = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
TESSERACT_CONFIG = f"--oem 1 --psm 7 -c tessedit_char_whitelist={ALNUM_WHITELIST}"
PLATE_REGEX = re.compile(r"[A-Z0-9]{5,9}")  # generic; adjust to your region

def rectify_plate(image: np.ndarray, contour: np.ndarray, output_size=(240, 80)) -> np.ndarray:
    """
    Given a contour approximated to 4 points, create a perspective transform
    to rectify the plate ROI to a canonical size suitable for OCR.
    """
    pts = contour.reshape(4, 2).astype(np.float32)

    # Order points: top-left, top-right, bottom-right, bottom-left
    s = pts.sum(axis=1)
    diff = np.diff(pts, axis=1)
    tl = pts[np.argmin(s)]
    br = pts[np.argmax(s)]
    tr = pts[np.argmin(diff)]
    bl = pts[np.argmax(diff)]
    ordered = np.array([tl, tr, br, bl], dtype=np.float32)

    dst = np.array(
        [[0, 0], [output_size[0] - 1, 0], [output_size[0] - 1, output_size[1] - 1], [0, output_size[1] - 1]],
        dtype=np.float32
    )
    M = cv2.getPerspectiveTransform(ordered, dst)
    warp = cv2.warpPerspective(image, M, output_size, flags=cv2.INTER_CUBIC)
    return warp

def detect_plate_candidates(frame: np.ndarray) -> List[np.ndarray]:
    """
    Use a classical pipeline to propose plate-shaped contours:
    - grayscale
    - bilateral filter
    - Canny edges
    - morphological closing
    - contour filtering by area/aspect ratio/rectangularity
    Returns a list of quadrilateral contours (4 points).
    """
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    blur = cv2.bilateralFilter(gray, 9, 75, 75)
    edges = cv2.Canny(blur, 80, 200)

    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
    closed = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel, iterations=2)

    cnts, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    candidates = []
    h, w = gray.shape
    img_area = h * w

    for c in cnts:
        area = cv2.contourArea(c)
        if area < 0.001 * img_area or area > 0.2 * img_area:
            continue
        peri = cv2.arcLength(c, True)
        approx = cv2.approxPolyDP(c, 0.02 * peri, True)
        if len(approx) == 4:
            # Aspect ratio filter using bounding box
            x, y, bw, bh = cv2.boundingRect(approx)
            if bh == 0:
                continue
            ar = bw / float(bh)
            if 1.8 <= ar <= 6.5:
                candidates.append(approx)
    return candidates

def ocr_plate(plate_img: np.ndarray) -> Tuple[str, float]:
    """
    Run OCR on the rectified plate image.
    Returns recognized text and an average confidence in [0, 100].
    """
    gray = cv2.cvtColor(plate_img, cv2.COLOR_RGB2GRAY)
    # Adaptive threshold improves robustness under varying lighting
    th = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C,
                               cv2.THRESH_BINARY, 31, 15)
    th = cv2.medianBlur(th, 3)
    data = pytesseract.image_to_data(th, config=TESSERACT_CONFIG, output_type=pytesseract.Output.DICT, lang="eng")

    words = []
    confs = []
    for txt, conf in zip(data["text"], data["conf"]):
        try:
            cval = float(conf)
        except ValueError:
            cval = -1.0
        clean = "".join([ch for ch in txt.upper() if ch in ALNUM_WHITELIST])
        if clean and cval >= 0:
            words.append(clean)
            confs.append(cval)

    if not words:
        return "", 0.0

    text = "".join(words)
    avg_conf = float(np.mean(confs)) if confs else 0.0

    # Simple regex filter to avoid obvious garbage
    m = PLATE_REGEX.search(text)
    if m:
        text = m.group(0)
    return text, avg_conf

def annotate(frame: np.ndarray, contour: np.ndarray, text: str, conf: float):
    cv2.drawContours(frame, [contour], -1, (0, 255, 0), 2)
    x, y, w, h = cv2.boundingRect(contour)
    label = f"{text} ({conf:.0f}%)"
    cv2.rectangle(frame, (x, y - 24), (x + 8 * len(label), y), (0, 255, 0), -1)
    cv2.putText(frame, label, (x + 4, y - 6),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1, cv2.LINE_AA)

def process_frame(frame: np.ndarray, min_conf: float = 55.0):
    """
    Process a frame: detect candidate plates, OCR the best candidate,
    and return annotated frame plus best (text, conf).
    """
    candidates = detect_plate_candidates(frame)
    best_text, best_conf, best_contour = "", 0.0, None
    for cnt in candidates:
        try:
            roi = rectify_plate(frame, cnt)
        except Exception:
            continue
        text, conf = ocr_plate(roi)
        if conf > best_conf and len(text) >= 5:
            best_text, best_conf, best_contour = text, conf, cnt

    if best_contour is not None and best_conf >= min_conf:
        annotate(frame, best_contour, best_text, best_conf)

    return frame, best_text, best_conf

def run_camera(args):
    picam = Picamera2()
    # 1080p RGB frames for OCR-friendly resolution
    config = picam.create_video_configuration(main={"size": (1920, 1080), "format": "RGB888"})
    picam.configure(config)
    picam.start()
    time.sleep(0.5)

    button = None
    led = None
    if args.trigger_gpio is not None and Button is not None:
        button = Button(args.trigger_gpio, pull_up=True, bounce_time=0.03)
    if args.led_gpio is not None and LED is not None:
        led = LED(args.led_gpio)

    print("Camera running. Press Ctrl+C to exit.")
    try:
        while True:
            if button:
                # Wait for button press to capture/process
                button.wait_for_press()
            frame = picam.capture_array()  # RGB888
            t0 = time.time()
            annotated, text, conf = process_frame(frame, min_conf=args.min_conf)
            dt = (time.time() - t0) * 1000.0

            if text:
                ts = datetime.now().strftime("%Y%m%d_%H%M%S")
                out_path = os.path.join(args.out_dir, f"anpr_{ts}_{text}_{int(conf)}.jpg")
                cv2.imwrite(out_path, cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
                print(f"[{ts}] {text} ({conf:.1f}%) - saved {out_path} - {dt:.1f} ms")
                if led:
                    led.on()
                    time.sleep(0.1)
                    led.off()
            elif args.verbose:
                print(f"No plate | {dt:.1f} ms")

            if args.display:
                cv2.imshow("ANPR", cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            if button:
                # Debounce window after capture
                time.sleep(0.25)
    except KeyboardInterrupt:
        pass
    finally:
        if args.display:
            cv2.destroyAllWindows()
        picam.stop()

def run_on_media(args):
    if args.image:
        frame_bgr = cv2.imread(args.image)
        assert frame_bgr is not None, f"Cannot read image: {args.image}"
        frame = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        annotated, text, conf = process_frame(frame, min_conf=args.min_conf)
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        out_path = os.path.join(args.out_dir, f"anpr_{ts}_{text or 'none'}_{int(conf)}.jpg")
        cv2.imwrite(out_path, cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
        print(f"[IMAGE] {args.image} -> {text} ({conf:.1f}%) -> {out_path}")
        if args.display:
            cv2.imshow("ANPR", cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        return

    if args.video:
        cap = cv2.VideoCapture(args.video)
        assert cap.isOpened(), f"Cannot open video: {args.video}"
        while True:
            ok, frame_bgr = cap.read()
            if not ok:
                break
            frame = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
            annotated, text, conf = process_frame(frame, min_conf=args.min_conf)
            if text:
                ts = datetime.now().strftime("%Y%m%d_%H%M%S")
                out_path = os.path.join(args.out_dir, f"anpr_{ts}_{text}_{int(conf)}.jpg")
                cv2.imwrite(out_path, cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
                print(f"[VIDEO] {text} ({conf:.1f}%) -> {out_path}")
            if args.display:
                cv2.imshow("ANPR", cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        cap.release()
        if args.display:
            cv2.destroyAllWindows()

def main():
    parser = argparse.ArgumentParser(description="Raspberry Pi 4 + HQ Camera ANPR (OpenCV + Tesseract)")
    parser.add_argument("--display", action="store_true", help="Show annotated frames")
    parser.add_argument("--out-dir", type=str, default=os.path.expanduser("~/projects/anpr-rpi4/out"), help="Output directory")
    parser.add_argument("--min-conf", type=float, default=55.0, help="Minimum OCR confidence to accept")
    parser.add_argument("--verbose", action="store_true", help="Verbose output")
    parser.add_argument("--trigger-gpio", type=int, default=None, help="Button GPIO (BCM) for trigger, e.g., 17")
    parser.add_argument("--led-gpio", type=int, default=None, help="LED GPIO (BCM) to blink on detection, e.g., 27")
    parser.add_argument("--image", type=str, default=None, help="Run once on a single image")
    parser.add_argument("--video", type=str, default=None, help="Run on a video file")
    args = parser.parse_args()

    os.makedirs(args.out_dir, exist_ok=True)

    if args.image or args.video:
        run_on_media(args)
    else:
        run_camera(args)

if __name__ == "__main__":
    main()

Make it executable:

chmod +x ~/projects/anpr-rpi4/src/anpr_cam.py

Build/Flash/Run Commands

Although “build/flash” is minimal for Python, the environment setup is critical. Use the exact sequence below.

1) Confirm camera works (libcamera)

libcamera-hello -t 3000
libcamera-still -o ~/projects/anpr-rpi4/out/libcamera_test.jpg

2) Validate Tesseract OCR alone

tesseract --version
# Quick smoke test with a synthetic label:
convert -size 400x120 xc:white -fill black -pointsize 72 -gravity center \
  -annotate 0 "ABC1234" /tmp/plate.png
tesseract /tmp/plate.png stdout -l eng --psm 7

If ImageMagick convert is missing:

sudo apt install -y imagemagick

3) Run the ANPR app on an image

source ~/projects/anpr-rpi4/.venv/bin/activate
python ~/projects/anpr-rpi4/src/anpr_cam.py --image /tmp/plate.png --display --verbose

4) Run live with the HQ Camera (with optional GPIO trigger)

  • Continuous processing, headless:
    python ~/projects/anpr-rpi4/src/anpr_cam.py --min-conf 55.0
  • Show annotated preview (requires desktop/X or Wayland):
    python ~/projects/anpr-rpi4/src/anpr_cam.py --display --min-conf 55.0
  • Hardware trigger on GPIO 17 and LED feedback on GPIO 27:
    python ~/projects/anpr-rpi4/src/anpr_cam.py --trigger-gpio 17 --led-gpio 27 --min-conf 55.0

Outputs are saved in:
– ~/projects/anpr-rpi4/out/

Filename format:
– anpr_YYYYMMDD_HHMMSS_PLATETEXT_CONF.jpg


Step‑by‑Step Validation

Follow this sequence to isolate and validate each subsystem.

1) OS and Python
– Confirm 64‑bit OS and Python 3.11:
uname -m
python3 --version

Expect: aarch64, Python 3.11.x

2) Camera driver and sensor
– Test libcamera alive:
libcamera-hello -t 2000
You should see a live preview and no errors in the terminal.

  • Capture a still:
    libcamera-still -o /tmp/hq_check.jpg
    file /tmp/hq_check.jpg

3) Picamera2 import
– From the venv:
source ~/projects/anpr-rpi4/.venv/bin/activate
python - <<'PY'
from picamera2 import Picamera2
picam = Picamera2()
print("Picamera2 OK:", picam)
PY

4) OpenCV and Tesseract together
– Confirm OpenCV can load and Tesseract can OCR:
python - <<'PY'
import cv2, numpy as np, pytesseract
img = cv2.imread("/tmp/hq_check.jpg")
print("cv2 version:", cv2.__version__, "img:", img.shape if img is not None else None)
print("tesseract:", pytesseract.get_tesseract_version())
PY

5) OCR with synthetic plate
– Create and OCR:
convert -size 400x120 xc:white -fill black -pointsize 72 -gravity center \
-annotate 0 "KJ67ABC" /tmp/fakeplate.png
python ~/projects/anpr-rpi4/src/anpr_cam.py --image /tmp/fakeplate.png --display --verbose

Expect: A console line showing “KJ67ABC (~90%+)” depending on rendering.

6) Optical focus and exposure
– Use a live preview to manually focus the HQ camera:
libcamera-hello -t 0
Point at a printed license plate (or high‑contrast text). Adjust lens focus and back‑focus for maximal sharpness. ANPR is highly sensitive to sharpness.

7) End‑to‑end live ANPR
– Place a test plate in the field of view at realistic distance.
– Run:
python ~/projects/anpr-rpi4/src/anpr_cam.py --display --min-conf 55.0 --verbose
– Observe console logs:
– If a plate is recognized, you will see lines like:
[20250114_143501] ABC1234 (84.0%) - saved /home/pi/projects/anpr-rpi4/out/anpr_20250114_143501_ABC1234_84.jpg - 72.3 ms
– Check saved images under ~/projects/anpr-rpi4/out to verify the bounding box and label.

8) Trigger validation (if wired)
– With the pushbutton wired to GPIO 17, run:
python ~/projects/anpr-rpi4/src/anpr_cam.py --trigger-gpio 17 --led-gpio 27 --min-conf 55.0
– Press the button to capture/process a single frame each press. LED should blink on successful recognition.


Troubleshooting

  • Camera not detected / libcamera errors:
  • Ensure the ribbon cable orientation and latches are correct on both ends.
  • Confirm raspi‑config “Camera” is enabled and rebooted.
  • Check /boot/firmware/config.txt contains:
    camera_auto_detect=1
    dtoverlay=imx477
    gpu_mem=256
  • Inspect dmesg:
    dmesg | grep -i imx477

  • Picamera2 import fails inside venv:

  • You must create the venv with system site packages:
    python3 -m venv --system-site-packages ~/projects/anpr-rpi4/.venv
  • Or install python3-picamera2 globally via apt (already done) and avoid a venv if desired.

  • OpenCV import error or “Illegal instruction”:

  • Reinstall the pinned wheel:
    pip install --force-reinstall opencv-python==4.8.1.78
  • Alternatively, use the distro package:
    deactivate
    sudo apt install -y python3-opencv

    Then change venv strategy or run system Python 3.11.

  • Tesseract poor accuracy:

  • Increase plate ROI resolution (raise camera resolution).
  • Improve illumination; avoid motion blur (use faster shutter).
  • Use a different Tesseract page segmentation mode (psm 7 vs 8 or 6).
  • Adjust whitelist and remove characters not present in your region.
  • Try a different threshold: Otsu vs adaptive.

  • No contours found or spurious detections:

  • Tune the Canny thresholds and morphological kernel size.
  • Adjust aspect ratio bounds in detect_plate_candidates (for your country’s plate format).
  • Consider cropping search area to expected vertical band (e.g., lower half of frame).

  • High CPU usage, low FPS:

  • Reduce resolution to 1280×720 in Picamera2 config.
  • Disable display (–display off).
  • Use PyTorch/CUDA? Not on Pi 4; prefer classical methods or tiny detectors.
  • Batch OCR only when contour is stable across consecutive frames (temporal smoothing).

  • Permission issues with GPIO:

  • Ensure your user is in gpio group (default on Raspberry Pi OS).
  • Run:
    groups
    If needed:
    sudo usermod -aG gpio $USER
    newgrp gpio

Improvements and Extensions

  • Robust plate detection with a trained detector:
  • Replace classical contour heuristic with a plate detector (e.g., YOLOv5n/YOLOv8n or Haar/LBP cascades). Run a lightweight model via OpenCV DNN or ONNXRuntime on CPU; use 320×320 input to keep latency reasonable.
  • Apply non‑maximum suppression and track boxes across frames (e.g., SORT) for stability and fewer OCR calls.

  • OCR specialization:

  • Train a custom OCR model for your region; or use PaddleOCR/EasyOCR for multi‑language.
  • Use Tesseract with custom language data tuned to plate fonts.
  • Implement character segmentation and per‑glyph classification (SVM/CNN) for strict formats.

  • Post‑processing:

  • Validate OCR against regional patterns (regex) and correct likely confusions (O/0, I/1, B/8, S/5).
  • Temporal voting: require consistent reads across N frames before reporting.

  • Camera control:

  • Lock exposure and white balance for consistency:

    • With Picamera2, set:
      picam.set_controls({"AeEnable": False, "AwbEnable": False, "ExposureTime": 8000, "AnalogueGain": 1.5})
    • Tune ExposureTime to avoid motion blur while maintaining SNR under your lighting.
  • Optics and illumination:

  • Use a longer focal length (e.g., 12 mm) for distant plates.
  • Add polarized illumination and a polarizing filter to reduce glare.
  • Consider IR‑sensitive setups with appropriate illumination and filters (within legal constraints).

  • Performance:

  • Use ROI strategy: detect car region first (background subtraction or motion box), then run plate detection inside ROI.
  • Vectorize pre‑processing; keep arrays in RGB and convert once; reuse buffers.
  • Multi‑thread capture and processing (producer/consumer queues).

  • Deployment:

  • Run as a systemd service to auto‑start on boot.
  • Write logs in CSV/JSON with timestamps, confidences, and image paths.
  • Provide a simple REST API (Flask/FastAPI) to fetch last N detections.

Final Checklist

  • Hardware
  • Raspberry Pi 4 and Raspberry Pi HQ Camera (IMX477) connected via CSI‑2 ribbon.
  • Lens mounted and focused; camera rigidly mounted.
  • Optional: Button on GPIO 17 to GND; LED on GPIO 27 via 330 Ω to GND.

  • OS and interfaces

  • Raspberry Pi OS Bookworm 64‑bit installed and updated.
  • Camera enabled via raspi‑config; I2C/SPI enabled if needed.
  • /boot/firmware/config.txt contains:
    • camera_auto_detect=1
    • dtoverlay=imx477
    • gpu_mem=256
  • Rebooted after changes.

  • Software environment

  • Project directory: ~/projects/anpr-rpi4
  • Python venv with –system-site-packages created and activated.
  • apt packages: libcamera-apps, python3-picamera2, tesseract-ocr, tesseract-ocr-eng, libtesseract-dev.
  • pip packages pinned: numpy==1.26.4, opencv-python==4.8.1.78, imutils==0.5.4, pytesseract==0.3.10, gpiozero==2.0, smbus2==0.4.3, spidev==3.6.
  • Sanity checks: libcamera-hello OK; tesseract –version OK; Python imports OK.

  • Code and execution

  • anpr_cam.py placed under ~/projects/anpr-rpi4/src, made executable.
  • Test image run: python src/anpr_cam.py –image /tmp/plate.png –display.
  • Live run: python src/anpr_cam.py –display –min-conf 55.0.
  • Output images saved under ~/projects/anpr-rpi4/out with timestamp and confidence.

  • Validation

  • Verified plate detection and readable OCR on controlled images.
  • Adjusted lens focus and lighting to improve accuracy.
  • Tuned thresholds (Canny, morphological ops, min_conf) for your scenario.

If all boxes are ticked, your Raspberry Pi 4 + HQ Camera is performing license plate recognition locally using OpenCV and Tesseract, meeting the “opencv-anpr-license-plates” objective with a reproducible, documented setup.

Find this product and/or books on this topic on Amazon

Go to Amazon

As an Amazon Associate, I earn from qualifying purchases. If you buy through this link, you help keep this project running.

Quick Quiz

Question 1: What is the primary objective of the project described in the article?




Question 2: Which operating system is recommended for the Raspberry Pi 4 in this project?




Question 3: What is the minimum RAM recommended for the Raspberry Pi 4 for this project?




Question 4: Which camera model is used in this ANPR system?




Question 5: What type of lens is suggested for use with the Raspberry Pi HQ Camera?




Question 6: What is the recommended power supply for the Raspberry Pi 4?




Question 7: What type of cable is specified for connecting the camera to the Raspberry Pi?




Question 8: What is a recommended optional component for capturing images in the project?




Question 9: What is the expertise level required for this project?




Question 10: What is the main software used for text recognition in this ANPR system?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Telecommunications Electronics Engineer and Computer Engineer (official degrees in Spain).

Follow me:


Practical case: Raspberry Pi 5 + Waveshare 2.9-inch e-Paper

Practical case: Raspberry Pi 5 + Waveshare 2.9-inch e-Paper — hero

Objective and use case

What you’ll build: A live system dashboard on Raspberry Pi 5 that displays real-time system health and network telemetry on a 2.9-inch e-Paper display using Python and SPI for efficient updates.

Why it matters / Use cases

  • Monitor system performance metrics like CPU temperature and memory usage directly on the e-Paper display.
  • Visualize network statistics such as packet loss and latency for troubleshooting connectivity issues.
  • Provide a low-power solution for remote monitoring applications, ideal for IoT deployments.
  • Utilize the e-Paper display’s readability in various lighting conditions for outdoor installations.

Expected outcome

  • Real-time updates of system metrics with a refresh rate of less than 1 second.
  • Power consumption under 1W during operation, ensuring energy efficiency.
  • Display of critical alerts when CPU temperature exceeds 75°C or memory usage exceeds 80%.
  • Network latency measurements displayed with an accuracy of ±5ms.

Audience: Intermediate developers; Level: Advanced Hands-On

Architecture/flow: Raspberry Pi 5 communicates with the Waveshare 2.9-inch e-Paper display over SPI, fetching data from system sensors and network interfaces.

Advanced Hands‑On: SPI e‑Paper Live Dashboard on Raspberry Pi 5 + Waveshare 2.9″ e‑Paper HAT

Build a responsive, low‑power system dashboard that renders live system health and network telemetry on a black‑and‑white 2.9″ e‑Paper display. You’ll drive the display over SPI from a Raspberry Pi 5 using Python 3.11 on Raspberry Pi OS Bookworm (64‑bit), leveraging partial refresh to keep updates quick and reduce ghosting.

The objective: spi‑epaper‑live‑dashboard on the exact device model “Raspberry Pi 5 + Waveshare 2.9″ e‑Paper HAT”.


Prerequisites

  • Hardware
  • Raspberry Pi 5 (4 GB or 8 GB recommended)
  • Waveshare 2.9″ e‑Paper HAT (black/white; 296×128 px)
  • Reliable 5V USB‑C power supply for Pi 5 (3A recommended)
  • microSD (≥16 GB, A1 or better)
  • Optional: USB keyboard/HDMI for first boot, or SSH

  • OS and tools

  • Raspberry Pi OS Bookworm 64‑bit (latest, with Linux 6.x)
  • Python 3.11 (default on Bookworm)
  • Internet connectivity for package installs

  • Skills

  • Comfortable with Linux shell and editing files
  • Familiarity with GPIO pinouts and SPI

Materials (with exact model)

  • 1× Raspberry Pi 5
  • 1× Waveshare 2.9″ e‑Paper HAT (HAT form factor for RPi 40‑pin header)
  • microSD card (≥16GB)
  • USB‑C 5V/3A PSU
  • Optional standoffs/screws for the HAT

Notes:
– The Waveshare 2.9″ e‑Paper HAT uses SPI0 and three control pins (DC, RST, BUSY). Out of the box, it maps to the Pi 40‑pin header.


Setup/Connection

1) OS install and first boot

  • Flash Raspberry Pi OS Bookworm 64‑bit using Raspberry Pi Imager.
  • On first boot, configure locale, Wi‑Fi, and enable SSH if needed.

2) Enable SPI interface

You can enable SPI with raspi‑config or by editing /boot/firmware/config.txt.

  • Using raspi‑config (interactive):
sudo raspi-config
# Finish and reboot
sudo reboot
  • Or edit config.txt directly:
sudo nano /boot/firmware/config.txt

Ensure this line exists (uncomment or add):

dtparam=spi=on

Save, exit, and reboot:

sudo reboot

After reboot, verify:

ls -l /dev/spidev0.0
# Expect: /dev/spidev0.0 device present

Optional: verify kernel module:

lsmod | grep spidev || echo "spidev not listed (may be built-in)"

3) Physical attachment and pin mapping

Mount the Waveshare 2.9″ e‑Paper HAT onto the 40‑pin GPIO header of the Raspberry Pi 5. The HAT is keyed to mate directly; no jumper wires are needed. For reference, the e‑Paper HAT uses the following pins by default:

Signal BCM GPIO Physical Pin Notes
3.3V 1 Board power for logic
GND 6 Ground reference
SCLK GPIO11 23 SPI0 SCLK
MOSI GPIO10 19 SPI0 MOSI
MISO GPIO9 21 SPI0 MISO (not used by some panels)
CS0 GPIO8 24 SPI0 CE0 (display chip select)
DC GPIO25 22 Data/Command select
RST GPIO17 11 e‑Paper hardware reset
BUSY GPIO24 18 Panel busy status (low=busy on most variants)

Important:
– The HAT aligns and routes these pins internally; do not double‑wire them unless you change the defaults.
– Double‑check that the HAT fully seats on the 40‑pin header.

4) Update system packages

sudo apt update
sudo apt full-upgrade -y
sudo reboot

5) Python 3.11 virtual environment and dependencies

We’ll create a project directory under /home/pi/spi-epaper-live-dashboard and install packages in a venv.

# Create project directory
mkdir -p /home/pi/spi-epaper-live-dashboard
cd /home/pi/spi-epaper-live-dashboard

# Base tools
sudo apt install -y python3.11-venv python3-pip python3-dev \
  libatlas-base-dev libjpeg-dev zlib1g-dev \
  fonts-dejavu-core \
  git

# Create and activate venv
python3 -m venv .venv
source .venv/bin/activate

# Upgrade pip tooling
pip install --upgrade pip wheel setuptools

# Install runtime Python deps
pip install pillow psutil gpiozero spidev waveshare-epd

Notes:
– gpiozero interacts with pins via the OS; spidev allows raw SPI access and is a waveshare‑epd dependency.
– pillow handles image buffers and font rendering.
– psutil supplies CPU/memory/network metrics.
– waveshare‑epd provides panel drivers; we’ll use epd2in9_V2.

Check installed versions:

pip show pillow psutil gpiozero spidev waveshare-epd
python -V

6) Permissions

Ensure your user is in the spi group:

sudo usermod -aG spi $USER
# Log out/in or reboot to apply

Full Code

Create the main application at /home/pi/spi-epaper-live-dashboard/dashboard.py.

This script:
– Initializes the Waveshare 2.9″ e‑Paper (296×128 px) in landscape.
– Renders a live dashboard: time, CPU load and temperature, memory and disk usage, IP address, and network throughput.
– Uses a partial refresh most cycles for speed, with periodic full refresh to de‑ghost.

#!/usr/bin/env python3
# /home/pi/spi-epaper-live-dashboard/dashboard.py
# Raspberry Pi 5 + Waveshare 2.9" e-Paper HAT
# spi-epaper-live-dashboard (Bookworm 64-bit, Python 3.11)

import os
import sys
import time
import math
import signal
import socket
import logging
from datetime import datetime

import psutil
from gpiozero import CPUTemperature
from PIL import Image, ImageDraw, ImageFont

# Waveshare driver for 2.9" V2 (296x128), black/white
from waveshare_epd import epd2in9_V2

APP_DIR = os.path.dirname(os.path.abspath(__file__))
FONT_SANS = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
FONT_MONO = "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf"

# Layout constants
DARK = 0      # black pixel on B/W panel
LIGHT = 255   # white pixel
W = 296
H = 128

# Update policy
PARTIAL_INTERVAL_SEC = 5        # partial update every 5 seconds
FULL_REFRESH_EVERY = 24         # after 24 partial cycles (~2 minutes), do a full refresh

# Panels prefer to clear after ~5 minutes of partial updates to avoid ghosting
# This policy trades responsiveness for panel health.

log = logging.getLogger("dashboard")
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s: %(message)s",
)

RUN = True

def handle_exit(signum, frame):
    global RUN
    RUN = False
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)

def get_ip():
    try:
        # Attempt to get default route IP by creating a UDP socket
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(0.2)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except Exception:
        # Fallback to hostname resolution
        try:
            return socket.gethostbyname(socket.gethostname())
        except Exception:
            return "0.0.0.0"

def human_bytes(n):
    # Format bytes/sec or totals
    units = ["B", "KB", "MB", "GB", "TB"]
    f = float(n)
    u = 0
    while f >= 1024 and u < len(units)-1:
        f /= 1024.0
        u += 1
    return f"{f:.1f}{units[u]}"

def draw_bar(draw, x, y, w, h, frac, label=None, value_text=None):
    # Background
    draw.rectangle((x, y, x+w, y+h), fill=LIGHT, outline=DARK, width=1)
    # Fill proportionally
    fill_w = int(max(0, min(1.0, frac)) * (w-2))
    draw.rectangle((x+1, y+1, x+1+fill_w, y+h-1), fill=DARK)
    # Optional text overlay
    if label:
        draw.text((x+4, y-18), label, font=FONT_SMALL, fill=DARK)
    if value_text:
        tw, th = draw.textsize(value_text, font=FONT_SMALL)
        draw.text((x+w-tw-4, y-18), value_text, font=FONT_SMALL, fill=DARK)

def layout_dashboard(epd, metrics, do_full=False):
    # Create a fresh canvas (landscape). waveshare drivers often use (H, W) swap.
    image = Image.new('1', (W, H), LIGHT)
    draw = ImageDraw.Draw(image)

    # Title and time
    now = datetime.now()
    title = "LIVE DASH"
    draw.text((8, 6), title, font=FONT_BOLD, fill=DARK)
    timestr = now.strftime("%Y-%m-%d %H:%M:%S")
    tw, th = draw.textsize(timestr, font=FONT_MED)
    draw.text((W - tw - 8, 6), timestr, font=FONT_MED, fill=DARK)

    # CPU row
    cpu_text = f"CPU {metrics['cpu_percent']:>4.1f}%  {metrics['cpu_temp']:>4.1f}°C  {metrics['freq_mhz']:>4.0f}MHz"
    draw.text((8, 30), cpu_text, font=FONT_MED, fill=DARK)
    draw_bar(draw, 8, 54, 160, 16, metrics['cpu_percent']/100.0, label="CPU", value_text=f"{metrics['cpu_percent']:.0f}%")

    # Memory row
    mem = metrics['mem']
    mem_text = f"MEM {mem['used_mb']:>4.0f}/{mem['total_mb']:>4.0f}MB"
    draw.text((8, 80), mem_text, font=FONT_MED, fill=DARK)
    draw_bar(draw, 8, 102, 160, 16, mem['used_mb']/mem['total_mb'], label="RAM", value_text=f"{mem['used_mb']:.0f}MB")

    # Right column: Disk, Net, IP
    x0 = 180
    draw.text((x0, 30), f"DISK {metrics['disk']['used_gb']:.1f}/{metrics['disk']['total_gb']:.1f}GB", font=FONT_MED, fill=DARK)
    draw.text((x0, 50), f"NET  ↓{metrics['net']['rx']}  ↑{metrics['net']['tx']}", font=FONT_MED, fill=DARK)
    draw.text((x0, 70), f"IP   {metrics['ip']}", font=FONT_MED, fill=DARK)
    draw.text((x0, 90), f"LOAD {metrics['loadavg']}", font=FONT_MED, fill=DARK)
    draw.text((x0, 110), "REF  FULL" if do_full else "REF  PART", font=FONT_MED, fill=DARK)

    return image

def collect_metrics(prev_net):
    cpu = psutil.cpu_percent(interval=None)
    try:
        temp = CPUTemperature().temperature
    except Exception:
        temp = psutil.sensors_temperatures().get('cpu_thermal', [{}])[0].get('current', 0.0)
    freq = psutil.cpu_freq()
    freq_mhz = freq.current if freq else 0.0

    vm = psutil.virtual_memory()
    mem = {
        "total_mb": vm.total / (1024*1024),
        "used_mb": (vm.total - vm.available) / (1024*1024),
    }

    du = psutil.disk_usage("/")
    disk = {
        "total_gb": du.total / (1024*1024*1024),
        "used_gb": du.used / (1024*1024*1024),
    }

    ip = get_ip()

    n = psutil.net_io_counters()
    if prev_net is None:
        rx_rate = tx_rate = 0.0
    else:
        dt = max(1e-6, time.time() - prev_net['t'])
        rx_rate = (n.bytes_recv - prev_net['rx']) / dt
        tx_rate = (n.bytes_sent - prev_net['tx']) / dt

    load1, load5, load15 = os.getloadavg()
    metrics = {
        "cpu_percent": cpu,
        "cpu_temp": temp,
        "freq_mhz": freq_mhz,
        "mem": mem,
        "disk": disk,
        "ip": ip,
        "net": {
            "rx": human_bytes(rx_rate) + "/s",
            "tx": human_bytes(tx_rate) + "/s"
        },
        "loadavg": f"{load1:.2f},{load5:.2f},{load15:.2f}",
    }
    new_prev = {"rx": n.bytes_recv, "tx": n.bytes_sent, "t": time.time()}
    return metrics, new_prev

def main():
    global RUN
    # Load fonts
    size_small = 12
    size_med = 16
    size_bold = 20

    global FONT_SMALL, FONT_MED, FONT_BOLD
    try:
        FONT_SMALL = ImageFont.truetype(FONT_MONO, size_small)
        FONT_MED = ImageFont.truetype(FONT_SANS, size_med)
        FONT_BOLD = ImageFont.truetype(FONT_SANS, size_bold)
    except Exception as e:
        log.warning("Falling back to default PIL fonts: %s", e)
        FONT_SMALL = ImageFont.load_default()
        FONT_MED = ImageFont.load_default()
        FONT_BOLD = ImageFont.load_default()

    log.info("Initializing e-Paper...")
    epd = epd2in9_V2.EPD()
    epd.init()
    epd.Clear(0xFF)

    # Base frame to reduce flashing on partial updates
    base_image = Image.new('1', (W, H), LIGHT)
    epd.displayPartBaseImage(epd.getbuffer(base_image))

    counter = 0
    prev_net = None

    try:
        while RUN:
            do_full = (counter % FULL_REFRESH_EVERY == 0)

            metrics, prev_net = collect_metrics(prev_net)

            canvas = layout_dashboard(epd, metrics, do_full=do_full)

            if do_full:
                log.info("Full refresh")
                epd.init()            # re-init full update waveform
                epd.display(epd.getbuffer(canvas))
                # Reset base after full refresh to reduce ghosting drift
                epd.displayPartBaseImage(epd.getbuffer(canvas))
            else:
                # Partial refresh
                epd.displayPartial(epd.getbuffer(canvas))

            counter += 1
            # Sleep until next partial update
            for _ in range(PARTIAL_INTERVAL_SEC * 10):
                if not RUN:
                    break
                time.sleep(0.1)

    except Exception as e:
        log.exception("Unhandled error: %s", e)
    finally:
        # Put panel to sleep to preserve lifetime
        try:
            epd.init()
            epd.sleep()
        except Exception as e:
            log.warning("Failed to sleep panel cleanly: %s", e)
        log.info("Done.")

if __name__ == "__main__":
    main()

Optional small hardware check script at /home/pi/spi-epaper-live-dashboard/check_spi.py:

#!/usr/bin/env python3
# Minimal SPI sanity check (does not drive panel fully)
import spidev
s = spidev.SpiDev()
s.open(0, 0)      # bus 0, CE0
s.max_speed_hz = 4000000
s.mode = 0
print("spidev opened:", s)
# Read back nothing meaningful from panel, but at least ensure write call succeeds:
s.xfer2([0x00])
s.close()
print("SPI xfer2 OK at 4MHz, mode 0")

Make both scripts executable:

chmod +x /home/pi/spi-epaper-live-dashboard/dashboard.py
chmod +x /home/pi/spi-epaper-live-dashboard/check_spi.py

Build/Flash/Run Commands

1) Create and activate the environment

cd /home/pi/spi-epaper-live-dashboard
source .venv/bin/activate

2) Quick SPI verification

python ./check_spi.py

Expected: prints that SPI opened and xfer2 OK. If not, check SPI enabling and permissions.

3) First dashboard run (foreground)

python ./dashboard.py

The display should clear, then show the dashboard. Every 5 seconds it should partially update. Every ~2 minutes it should do a full refresh (brief flash).

Stop with Ctrl+C.

4) Optional: systemd service for auto‑start at boot

Create /etc/systemd/system/epaper-dashboard.service:

sudo tee /etc/systemd/system/epaper-dashboard.service >/dev/null <<'UNIT'
[Unit]
Description=SPI e-Paper Live Dashboard (Waveshare 2.9") on Raspberry Pi 5
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi/spi-epaper-live-dashboard
Environment=PYTHONUNBUFFERED=1
ExecStart=/home/pi/spi-epaper-live-dashboard/.venv/bin/python /home/pi/spi-epaper-live-dashboard/dashboard.py
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target
UNIT

Reload, enable, and start:

sudo systemctl daemon-reload
sudo systemctl enable epaper-dashboard.service
sudo systemctl start epaper-dashboard.service
sudo systemctl status epaper-dashboard.service --no-pager

Stop/disable if needed:

sudo systemctl stop epaper-dashboard.service
sudo systemctl disable epaper-dashboard.service

Step‑by‑step Validation

Follow this checklist to validate each layer before blaming the next.

1) OS and Python
– Verify 64‑bit Bookworm and Python 3.11:

uname -m && lsb_release -ds
python3 -V

Expect: aarch64, “Raspberry Pi OS Bookworm”, and Python 3.11.x.

2) SPI enabled and device nodes present
– Confirm /dev/spidev0.0:

ls -l /dev/spidev0.0
  • Confirm dtparam is active:
grep -E '^dtparam=spi=on' /boot/firmware/config.txt
  • Check that your user is in the spi group:
id -nG | tr ' ' '\n' | grep -x spi || echo "User not in spi group"

3) HAT seating and pin mapping
– Press the HAT gently to ensure it fully seats on the 40‑pin header.
– Ensure no standoffs short pins.
– If using jumpers instead of HAT, confirm connections match the table earlier exactly.

4) Python environment and dependencies
– Activate venv:

source /home/pi/spi-epaper-live-dashboard/.venv/bin/activate
  • Check packages:
python -c "import PIL,psutil,spidev,gpiozero; print('deps OK')"
python -c "from waveshare_epd import epd2in9_V2; print('waveshare-epd OK')"

5) SPI sanity
– Run check script:

python /home/pi/spi-epaper-live-dashboard/check_spi.py

6) First image on panel
– Run the dashboard:

python /home/pi/spi-epaper-live-dashboard/dashboard.py

Observe:
– Initial clear to white.
– Dashboard appears with time, CPU/Temp, RAM, DISK, NET, IP, LOAD.
– Partial updates every ~5 seconds show changing CPU percentage and network throughput.
– Full refresh every ~2 minutes briefly flashes, then restores crisp text.

7) Service mode
– Start the systemd service and confirm it holds through reboots.


Troubleshooting

  • Symptom: python import error for waveshare_epd
  • Cause: Package not installed in venv.
  • Fix: Activate venv, then pip install waveshare-epd. Verify with import test.

  • Symptom: PermissionError: [Errno 13] Permission denied: ‘/dev/spidev0.0’

  • Cause: SPI not enabled or user not in spi group.
  • Fix: Enable in raspi-config or config.txt, add user to spi group, reboot.

  • Symptom: /dev/spidev0.0 missing

  • Cause: SPI disabled or kernel missing spidev device.
  • Fix: Ensure dtparam=spi=on in /boot/firmware/config.txt, reboot. Check dmesg | grep -i spi.

  • Symptom: e‑Paper stuck white or black, BUSY never clears

  • Causes:
    • BUSY/DC/RST pins mismatched (only if manually wired).
    • Incomplete init or previous crash during update.
  • Fix:

    • Power‑cycle the Pi to hard reset the panel.
    • Confirm pin mappings per table.
    • Ensure stable 3.3V rail; avoid undervoltage (check dmesg for “Under-voltage detected!”).
  • Symptom: Ghosting accumulates (shadows of old content)

  • Cause: Too many partial updates without full refresh.
  • Fix: Increase rate of full refresh (reduce FULL_REFRESH_EVERY). Consider calling epd.Clear(0xFF) every 5–10 minutes.

  • Symptom: Fonts look jagged or too small

  • Fix: Use a heavier/bold font or larger size. Ensure fonts-dejavu-core is installed. Adjust FONT_SANS/MONO sizes in the script.

  • Symptom: High CPU load from Python drawing

  • Cause: Excessively frequent refresh or large anti‑aliased fonts.
  • Fix: Keep PARTIAL_INTERVAL_SEC ≥ 3–5 seconds. Avoid complex graphics. Stick to 1‑bit images.

  • Symptom: Network throughput always shows 0.0B/s

  • Cause: Baseline sample is the first call; rate needs two measurements.
  • Fix: Wait one cycle; subsequent partial updates will show non‑zero values.

  • Symptom: ModuleNotFoundError: _lgpio or RPi.GPIO warnings

  • Cause: gpiozero backend fallback.
  • Fix: In Bookworm, gpiozero should work out of the box. If not, try installing python3-lgpio via apt for system Python, or avoid gpiozero by using psutil.sensors_temperatures directly.

  • Symptom: Service runs but screen doesn’t update

  • Causes:
    • Service runs before SPI device is ready.
    • Wrong WorkingDirectory or venv path.
  • Fix:

    • Add After=spi.target (if available) or a small ExecStartPre sleep.
    • Double-check ExecStart path and WorkingDirectory.
  • Diagnostic commands

  • Check systemd logs:
journalctl -u epaper-dashboard.service -e --no-pager
  • Inspect pin states (BUSY high/low):
raspi-gpio get 24 25 17
  • Confirm SPI bus activity:
sudo strace -f -e trace=openat /home/pi/spi-epaper-live-dashboard/.venv/bin/python /home/pi/spi-epaper-live-dashboard/dashboard.py

Improvements

  • Content sources
  • Subscribe to MQTT topics and render application metrics or sensor values.
  • Add local sensor widgets (e.g., I2C temperature/humidity) and rotate pages every minute.

  • Rendering quality and lifetime

  • Implement scheduled deep clears (epd.Clear(0xFF)) every 10 minutes to purge ghosting.
  • Use dithering for small icons or QR codes while preserving 1‑bit output.

  • Performance tuning

  • Batch drawing into regions; if your panel/driver supports partial window updates, restrict refresh to changing rectangles to reduce flicker and extend panel life.
  • Pre‑render static assets (labels, lines) into a base image and composite only the dynamic overlays each cycle.

  • Robustness

  • Wrap updates with a watchdog; if an exception occurs, reset the panel (RST pin toggled by driver init).
  • Add graceful backoff when network metrics are unavailable.

  • Deployment

  • Use a Python packaging structure and a lock file (pip-tools or uv) to pin versions and ensure reproducibility.
  • Containerize on Bookworm with systemd‑nspawn or podman if isolating dependencies is important.

  • Power/use case expansions

  • Pair with a battery HAT and use cron or a button to wake, refresh a snapshot, then sleep the Pi for low‑duty signage.
  • Integrate with Home Assistant via REST/MQTT for smart home status display.

  • UI/UX

  • Implement page flipping via a GPIO button (gpiozero Button on BCM5) to cycle dashboards.
  • Add large glyphs for glanceable status (e.g., WIFI strength bars, USB device count, service health).

Final Checklist

  • Hardware
  • Raspberry Pi 5 powered with a 5V/3A supply
  • Waveshare 2.9″ e‑Paper HAT seated on the 40‑pin header
  • No shorts, secure mechanical mounting

  • OS and interfaces

  • Raspberry Pi OS Bookworm 64‑bit installed
  • SPI enabled: dtparam=spi=on in /boot/firmware/config.txt
  • /dev/spidev0.0 present
  • User added to spi group

  • Python environment

  • Project directory: /home/pi/spi-epaper-live-dashboard
  • Virtual environment created: .venv present
  • Dependencies installed in venv: pillow, psutil, gpiozero, spidev, waveshare-epd
  • Fonts installed: fonts-dejavu-core

  • Application

  • dashboard.py executable and configured with correct font paths
  • check_spi.py runs without errors
  • Foreground run shows dashboard and periodic updates
  • Partial refresh cadence ~5s; full refresh ~2 min (configurable)

  • Service (optional)

  • systemd unit at /etc/systemd/system/epaper-dashboard.service
  • Enabled and started; persists across reboot
  • Logs clean in journalctl

  • Validation complete

  • CPU %, temperature, RAM, disk, IP, load average, and net throughput values look reasonable
  • Panel sleeps on exit; no persistent ghosting after periodic deep clears

With this setup, your Raspberry Pi 5 + Waveshare 2.9″ e‑Paper HAT becomes a silent, low‑power SPI‑driven live dashboard suitable for desks, labs, or server racks—updating just enough to stay useful while preserving the panel’s lifetime.

Find this product and/or books on this topic on Amazon

Go to Amazon

As an Amazon Associate, I earn from qualifying purchases. If you buy through this link, you help keep this project running.

Quick Quiz

Question 1: What is the recommended RAM for Raspberry Pi 5?




Question 2: Which display technology is used in the project?




Question 3: What is the minimum capacity for the microSD card?




Question 4: What version of Python is used in the project?




Question 5: What type of power supply is recommended for the Raspberry Pi 5?




Question 6: Which operating system is required for the project?




Question 7: What is the pixel resolution of the Waveshare 2.9" e-Paper HAT?




Question 8: What command is used to enable SPI interface in raspi-config?




Question 9: What is the purpose of the USB keyboard/HDMI for first boot?




Question 10: Which GPIO pins are used for the Waveshare e-Paper HAT?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Telecommunications Electronics Engineer and Computer Engineer (official degrees in Spain).

Follow me:


Practical case: RPi Zero 2 W & Dragino LoRa-MQTT bridge

Practical case: RPi Zero 2 W & Dragino LoRa-MQTT bridge — hero

Objective and use case

What you’ll build: Transform your Raspberry Pi Zero 2 W into a powerful LoRa-to-MQTT gateway bridge using Dragino LoRa/GPS HAT and Python 3.11. This project enables the continuous reception of raw LoRa frames and their publication to an MQTT broker.

Why it matters / Use cases

  • Enables remote sensor data collection in agricultural applications using LoRa technology.
  • Facilitates real-time monitoring of environmental conditions in smart city projects.
  • Supports IoT device communication in areas with limited internet connectivity.
  • Allows for the integration of GPS data for location-based services in logistics.
  • Provides a cost-effective solution for long-range data transmission in industrial automation.

Expected outcome

  • Achieve a data transmission rate of up to 10 packets/s from LoRa devices to the MQTT broker.
  • Maintain latencies under 200ms for end-to-end message delivery.
  • Ensure a successful connection to the MQTT broker with a 99% uptime.
  • Receive structured JSON payloads with metadata including RSSI and SNR for each message.
  • Validate the integrity of received data with a 95% accuracy rate in processing.

Audience: IoT developers and hobbyists; Level: Intermediate

Architecture/flow: Raspberry Pi Zero 2 W with Dragino LoRa/GPS HAT communicates with LoRa devices, processes data, and publishes to an MQTT broker.

Raspberry Pi Zero 2 W + Dragino LoRa/GPS HAT: Advanced LoRa–MQTT Gateway Bridge (Raspberry Pi OS Bookworm, Python 3.11)

This hands-on, end-to-end build turns a Raspberry Pi Zero 2 W with a Dragino LoRa/GPS HAT into a robust, headless LoRa-to-MQTT gateway bridge. It initializes the SX127x LoRa radio over SPI, continuously receives raw LoRa frames, enriches them with metadata (RSSI, SNR, timestamp, optional GPS), and publishes structured JSON payloads to an MQTT broker. We’ll use Raspberry Pi OS Bookworm (64-bit) and Python 3.11 inside a venv, configure SPI and UART, and validate at each layer: SPI, radio configuration, GPS feed, MQTT connectivity, and end-to-end message flow.

The project is intentionally protocol-agnostic (raw LoRa frames, not LoRaWAN). This keeps the focus on the “gateway bridge” pattern and allows you to bring your own over-the-air framing later. The provided code is production-ready to the extent that it properly initializes radio RX, handles interrupts, debounces error flags, reconnects to MQTT, and optionally runs under systemd.


Prerequisites

  • A clean Raspberry Pi OS Bookworm 64-bit installation on a microSD card.
  • Internet connectivity for the Pi Zero 2 W (Wi‑Fi).
  • Basic familiarity with Linux and Python virtual environments.
  • A power supply capable of powering the Pi Zero 2 W and HAT reliably.
  • Optional but recommended: another LoRa transmitter configured to the same frequency/modem settings for end-to-end testing. If not available, you’ll still validate SPI, radio registers, and MQTT publication using the built-in test mode.

Environment assumptions:
– Hostname: rpi-zero2w
– User: pi
– Python 3.11 installed by default on Bookworm
– Local MQTT broker (mosquitto) on the Pi, or a reachable remote broker


Materials (exact model)

  • Raspberry Pi Zero 2 W
  • Dragino LoRa/GPS HAT (SX1276/77/78/79-based LoRa + GPS module)
  • 40-pin header (soldered to Pi Zero 2 W) and HAT stacking hardware
  • MicroSD card (16 GB+ recommended), flashed with Raspberry Pi OS Bookworm 64-bit
  • Micro-USB power supply (5 V / 2.5 A recommended)
  • Optional: u.FL or SMA antenna (matched to your frequency band and local regulations)
  • Optional: case/enclosure that accommodates the HAT and antenna connector

Setup/Connection

1) Physical assembly

  • Power off the Pi completely.
  • Seat the Dragino LoRa/GPS HAT onto the 40-pin header of the Raspberry Pi Zero 2 W.
  • Attach a proper LoRa antenna to the HAT’s RF connector. Never transmit without an antenna. For RX-only use, you still need an antenna for reasonable sensitivity.

The HAT uses the standard Raspberry Pi 40-pin interface; the most relevant signal mapping for this project is summarized below.

2) Pins and signal mapping

The Dragino LoRa/GPS HAT connects directly to the 40-pin header; these are the key signals the software uses:

Function Raspberry Pi GPIO Pin # Notes
SPI0 CE0 (NSS) GPIO8 24 SX127x chip select
SPI0 SCLK GPIO11 23 SPI clock
SPI0 MOSI GPIO10 19 SPI MOSI
SPI0 MISO GPIO9 21 SPI MISO
SX127x DIO0 GPIO25 22 RX done interrupt
SX127x RST GPIO17 11 Reset pin for LoRa radio
GPS TX GPIO14 (TXD0) 8 From Pi to GPS (usually not used)
GPS RX GPIO15 (RXD0) 10 From GPS to Pi (NMEA in)
GPS PPS (optional) GPIO4 7 If the HAT exposes PPS
3.3V / 5V / GND Standard power and ground

Notes:
– This mapping matches the common Dragino LoRa/GPS HAT defaults and is silkscreened on most boards. If your board revision differs, adjust the GPIO numbers in the Python code accordingly.
– We will use DIO0 for “RxDone” interrupts and RST to reliably reset the radio on startup.
– GPS runs on the primary UART (serial0). We’ll disable the Bluetooth overlay so serial0 is stable and dedicated to the HAT’s GPS module.

3) Enable interfaces and configure boot files

On Raspberry Pi OS Bookworm, firmware files live under /boot/firmware. We’ll enable SPI and the UART, disable the serial console, and reassign the PL011 UART to GPIO14/15 for GPS by disabling Bluetooth.

  • Update system and install raspi-config:
sudo apt update
sudo apt full-upgrade -y
sudo apt install -y raspi-config
  • Enable SPI:
sudo raspi-config nonint do_spi 0
  • Enable the serial interface but disable the login shell over serial:
sudo raspi-config nonint do_serial 1
  • Edit /boot/firmware/config.txt to ensure SPI enabled and Bluetooth disabled to free PL011 for GPS:
sudo nano /boot/firmware/config.txt

Add (or ensure) the following lines exist (preferably near other dtoverlay entries):

dtparam=spi=on
dtoverlay=disable-bt
  • Remove serial console from cmdline (if present) so the UART isn’t claimed by the console:
sudo sed -i 's/console=serial0,[0-9]* //g' /boot/firmware/cmdline.txt
  • Disable the Bluetooth UART service (if present) and reboot:
sudo systemctl disable --now hciuart.service || true
sudo reboot

After reboot, verify SPI and serial:

ls -l /dev/spidev0.0
ls -l /dev/serial0

You should see /dev/spidev0.0 and /dev/serial0 present.


Full Code

We’ll build a single Python application that:
– Initializes the SX127x (via SPI) in LoRa mode, sets frequency/modem config, and enters RX continuous.
– Uses GPIO interrupt on DIO0 for RxDone, reads the FIFO, computes RSSI/SNR, and publishes to MQTT as JSON.
– Optionally reads GPS NMEA (serial0) to add coordinates/time metadata.
– Has a test mode to inject a synthetic payload directly to MQTT for validation.

Save as ~/lora-mqtt-bridge/bridge.py.

#!/usr/bin/env python3
# Target: Raspberry Pi Zero 2 W + Dragino LoRa/GPS HAT
# Python: 3.11 (Raspberry Pi OS Bookworm)
#
# Requires: spidev, RPi.GPIO, paho-mqtt, pyserial, gpiozero (installed), smbus2 (installed)
#
# Default pin mapping (Dragino LoRa/GPS HAT typical):
#   LoRa SPI: /dev/spidev0.0 (CE0)
#   DIO0 = GPIO25 (RX done)
#   RST  = GPIO17
#   GPS  = /dev/serial0 9600-115200 (NMEA)
#
# For raw LoRa RX. Not LoRaWAN (no MAC/crypto).
#
# MQTT topics:
#   publish: lora/rx/raw
#   payload JSON includes: hex/base64 payload, rssi, snr, freq_hz, sf, bw, timestamp, gps (when available).

import os
import time
import json
import base64
import signal
import threading
from datetime import datetime, timezone
from typing import Optional, Tuple

import spidev
import RPi.GPIO as GPIO
import serial
from paho.mqtt.client import Client as MQTTClient

# --------------------
# Configuration (env)
# --------------------
LORA_FREQ_HZ = int(os.getenv("LORA_FREQ_HZ", "868100000"))   # 868.1 MHz default (EU868). Use 915000000 for US915.
LORA_SF = int(os.getenv("LORA_SF", "7"))                     # 7..12
LORA_BW_KHZ = int(os.getenv("LORA_BW_KHZ", "125"))           # 125, 250, 500
LORA_CR = os.getenv("LORA_CR", "4/5")                        # 4/5, 4/6, 4/7, 4/8
LORA_CRC_ON = os.getenv("LORA_CRC_ON", "1") == "1"           # enable CRC expectation
LORA_PREAMBLE = int(os.getenv("LORA_PREAMBLE", "8"))         # symbols

PIN_DIO0 = int(os.getenv("PIN_DIO0", "25"))
PIN_RST = int(os.getenv("PIN_RST", "17"))

SPI_BUS = int(os.getenv("SPI_BUS", "0"))
SPI_DEV = int(os.getenv("SPI_DEV", "0"))
SPI_MAX_HZ = int(os.getenv("SPI_MAX_HZ", "8000000"))  # 8 MHz safe

MQTT_HOST = os.getenv("MQTT_HOST", "localhost")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
MQTT_USERNAME = os.getenv("MQTT_USERNAME", "")
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "")
MQTT_TOPIC = os.getenv("MQTT_TOPIC", "lora/rx/raw")
MQTT_CLIENT_ID = os.getenv("MQTT_CLIENT_ID", "rpi-zero2w-lora-bridge")
MQTT_KEEPALIVE = int(os.getenv("MQTT_KEEPALIVE", "30"))
MQTT_TLS = os.getenv("MQTT_TLS", "0") == "1"

GPS_ENABLE = os.getenv("GPS_ENABLE", "1") == "1"
GPS_PORT = os.getenv("GPS_PORT", "/dev/serial0")
GPS_BAUD = int(os.getenv("GPS_BAUD", "9600"))

TEST_PAYLOAD = os.getenv("TEST_PAYLOAD", "")  # If set, inject into MQTT and exit

# SX127x registers
REG_FIFO = 0x00
REG_OP_MODE = 0x01
REG_FRF_MSB = 0x06
REG_FRF_MID = 0x07
REG_FRF_LSB = 0x08
REG_PA_CONFIG = 0x09
REG_LNA = 0x0C
REG_FIFO_ADDR_PTR = 0x0D
REG_FIFO_TX_BASE_ADDR = 0x0E
REG_FIFO_RX_BASE_ADDR = 0x0F
REG_FIFO_RX_CURRENT_ADDR = 0x10
REG_IRQ_FLAGS_MASK = 0x11
REG_IRQ_FLAGS = 0x12
REG_RX_NB_BYTES = 0x13
REG_PKT_SNR_VALUE = 0x19
REG_PKT_RSSI_VALUE = 0x1A
REG_MODEM_CONFIG_1 = 0x1D
REG_MODEM_CONFIG_2 = 0x1E
REG_SYMB_TIMEOUT_LSB = 0x1F
REG_PREAMBLE_MSB = 0x20
REG_PREAMBLE_LSB = 0x21
REG_PAYLOAD_LENGTH = 0x22
REG_MODEM_CONFIG_3 = 0x26
REG_DIO_MAPPING_1 = 0x40
REG_VERSION = 0x42

# OpMode bits
LONG_RANGE_MODE = 0x80  # LoRa
MODE_SLEEP = 0x00
MODE_STDBY = 0x01
MODE_RXCONT = 0x05

# Band threshold: HF port above 525MHz
HF_PORT = True

class SX127x:
    def __init__(self, spi_bus, spi_dev, max_hz, pin_dio0, pin_rst):
        self.spi = spidev.SpiDev()
        self.spi.open(spi_bus, spi_dev)
        self.spi.max_speed_hz = max_hz
        self.spi.mode = 0

        self.pin_dio0 = pin_dio0
        self.pin_rst = pin_rst

        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.pin_rst, GPIO.OUT, initial=GPIO.HIGH)
        GPIO.setup(self.pin_dio0, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

        self.rx_callback = None
        self.lock = threading.Lock()

    def close(self):
        try:
            self.spi.close()
        except Exception:
            pass
        GPIO.cleanup()

    def _write_reg(self, addr, val):
        # bit7=1 for write
        self.spi.xfer2([addr | 0x80, val & 0xFF])

    def _read_reg(self, addr):
        # bit7=0 for read
        return self.spi.xfer2([addr & 0x7F, 0x00])[1]

    def _burst_read(self, addr, length):
        # read 'length' bytes from 'addr'
        resp = self.spi.xfer2([addr & 0x7F] + [0x00] * length)
        return resp[1:]

    def _burst_write(self, addr, data):
        self.spi.xfer2([addr | 0x80] + list(data))

    def reset(self):
        # Hardware reset: drive low, then high
        GPIO.output(self.pin_rst, GPIO.LOW)
        time.sleep(0.01)
        GPIO.output(self.pin_rst, GPIO.HIGH)
        time.sleep(0.01)

    def set_mode(self, mode_bits):
        self._write_reg(REG_OP_MODE, LONG_RANGE_MODE | mode_bits)

    def init_lora(self, freq_hz, sf, bw_khz, cr, crc_on, preamble):
        # Reset & check version
        self.reset()
        time.sleep(0.01)
        ver = self._read_reg(REG_VERSION)
        if ver == 0x00 or ver == 0xFF:
            raise RuntimeError("SX127x not responding on SPI (REG_VERSION read as 0x%02X)" % ver)

        # Sleep LoRa
        self.set_mode(MODE_SLEEP)
        time.sleep(0.01)

        # Frequency
        frf = int(freq_hz / (32e6 / (1 << 19)))
        self._write_reg(REG_FRF_MSB, (frf >> 16) & 0xFF)
        self._write_reg(REG_FRF_MID, (frf >> 8) & 0xFF)
        self._write_reg(REG_FRF_LSB, frf & 0xFF)

        # LNA: max gain, boost on
        self._write_reg(REG_LNA, 0x23)  # 0b0010_0011

        # BW
        bw_map = {125: 0x70, 250: 0x80, 500: 0x90}
        if bw_khz not in bw_map:
            raise ValueError("Unsupported BW kHz: %s" % bw_khz)
        bw_bits = bw_map[bw_khz]

        # Coding rate
        cr_map = {"4/5": 0x02, "4/6": 0x04, "4/7": 0x06, "4/8": 0x08}
        if cr not in cr_map:
            raise ValueError("Unsupported CR: %s" % cr)
        cr_bits = cr_map[cr]

        # Explicit header mode
        modem_config_1 = bw_bits | cr_bits | 0x00
        self._write_reg(REG_MODEM_CONFIG_1, modem_config_1)

        # SF
        if sf < 6 or sf > 12:
            raise ValueError("SF must be 6..12")
        sf_bits = (sf << 4) & 0xF0

        # CRC on?
        crc_bits = 0x04 if crc_on else 0x00
        modem_config_2 = sf_bits | crc_bits | 0x03  # SymbTimeout MSB2 bits as 0b11 default
        self._write_reg(REG_MODEM_CONFIG_2, modem_config_2)

        # Modem config 3: LowDataRateOptimize if SF11/12 at low BW; AGC on
        ldo = 0x08 if (bw_khz == 125 and sf >= 11) else 0x00
        self._write_reg(REG_MODEM_CONFIG_3, ldo | 0x04)

        # Preamble
        self._write_reg(REG_PREAMBLE_MSB, (preamble >> 8) & 0xFF)
        self._write_reg(REG_PREAMBLE_LSB, preamble & 0xFF)

        # Set FIFO base addresses
        self._write_reg(REG_FIFO_RX_BASE_ADDR, 0x00)
        self._write_reg(REG_FIFO_TX_BASE_ADDR, 0x80)
        self._write_reg(REG_PAYLOAD_LENGTH, 0xFF)  # allow full 255

        # IRQ: unmask RxDone, RxTimeout, ValidHeader, CRCError
        irq_mask = 0x00  # we’ll handle in flags
        self._write_reg(REG_IRQ_FLAGS_MASK, irq_mask)

        # Map DIO0 to RxDone (00)
        self._write_reg(REG_DIO_MAPPING_1, 0x00)

        # Standby then RX continuous
        self.set_mode(MODE_STDBY)
        time.sleep(0.01)
        self._clear_irq()
        self.set_mode(MODE_RXCONT)

    def _clear_irq(self):
        self._write_reg(REG_IRQ_FLAGS, 0xFF)

    def _read_packet(self) -> Optional[dict]:
        # Called when DIO0 indicates RxDone
        irq = self._read_reg(REG_IRQ_FLAGS)
        # Save then clear
        self._clear_irq()

        if irq & 0x20:  # CRC Error
            return None
        if irq & 0x40 == 0x00:  # RxDone not set
            return None

        # Read FIFO
        current_addr = self._read_reg(REG_FIFO_RX_CURRENT_ADDR)
        nb = self._read_reg(REG_RX_NB_BYTES)
        self._write_reg(REG_FIFO_ADDR_PTR, current_addr)
        data = self._burst_read(REG_FIFO, nb)

        # SNR and RSSI
        raw_snr = self._read_reg(REG_PKT_SNR_VALUE)
        snr = (raw_snr if raw_snr < 128 else raw_snr - 256) / 4.0
        raw_rssi = self._read_reg(REG_PKT_RSSI_VALUE)
        rssi = raw_rssi - (157 if HF_PORT else 164)

        return {
            "payload_bytes": bytes(data),
            "rssi": rssi,
            "snr": snr,
            "num_bytes": nb,
        }

    def on_dio0(self, channel):
        # ISR -> schedule processing in a thread
        if self.rx_callback:
            try:
                with self.lock:
                    pkt = self._read_packet()
                if pkt:
                    self.rx_callback(pkt)
            except Exception as e:
                print(f"[DIO0 handler] Error: {e}")

    def start_rx(self, rx_callback):
        self.rx_callback = rx_callback
        GPIO.add_event_detect(self.pin_dio0, GPIO.RISING, callback=self.on_dio0, bouncetime=1)


class GPSReader(threading.Thread):
    def __init__(self, port, baud):
        super().__init__(daemon=True)
        self.port = port
        self.baud = baud
        self.latest = None  # (lat, lon, fix_time_iso) or None
        self._stop = threading.Event()
        self.ser = None

    def run(self):
        try:
            self.ser = serial.Serial(self.port, self.baud, timeout=1)
        except Exception as e:
            print(f"[GPS] Unable to open {self.port}: {e}")
            return
        while not self._stop.is_set():
            try:
                line = self.ser.readline().decode(errors="ignore").strip()
                if line.startswith("$GPRMC") or line.startswith("$GNRMC"):
                    latlon, ts = self._parse_rmc(line)
                    if latlon:
                        self.latest = (latlon[0], latlon[1], ts)
            except Exception:
                pass
        try:
            self.ser.close()
        except Exception:
            pass

    def stop(self):
        self._stop.set()

    @staticmethod
    def _nmea_to_deg(val, hemi):
        # NMEA ddmm.mmmm or dddmm.mmmm
        if not val or val == "":
            return None
        parts = val.split(".")
        if len(parts) != 2:
            return None
        head = parts[0]
        mins = float(head[-2:] + "." + parts[1])
        deg = int(head[:-2]) if head[:-2] else 0
        coord = deg + mins / 60.0
        if hemi in ("S", "W"):
            coord = -coord
        return coord

    def _parse_rmc(self, line):
        # $GPRMC,hhmmss.sss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,ddmmyy,x.x,a*hh
        f = line.split(",")
        if len(f) < 12:
            return None, None
        status = f[2]
        if status != "A":
            return None, None
        utc = f[1]
        lat, latH = f[3], f[4]
        lon, lonH = f[5], f[6]
        date = f[9]

        lat_deg = self._nmea_to_deg(lat, latH)
        lon_deg = self._nmea_to_deg(lon, lonH)
        if lat_deg is None or lon_deg is None:
            return None, None

        # Build ISO timestamp (UTC). If parsing fails, use now().
        try:
            hh = int(utc[0:2]); mm = int(utc[2:4]); ss = int(utc[4:6])
            dd = int(date[0:2]); mo = int(date[2:4]); yy = int(date[4:6]) + 2000
            dt = datetime(yy, mo, dd, hh, mm, ss, tzinfo=timezone.utc).isoformat()
        except Exception:
            dt = datetime.now(timezone.utc).isoformat()

        return (lat_deg, lon_deg), dt


class MQTTBridge:
    def __init__(self, host, port, username, password, client_id, keepalive, tls):
        self.client = MQTTClient(client_id=client_id, clean_session=True)
        if username:
            self.client.username_pw_set(username, password or None)
        if tls:
            import ssl
            self.client.tls_set(cert_reqs=ssl.CERT_NONE)
            self.client.tls_insecure_set(True)
        self.host = host
        self.port = port
        self.keepalive = keepalive
        self.connected = False
        self.client.on_connect = self._on_connect
        self.client.on_disconnect = self._on_disconnect

    def _on_connect(self, client, userdata, flags, rc):
        self.connected = (rc == 0)
        print(f"[MQTT] Connected rc={rc}")

    def _on_disconnect(self, client, userdata, rc):
        self.connected = False
        print(f"[MQTT] Disconnected rc={rc}")

    def connect(self):
        self.client.connect(self.host, self.port, self.keepalive)
        self.client.loop_start()

    def publish_json(self, topic, obj, qos=0, retain=False):
        payload = json.dumps(obj, separators=(",", ":"), ensure_ascii=False)
        res = self.client.publish(topic, payload, qos=qos, retain=retain)
        if res.rc != 0:
            print(f"[MQTT] publish rc={res.rc}")
        return res


def main():
    # Test mode (no radio access needed): inject an example payload into MQTT and exit
    if TEST_PAYLOAD:
        mqtt = MQTTBridge(MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_CLIENT_ID, MQTT_KEEPALIVE, MQTT_TLS)
        mqtt.connect()
        time.sleep(1)
        doc = {
            "ts": datetime.now(timezone.utc).isoformat(),
            "freq_hz": LORA_FREQ_HZ,
            "sf": LORA_SF,
            "bw_khz": LORA_BW_KHZ,
            "rssi": None,
            "snr": None,
            "payload_hex": TEST_PAYLOAD.encode().hex(),
            "payload_b64": base64.b64encode(TEST_PAYLOAD.encode()).decode(),
            "note": "injected-test-payload",
        }
        mqtt.publish_json(MQTT_TOPIC, doc)
        time.sleep(0.5)
        return

    lora = SX127x(SPI_BUS, SPI_DEV, SPI_MAX_HZ, PIN_DIO0, PIN_RST)
    gps = GPSReader(GPS_PORT, GPS_BAUD) if GPS_ENABLE else None
    mqtt = MQTTBridge(MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_CLIENT_ID, MQTT_KEEPALIVE, MQTT_TLS)

    def cleanup(*_):
        print("Shutting down...")
        try:
            if gps:
                gps.stop()
        except Exception:
            pass
        try:
            lora.close()
        except Exception:
            pass
        mqtt.client.loop_stop()
        exit(0)

    signal.signal(signal.SIGINT, cleanup)
    signal.signal(signal.SIGTERM, cleanup)

    mqtt.connect()
    if gps:
        gps.start()

    # Initialize radio
    print("[LoRa] Initializing SX127x...")
    lora.init_lora(LORA_FREQ_HZ, LORA_SF, LORA_BW_KHZ, LORA_CR, LORA_CRC_ON, LORA_PREAMBLE)

    # Small print to confirm chip version
    ver = lora._read_reg(REG_VERSION)
    print(f"[LoRa] SX127x REG_VERSION=0x{ver:02X} (expect 0x12 for SX1276)")

    def on_packet(pkt):
        # Prepare JSON document
        now = datetime.now(timezone.utc).isoformat()
        gps_doc = None
        if gps and gps.latest:
            lat, lon, gts = gps.latest
            gps_doc = {"lat": lat, "lon": lon, "ts": gts}

        payload_bytes = pkt["payload_bytes"]
        doc = {
            "ts": now,
            "freq_hz": LORA_FREQ_HZ,
            "sf": LORA_SF,
            "bw_khz": LORA_BW_KHZ,
            "rssi": pkt["rssi"],
            "snr": pkt["snr"],
            "len": pkt["num_bytes"],
            "payload_hex": payload_bytes.hex(),
            "payload_b64": base64.b64encode(payload_bytes).decode(),
            "gps": gps_doc,
        }
        mqtt.publish_json(MQTT_TOPIC, doc)

        # Optional console log preview (hex)
        print(f"[LoRa] RX len={pkt['num_bytes']} RSSI={pkt['rssi']:.1f}dBm SNR={pkt['snr']:.1f} dB")

    lora.start_rx(on_packet)

    print("[LoRa] RX continuous. Waiting for packets...")
    print(f"[MQTT] Publishing to mqtt://{MQTT_HOST}:{MQTT_PORT} topic '{MQTT_TOPIC}'")
    if gps:
        print(f"[GPS] Reading NMEA from {GPS_PORT} @ {GPS_BAUD} baud")

    # Idle forever
    while True:
        time.sleep(1)


if __name__ == "__main__":
    main()

Build/Flash/Run commands

We’re not flashing microcontroller firmware; instead we will set up a Python virtual environment, install dependencies, and run the script. This section uses Bookworm 64-bit and Python 3.11.

1) Create project and venv

mkdir -p ~/lora-mqtt-bridge
cd ~/lora-mqtt-bridge

python3 --version
# Expect Python 3.11.x

python3 -m venv .venv
source .venv/bin/activate
python -m pip install --upgrade pip wheel setuptools

2) Install OS-level prerequisites

sudo apt update
sudo apt install -y python3-dev python3-venv python3-rpi.gpio \
    python3-serial python3-spidev \
    mosquitto mosquitto-clients minicom

Note:
– We install mosquitto to run a local broker; if you use a remote broker, you can skip mosquitto.
– minicom is used for optional GPS validation.

3) Install Python packages in the venv

pip install spidev RPi.GPIO paho-mqtt pyserial gpiozero smbus2

Optional: pin versions via a requirements.txt:

cat > requirements.txt <<'EOF'
spidev>=3.6
RPi.GPIO>=0.7.1
paho-mqtt>=1.6.1
pyserial>=3.5
gpiozero>=1.6.2
smbus2>=0.4.3
EOF

pip install -r requirements.txt

4) Place the code

nano ~/lora-mqtt-bridge/bridge.py
# paste the Full Code content and save
chmod +x ~/lora-mqtt-bridge/bridge.py

5) First run (with environment defaults)

If you’re in EU868, the defaults are already set to 868.1 MHz, SF7, BW125. To run with defaults:

cd ~/lora-mqtt-bridge
source .venv/bin/activate
./bridge.py

If you’re in US915:

export LORA_FREQ_HZ=915000000
export LORA_SF=7
export LORA_BW_KHZ=125
./bridge.py

To point the gateway to a remote MQTT broker:

export MQTT_HOST="your-broker.example.com"
export MQTT_PORT=8883
export MQTT_TLS=1
export MQTT_USERNAME="user"
export MQTT_PASSWORD="pass"
./bridge.py

To run a simple broker on the Pi:

sudo systemctl enable --now mosquitto

6) Optional: systemd service for autostart

Create a service unit:

sudo tee /etc/systemd/system/lora-mqtt-bridge.service >/dev/null <<'EOF'
[Unit]
Description=LoRa to MQTT Bridge (Raspberry Pi Zero 2 W + Dragino LoRa/GPS HAT)
After=network-online.target mosquitto.service
Wants=network-online.target

[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi/lora-mqtt-bridge
Environment=PYTHONUNBUFFERED=1
Environment=LORA_FREQ_HZ=868100000
Environment=LORA_SF=7
Environment=LORA_BW_KHZ=125
Environment=LORA_CR=4/5
Environment=LORA_CRC_ON=1
Environment=LORA_PREAMBLE=8
Environment=PIN_DIO0=25
Environment=PIN_RST=17
Environment=MQTT_HOST=localhost
Environment=MQTT_PORT=1883
Environment=MQTT_TOPIC=lora/rx/raw
Environment=MQTT_CLIENT_ID=rpi-zero2w-lora-bridge
Environment=GPS_ENABLE=1
Environment=GPS_PORT=/dev/serial0
Environment=GPS_BAUD=9600
ExecStart=/home/pi/lora-mqtt-bridge/.venv/bin/python /home/pi/lora-mqtt-bridge/bridge.py
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable --now lora-mqtt-bridge.service

Check status:

systemctl status lora-mqtt-bridge.service -n 50
journalctl -u lora-mqtt-bridge.service -f

Step-by-step Validation

This section validates each layer from bottom (hardware) to top (MQTT messages). Follow in order.

1) Validate SPI device and radio presence

  • Check that /dev/spidev0.0 exists:
ls -l /dev/spidev0.0
  • Read the SX127x version register via the script log. Start the app:
cd ~/lora-mqtt-bridge
source .venv/bin/activate
./bridge.py

Look for:

  • “[LoRa] SX127x REG_VERSION=0x12” — 0x12 indicates SX1276/77/78 family.
  • If you see 0x00 or 0xFF, the radio is not responding. Recheck SPI enable and pin seating.

2) Validate UART/GPS (optional but recommended)

  • Connect to GPS at 9600 baud:
sudo minicom -b 9600 -D /dev/serial0

You should see NMEA sentences like $GPRMC/$GNRMC. Exit minicom with Ctrl-A, X. If you get no data, ensure dtoverlay=disable-bt is set in /boot/firmware/config.txt and that the serial console is disabled in cmdline.txt, then reboot.

When the script runs, you should occasionally see gps entries in the published JSON once a fix is obtained.

3) Validate MQTT connectivity

  • If using local mosquitto, ensure it’s active:
sudo systemctl status mosquitto
  • Subscribe to the topic:
mosquitto_sub -h localhost -t 'lora/rx/raw' -v

Keep this subscriber running in a second terminal.

  • Without a LoRa transmitter, test the MQTT path with injection:
cd ~/lora-mqtt-bridge
source .venv/bin/activate
export TEST_PAYLOAD="hello-from-bridge"
./bridge.py

You should see a JSON message on the subscriber with payload_hex and payload_b64 fields and note “injected-test-payload”.

4) Validate DIO0 interrupt and RX behavior

Even without a transmitter, the radio should toggle IRQ flags on RX timeouts. In the main run (not test mode), look for:

  • “[LoRa] RX continuous. Waiting for packets…”

If you have a second LoRa transmitter (recommended):

  • Configure it to the same parameters:
  • Frequency: match LORA_FREQ_HZ (e.g., 868.1 MHz)
  • BW: 125 kHz
  • SF: 7
  • CR: 4/5
  • CRC: on (optional, match the bridge)
  • Send a short payload (e.g., “PING”).
  • You should see a console line:
  • “RX len=4 RSSI=-xx.xdBm SNR=yy.y dB”
  • On the MQTT subscriber, you will receive a JSON document. Example:
lora/rx/raw {"ts":"2025-11-01T12:34:56.789012+00:00","freq_hz":868100000,"sf":7,"bw_khz":125,"rssi":-92.5,"snr":8.0,"len":4,"payload_hex":"50494e47","payload_b64":"UElORw==","gps":{"lat":52.52001,"lon":13.40495,"ts":"2025-11-01T12:34:55+00:00"}}

5) Validate resilience

  • Stop and start mosquitto while the script is running; the bridge should reconnect automatically within the keepalive window.
  • Unplug/replug the GPS UART or temporarily disable GPS_ENABLE to confirm the bridge still functions for LoRa-only RX.

Troubleshooting

  • No /dev/spidev0.0:
  • Ensure SPI is enabled: sudo raspi-config nonint do_spi 0
  • Check /boot/firmware/config.txt has dtparam=spi=on
  • Reboot after enabling SPI.

  • SX127x REG_VERSION reads 0x00 or 0xFF:

  • HAT not fully seated or wrong chip select. This project expects CE0 => /dev/spidev0.0.
  • Confirm your pin mapping; DIO0/RST won’t affect REG_VERSION, but CS must be CE0 (GPIO8).
  • Verify power: The HAT requires 3.3 V and 5 V per design; use a stable supply.

  • “GPIO permission denied” or “module not found”:

  • Ensure you’re running from the venv and that RPi.GPIO is installed: pip show RPi.GPIO.
  • Run the script as a user with GPIO access (pi). If using systemd, our unit sets User=pi.

  • DIO0 interrupt not firing:

  • Confirm PIN_DIO0=25 matches your HAT. Some variants may wire DIO0 differently; check the PCB silkscreen or vendor schematic.
  • Try polling (as a temporary debug) by calling _read_packet() in a loop if interrupts prove unreliable.
  • Ensure you’re in RX continuous: the console should show “RX continuous. Waiting for packets…”.

  • MQTT publish errors:

  • Check broker reachability: mosquitto_pub -h <host> -t test -m hi
  • For TLS: set MQTT_TLS=1 and ensure port is correct (often 8883). If you have CA certs, configure client.tls_set(ca_certs=...) in code.

  • GPS shows no data:

  • Verify /dev/serial0 points to PL011 and isn’t attached to the console or Bluetooth: ensure dtoverlay=disable-bt and console removed from /boot/firmware/cmdline.txt; reboot.
  • Try different baud rates if your HAT’s GPS is configured differently (some use 9600, others 115200).

  • Wrong band or poor reception:

  • Verify antenna is for the correct band (868 vs 915 MHz).
  • Check RSSI/SNR numbers. SNR near or above 0 dB typically yields solid demodulation; negative SNR (down to about −20 dB) can still work.

Improvements

  • Protocol: Add a payload decoder or framing (CBOR/JSON/SLIP) to identify device IDs and sensor types; currently, the gateway publishes raw bytes.
  • Security: Enable authenticated MQTT with TLS and client certificates; consider ACLs to restrict topics.
  • Observability: Add Prometheus metrics (packet counts, RSSI histogram), log rotation, and health checks.
  • Backpressure: Implement an internal queue with bounded size and asynchronous publishing to avoid blocking ISR.
  • Resilience: Auto-reinitialize the SX127x on repeated CRC errors or timeouts; add a watchdog timer.
  • GPS: Emit GPS-only heartbeat messages to indicate gateway position and availability; filter NMEA for fix state and HDOP.
  • Packaging: Convert to a Python package and install via setuptools; include a systemd EnvironmentFile for cleaner configuration.
  • Multi-region config: Provide a static config file that sets frequency plans per region and enforces legal duty cycle constraints for TX (if you later add transmit capabilities).
  • LoRaWAN: To handle LoRaWAN packets properly, you need a Semtech concentrator (e.g., SX130x) and a packet forwarder; the SX127x is a single-channel radio not suited for compliant LoRaWAN gateways. For simple, private, single-channel experiments, you can still parse and forward raw LoRaWAN frames to MQTT for offline analysis.

Final Checklist

  • Raspberry Pi OS Bookworm 64-bit installed and updated
  • SPI enabled, UART enabled, Bluetooth disabled for GPS: checked /boot/firmware/config.txt and cmdline.txt
  • Dragino LoRa/GPS HAT seated and antenna attached
  • Python venv created; packages installed: spidev, RPi.GPIO, paho-mqtt, pyserial, gpiozero, smbus2
  • bridge.py placed at ~/lora-mqtt-bridge/ and made executable
  • Service optional: lora-mqtt-bridge.service installed, enabled, and running
  • Validation:
  • REG_VERSION reads 0x12
  • Serial NMEA visible via minicom (optional)
  • mosquitto running or reachable broker configured
  • TEST_PAYLOAD injection confirmed on MQTT subscriber
  • Real LoRa packet received and published with RSSI/SNR metadata
  • Troubleshooting notes at hand for SPI, UART, DIO0, MQTT issues

With this setup, your Raspberry Pi Zero 2 W + Dragino LoRa/GPS HAT operates as a compact, flexible LoRa-to-MQTT gateway bridge, ready to feed downstream systems like Node-RED, InfluxDB, or cloud IoT platforms.

Find this product and/or books on this topic on Amazon

Go to Amazon

As an Amazon Associate, I earn from qualifying purchases. If you buy through this link, you help keep this project running.

Quick Quiz

Question 1: What is the primary function of the Raspberry Pi Zero 2 W in this project?




Question 2: Which programming language is used in this project?




Question 3: What is the purpose of the Dragino LoRa/GPS HAT?




Question 4: What does the project primarily use for message publishing?




Question 5: Which operating system version is required for this project?




Question 6: What is the recommended power supply for the Pi Zero 2 W?




Question 7: What does the project initialize over SPI?




Question 8: What type of frames does the project focus on?




Question 9: What is the main purpose of using a venv in this project?




Question 10: What additional feature does the project optionally enrich the LoRa frames with?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Telecommunications Electronics Engineer and Computer Engineer (official degrees in Spain).

Follow me:


Practical case: I2S keyword spotting on RPi Pico W + INMP441

Practical case: I2S keyword spotting on RPi Pico W + INMP441 — hero

Objective and use case

What you’ll build: This practical case guides you through building a small, on‑device keyword‑spotter running on a Raspberry Pi Pico W using an INMP441 I2S digital microphone. You’ll capture audio via I2S, compute MFCC features on-device, and detect a user-trained keyword using cosine similarity.

Why it matters / Use cases

  • Implementing voice-activated controls in smart home devices using the Raspberry Pi Pico W.
  • Creating a low-power, edge-based keyword detection system for wearable technology.
  • Utilizing I2S microphones for real-time audio processing in robotics applications.
  • Developing educational tools for teaching audio processing and machine learning concepts.

Expected outcome

  • Achieve a keyword detection accuracy of over 90% in controlled environments.
  • Process audio input at a rate of 16 kHz with minimal latency (less than 50 ms).
  • Utilize less than 100 mW of power during keyword detection.
  • Successfully detect keywords with a false positive rate of less than 5%.

Audience: Hobbyists, educators, and developers; Level: Intermediate

Architecture/flow: Audio captured via I2S from INMP441, processed on-device for feature extraction, keyword detection using cosine similarity.

Advanced Hands‑On: I2S Keyword Spotting on Raspberry Pi Pico W + INMP441 I2S Mic

Objective: i2s-keyword-spotting-pico

This practical case guides you through building a small, on‑device keyword‑spotter running on a Raspberry Pi Pico W using an INMP441 I2S digital microphone. You’ll capture audio via I2S, compute MFCC features on-device, and detect a user-trained keyword using cosine similarity. You’ll use a Raspberry Pi running Raspberry Pi OS Bookworm 64‑bit (Python 3.11) as the host for flashing and file management.

No circuit drawings are used—connections are explained with a pin table and precise steps. Commands are provided exactly as they should be typed on Raspberry Pi OS. The code runs directly on the Pico W.


Prerequisites

  • Host computer: Raspberry Pi (any model with USB ports) running:
  • Raspberry Pi OS Bookworm 64‑bit
  • Python 3.11 preinstalled (default on Bookworm)
  • Internet access
  • USB‑A to micro‑USB cable for Pico W
  • You are comfortable with terminals and editing files.
  • You can follow instructions for flashing a UF2 and copying files to a USB mass storage device.

Enabling interfaces on the Raspberry Pi host (not strictly required for Pico W, but included per family defaults):
– Enable SPI, I2C, and serial over GPIO in case you use them for auxiliary tools.

Commands to enable interfaces:

sudo raspi-config nonint do_i2c 0
sudo raspi-config nonint do_spi 0
sudo raspi-config nonint do_serial 0

Alternatively, ensure the following are present in /boot/firmware/config.txt (add if missing, then reboot):

dtparam=i2c_arm=on
dtparam=spi=on
enable_uart=1

Reboot after changes:

sudo reboot

Materials (exact model)

  • Raspberry Pi Pico W (RP2040, wireless variant)
  • INMP441 I2S Microphone breakout (exact model: INMP441 I2S Mic; 3.3 V)
  • Solderless breadboard and 6x female‑to‑male jumper wires
  • USB‑A to micro‑USB cable (data-capable)

Setup / Connection

We will use the Pico W as the I2S master (generating BCLK and LRCLK) and the INMP441 as the I2S transmitter. The INMP441 must be powered at 3.3 V. Its L/R pin selects which I2S time slot to use; we’ll use “left” to keep things consistent.

  • Power: 3V3(OUT) from Pico to VDD on INMP441
  • Ground: GND to GND
  • I2S clocks/data:
  • BCLK (bit clock): Pico GP14
  • LRCLK (word select): Pico GP15
  • SD (data output from mic): Pico GP13
  • L/R select on the INMP441: tie to GND for left channel

Double‑check your breakout pin labels; most INMP441 breakouts follow this pin pattern: GND, VDD, SD, L/R, SCK (BCLK), WS (LRCLK). Some vary order—always follow the silkscreen on your board.

Pin mapping table

INMP441 Pin Function Pico W Pin Notes
VDD +3.3 V 3V3(OUT) Use Pico’s 3.3 V output only
GND Ground GND Common ground
SD Data Out GP13 I2S data input to Pico
SCK Bit Clock GP14 I2S BCLK from Pico to mic
WS Word Select GP15 I2S LRCLK from Pico to mic
L/R Channel sel. GND GND = left, 3.3 V = right

Do not power the INMP441 from 5 V. The Pico W GPIO are 3.3 V only.


Full Code

We’ll use CircuitPython on the Pico W to simplify I2S input and on‑device DSP. The code implements:
– I2S configuration (16 kHz sample rate)
– MFCC extraction (20 mel bands, 13 coefficients)
– Training mode (create a template vector from a few utterances)
– Run mode (continuous detection with cosine similarity)
– LED indication and serial logs

Files to place on the Pico W’s CIRCUITPY drive:
– code.py (main application)
– kws.py (DSP and classifier helpers)
– Optionally: kws_mode.txt (with text “train” or “run”)
– Automatically generated: kws_template.json (saved by training)

code.py and kws.py

Copy both files exactly. They are intended for CircuitPython 9.x on Pico W.

# Lightweight MFCC + cosine-similarity classifier in CircuitPython
# Tested on CircuitPython 9.x on RP2040 (Pico W)

import math
import json
try:
    from ulab import numpy as np  # faster numeric ops (ulab included in CircuitPython)
except ImportError:
    # Fallback for safety, but ulab is strongly recommended
    import array as np

def hz_to_mel(f):
    return 2595.0 * math.log10(1.0 + f / 700.0)

def mel_to_hz(m):
    return 700.0 * (10.0**(m / 2595.0) - 1.0)

def mel_filterbank(sr, n_fft, n_mels=20, fmin=20.0, fmax=None):
    if fmax is None:
        fmax = sr / 2.0
    # FFT bins: rfft bins = n_fft//2 + 1
    n_fft_bins = n_fft // 2 + 1
    # Compute mel-spaced points
    m_min = hz_to_mel(fmin)
    m_max = hz_to_mel(fmax)
    m_points = [m_min + i * (m_max - m_min) / (n_mels + 2) for i in range(n_mels + 2)]
    f_points = [mel_to_hz(m) for m in m_points]
    bin_points = [int((n_fft * f) / sr) for f in f_points]
    # Build triangular filters
    fbanks = []
    for m in range(1, n_mels + 1):
        fbank = [0.0] * n_fft_bins
        f_left = bin_points[m - 1]
        f_center = bin_points[m]
        f_right = bin_points[m + 1]
        if f_left < 0: f_left = 0
        if f_right > n_fft_bins - 1: f_right = n_fft_bins - 1
        # Rising slope
        for k in range(f_left, f_center):
            if f_center > f_left:
                fbank[k] = (k - f_left) / float(f_center - f_left)
        # Falling slope
        for k in range(f_center, f_right):
            if f_right > f_center:
                fbank[k] = (f_right - k) / float(f_right - f_center)
        fbanks.append(fbank)
    return fbanks  # list of [n_fft_bins] lists

def dct_matrix(n_mfcc, n_mels):
    # DCT-II matrix for MFCC (orthonormalized 0..n_mfcc-1)
    # C[k,n] = sqrt(2/N) * cos( pi/N * (n+0.5) * k ), with k=0..K-1; C[0,:] scaled by sqrt(1/N)
    C = []
    scale0 = math.sqrt(1.0 / n_mels)
    scalek = math.sqrt(2.0 / n_mels)
    for k in range(n_mfcc):
        row = []
        for n in range(n_mels):
            val = math.cos((math.pi / n_mels) * (n + 0.5) * k)
            row.append(val)
        if k == 0:
            row = [scale0 * v for v in row]
        else:
            row = [scalek * v for v in row]
        C.append(row)
    return C  # list shape [n_mfcc, n_mels]

def hamming_window(N):
    return [0.54 - 0.46 * math.cos((2.0 * math.pi * n) / (N - 1)) for n in range(N)]

def pre_emphasis(x, coef=0.97):
    out = [0.0] * len(x)
    prev = 0.0
    for i, xi in enumerate(x):
        out[i] = xi - coef * prev
        prev = xi
    return out

def frame_signal(x, frame_len, frame_step):
    # Returns list of frames, each a list of length frame_len
    frames = []
    i = 0
    while i + frame_len <= len(x):
        frames.append(x[i:i+frame_len])
        i += frame_step
    return frames

def power_spectrum(frame, n_fft):
    # Zero pad to n_fft; compute power spectrum via rfft
    from ulab import numpy as np
    import ulab
    tmp = frame + [0.0] * (n_fft - len(frame))
    arr = np.array(tmp, dtype=np.float32)
    spec = np.fft.rfft(arr)  # length n_fft//2+1 complex
    # |X|^2
    ps = (spec.real*spec.real + spec.imag*spec.imag)
    return ps

def log_mel_spectrum(ps, fbanks, eps=1e-10):
    n_mels = len(fbanks)
    mel_spec = [0.0] * n_mels
    for m in range(n_mels):
        s = 0.0
        fbank = fbanks[m]
        # dot product
        for k, w in enumerate(fbank):
            if w != 0.0:
                s += w * ps[k]
        mel_spec[m] = math.log(max(s, eps))
    return mel_spec

def mfcc(x, sr=16000, n_mfcc=13, n_mels=20, frame_ms=32, hop_ms=16, n_fft=512):
    # x: list/array of floats in [-1,1]
    # returns: list of MFCC vectors (per frame)
    frame_len = int(sr * frame_ms / 1000)
    frame_step = int(sr * hop_ms / 1000)
    x = pre_emphasis(x, 0.97)
    frames = frame_signal(x, frame_len, frame_step)
    win = hamming_window(frame_len)
    fbanks = mel_filterbank(sr, n_fft, n_mels)
    D = dct_matrix(n_mfcc, n_mels)
    coeffs = []
    for f in frames:
        wf = [f[i] * win[i] for i in range(frame_len)]
        ps = power_spectrum(wf, n_fft)
        mel_spec = log_mel_spectrum(ps, fbanks)
        # DCT
        mf = []
        for r in D:
            s = 0.0
            for j, rv in enumerate(r):
                s += rv * mel_spec[j]
            mf.append(s)
        coeffs.append(mf)
    return coeffs  # list of [n_mfcc] per frame

def mean_vector(vectors):
    if not vectors:
        return []
    n = len(vectors[0])
    out = [0.0] * n
    for vec in vectors:
        for i in range(n):
            out[i] += vec[i]
    count = float(len(vectors))
    return [v / count for v in out]

def l2norm(x):
    return math.sqrt(sum([xi*xi for xi in x]))

def cosine_similarity(a, b, eps=1e-9):
    if len(a) != len(b) or len(a) == 0:
        return 0.0
    dot = 0.0
    for i in range(len(a)):
        dot += a[i] * b[i]
    na = l2norm(a)
    nb = l2norm(b)
    if na < eps or nb < eps:
        return 0.0
    return dot / (na * nb)

def save_template(vec, path="/kws_template.json"):
    with open(path, "w") as f:
        json.dump({"mfcc_mean": [float(x) for x in vec]}, f)

def load_template(path="/kws_template.json"):
    try:
        with open(path, "r") as f:
            obj = json.load(f)
            return obj.get("mfcc_mean", [])
    except OSError:
        return []

# ===== file: code.py =====
# Keyword Spotting on Raspberry Pi Pico W + INMP441 I2S Mic
# Modes:
#  - train: capture 3 phrases, compute template, save to /kws_template.json
#  - run: continuous detection, print and blink LED on detection

import time
import board
import digitalio
import supervisor

# CircuitPython I2SIn
import audiobusio
from array import array

from kws import mfcc, mean_vector, cosine_similarity, save_template, load_template

# Hardware config
I2S_BCLK = board.GP14
I2S_LRCLK = board.GP15
I2S_SD = board.GP13

SAMPLE_RATE = 16000       # Hz
BIT_DEPTH = 32            # INMP441 produces 24-bit, we capture 32-bit containers
CAPTURE_SEC = 1.0         # seconds per analysis window
CAPTURE_SAMPLES = int(SAMPLE_RATE * CAPTURE_SEC)

# MFCC params
N_MFCC = 13
N_MELS = 20
FRAME_MS = 32
HOP_MS = 16
N_FFT = 512

# Threshold for cosine similarity
DETECTION_THRESHOLD = 0.90  # tune during validation

# LED
led = digitalio.DigitalInOut(board.LED)
led.direction = digitalio.Direction.OUTPUT

# Read mode from optional text file
def read_mode():
    try:
        with open("/kws_mode.txt", "r") as f:
            t = f.read().strip().lower()
            if t in ("train", "run"):
                return t
    except OSError:
        pass
    return "run"

def normalize_i2s_i32_to_float(samples_i32):
    # Convert signed 32-bit I2S samples to float [-1, 1]
    # Many I2S mics deliver valid data in top 24 bits; scaling robustly to float
    scale = 1.0 / (1 << 23)  # treat as 24-bit signed
    out = [0.0] * len(samples_i32)
    for i, v in enumerate(samples_i32):
        out[i] = max(-1.0, min(1.0, v * scale))
    return out

def capture_seconds(i2s, seconds):
    n = int(SAMPLE_RATE * seconds)
    # Record into an array of signed 32-bit
    buf = array('i', [0] * n)
    # Clear input FIFO by tiny dummy read
    # Some ports require a small settle; simple sleep helps
    time.sleep(0.01)
    i2s.record(buf, len(buf))
    return list(buf)

def i2s_setup():
    # Create I2SIn instance; mono from left slot (mic L/R pin tied to GND)
    i2s = audiobusio.I2SIn(
        bit_clock=I2S_BCLK,
        word_select=I2S_LRCLK,
        data=I2S_SD,
        sample_rate=SAMPLE_RATE,
        bit_depth=BIT_DEPTH
    )
    return i2s

def blink(times=2, dur=0.1):
    for _ in range(times):
        led.value = True
        time.sleep(dur)
        led.value = False
        time.sleep(dur)

def train_loop():
    print("Mode: TRAIN")
    print("Speak your keyword clearly when prompted.")
    i2s = i2s_setup()
    utterances = []
    TRIALS = 3
    for t in range(1, TRIALS + 1):
        print("Prepare... trial", t)
        blink(1, 0.2)
        time.sleep(1.0)
        print("Recording...")
        led.value = True
        raw = capture_seconds(i2s, CAPTURE_SEC)
        led.value = False
        print("Processing...")
        x = normalize_i2s_i32_to_float(raw)
        mf = mfcc(
            x, sr=SAMPLE_RATE, n_mfcc=N_MFCC, n_mels=N_MELS,
            frame_ms=FRAME_MS, hop_ms=HOP_MS, n_fft=N_FFT
        )
        mv = mean_vector(mf)
        utterances.append(mv)
        print("Captured trial", t, "MFCC mean len:", len(mv))
        time.sleep(0.5)
    # Average template
    n = len(utterances[0])
    template = [0.0] * n
    for mv in utterances:
        for i in range(n):
            template[i] += mv[i]
    template = [v / float(TRIALS) for v in template]
    save_template(template)
    print("Saved template to /kws_template.json")
    blink(3, 0.1)
    print("Switch /kws_mode.txt to 'run' and reset (Ctrl-D in REPL) or power-cycle.")

def run_loop():
    print("Mode: RUN")
    template = load_template()
    if not template:
        print("ERROR: No template found. Please create /kws_mode.txt with 'train' and reset.")
        while True:
            led.value = True
            time.sleep(0.1)
            led.value = False
            time.sleep(0.1)
    i2s = i2s_setup()
    print("Starting continuous detection at", SAMPLE_RATE, "Hz. Threshold:", DETECTION_THRESHOLD)
    blink(2, 0.05)
    # Rolling loop: record, compute, score
    while True:
        raw = capture_seconds(i2s, CAPTURE_SEC)
        x = normalize_i2s_i32_to_float(raw)
        mf = mfcc(
            x, sr=SAMPLE_RATE, n_mfcc=N_MFCC, n_mels=N_MELS,
            frame_ms=FRAME_MS, hop_ms=HOP_MS, n_fft=N_FFT
        )
        mv = mean_vector(mf)
        score = cosine_similarity(mv, template)
        detected = score >= DETECTION_THRESHOLD
        print("score=", "{:.3f}".format(score), "detected=", detected)
        if detected:
            # Blink longer to indicate detection
            led.value = True
            time.sleep(0.2)
            led.value = False

# Entry
mode = read_mode()
if mode == "train":
    train_loop()
else:
    run_loop()

Notes:
– If your CIRCUITPY build doesn’t include ulab, install a CircuitPython UF2 for Pico W that bundles ulab (recommended). See flashing instructions below.
– If you prefer a lower CPU load, reduce MFCC settings (e.g., N_MELS=16, N_MFCC=10, N_FFT=256).


Build / Flash / Run Commands

All commands are run on your Raspberry Pi host (Bookworm 64‑bit). We’ll prepare a Python virtual environment for tools, install dependencies, flash CircuitPython UF2 to the Pico W, then copy the code.

1) Host environment setup

sudo apt update
sudo apt install -y python3.11-venv python3-pip git curl unzip screen minicom rsync

# Optional but included per family defaults:
sudo apt install -y cmake build-essential

# Create and activate venv
python3 -m venv ~/venvs/pico-kws
source ~/venvs/pico-kws/bin/activate

# Upgrade pip and install utilities
pip install --upgrade pip

# Install common GPIO/SMBus/SPI libs (not strictly required for this project)
pip install gpiozero smbus2 spidev

# Install helpful microcontroller tooling
pip install rshell mpremote adafruit-ampy circup

Verify Python:

python --version
# Expected: Python 3.11.x

2) Flash CircuitPython 9.x to the Pico W

  • Download the latest stable CircuitPython UF2 for Pico W (with ulab):
  • Example version path (adjust to latest stable): https://downloads.circuitpython.org/bin/raspberry_pi_pico_w/en_US/adafruit-circuitpython-raspberry_pi_pico_w-en_US-9.0.0.uf2

Commands to download:

cd ~/Downloads
curl -LO https://downloads.circuitpython.org/bin/raspberry_pi_pico_w/en_US/adafruit-circuitpython-raspberry_pi_pico_w-en_US-9.0.0.uf2
  • Put the Pico W into BOOTSEL mode:
  • Unplug the Pico W USB.
  • Hold the BOOTSEL button.
  • Plug in the USB.
  • Release BOOTSEL.
  • A mass storage device named RPI-RP2 should mount.

  • Copy the UF2:

# Replace /media/pi/RPI-RP2 with your actual mount (ls /media/$USER/)
cp ~/Downloads/adafruit-circuitpython-raspberry_pi_pico_w-en_US-9.0.0.uf2 /media/$USER/RPI-RP2/

The board will reboot and re-mount as CIRCUITPY.

3) Install the project files

Create kws_mode.txt in “train” to start with training mode:

echo "train" > /media/$USER/CIRCUITPY/kws_mode.txt

Copy code.py and kws.py:

# Assuming you saved the two code blocks as ~/pico-kws/code.py and ~/pico-kws/kws.py
mkdir -p ~/pico-kws
# (Paste the code into these files using your editor)
# nano ~/pico-kws/code.py
# nano ~/pico-kws/kws.py

cp ~/pico-kws/code.py /media/$USER/CIRCUITPY/
cp ~/pico-kws/kws.py  /media/$USER/CIRCUITPY/
sync

Confirm the files exist on CIRCUITPY:

ls -l /media/$USER/CIRCUITPY/

The board will auto-reload code.py.


Step‑by‑Step Validation

Follow these steps to verify hardware, audio capture, and keyword spotting.

1) Wiring sanity check

  • Ensure INMP441 VDD is connected to Pico 3V3(OUT), not 5 V.
  • Confirm grounds connected (Pico GND ↔ INMP441 GND).
  • Verify:
  • INMP441 SCK ↔ Pico GP14
  • INMP441 WS ↔ Pico GP15
  • INMP441 SD ↔ Pico GP13
  • INMP441 L/R ↔ GND (Left channel)

If uncertain, re-check board silkscreen and the pin table above.

2) USB serial console

In another terminal on the Raspberry Pi host:

ls /dev/ttyACM*
# Example: /dev/ttyACM0

screen /dev/ttyACM0 115200
# or:
minicom -D /dev/ttyACM0 -b 115200

You should see “Mode: TRAIN” logs and prompts as soon as code.py runs.

To exit screen: press Ctrl-A, then K, then Y.

3) Train the keyword template

  • With kws_mode.txt set to “train”, you’ll see:
  • “Prepare… trial 1”
  • It blinks and asks to speak
  • Say your keyword (e.g., “pico”) clearly and consistently for 1 second when “Recording…” appears.
  • After 3 trials, the device saves /kws_template.json and tells you to switch mode.

Set run mode:

echo "run" > /media/$USER/CIRCUITPY/kws_mode.txt
sync

Reset the board by unplugging/replugging USB, or press Ctrl-D in the serial REPL to soft reset.

4) Run detection

Re-open the serial console:

screen /dev/ttyACM0 115200
  • You should see: “Mode: RUN” and periodic lines like:
  • “score= 0.876 detected= False”
  • “score= 0.932 detected= True” when it hears your keyword
  • The onboard LED blinks longer upon detection.

Validation checklist:
– Speak the trained keyword three times. Expect at least two detections (score ≥ threshold).
– Speak non-keywords. Expect no detections or significantly lower scores.
– If false positives are frequent, increase DETECTION_THRESHOLD in code.py (e.g., 0.93–0.96).
– If misses are frequent, decrease DETECTION_THRESHOLD (e.g., 0.85–0.88), retrain more consistently, or reduce background noise.

5) Quick numerical checks

  • RMS/levels sanity (optional modification):
  • Temporarily print mean(abs(x)) of the captured float samples after normalization to ensure the mic isn’t saturating or silent.
  • Latency:
  • Current loop processes 1 sec windows; reduce CAPTURE_SEC to 0.75 or 0.5 for faster response, at some robustness cost.

Troubleshooting

  • CIRCUITPY doesn’t appear after flashing:
  • Ensure you copied the UF2 to RPI‑RP2 with BOOTSEL pressed at plugin time.
  • Try a different USB cable/port; ensure data‑capable cable.
  • No serial logs:
  • Check /dev/ttyACM0 exists. Try: ls /dev/ttyACM*
  • Try another baud or terminal. CircuitPython REPL usually defaults fine at 115200.
  • I2S audio seems silent or noisy:
  • Verify 3.3 V power and ground integrity.
  • Confirm L/R is tied to GND (for left).
  • Check wire lengths; keep I2S lines short. Re-seat jumpers.
  • Confirm pin mapping matches the table (SCK=GP14, WS=GP15, SD=GP13).
  • Try replugging after power cycling. Some mics need stable clocks at power-up.
  • High false positives:
  • Increase DETECTION_THRESHOLD in code.py.
  • Retrain in a quieter room; use a consistent speaking pace and volume.
  • Reduce MFCC dimensionality noise by lowering N_MELS to 16 and/or using longer CAPTURE_SEC.
  • Missed detections:
  • Decrease threshold slightly (e.g., 0.88–0.90).
  • Move closer to the mic; avoid angled placement.
  • Increase CAPTURE_SEC to 1.25 s if your keyword is long.
  • Performance issues (glitches or slow processing):
  • Reduce N_FFT to 256, FRAME_MS to 25, HOP_MS to 12.
  • Reduce N_MELS to 16 and N_MFCC to 10.
  • Ensure you are running a CircuitPython build with ulab for speed.
  • File write failures:
  • Ensure CIRCUITPY is not write‑protected (it mounts read-only if filesystem errors occurred). If needed, back up your files and reformat CIRCUITPY from the REPL: import storage; storage.erase_filesystem() (use with caution).

Improvements

  • Use a small neural model (e.g., 1D conv or tiny fully connected) with TensorFlow Lite for Microcontrollers:
  • Train a model on mel spectrogram features (“yes/no” style but for your keyword).
  • Convert to TFLM and deploy using C/C++ on Pico (pico-sdk) or with Arduino‑TFLM.
  • Quantize to int8 for speed and memory savings.
  • Continuous streaming and overlap:
  • Instead of 1 s chunks, maintain a rolling ring buffer with 0.5 s stride for faster reaction.
  • Multiple keywords:
  • Train multiple templates and compute argmax of cosine scores among N templates.
  • Feature normalization:
  • Add per‑feature mean/variance normalization from training to improve robustness.
  • Wi‑Fi reporting:
  • Use Pico W networking to publish detections via MQTT/HTTP to a central server.
  • External LED/buzzer:
  • Drive a GPIO to trigger a visual or audible alert on keyword detection.
  • Edge Impulse:
  • Collect data, design an impulse, export a C++ library for RP2040, integrate with I2S capture.

Final Checklist

  • Raspberry Pi host
  • Raspberry Pi OS Bookworm 64‑bit installed
  • Python 3.11 ready
  • Interfaces enabled (I2C, SPI, UART) via raspi-config or /boot/firmware/config.txt (per defaults)
  • venv created; rshell/mpremote/circup installed
  • Hardware
  • Raspberry Pi Pico W
  • INMP441 I2S Mic wired to Pico:
    • VDD ↔ 3V3(OUT)
    • GND ↔ GND
    • SD ↔ GP13
    • SCK ↔ GP14
    • WS ↔ GP15
    • L/R ↔ GND (left)
  • Firmware
  • CircuitPython UF2 for Pico W 9.x flashed (with ulab)
  • CIRCUITPY drive mounts on host
  • Files on CIRCUITPY
  • code.py and kws.py present
  • kws_mode.txt with “train” for initial training
  • Training
  • Three consistent utterances recorded
  • /kws_template.json saved
  • Running
  • kws_mode.txt set to “run”
  • Serial logs show score values
  • LED blinks on detection
  • Threshold tuned for acceptable false positives/misses

With this setup, you have a fully working i2s-keyword-spotting-pico flow on Raspberry Pi Pico W + INMP441 I2S Mic: audio capture over I2S, feature extraction, and keyword detection—all on-device, with reproducible commands and a clear path to more advanced ML deployments.

Find this product and/or books on this topic on Amazon

Go to Amazon

As an Amazon Associate, I earn from qualifying purchases. If you buy through this link, you help keep this project running.

Quick Quiz

Question 1: What is the primary objective of the project described in the article?




Question 2: Which microphone is used in the project?




Question 3: Which operating system is required on the host computer?




Question 4: What programming language is mentioned as preinstalled on the operating system?




Question 5: What type of cable is needed to connect the Pico W to the host computer?




Question 6: What command is used to enable I2C on the Raspberry Pi?




Question 7: What feature is computed on-device for keyword detection?




Question 8: What is the purpose of the cosine similarity in the project?




Question 9: What must be done after modifying the config.txt file?




Question 10: Which model of Raspberry Pi is specifically mentioned for this project?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Telecommunications Electronics Engineer and Computer Engineer (official degrees in Spain).

Follow me: