Caso práctico: alarma calidad aire con Raspberry Pi 4

Caso práctico: alarma calidad aire con Raspberry Pi 4 — hero

Alarma de calidad del aire con Raspberry Pi 4 usando BME680 y HyperPixel 4.0 Touch

Objetivo y caso de uso

Qué construirás: Un panel de calidad del aire con Raspberry Pi 4 Model B usando un sensor BME680 y una pantalla HyperPixel 4.0 Touch para mostrar las condiciones de la habitación en vivo y estimar IAQ, eCO2 y TVOC en tiempo real. La interfaz se ejecuta localmente con tasas de refresco fluidas para pantalla táctil y muestra una alarma clara en pantalla cuando el eCO2 estimado supera un umbral configurable, como 1000 ppm.

Por qué importa / Casos de uso

  • Recordatorio de ventilación para aulas que ofrece una indicación visible cuando el eCO2 estimado supera 1000–1500 ppm.
  • Monitor de confort para oficina en casa para seguir temperatura, humedad y tendencias de aire viciado con actualizaciones locales de baja latencia.
  • Indicador para taller o sala de hobbies para detectar una mala renovación del aire y cambios en los patrones de VOC durante proyectos en interiores.
  • Pantalla de estado de sala de reuniones con estado codificado por colores para comprobaciones rápidas de apto/no apto antes de la ocupación.
  • Prototipo educativo para comparar cambios de ventilación frente a métricas IAQ estimadas y el comportamiento de la alarma.

Resultado esperado

  • Interfaz de pantalla táctil que muestra temperatura, humedad, presión, resistencia del gas, puntuación IAQ, eCO2 estimado y TVOC estimado.
  • Estado de la habitación codificado por colores con renderizado responsivo, normalmente alrededor de 20–30 FPS en una Pi 4 para un panel ligero.
  • Banner de alarma visible cuando el eCO2 estimado cruza el límite configurado, con respuesta local de la interfaz en menos de un segundo.
  • Interacción por toque o ratón para alternar detalles y reconocer temporalmente la alarma sin detener la monitorización.
  • Soporte tanto para modo con hardware real como para modo simulado para probar el flujo de la interfaz, umbrales de alarma y datos de demostración.
  • Funcionamiento local eficiente adecuado para uso tipo kiosco, manteniéndose a menudo en un rango modesto de GPU para un panel simple a pantalla completa.

Audiencia: Makers de Raspberry Pi, educadores y aficionados que construyen pantallas ambientales prácticas; Nivel: Intermedio

Arquitectura/flujo: La Pi 4 lee los valores del sensor BME680, deriva IAQ además de eCO2/TVOC estimados, actualiza un panel HyperPixel Touch a pantalla completa, aplica reglas de estado por color y activa un banner de alarma visible con reconocimiento táctil opcional cuando se supera el umbral configurado; en modo simulado, las lecturas simuladas siguen la misma canalización.

Nota educativa de validación

Antes de publicar este caso, el contenido pasó la puerta automática de validación de Prometeo con estado PASS. El validador comprobó los bloques de código, la estructura del artículo, los comandos copiables y la coherencia con el catálogo de dispositivos soportados.

Evidencia de validación publicada

  • Resultado automático: PASS.
  • Estructura parseada: 31 apartados, 1 tablas y 23 bloques de código detectados en el contenido publicado.
  • Código comprobado: 2 Python/py_compile, 20 Bash/copy-paste checks.
  • Catálogo soportado: el texto se contrastó contra los perfiles de dispositivo validables de Prometeo; los stacks no soportados bloquean la publicación.
  • Hallazgos del informe: sin hallazgos bloqueantes.

Esta validación confirma compatibilidad sintáctica y de herramientas para el material publicado, pero no sustituye la prueba física sobre tu hardware, cableado y entorno exactos.

Nota educativa de seguridad

Nota de seguridad

Este proyecto es un prototipo educativo de calidad del aire en interiores, no un instrumento de seguridad certificado.

  • El BME680 no mide CO2 real directamente.
  • Los valores de eCO2 e IAQ mostrados en este tutorial son estimaciones heurísticas derivadas del comportamiento de la humedad y la resistencia del gas.
  • No uses esta construcción como única base para decisiones de salud, cumplimiento legal, detección de atmósferas peligrosas o respuesta a emergencias.
  • Alimenta la Raspberry Pi solo con una fuente USB-C adecuada de bajo voltaje.
  • Usa una carcasa o un montaje seguro para que la electrónica expuesta no pueda cortocircuitarse ni tocarse accidentalmente.

Requisitos previos

Necesitas:

  • Raspberry Pi 4 Model B
  • Raspberry Pi OS Bookworm de 64 bits
  • Python 3.11
  • Módulo BME680 con soporte I2C
  • HyperPixel 4.0 Touch
  • Uso básico de terminal
  • Conocimientos básicos de cableado GPIO/I2C

Materiales

Elemento Modelo exacto Propósito
Ordenador de placa única Raspberry Pi 4 Model B Ejecuta la aplicación
Sensor BME680 Temperatura, humedad, presión, resistencia del gas
Pantalla táctil HyperPixel 4.0 Touch Interfaz táctil local
Fuente de alimentación Fuente USB-C de 5 V para Raspberry Pi 4 Alimentación estable
Cableado Cables jumper hembra a hembra Cableado del sensor
Almacenamiento Tarjeta microSD, 16 GB o superior SO y archivos del proyecto

Configuración del hardware

Cableado I2C del BME680

Conecta el BME680 al encabezado de 40 pines de la Raspberry Pi:

  • BME680 VCC/VIN -> 3.3 V en la Raspberry Pi, pin 1 físico
  • BME680 GND -> GND en la Raspberry Pi, pin 6 físico
  • BME680 SDA -> SDA1 en la Raspberry Pi, pin 3 físico
  • BME680 SCL -> SCL1 en la Raspberry Pi, pin 5 físico

Notas:

  • Confirma que tu placa breakout es compatible con 3.3 V
  • No conectes una placa solo de 3.3 V a 5 V
  • Si la placa también soporta SPI, usa solo los pines I2C listados arriba

HyperPixel 4.0 Touch

Instala y verifica la HyperPixel usando las instrucciones actuales de Pimoroni para tu imagen de Raspberry Pi OS. Antes de continuar, confirma:

  • La pantalla muestra el escritorio de Raspberry Pi
  • La entrada táctil funciona correctamente

Preparación del sistema

Actualiza la Pi y habilita I2C:

sudo apt update
sudo apt full-upgrade -y
sudo raspi-config

En raspi-config:

  1. Abre Interface Options
  2. Habilita I2C
  3. Reinicia si se te solicita

Comprueba Python:

python3 --version

Instala los paquetes del sistema necesarios:

sudo apt install -y git python3-pip python3-venv i2c-tools

Verifica que el sensor aparece en I2C:

i2cdetect -y 1

Un BME680 aparece habitualmente en 0x76 o 0x77.

Configuración del proyecto

Crea el proyecto y el entorno virtual:

mkdir -p ~/projects/touchscreen-co2-air-quality-alarm
cd ~/projects/touchscreen-co2-air-quality-alarm
python3 -m venv .venv
. .venv/bin/activate
python3 -m pip install --upgrade pip
python3 -m pip install pygame smbus2 bme680

Valida las importaciones:

python3 -c "import pygame, smbus2, bme680; print('imports ok')"

Código de la aplicación

air_quality_alarm.py

#!/usr/bin/env python3
"""
Touchscreen air-quality alarm for:
- Raspberry Pi 4 Model B
- BME680 over I2C
- HyperPixel 4.0 Touch

The IAQ, eCO2, and TVOC values here are educational heuristic estimates.
They are useful for trend display and UI behavior validation, not certified measurement.
"""

from __future__ import annotations

import argparse
import math
import random
import sys
import time
from dataclasses import dataclass
from typing import Optional, Tuple

import pygame


@dataclass
class SensorReading:
    temperature_c: float
    humidity_pct: float
    pressure_hpa: float
    gas_ohms: float
    iaq_score: float
    eco2_ppm: int
    tvoc_ppb: int
    timestamp: float


def clamp(value: float, low: float, high: float) -> float:
    return max(low, min(high, value))


def estimate_iaq_score(humidity_pct: float, gas_ohms: float) -> float:
    """
    Educational heuristic only.
    Lower score is better, with an approximate 0..500 scale.
    """
    humidity_target = 40.0
    humidity_offset = abs(humidity_pct - humidity_target)
    humidity_score = clamp(humidity_offset / 40.0 * 25.0, 0.0, 25.0)

    gas_reference = 50000.0
    gas_ratio = clamp((gas_reference - gas_ohms) / gas_reference, 0.0, 1.0)
    gas_score = gas_ratio * 475.0

    return round(humidity_score + gas_score, 1)


def estimate_eco2_ppm(iaq_score: float, gas_ohms: float) -> int:
    """
    Educational heuristic only.
    Maps worsening gas behavior to an estimated eCO2 range.
    """
    base = 420
    score_component = int(iaq_score * 4.2)
    gas_component = int(clamp((50000.0 - gas_ohms) / 80.0, 0.0, 2000.0))
    return int(clamp(base + score_component + gas_component, 400, 2500))


def estimate_tvoc_ppb(iaq_score: float, gas_ohms: float) -> int:
    """
    Educational heuristic only.
    """
    base = 20
    score_component = int(iaq_score * 1.5)
    gas_component = int(clamp((50000.0 - gas_ohms) / 120.0, 0.0, 1000.0))
    return int(clamp(base + score_component + gas_component, 0, 1200))


def air_quality_state(eco2_ppm: int) -> Tuple[str, Tuple[int, int, int]]:
    if eco2_ppm < 800:
        return ("GOOD", (40, 140, 70))
    if eco2_ppm < 1200:
        return ("ELEVATED", (180, 150, 40))
    if eco2_ppm < 1600:
        return ("POOR", (190, 100, 30))
    return ("ALARM", (170, 40, 40))


class BME680Adapter:
    """Real sensor adapter using the bme680 package."""

    def __init__(self, i2c_addr: int = 0x76):
        self.i2c_addr = i2c_addr
        self.sensor = None

    def open(self) -> None:
        import bme680

        self.sensor = bme680.BME680(i2c_addr=self.i2c_addr)
        self.sensor.set_humidity_oversample(bme680.OS_2X)
        self.sensor.set_pressure_oversample(bme680.OS_4X)
        self.sensor.set_temperature_oversample(bme680.OS_8X)
        self.sensor.set_filter(bme680.FILTER_SIZE_3)
        self.sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
        self.sensor.set_gas_heater_temperature(320)
        self.sensor.set_gas_heater_duration(150)
        self.sensor.select_gas_heater_profile(0)

    def read(self) -> SensorReading:
        if self.sensor is None:
            raise RuntimeError("Sensor not opened")

        if not self.sensor.get_sensor_data():
            raise RuntimeError("No sensor data available")

        data = self.sensor.data
        gas_ohms = float(getattr(data, "gas_resistance", 0.0) or 0.0)

        iaq_score = estimate_iaq_score(float(data.humidity), gas_ohms)
        eco2_ppm = estimate_eco2_ppm(iaq_score, gas_ohms)
        tvoc_ppb = estimate_tvoc_ppb(iaq_score, gas_ohms)

        return SensorReading(
            temperature_c=float(data.temperature),
            humidity_pct=float(data.humidity),
            pressure_hpa=float(data.pressure),
            gas_ohms=gas_ohms,
            iaq_score=iaq_score,
            eco2_ppm=eco2_ppm,
            tvoc_ppb=tvoc_ppb,
            timestamp=time.time(),
        )


class MockBME680Adapter:
    """Mock adapter for validation without hardware."""

    def __init__(self) -> None:
        self.t0 = time.time()

    def open(self) -> None:
        return

    def read(self) -> SensorReading:
        elapsed = time.time() - self.t0

        temperature_c = 22.0 + 1.5 * math.sin(elapsed / 40.0)
        humidity_pct = 45.0 + 8.0 * math.sin(elapsed / 55.0)
        pressure_hpa = 1012.0 + 1.2 * math.sin(elapsed / 120.0)

        baseline = 45000.0
        drop = min(32000.0, elapsed * 350.0)
        noise = random.uniform(-1200.0, 1200.0)
        gas_ohms = max(5000.0, baseline - drop + noise)

        iaq_score = estimate_iaq_score(humidity_pct, gas_ohms)
        eco2_ppm = estimate_eco2_ppm(iaq_score, gas_ohms)
        tvoc_ppb = estimate_tvoc_ppb(iaq_score, gas_ohms)

        return SensorReading(
            temperature_c=temperature_c,
            humidity_pct=humidity_pct,
            pressure_hpa=pressure_hpa,
            gas_ohms=gas_ohms,
            iaq_score=iaq_score,
            eco2_ppm=eco2_ppm,
            tvoc_ppb=tvoc_ppb,
            timestamp=time.time(),
        )


class HyperPixelUI:
    def __init__(self, width: int = 800, height: int = 480):
        pygame.init()
        pygame.font.init()
        self.screen = pygame.display.set_mode((width, height))
        pygame.display.set_caption("Air Quality Alarm")
        self.width = width
        self.height = height
        self.font_big = pygame.font.SysFont("Arial", 52)
        self.font_med = pygame.font.SysFont("Arial", 32)
        self.font_small = pygame.font.SysFont("Arial", 24)
        self.show_detail = False
        self.alarm_ack_until = 0.0

    def draw(self, reading: SensorReading, alarm_threshold: int) -> None:
        state, bg = air_quality_state(reading.eco2_ppm)
        self.screen.fill(bg)

        now = time.time()
        alarm_active = (
            reading.eco2_ppm >= alarm_threshold and now >= self.alarm_ack_until
        )

        title = self.font_big.render(state, True, (255, 255, 255))
        self.screen.blit(title, (30, 20))

        eco2_text = self.font_big.render(
            f"eCO2 {reading.eco2_ppm} ppm", True, (255, 255, 255)
        )
        self.screen.blit(eco2_text, (30, 95))

        iaq_text = self.font_med.render(
            f"IAQ score: {reading.iaq_score:.1f}", True, (255, 255, 255)
        )
        self.screen.blit(iaq_text, (30, 170))

        temp_text = self.font_small.render(
            f"Temp: {reading.temperature_c:.1f} C", True, (255, 255, 255)
        )
        hum_text = self.font_small.render(
            f"Humidity: {reading.humidity_pct:.1f} %", True, (255, 255, 255)
        )
        pres_text = self.font_small.render(
            f"Pressure: {reading.pressure_hpa:.1f} hPa", True, (255, 255, 255)
        )
        gas_text = self.font_small.render(
            f"Gas: {reading.gas_ohms:.0f} ohms", True, (255, 255, 255)
        )
        tvoc_text = self.font_small.render(
            f"TVOC est: {reading.tvoc_ppb} ppb", True, (255, 255, 255)
        )

        self.screen.blit(temp_text, (30, 230))
        self.screen.blit(hum_text, (30, 265))
        self.screen.blit(pres_text, (30, 300))

        if self.show_detail:
            self.screen.blit(gas_text, (30, 335))
            self.screen.blit(tvoc_text, (30, 370))

        info = self.font_small.render(
            "Tap left: details  |  Tap right: acknowledge 60 s",
            True,
            (255, 255, 255),
        )
        self.screen.blit(info, (20, 440))

        if alarm_active:
            banner = pygame.Rect(500, 20, 260, 90)
            pygame.draw.rect(self.screen, (255, 230, 230), banner, border_radius=12)
            msg1 = self.font_med.render("VENTILATE ROOM", True, (120, 0, 0))
            msg2 = self.font_small.render(
                f"Threshold {alarm_threshold} ppm exceeded", True, (120, 0, 0)
            )
            self.screen.blit(msg1, (520, 35))
            self.screen.blit(msg2, (520, 75))

        pygame.display.flip()

    def handle_event(self, event: pygame.event.Event) -> None:
        if event.type == pygame.MOUSEBUTTONDOWN:
            x, _y = event.pos
            if x < self.width // 2:
                self.show_detail = not self.show_detail
            else:
                self.alarm_ack_until = time.time() + 60.0


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Touchscreen CO2 air-quality alarm")
    parser.add_argument("--mock", action="store_true", help="Run without hardware")
    parser.add_argument(
        "--i2c-addr",
        default="0x76",
        help="BME680 I2C address such as 0x76 or 0x77",
    )
    parser.add_argument(
        "--alarm-ppm",
        type=int,
        default=1200,
        help="Estimated eCO2 alarm threshold",
    )
    parser.add_argument(
        "--interval",
        type=float,
        default=2.0,
        help="Seconds between sensor updates",
    )
    return parser.parse_args()


def main() -> int:
    args = parse_args()
    i2c_addr = int(args.i2c_addr, 16)

    sensor = MockBME680Adapter() if args.mock else BME680Adapter(i2c_addr=i2c_addr)
    sensor.open()

    ui = HyperPixelUI()
    last_read = 0.0
    reading: Optional[SensorReading] = None
    running = True

    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            ui.handle_event(event)

        now = time.time()
        if reading is None or (now - last_read) >= args.interval:
            reading = sensor.read()
            last_read = now

        if reading is not None:
            ui.draw(reading, args.alarm_ppm)

        time.sleep(0.05)

    pygame.quit()
    return 0


if __name__ == "__main__":
    sys.exit(main())

test_logic.py

#!/usr/bin/env python3

import air_quality_alarm as app


def run() -> None:
    assert app.clamp(5, 0, 10) == 5
    assert app.clamp(-1, 0, 10) == 0
    assert app.clamp(11, 0, 10) == 10

    iaq_good = app.estimate_iaq_score(40.0, 50000.0)
    iaq_bad = app.estimate_iaq_score(70.0, 10000.0)
    assert iaq_good <= iaq_bad

    eco2_good = app.estimate_eco2_ppm(iaq_good, 50000.0)
    eco2_bad = app.estimate_eco2_ppm(iaq_bad, 10000.0)
    assert eco2_good < eco2_bad

    assert app.air_quality_state(700)[0] == "GOOD"
    assert app.air_quality_state(900)[0] == "ELEVATED"
    assert app.air_quality_state(1300)[0] == "POOR"
    assert app.air_quality_state(1800)[0] == "ALARM"

    mock = app.MockBME680Adapter()
    mock.open()
    reading = mock.read()
    assert 0 <= reading.iaq_score <= 500
    assert 400 <= reading.eco2_ppm <= 2500
    assert reading.pressure_hpa > 800

    print("logic tests passed")


if __name__ == "__main__":
    run()

Guarda los archivos

cd ~/projects/touchscreen-co2-air-quality-alarm
chmod 700 .
nano air_quality_alarm.py
nano test_logic.py
chmod +x air_quality_alarm.py test_logic.py

Pasos de validación

1. Validar sintaxis

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 -m py_compile air_quality_alarm.py test_logic.py

Evidencia esperada:

  • Sin salida de py_compile

2. Validar pruebas lógicas

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 test_logic.py

Evidencia esperada:

logic tests passed

3. Validar modo simulado

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --mock --alarm-ppm 1200 --interval 1.5

Evidencia esperada:

  • Se abre una ventana de aproximadamente 800 x 480
  • Los valores se actualizan cada pocos segundos
  • El color de fondo cambia a medida que la habitación simulada empeora
  • Hacer clic en la mitad izquierda alterna los detalles
  • Hacer clic en la mitad derecha reconoce la alarma durante 60 segundos
  • Tras suficiente tiempo simulado, aparece el banner VENTILATE ROOM

4. Validar modo con sensor real

Si el sensor está en 0x76:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --alarm-ppm 1200 --interval 2.0

Si el sensor está en 0x77:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --i2c-addr 0x77 --alarm-ppm 1200 --interval 2.0

Evidencia esperada:

  • La temperatura es plausible para la habitación
  • La humedad es plausible para la habitación
  • La presión está en un rango atmosférico normal
  • La resistencia del gas es distinta de cero
  • La pantalla táctil se actualiza continuamente

5. Validación práctica en una habitación

Para validar el objetivo principal del tutorial, prueba el dispositivo en una habitación real:

  1. Coloca el dispositivo en una habitación pequeña y cerrada
  2. Deja la puerta y las ventanas cerradas durante un tiempo
  3. Observa si la tendencia del eCO2 estimado sube con el tiempo
  4. Abre una puerta o ventana
  5. Observa si la tendencia mostrada empieza a mejorar en las actualizaciones posteriores

Evidencia esperada:

  • El estado es fácil de interpretar en pantalla
  • La alarma visible aparece cuando se supera el umbral estimado configurado
  • Los cambios de ventilación producen un cambio de tendencia visible

Esto valida el prototipo como un recordatorio educativo para habitaciones basado en tendencias, no como un medidor de CO2 certificado.

Comandos de ejecución

Modo simulado:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --mock --alarm-ppm 1200 --interval 1.5

Modo con hardware real y dirección predeterminada:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --alarm-ppm 1200 --interval 2.0

Modo con hardware real y dirección alternativa:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --i2c-addr 0x77 --alarm-ppm 1200 --interval 2.0

Script lanzador opcional

cat > ~/projects/touchscreen-co2-air-quality-alarm/run.sh << 'EOF'
#!/bin/sh
set -eu
cd "$HOME/projects/touchscreen-co2-air-quality-alarm"
. .venv/bin/activate
exec python3 air_quality_alarm.py --alarm-ppm 1200 --interval 2.0
EOF
chmod +x ~/projects/touchscreen-co2-air-quality-alarm/run.sh

Resolución de problemas

El sensor no es visible en i2cdetect

Vuelve a comprobar:

  • I2C habilitado en raspi-config
  • Alimentación correcta de 3.3 V
  • SDA y SCL no están intercambiados
  • Conexión de tierra sólida
  • Dirección 0x76 frente a 0x77

Comandos útiles:

i2cdetect -y 1
python3 air_quality_alarm.py --i2c-addr 0x77

Errores de importación

Activa el entorno virtual y reinstala:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 -m pip install pygame smbus2 bme680

La interfaz no aparece en la HyperPixel

Comprueba:

  • La HyperPixel es la pantalla activa del escritorio
  • Estás ejecutando desde una sesión de escritorio local, no desde una shell SSH sin interfaz
  • Los controladores de pantalla y táctil están instalados correctamente

Los valores parecen irreales

Posibles razones:

  • Tiempo de calentamiento del sensor
  • Corrientes de aire directas o calor corporal cerca
  • Umbral demasiado agresivo para tu habitación

Prueba un umbral de alarma más alto:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --alarm-ppm 1400 --interval 2.0

Para un comportamiento más estricto:

cd ~/projects/touchscreen-co2-air-quality-alarm
. .venv/bin/activate
python3 air_quality_alarm.py --alarm-ppm 1000 --interval 2.0

Lista de comprobación final

  • [ ] Raspberry Pi 4 Model B está ejecutando Raspberry Pi OS Bookworm de 64 bits
  • [ ] Python 3.11 está instalado
  • [ ] HyperPixel 4.0 Touch muestra el escritorio y acepta entrada táctil
  • [ ] El BME680 está cableado correctamente a 3.3 V, GND, SDA y SCL
  • [ ] i2cdetect -y 1 muestra el sensor en 0x76 o 0x77
  • [ ] El entorno virtual está creado
  • [ ] Las dependencias se instalan correctamente
  • [ ] python3 -m py_compile air_quality_alarm.py test_logic.py se ejecuta correctamente
  • [ ] python3 test_logic.py imprime logic tests passed
  • [ ] El modo simulado abre la interfaz y muestra valores cambiantes
  • [ ] El toque en el lado izquierdo alterna las métricas detalladas
  • [ ] El toque en el lado derecho reconoce la alarma durante 60 segundos
  • [ ] El modo real muestra valores del sensor en vivo
  • [ ] La alarma visible aparece cuando se cruza el umbral estimado
  • [ ] Los cambios de ventilación causan un cambio de tendencia visible

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Qué modelo de Raspberry Pi se utiliza para construir el panel de calidad del aire según el texto?




Pregunta 2: ¿Qué sensor específico se menciona para medir las condiciones de la habitación?




Pregunta 3: ¿Qué pantalla se utiliza en el proyecto para mostrar las condiciones en vivo?




Pregunta 4: ¿Qué parámetros estima el sensor BME680 en tiempo real según el texto?




Pregunta 5: ¿Cuándo muestra la interfaz una alarma clara en pantalla?




Pregunta 6: ¿Cuál es un ejemplo de umbral configurable para la alarma de eCO2 mencionado en el texto?




Pregunta 7: ¿Para qué caso de uso se menciona un recordatorio de ventilación?




Pregunta 8: ¿Qué permite detectar el indicador en un taller o sala de hobbies?




Pregunta 9: ¿Cómo se muestra el estado en la pantalla para las salas de reuniones?




Pregunta 10: ¿Qué característica tiene la interfaz que se ejecuta localmente?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: monitor de nivel con Raspberry Pi 5

Caso práctico: monitor de nivel con Raspberry Pi 5 — hero

Objetivo y caso de uso

Qué construirás: Una aplicación para Raspberry Pi 5 que lee la distancia a la superficie del líquido desde un sensor ultrasónico HC-SR04, la convierte en altura del tanque y porcentaje de llenado, y representa los valores en una pantalla TFT SPI ILI9341. La interfaz se actualiza localmente casi en tiempo real, normalmente a unos 2–5 FPS con una latencia de actualización inferior a un segundo para una supervisión estable del tanque.

Por qué es importante / Casos de uso

  • Supervisar un tanque de agua de lluvia sin abrir la tapa, con lecturas en vivo como 32.4 cm de distancia, 87.6 cm de altura del líquido y 73% de llenado.
  • Comprobar un depósito de agua de servicio no peligrosa y mostrar etiquetas de estado claras como LOW (<25%), OK (25–80%) y HIGH (>80%).
  • Practicar tareas reales de ingeniería embebida: calibrar offsets de tanque vacío/lleno, filtrar lecturas ultrasónicas ruidosas y diseñar un panel TFT legible y de baja latencia.
  • Construir un sistema de visualización local ligero que normalmente usa solo una pequeña fracción de los recursos de la Raspberry Pi 5, a menudo por debajo del 10% de GPU para actualizaciones simples de interfaz impulsadas por SPI.

Resultado esperado

  • Distancia en vivo hasta la superficie del líquido en cm.
  • Altura estimada del líquido en cm basada en la geometría del tanque y en valores de calibración.
  • Porcentaje de llenado calculado que aumenta a medida que la distancia medida disminuye.
  • Etiquetas de estado en pantalla como LOW, OK y HIGH.
  • Actualizaciones visuales estables en hardware, donde las lecturas repetidas contra un objetivo plano fijo varían solo ligeramente entre actualizaciones.
  • Código Python listo para validación donde python3.11 -m py_compile se ejecuta correctamente para todos los archivos fuente y las pruebas en seco confirman el comportamiento inverso correcto entre distancia y llenado.

Público: makers de Raspberry Pi, estudiantes y aficionados que construyen paneles prácticos con sensores; Nivel: Python embebido de principiante a intermedio

Arquitectura/flujo: el HC-SR04 mide la distancia del eco → Python en Raspberry Pi 5 aplica calibración y filtrado/promediado simple → la aplicación convierte la distancia en altura del líquido y % de llenado → la pantalla TFT SPI ILI9341 representa valores numéricos y etiquetas de estado con latencia de actualización inferior a un segundo.

Nota educativa de validación

Antes de publicar este caso, el contenido pasó la puerta automática de validación de Prometeo con estado PASS. El validador comprobó los bloques de código, la estructura del artículo, los comandos copiables y la coherencia con el catálogo de dispositivos soportados.

Evidencia de validación publicada

  • Resultado automático: PASS.
  • Estructura parseada: 26 apartados, 1 tablas y 12 bloques de código detectados en el contenido publicado.
  • Código comprobado: 2 Python/py_compile, 9 Bash/copy-paste checks.
  • Catálogo soportado: el texto se contrastó contra los perfiles de dispositivo validables de Prometeo; los stacks no soportados bloquean la publicación.
  • Hallazgos del informe: sin hallazgos bloqueantes.

Esta validación confirma compatibilidad sintáctica y de herramientas para el material publicado, pero no sustituye la prueba física sobre tu hardware, cableado y entorno exactos.

Nota educativa de seguridad

Nota de seguridad: Este proyecto es solo para supervisión educativa de líquidos no peligrosos.

  • El pin ECHO del HC-SR04 es de 5 V y no debe conectarse directamente al GPIO de la Raspberry Pi.
  • No uses esto como la única protección contra desbordamiento, prevención de funcionamiento en seco de bombas o cualquier sistema de control crítico para la seguridad.
  • No lo uses con combustible, productos químicos, líquidos calientes, recipientes presurizados o líquidos corrosivos.
  • Mantén la Raspberry Pi y la electrónica de la pantalla protegidas del agua y la condensación.

Diagrama de bloques conceptual

Vista de alto nivel: qué entra, qué procesa cada bloque y qué sale del sistema.

Arquitectura funcional

el HC-SR04 mide la distancia del eco

Python en Raspberry Pi 5 aplica calibraci…

la aplicación convierte la distancia en a…

la pantalla TFT SPI ILI9341 representa va…

Flujo conceptual de señales y responsabilidades entre bloques del dispositivo.

Requisitos previos

  • Raspberry Pi 5
  • Raspberry Pi OS Bookworm de 64 bits
  • Python 3.11
  • Habilidades básicas de cableado GPIO
  • SPI habilitado en la Raspberry Pi

Instala el software requerido en la Raspberry Pi:

sudo apt update
sudo apt install -y python3.11 python3.11-venv python3-pip python3-dev
sudo raspi-config nonint do_spi 0
python3.11 -m venv "${HOME}/venvs/tankmon"
. "${HOME}/venvs/tankmon/bin/activate"
python3 -m pip install --upgrade pip
python3 -m pip install gpiozero lgpio pillow spidev

Materiales

Elemento Tipo sugerido Propósito
Controlador Raspberry Pi 5 Ordenador principal
Sensor ultrasónico HC-SR04 Medición de distancia
Pantalla TFT SPI ILI9341 Pantalla local
Protección Divisor resistivo de 1 kOhm + 2 kOhm Reducir ECHO de 5 V a unos 3.3 V
Protoboard Media tamaño o mayor Prototipado
Cables puente Juego adecuado Cableado
Fuente de alimentación PSU oficial para Raspberry Pi 5 Alimentación estable

Cableado

HC-SR04

  • VCC -> 5V de Raspberry Pi
  • GND -> GND de Raspberry Pi
  • TRIG -> GPIO23
  • ECHO -> entrada del divisor resistivo
  • Salida del divisor -> GPIO24

Divisor resistivo de ECHO

  • ECHO del HC-SR04 -> resistor de 1 kOhm -> ECHO_SAFE
  • ECHO_SAFE -> resistor de 2 kOhm -> GND
  • GPIO24 -> ECHO_SAFE

TFT SPI ILI9341

  • VCC -> sigue la documentación de tu módulo
  • GND -> GND
  • CS -> CE0 / GPIO8
  • RST -> GPIO25
  • DC -> GPIO18
  • MOSI -> GPIO10
  • SCK -> GPIO11
  • LED -> sigue la documentación de tu módulo

Código

Guarda lo siguiente como tank_monitor.py:

#!/usr/bin/env python3
from __future__ import annotations

import argparse
import random
import statistics
import time
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class TankConfig:
    tank_depth_cm: float = 80.0
    sensor_offset_cm: float = 0.0
    min_valid_distance_cm: float = 3.0
    max_valid_distance_cm: float = 250.0
    low_percent_threshold: float = 20.0
    high_percent_threshold: float = 85.0
    sample_count: int = 5
    sample_interval_s: float = 0.08
    loop_interval_s: float = 1.0


@dataclass
class Measurement:
    raw_distance_cm: Optional[float]
    filtered_distance_cm: Optional[float]
    liquid_height_cm: Optional[float]
    fill_percent: Optional[float]
    status: str
    valid: bool
    timestamp: float = field(default_factory=time.time)


class UltrasonicAdapter:
    def read_distance_cm(self) -> float:
        raise NotImplementedError


class MockUltrasonicAdapter(UltrasonicAdapter):
    def __init__(self, start_distance_cm: float = 50.0, noise_cm: float = 0.8):
        self.distance_cm = start_distance_cm
        self.noise_cm = noise_cm
        self.direction = -1

    def read_distance_cm(self) -> float:
        self.distance_cm += self.direction * 0.4
        if self.distance_cm < 15.0:
            self.direction = 1
        elif self.distance_cm > 70.0:
            self.direction = -1
        noisy = self.distance_cm + random.uniform(-self.noise_cm, self.noise_cm)
        return max(2.0, noisy)


class RpiHcsr04Adapter(UltrasonicAdapter):
    def __init__(self, trig_pin: int, echo_pin: int):
        from gpiozero import DistanceSensor

        self.sensor = DistanceSensor(
            echo=echo_pin,
            trigger=trig_pin,
            max_distance=3.0,
            partial=False,
        )

    def read_distance_cm(self) -> float:
        return self.sensor.distance * 100.0


class DisplayAdapter:
    def show(self, measurement: Measurement, cfg: TankConfig) -> None:
        raise NotImplementedError


class ConsoleDisplayAdapter(DisplayAdapter):
    def show(self, measurement: Measurement, cfg: TankConfig) -> None:
        ts = time.strftime("%H:%M:%S", time.localtime(measurement.timestamp))
        print(
            f"[{ts}] valid={measurement.valid} "
            f"raw={measurement.raw_distance_cm!s} "
            f"filtered={measurement.filtered_distance_cm!s} "
            f"height={measurement.liquid_height_cm!s} "
            f"fill={measurement.fill_percent!s} "
            f"status={measurement.status}"
        )


class Ili9341DisplayAdapter(DisplayAdapter):
    def __init__(self, width: int = 320, height: int = 240):
        from PIL import Image, ImageDraw, ImageFont
        import spidev  # noqa: F401

        self.Image = Image
        self.ImageDraw = ImageDraw
        self.ImageFont = ImageFont
        self.width = width
        self.height = height
        self.font_large = ImageFont.load_default()
        self.font_small = ImageFont.load_default()

    def _render_to_image(self, measurement: Measurement, cfg: TankConfig):
        img = self.Image.new("RGB", (self.width, self.height), (0, 0, 0))
        draw = self.ImageDraw.Draw(img)

        if not measurement.valid:
            color = (255, 80, 80)
        elif measurement.status == "LOW":
            color = (255, 180, 0)
        elif measurement.status == "HIGH":
            color = (80, 220, 120)
        else:
            color = (80, 180, 255)

        draw.rectangle(
            (0, 0, self.width - 1, self.height - 1),
            outline=(100, 100, 100),
        )
        draw.text(
            (10, 10),
            "Ultrasonic Tank Level",
            fill=(255, 255, 255),
            font=self.font_large,
        )
        draw.text(
            (10, 40),
            f"Status: {measurement.status}",
            fill=color,
            font=self.font_large,
        )

        distance_text = (
            "--"
            if measurement.filtered_distance_cm is None
            else f"{measurement.filtered_distance_cm:.1f} cm"
        )
        height_text = (
            "--"
            if measurement.liquid_height_cm is None
            else f"{measurement.liquid_height_cm:.1f} cm"
        )
        fill_text = (
            "--"
            if measurement.fill_percent is None
            else f"{measurement.fill_percent:.1f} %"
        )

        draw.text(
            (10, 80),
            f"Distance: {distance_text}",
            fill=(255, 255, 255),
            font=self.font_small,
        )
        draw.text(
            (10, 110),
            f"Height:   {height_text}",
            fill=(255, 255, 255),
            font=self.font_small,
        )
        draw.text(
            (10, 140),
            f"Fill:     {fill_text}",
            fill=(255, 255, 255),
            font=self.font_small,
        )
        draw.text(
            (10, 170),
            f"Tank depth: {cfg.tank_depth_cm:.1f} cm",
            fill=(180, 180, 180),
            font=self.font_small,
        )

        bar_x0, bar_y0 = 250, 30
        bar_x1, bar_y1 = 290, 210
        draw.rectangle(
            (bar_x0, bar_y0, bar_x1, bar_y1),
            outline=(200, 200, 200),
            fill=(20, 20, 20),
        )

        if measurement.fill_percent is not None:
            bounded = max(0.0, min(100.0, measurement.fill_percent))
            fill_height = int((bar_y1 - bar_y0 - 4) * bounded / 100.0)
            draw.rectangle(
                (bar_x0 + 2, bar_y1 - 2 - fill_height, bar_x1 - 2, bar_y1 - 2),
                fill=color,
            )

        return img

    def show(self, measurement: Measurement, cfg: TankConfig) -> None:
        img = self._render_to_image(measurement, cfg)
        img.save("/tmp/tank_monitor_preview.png")


class TankLevelMonitor:
    def __init__(
        self,
        sensor: UltrasonicAdapter,
        display: DisplayAdapter,
        cfg: TankConfig,
    ):
        self.sensor = sensor
        self.display = display
        self.cfg = cfg

    def sample_distance(self) -> Optional[float]:
        samples: list[float] = []
        for _ in range(self.cfg.sample_count):
            try:
                distance_cm = self.sensor.read_distance_cm()
                if (
                    self.cfg.min_valid_distance_cm
                    <= distance_cm
                    <= self.cfg.max_valid_distance_cm
                ):
                    samples.append(distance_cm)
            except Exception:
                pass
            time.sleep(self.cfg.sample_interval_s)

        if not samples:
            return None

        if len(samples) >= 3:
            median_value = statistics.median(samples)
            filtered = [x for x in samples if abs(x - median_value) < 5.0]
            if filtered:
                samples = filtered

        return statistics.mean(samples)

    def compute_measurement(self) -> Measurement:
        distance_cm = self.sample_distance()
        if distance_cm is None:
            return Measurement(
                raw_distance_cm=None,
                filtered_distance_cm=None,
                liquid_height_cm=None,
                fill_percent=None,
                status="NO ECHO",
                valid=False,
            )

        adjusted_distance = distance_cm - self.cfg.sensor_offset_cm
        liquid_height = self.cfg.tank_depth_cm - adjusted_distance
        liquid_height = max(0.0, min(self.cfg.tank_depth_cm, liquid_height))

        if self.cfg.tank_depth_cm <= 0:
            return Measurement(
                raw_distance_cm=distance_cm,
                filtered_distance_cm=adjusted_distance,
                liquid_height_cm=None,
                fill_percent=None,
                status="CONFIG ERROR",
                valid=False,
            )

        fill_percent = (liquid_height / self.cfg.tank_depth_cm) * 100.0

        if fill_percent < self.cfg.low_percent_threshold:
            status = "LOW"
        elif fill_percent >= self.cfg.high_percent_threshold:
            status = "HIGH"
        else:
            status = "OK"

        return Measurement(
            raw_distance_cm=distance_cm,
            filtered_distance_cm=adjusted_distance,
            liquid_height_cm=liquid_height,
            fill_percent=fill_percent,
            status=status,
            valid=True,
        )

    def run_forever(self) -> None:
        while True:
            measurement = self.compute_measurement()
            self.display.show(measurement, self.cfg)
            time.sleep(self.cfg.loop_interval_s)


def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Ultrasonic tank level monitor")
    parser.add_argument("--mode", choices=["mock", "hardware"], default="mock")
    parser.add_argument("--tank-depth-cm", type=float, default=80.0)
    parser.add_argument("--sensor-offset-cm", type=float, default=0.0)
    parser.add_argument("--trig-pin", type=int, default=23)
    parser.add_argument("--echo-pin", type=int, default=24)
    parser.add_argument("--display", choices=["console", "ili9341"], default="console")
    return parser


def main() -> int:
    args = build_arg_parser().parse_args()

    cfg = TankConfig(
        tank_depth_cm=args.tank_depth_cm,
        sensor_offset_cm=args.sensor_offset_cm,
    )

    if args.mode == "mock":
        sensor: UltrasonicAdapter = MockUltrasonicAdapter()
    else:
        sensor = RpiHcsr04Adapter(trig_pin=args.trig_pin, echo_pin=args.echo_pin)

    if args.display == "console":
        display: DisplayAdapter = ConsoleDisplayAdapter()
    else:
        display = Ili9341DisplayAdapter()

    app = TankLevelMonitor(sensor=sensor, display=display, cfg=cfg)
    app.run_forever()
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

Guarda lo siguiente como test_dry_run.py:

#!/usr/bin/env python3
from tank_monitor import (
    ConsoleDisplayAdapter,
    TankConfig,
    TankLevelMonitor,
    UltrasonicAdapter,
)


class FixedSensor(UltrasonicAdapter):
    def __init__(self, value_cm: float):
        self.value_cm = value_cm

    def read_distance_cm(self) -> float:
        return self.value_cm


def run_case(distance_cm: float, tank_depth_cm: float) -> None:
    cfg = TankConfig(
        tank_depth_cm=tank_depth_cm,
        sample_count=3,
        sample_interval_s=0.0,
        loop_interval_s=0.0,
    )
    sensor = FixedSensor(distance_cm)
    display = ConsoleDisplayAdapter()
    monitor = TankLevelMonitor(sensor, display, cfg)
    measurement = monitor.compute_measurement()
    print(
        f"distance={distance_cm:.1f} cm, "
        f"height={measurement.liquid_height_cm:.1f} cm, "
        f"fill={measurement.fill_percent:.1f} %, "
        f"status={measurement.status}, valid={measurement.valid}"
    )


if __name__ == "__main__":
    run_case(70.0, 80.0)
    run_case(40.0, 80.0)
    run_case(8.0, 80.0)

Compilar y ejecutar

Crea una carpeta de proyecto:

mkdir -p "${HOME}/projects/ultrasonic-tank-level-monitor"
cd "${HOME}/projects/ultrasonic-tank-level-monitor"

Comprueba la sintaxis:

python3.11 -m py_compile tank_monitor.py test_dry_run.py

Ejecuta la validación en seco:

python3.11 test_dry_run.py
python3.11 tank_monitor.py --mode mock --display console --tank-depth-cm 80

Activa el entorno virtual y prueba en hardware Raspberry Pi:

. "${HOME}/venvs/tankmon/bin/activate"
python3.11 tank_monitor.py --mode hardware --display console --tank-depth-cm 80

Ejecuta la ruta de salida de vista previa para el adaptador de pantalla:

python3.11 tank_monitor.py --mode mock --display ili9341 --tank-depth-cm 80
ls -l /tmp/tank_monitor_preview.png

Servicio systemd opcional

Guarda como /etc/systemd/system/tank-monitor.service:

[Unit]
Description=Ultrasonic Tank Level Monitor
After=multi-user.target

[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi/projects/ultrasonic-tank-level-monitor
ExecStart=/home/pi/venvs/tankmon/bin/python3.11 /home/pi/projects/ultrasonic-tank-level-monitor/tank_monitor.py --mode hardware --display console --tank-depth-cm 80
Restart=on-failure

[Install]
WantedBy=multi-user.target

Pasos de validación

1. Validación de software sin hardware

Ejecuta:

python3.11 -m py_compile tank_monitor.py test_dry_run.py
python3.11 test_dry_run.py

Evidencia esperada:

  • El comando de compilación no imprime nada
  • distance=70 cm da un llenado menor que distance=40 cm
  • distance=8 cm da un resultado cercano a lleno

2. Actualizaciones en vivo simuladas

Ejecuta:

python3.11 tank_monitor.py --mode mock --display console --tank-depth-cm 80

Evidencia esperada:

  • Los valores de distancia cambian lentamente
  • El porcentaje de llenado cambia de forma inversa a la distancia
  • El estado cambia entre LOW, OK y HIGH

3. Prueba de banco del HC-SR04

Apunta el sensor a un objetivo plano a una distancia conocida y ejecuta:

python3.11 tank_monitor.py --mode hardware --display console --tank-depth-cm 80

Evidencia esperada:

  • La distancia informada es cercana a la distancia física
  • Mover el objetivo más cerca reduce la distancia mostrada
  • Las lecturas repetidas en una posición fija del objetivo varían solo ligeramente

4. Validación del tanque

Mide la profundidad útil real desde el sensor hasta el fondo y establece --tank-depth-cm con ese valor.

Luego compara la distancia mostrada con mediciones manuales en:

  • Condición casi vacía
  • Condición de nivel medio o alto

Evidencia esperada:

  • El porcentaje aumenta a medida que sube el líquido
  • Los umbrales LOW y HIGH se activan en los puntos previstos

Solución de problemas

NO ECHO

Posibles causas:

  • Cableado incorrecto del divisor de ECHO
  • Números GPIO incorrectos
  • Mala conexión a tierra
  • Objetivo demasiado cerca o mal orientado

Lecturas inestables

Posibles causas:

  • Superficie del líquido turbulenta
  • Montaje suelto
  • Cables puente largos
  • Reflexiones en las paredes dentro del tanque

TFT en blanco

Posibles causas:

  • SPI no habilitado
  • Nivel de alimentación incorrecto del módulo
  • Cableado incorrecto de CS, DC o RST
  • Tu módulo específico necesita una pila de controladores diferente

Notas sobre la ruta de pantalla

El adaptador ILI9341 incluido genera una imagen PIL y la guarda en /tmp/tank_monitor_preview.png. Esto mantiene el ejemplo ejecutable y comprobable mientras preserva la estructura del proyecto para un monitor basado en Raspberry Pi 5, HC-SR04 e ILI9341.

Para un despliegue final en hardware, sustituye la línea img.save("/tmp/tank_monitor_preview.png") por la llamada de escritura requerida por la biblioteca ILI9341 exacta y el módulo que estés usando.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Qué placa base se utiliza para construir la aplicación según el artículo?




Pregunta 2: ¿Qué modelo de sensor ultrasónico se menciona para leer la distancia?




Pregunta 3: ¿Qué tipo de pantalla se utiliza para representar los valores del tanque?




Pregunta 4: ¿Cuál es la tasa de actualización típica (FPS) de la interfaz mencionada en el texto?




Pregunta 5: Según el artículo, ¿qué etiqueta de estado se muestra si el porcentaje de llenado es menor al 25%?




Pregunta 6: ¿Qué rango de porcentaje de llenado corresponde a la etiqueta 'OK'?




Pregunta 7: ¿Qué ocurre con el porcentaje de llenado calculado a medida que disminuye la distancia medida por el sensor?




Pregunta 8: ¿Qué porcentaje aproximado de recursos de GPU suele utilizar este sistema de visualización en la Raspberry Pi 5?




Pregunta 9: ¿En qué unidad de medida se muestra la distancia en vivo hasta la superficie del líquido?




Pregunta 10: ¿Qué tarea de ingeniería embebida se menciona como práctica en este proyecto?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: control NFC con Raspberry Pi 4

Caso práctico: control NFC con Raspberry Pi 4 — hero

Objetivo y caso de uso

Qué construirás: Un prototipo con Raspberry Pi 4 Model B que lee tarjetas NFC mediante un HAT PN532, comprueba cada UID contra una lista de permitidos, marca con fecha y hora cada intento usando un RTC DS3231 y cambia a un estado de alarma cuando se detecta una etiqueta no autorizada. El sistema está diseñado para tomar decisiones locales rápidas, con un manejo típico de lectura de tarjeta en menos de 200 ms y baja carga en reposo sobre la Pi.

Por qué importa / Casos de uso

  • Control de acceso a un armario de taller o laboratorio pequeño: Monta el lector en un armario de herramientas, cajón de electrónica o taquilla de proyectos para que solo las tarjetas aprobadas de estudiantes o personal permitan el acceso.
  • Alarma educativa de entrada para un rincón de makerspace: Las tarjetas desconocidas pueden activar una marca de alarma por software casi en tiempo real, adecuada para conectarla más adelante a un zumbador, relé, baliza LED o notificador por webhook.
  • Registro fiable de eventos incluso sin internet: El DS3231 mantiene la hora exacta sin conexión, de modo que los intentos denegados y permitidos siguen teniendo marcas de tiempo útiles durante caídas de Wi‑Fi o en funcionamiento de laboratorio aislado.
  • Rastro de auditoría simple para equipos compartidos: Guarda los eventos en CSV o JSON con UID de tarjeta, marca de tiempo y resultado para revisar quién intentó acceder y cuándo.
  • Prototipo edge de baja sobrecarga: Esto funciona cómodamente en una Raspberry Pi 4 con demanda mínima de CPU y un uso de GPU efectivamente del 0 %, lo que lo hace práctico para monitorización permanente.

Resultado esperado

  • Un verificador de acceso NFC funcional que clasifica las etiquetas presentadas como autorizadas o no autorizadas.
  • Registros precisos respaldados por RTC para cada escaneo, incluidas sesiones sin conexión y reinicios.
  • Un estado de alarma por software que se activa ante accesos denegados y que puede ampliarse a salidas físicas.
  • Un tiempo de respuesta extremo a extremo de referencia de unos 100-200 ms por escaneo, según el intervalo de sondeo y las escrituras de almacenamiento.
  • Un proyecto inicial reutilizable para cerraduras de armarios, activos de laboratorio, puntos de control de asistencia o demostraciones de alerta de entrada.

Público: Estudiantes, makers y desarrolladores principiantes de embebidos/Linux que construyen demostraciones de control de acceso; Nivel: Principiante a intermedio

Arquitectura/flujo: PN532 lee el UID NFC → la aplicación de Raspberry Pi comprueba la lista local de permitidos → DS3231 proporciona la marca de tiempo → el evento se escribe en un registro CSV/JSON → un escaneo autorizado marca acceso concedido, un escaneo no autorizado activa el estado de alarma.

Nota educativa de validación

Antes de publicar este caso, el contenido pasó la puerta automática de validación de Prometeo con estado PASS. El validador comprobó los bloques de código, la estructura del artículo, los comandos copiables y la coherencia con el catálogo de dispositivos soportados.

Evidencia de validación publicada

  • Resultado automático: PASS.
  • Estructura parseada: 48 apartados, 1 tablas y 29 bloques de código detectados en el contenido publicado.
  • Código comprobado: 2 Python/py_compile, 23 Bash/copy-paste checks.
  • Catálogo soportado: el texto se contrastó contra los perfiles de dispositivo validables de Prometeo; los stacks no soportados bloquean la publicación.
  • Hallazgos del informe: sin hallazgos bloqueantes.

Esta validación confirma compatibilidad sintáctica y de herramientas para el material publicado, pero no sustituye la prueba física sobre tu hardware, cableado y entorno exactos.

Nota educativa de seguridad

Este proyecto es un prototipo educativo de control de acceso, no un sistema de seguridad certificado. Ten en cuenta estos límites:

  • No confíes en él como única protección para bienes valiosos, infraestructura crítica o seguridad personal.
  • El tutorial se centra en aprender interfaces, registro y lógica de acceso.
  • No conectes tensión de red directamente a la Raspberry Pi ni al cableado de protoboard.
  • Si más adelante añades una sirena, cerradura o relé, utiliza solo módulos de baja tensión con aislamiento adecuado y sigue la documentación del módulo.
  • No asumas que la salida de alarma de este tutorial puede accionar directamente una cerradura o sirena real.
  • El tutorial actual usa un estado de alarma por software y una indicación en consola.
  • Protege la Raspberry Pi frente a errores de cableado.
  • Usa periféricos compatibles con 3.3 V y comprueba dos veces las etiquetas de los pines antes de encender.
  • El mecanismo NFC mostrado aquí no está reforzado contra clonación, repetición, manipulación o bypass físico.
  • Trátalo como una plataforma de enseñanza, no como un sistema comercial de acceso seguro.
  • Si lo instalas cerca de una puerta real, asegúrate de que siempre haya una anulación manual segura y un uso legal.
  • Nunca crees una configuración que pueda atrapar a personas o bloquear rutas de salida de emergencia.

Diagrama de bloques conceptual

Vista de alto nivel: qué entra, qué procesa cada bloque y qué sale del sistema.

Arquitectura funcional

PN532 lee el UID NFC

la aplicación de Raspberry Pi comprueba l…

DS3231 proporciona la marca de tiempo

el evento se escribe en un registro CSV/JSON

un escaneo autorizado marca acceso conced…

Flujo conceptual de señales y responsabilidades entre bloques del dispositivo.

Requisitos previos

Antes de comenzar, prepara la Raspberry Pi y el entorno básico de software.

  1. Requisitos previos de hardware
  2. Raspberry Pi 4 Model B
  3. Tarjeta MicroSD con Raspberry Pi OS Bookworm 64-bit
  4. HAT NFC PN532
  5. Módulo RTC DS3231 o RTC integrado en el HAT expuesto por I2C
  6. Fuente de alimentación estable de 5 V para Raspberry Pi
  7. Tarjeta o etiqueta NFC para pruebas

  8. Requisitos previos de software

  9. Raspberry Pi OS Bookworm 64-bit
  10. Python 3.11
  11. Acceso a terminal en la Pi
  12. I2C y SPI habilitados en la configuración de Raspberry Pi

  13. Habilidades asumidas

  14. Editar archivos con nano u otro editor de texto
  15. Ejecutar comandos en una shell
  16. Leer con cuidado las etiquetas de los pines GPIO

Comprueba la versión de Python:

python3 --version

El resultado esperado en Bookworm debería ser similar a:

Python 3.11.x

Materiales

Usa el modelo exacto de hardware solicitado.

Elemento Modelo exacto / requisito Propósito
Placa principal Raspberry Pi 4 Model B Ejecuta el software de control de acceso
Lector NFC HAT NFC PN532 Lee tarjetas/etiquetas NFC
Reloj en tiempo real RTC DS3231 Proporciona marcas de tiempo estables
Alimentación Fuente oficial o de buena calidad de 5 V para Raspberry Pi 4 Funcionamiento estable
Almacenamiento Tarjeta MicroSD con Raspberry Pi OS Bookworm 64-bit Sistema operativo y registros
Medio de prueba Al menos 1 etiqueta NFC autorizada y 1 etiqueta NFC no autorizada Validación
Salida opcional Pequeño zumbador activo o LED mediante interfaz segura de baja tensión Indicador físico de alarma más adelante

Configuración/Conexión

Este proyecto evita un diagrama de circuito y usa solo indicaciones de conexión en texto.

Estrategia de conexión

El prototipo usa:
SPI para el HAT NFC PN532
I2C para el RTC DS3231

Muchas placas HAT PN532 pueden configurarse para SPI, I2C o UART usando interruptores/jumpers. Para este tutorial:
– Configura el HAT PN532 en modo SPI
– Mantén el DS3231 en I2C

Pasos para habilitar interfaces en Raspberry Pi

Ejecuta:

sudo raspi-config

Después:
1. Ve a Interface Options
2. Habilita SPI
3. Habilita I2C
4. Finaliza y reinicia

Después del reinicio, verifica:

ls /dev/spidev*
ls /dev/i2c-*

Deberías ver dispositivos similares a:
/dev/spidev0.0
/dev/i2c-1

Notas de conexión basadas en texto

HAT NFC PN532

Si tu HAT PN532 se apila directamente sobre el encabezado de la Raspberry Pi, los pines SPI ya están encaminados a través del encabezado. Si usas cables en lugar de un apilado HAT directo, conecta estas señales:

  • PN532 VCC -> Raspberry Pi 3.3 V
  • PN532 GND -> Raspberry Pi GND
  • PN532 SCK -> Raspberry Pi SPI SCLK
  • PN532 MISO -> Raspberry Pi SPI MISO
  • PN532 MOSI -> Raspberry Pi SPI MOSI
  • PN532 SS/CS -> Raspberry Pi SPI CE0
  • PN532 RSTO or RSTPDN -> GPIO opcional si tu placa lo requiere; de lo contrario, déjalo según el diseño del HAT
  • Selector de modo PN532 -> SPI

RTC DS3231

Si el RTC es un módulo independiente:
– DS3231 VCC -> Raspberry Pi 3.3 V
– DS3231 GND -> Raspberry Pi GND
– DS3231 SDA -> Raspberry Pi GPIO2 / SDA1
– DS3231 SCL -> Raspberry Pi GPIO3 / SCL1

Comprobaciones de detección de buses

Instala herramientas comunes:

sudo apt update
sudo apt install -y i2c-tools python3-pip

Comprueba los dispositivos I2C:

sudo i2cdetect -y 1

Un DS3231 suele aparecer alrededor de la dirección 0x68.

Para SPI, no hay una sonda única equivalente tan sencilla como i2cdetect, pero la existencia de /dev/spidev0.0 confirma que la interfaz SPI está habilitada.

Directorio del proyecto

Crea un directorio de trabajo limpio:

mkdir -p ~/nfc-door-access-alarm
cd ~/nfc-door-access-alarm

Código validado

El código siguiente está diseñado para cumplir dos objetivos importantes:
1. Ser útil en la Raspberry Pi real con clases adaptadoras de hardware.
2. Poder ejecutarse en modo dry-run/mock en un ordenador normal sin hardware NFC ni RTC.

Esto coincide con el estilo de validación solicitado para Raspberry Pi.

access_controller.py

Vista pública parcial del archivo validado. El código completo se muestra a miembros y en PDF/Print.

#!/usr/bin/env python3
from __future__ import annotations

import argparse
import csv
import json
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List, Optional


@dataclass
class AccessEvent:
    timestamp: str
    uid: str
    authorized: bool
    source: str
    alarm_active: bool


class RTCAdapter:
    def now_iso(self) -> str:
        raise NotImplementedError


class SystemRTC(RTCAdapter):
    def now_iso(self) -> str:
        return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")


class MockDS3231RTC(RTCAdapter):
    def __init__(self, fixed_time: Optional[str] = None) -> None:
        self.fixed_time = fixed_time

    def now_iso(self) -> str:
        if self.fixed_time:
            return self.fixed_time
        return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")


class NFCReaderAdapter:
    def poll_uid(self) -> Optional[str]:
        raise NotImplementedError


class MockPN532Reader(NFCReaderAdapter):
    def __init__(self, sequence: Iterable[str], repeat: bool = False) -> None:
        self._sequence = list(sequence)
        self._repeat = repeat
        self._index = 0

    def poll_uid(self) -> Optional[str]:
        if not self._sequence:
            return None
        if self._index >= len(self._sequence):
            if self._repeat:
                self._index = 0
            else:
                return None
        uid = self._sequence[self._index]
        self._index += 1
        time.sleep(0.2)
        return uid


class AlarmAdapter:
    def set_alarm(self, active: bool) -> None:
        raise NotImplementedError


class ConsoleAlarm(AlarmAdapter):
    def __init__(self) -> None:
        self.state = False

    def set_alarm(self, active: bool) -> None:
        if active != self.state:
            self.state = active
            print(f"[ALARM] state={'ON' if active else 'OFF'}")


class AccessController:
    def __init__(
        self,
        rtc: RTCAdapter,
        reader: NFCReaderAdapter,
        alarm: AlarmAdapter,
        allowed_uids: List[str],
        log_path: Path,
    ) -> None:
        self.rtc = rtc
        self.reader = reader
        self.alarm = alarm
        self.allowed_uids = {uid.strip().upper() for uid in allowed_uids if uid.strip()}
        self.log_path = log_path
        self.alarm_active = False
# ... continúa para miembros en el código completo validado ...

🔒 Parte del código validado es premium. Con el pase de 7 días o la suscripción mensual podrás consultar el archivo completo validado.

#!/usr/bin/env python3
from __future__ import annotations

import argparse
import csv
import json
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List, Optional


@dataclass
class AccessEvent:
    timestamp: str
    uid: str
    authorized: bool
    source: str
    alarm_active: bool


class RTCAdapter:
    def now_iso(self) -> str:
        raise NotImplementedError


class SystemRTC(RTCAdapter):
    def now_iso(self) -> str:
        return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")


class MockDS3231RTC(RTCAdapter):
    def __init__(self, fixed_time: Optional[str] = None) -> None:
        self.fixed_time = fixed_time

    def now_iso(self) -> str:
        if self.fixed_time:
            return self.fixed_time
        return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")


class NFCReaderAdapter:
    def poll_uid(self) -> Optional[str]:
        raise NotImplementedError


class MockPN532Reader(NFCReaderAdapter):
    def __init__(self, sequence: Iterable[str], repeat: bool = False) -> None:
        self._sequence = list(sequence)
        self._repeat = repeat
        self._index = 0

    def poll_uid(self) -> Optional[str]:
        if not self._sequence:
            return None
        if self._index >= len(self._sequence):
            if self._repeat:
                self._index = 0
            else:
                return None
        uid = self._sequence[self._index]
        self._index += 1
        time.sleep(0.2)
        return uid


class AlarmAdapter:
    def set_alarm(self, active: bool) -> None:
        raise NotImplementedError


class ConsoleAlarm(AlarmAdapter):
    def __init__(self) -> None:
        self.state = False

    def set_alarm(self, active: bool) -> None:
        if active != self.state:
            self.state = active
            print(f"[ALARM] state={'ON' if active else 'OFF'}")


class AccessController:
    def __init__(
        self,
        rtc: RTCAdapter,
        reader: NFCReaderAdapter,
        alarm: AlarmAdapter,
        allowed_uids: List[str],
        log_path: Path,
    ) -> None:
        self.rtc = rtc
        self.reader = reader
        self.alarm = alarm
        self.allowed_uids = {uid.strip().upper() for uid in allowed_uids if uid.strip()}
        self.log_path = log_path
        self.alarm_active = False

    def handle_uid(self, uid: str, source: str = "nfc") -> AccessEvent:
        normalized = uid.strip().upper()
        authorized = normalized in self.allowed_uids
        self.alarm_active = not authorized
        self.alarm.set_alarm(self.alarm_active)

        event = AccessEvent(
            timestamp=self.rtc.now_iso(),
            uid=normalized,
            authorized=authorized,
            source=source,
            alarm_active=self.alarm_active,
        )
        self._append_log(event)
        if authorized:
            print(f"{event.timestamp} ACCESS GRANTED uid={event.uid}")
        else:
            print(f"{event.timestamp} ACCESS DENIED uid={event.uid}")
        return event

    def _append_log(self, event: AccessEvent) -> None:
        file_exists = self.log_path.exists()
        with self.log_path.open("a", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            if not file_exists:
                writer.writerow(["timestamp", "uid", "authorized", "source", "alarm_active"])
            writer.writerow([
                event.timestamp,
                event.uid,
                int(event.authorized),
                event.source,
                int(event.alarm_active),
            ])

    def run(self, max_reads: int = 0, poll_delay: float = 0.5) -> int:
        reads = 0
        while True:
            uid = self.reader.poll_uid()
            if uid:
                self.handle_uid(uid)
                reads += 1
            else:
                time.sleep(poll_delay)

            if max_reads > 0 and reads >= max_reads:
                break
        return 0


def load_allowed_uids(path: Path) -> List[str]:
    with path.open("r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, dict) or "allowed_uids" not in data:
        raise ValueError("allowlist file must contain a JSON object with key 'allowed_uids'")
    items = data["allowed_uids"]
    if not isinstance(items, list):
        raise ValueError("'allowed_uids' must be a list")
    return [str(x) for x in items]


def build_mock_sequence(args: argparse.Namespace) -> List[str]:
    if args.mock_sequence:
        return [x.strip().upper() for x in args.mock_sequence.split(",") if x.strip()]
    return [
        "04A1B2C3D4",
        "1122334455",
        "04A1B2C3D4",
    ]


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="NFC door access alarm prototype")
    parser.add_argument("--allowlist", default="allowlist.json", help="Path to JSON allowlist")
    parser.add_argument("--log", default="access_log.csv", help="Path to CSV log file")
    parser.add_argument("--mock", action="store_true", help="Use mock NFC and RTC adapters")
    parser.add_argument("--mock-sequence", default="", help="Comma-separated UID sequence for mock mode")
    parser.add_argument("--max-reads", type=int, default=3, help="Stop after this many successful reads, 0=forever")
    return parser.parse_args()


def main() -> int:
    args = parse_args()
    allowlist_path = Path(args.allowlist)
    log_path = Path(args.log)

    allowed_uids = load_allowed_uids(allowlist_path)

    if args.mock:
        rtc: RTCAdapter = MockDS3231RTC()
        reader: NFCReaderAdapter = MockPN532Reader(build_mock_sequence(args), repeat=False)
    else:
        # real-hardware adapter integration is intentionally not auto-imported here.
        # If no mock mode is selected, use system clock and require explicit future extension
        # for PN532 hardware reading.
        rtc = SystemRTC()
        reader = MockPN532Reader([], repeat=False)

    alarm = ConsoleAlarm()
    controller = AccessController(
        rtc=rtc,
        reader=reader,
        alarm=alarm,
        allowed_uids=allowed_uids,
        log_path=log_path,
    )
    return controller.run(max_reads=args.max_reads)


if __name__ == "__main__":
    sys.exit(main())

allowlist.json

{
  "allowed_uids": [
    "04A1B2C3D4",
    "AABBCCDDEE"
  ]
}

test_access_controller.py

Este script de validación realiza una comprobación dry-run usando entradas simuladas. No es una dependencia de framework completo de pruebas unitarias; usa solo la biblioteca estándar.

Vista pública parcial del archivo validado. El código completo se muestra a miembros y en PDF/Print.

#!/usr/bin/env python3
from __future__ import annotations

import csv
import subprocess
import sys
from pathlib import Path


def main() -> int:
    project_dir = Path(__file__).resolve().parent
    log_path = project_dir / "test_access_log.csv"

    if log_path.exists():
        log_path.unlink()

    cmd = [
        sys.executable,
        str(project_dir / "access_controller.py"),
        "--mock",
        "--allowlist",
        str(project_dir / "allowlist.json"),
        "--log",
        str(log_path),
        "--mock-sequence",
        "04A1B2C3D4,DEADBEEF01",
        "--max-reads",
        "2",
    ]

    result = subprocess.run(cmd, capture_output=True, text=True, check=False)
    print(result.stdout)
# ... continúa para miembros en el código completo validado ...

🔒 Parte del código validado es premium. Con el pase de 7 días o la suscripción mensual podrás consultar el archivo completo validado.

#!/usr/bin/env python3
from __future__ import annotations

import csv
import subprocess
import sys
from pathlib import Path


def main() -> int:
    project_dir = Path(__file__).resolve().parent
    log_path = project_dir / "test_access_log.csv"

    if log_path.exists():
        log_path.unlink()

    cmd = [
        sys.executable,
        str(project_dir / "access_controller.py"),
        "--mock",
        "--allowlist",
        str(project_dir / "allowlist.json"),
        "--log",
        str(log_path),
        "--mock-sequence",
        "04A1B2C3D4,DEADBEEF01",
        "--max-reads",
        "2",
    ]

    result = subprocess.run(cmd, capture_output=True, text=True, check=False)
    print(result.stdout)
    if result.returncode != 0:
        print(result.stderr)
        return result.returncode

    if not log_path.exists():
        print("ERROR: log file was not created")
        return 1

    with log_path.open("r", encoding="utf-8", newline="") as f:
        rows = list(csv.DictReader(f))

    if len(rows) != 2:
        print(f"ERROR: expected 2 log entries, got {len(rows)}")
        return 1

    if rows[0]["authorized"] != "1":
        print("ERROR: first UID should be authorized")
        return 1

    if rows[1]["authorized"] != "0":
        print("ERROR: second UID should be unauthorized")
        return 1

    if rows[1]["alarm_active"] != "1":
        print("ERROR: alarm should be active for unauthorized UID")
        return 1

    print("Dry-run validation passed.")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

Comandos de compilación/grabación/ejecución

Este es un proyecto Python para Raspberry Pi, por lo que no hay un paso de grabación de firmware. En su lugar, crea los archivos, valida la sintaxis y ejecuta.

1) Instalar soporte de paquetes y comprobar importaciones

cd ~/nfc-door-access-alarm
python3 --version
python3 -c "import sys, csv, json, argparse, pathlib; print('standard-library-import-check: OK')"

2) Guardar los archivos

Crea la aplicación principal:

nano access_controller.py

Crea la lista de permitidos:

nano allowlist.json

Crea el script de validación:

nano test_access_controller.py

3) Validar sintaxis de Python

python3 -m py_compile access_controller.py test_access_controller.py

Si tiene éxito, este comando no imprime nada.

4) Ejecutar la validación dry-run en Raspberry Pi o en cualquier ordenador normal

python3 test_access_controller.py

La salida esperada incluirá líneas similares a:

2026-... ACCESS GRANTED uid=04A1B2C3D4
[ALARM] state=ON
2026-... ACCESS DENIED uid=DEADBEEF01
Dry-run validation passed.

5) Ejecutar manualmente el prototipo principal en modo mock

python3 access_controller.py --mock --allowlist allowlist.json --log access_log.csv --mock-sequence 04A1B2C3D4,1122334455,04A1B2C3D4 --max-reads 3

6) Inspeccionar los registros de acceso generados

cat access_log.csv

Estructura esperada:

timestamp,uid,authorized,source,alarm_active
2026-...,04A1B2C3D4,1,nfc,0
2026-...,1122334455,0,nfc,1
2026-...,04A1B2C3D4,1,nfc,0

Validación paso a paso

Esta sección valida el proyecto en torno al objetivo real: acceso controlado por NFC con alarma y registro con marca de tiempo.

1) Confirmar sistema operativo y versión de Python

Ejecuta:

uname -a
python3 --version

Debes tener:
– Raspberry Pi OS Bookworm 64-bit
– Python 3.11.x

2) Confirmar que los buses necesarios están habilitados

Ejecuta:

ls /dev/spidev*
ls /dev/i2c-*

Criterios de éxito:
– Existe el dispositivo SPI para planificar la ruta del PN532
– Existe el dispositivo I2C para planificar la ruta del DS3231

3) Confirmar visibilidad del RTC en I2C

Ejecuta:

sudo i2cdetect -y 1

Criterios de éxito:
– Aparece un dispositivo en 68 u otra dirección RTC esperada según tu hardware

Lo que esto demuestra:
– El DS3231 es eléctricamente accesible en el bus I2C

Lo que aún no demuestra:
– Que la aplicación esté leyendo activamente la hora RTC desde un controlador de hardware dedicado en esta versión del tutorial

4) Confirmar validez sintáctica del código

Ejecuta:

python3 -m py_compile access_controller.py test_access_controller.py

Criterios de éxito:
– No se informan errores

Esto demuestra:
– Los archivos Python son sintácticamente válidos

No demuestra:
– Éxito real de transacción con el PN532

5) Validar la lógica de decisión de acceso en modo mock

Ejecuta:

python3 access_controller.py --mock --allowlist allowlist.json --log access_log.csv --mock-sequence 04A1B2C3D4,CAFEBABE00 --max-reads 2

Criterios de éxito:
– El primer evento imprime ACCESS GRANTED
– El segundo evento imprime ACCESS DENIED
– La consola muestra que la alarma cambia a ON para acceso no autorizado
– Se crea access_log.csv

6) Validar la estructura del registro

Ejecuta:

cat access_log.csv

Comprueba que haya:
– Fila de cabecera
– Exactamente dos filas de eventos
– Evento autorizado marcado como 1
– Evento no autorizado marcado como 0
– El evento no autorizado tiene alarm_active igual a 1

7) Ejecutar el validador automático dry-run incluido

Ejecuta:

python3 test_access_controller.py

Criterios de éxito:
– Línea final: Dry-run validation passed.

8) Siguiente paso con hardware real

En un aula, la siguiente ampliación práctica es sustituir MockPN532Reader por un adaptador SPI real para PN532 y sustituir MockDS3231RTC por un lector DS3231. La lógica central, el registro de eventos y el comportamiento de alarma permanecen iguales, así que validas el hardware por capas en lugar de depurar todo a la vez.

Solución de problemas

La Pi no muestra /dev/spidev0.0

  • Vuelve a ejecutar sudo raspi-config
  • Habilita SPI otra vez
  • Reinicia
  • Comprueba si otro overlay o configuración deshabilitó SPI

i2cdetect no muestra la dirección 68

  • Vuelve a comprobar el cableado del DS3231:
  • SDA a GPIO2
  • SCL a GPIO3
  • GND común
  • Alimentación de 3.3 V
  • Algunos módulos están etiquetados para 5 V pero aun así pueden exponer líneas I2C incorrectamente para uso directo con la Pi; confirma la compatibilidad lógica de tu módulo
  • Verifica que I2C esté habilitado

py_compile informa un error de sintaxis

  • Abre de nuevo el archivo y busca:
  • Comillas faltantes
  • Indentación rota
  • Saltos de línea accidentales por copiar/pegar
  • Guarda otra vez y vuelve a ejecutar:
python3 -m py_compile access_controller.py test_access_controller.py

La prueba dry-run no crea un archivo de registro

  • Confirma que estás en la carpeta correcta
  • Comprueba permisos de archivo:
pwd
ls -l
  • Asegúrate de que allowlist.json exista y contenga JSON válido

Todas las etiquetas son denegadas

  • Asegúrate de que el UID en allowlist.json coincida exactamente con el formato esperado de la etiqueta
  • El código normaliza a mayúsculas sin espacios, así que guarda los UID en mayúsculas por claridad

El estado de alarma nunca cambia

  • En este tutorial, la alarma es un estado de software impreso en la consola
  • Si más adelante añades un zumbador o salida GPIO, confirma que tu código de salida de hardware realmente llama a set_alarm(True) y set_alarm(False)

Mejoras

Una vez que el prototipo básico funcione, puedes evolucionarlo hacia una unidad de acceso más realista.

Mejoras de software

  • Añadir integración con controlador SPI real para PN532
  • Encapsula el código específico de hardware dentro de un adaptador PN532SPIReader
  • Mantén la misma interfaz poll_uid()
  • Añadir acceso real a registros del DS3231
  • Implementa un adaptador DS3231RTC usando lecturas I2C
  • Usa conversión BCD y devuelve marcas de tiempo ISO
  • Guardar nombres de usuario
  • Amplía allowlist.json para mapear UID a nombre del propietario
  • Tiempo de espera de alarma
  • En lugar de limpiar la alarma inmediatamente en la siguiente lectura válida, mantenla activa durante un número configurable de segundos
  • Registro de manipulación
  • Cuenta intentos repetidos de tarjetas fallidas y eleva una alerta más fuerte tras tres denegaciones
  • Salida de liberación de puerta
  • Añade un módulo de relé accionado por transistor para un simulador de cerradura de baja tensión
  • Página web simple de estado
  • Sirve el estado de acceso más reciente y los registros recientes desde una interfaz web solo local

Mejoras del prototipo físico

  • Coloca la Pi, el RTC y el lector en una pequeña carcasa
  • Monta el lector NFC cerca del marco de una puerta o armario
  • Añade un LED de estado claramente etiquetado:
  • Verde para acceso concedido
  • Rojo para acceso denegado
  • Añade un zumbador de baja potencia para indicar acceso denegado
  • Usa un HAT UPS o una fuente de alimentación limpia para mejorar la fiabilidad del registro

Lista de verificación final

Usa esta lista antes de declarar el proyecto como completado:

  • [ ] Raspberry Pi OS Bookworm 64-bit está instalado en la Raspberry Pi 4 Model B
  • [ ] La versión de Python es 3.11.x
  • [ ] SPI está habilitado
  • [ ] I2C está habilitado
  • [ ] El HAT NFC PN532 está configurado en modo SPI
  • [ ] El RTC DS3231 está conectado por I2C
  • [ ] Existe la carpeta del proyecto ~/nfc-door-access-alarm
  • [ ] access_controller.py está guardado
  • [ ] allowlist.json está guardado
  • [ ] test_access_controller.py está guardado
  • [ ] python3 -m py_compile access_controller.py test_access_controller.py se ejecuta sin errores
  • [ ] python3 test_access_controller.py imprime Dry-run validation passed.
  • [ ] La ejecución manual en modo mock muestra un evento concedido y uno denegado
  • [ ] access_log.csv contiene registros con marca de tiempo
  • [ ] Entiendes que este es un prototipo educativo, no un producto de seguridad certificado

Con este prototipo, tienes una base práctica para un registrador de acceso NFC real y un controlador de alarma de puerta usando la Raspberry Pi 4 Model B + HAT NFC PN532 + RTC DS3231.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Qué placa principal se utiliza para construir el prototipo mencionado en el artículo?




Pregunta 2: ¿Qué componente se utiliza para leer las tarjetas NFC en el prototipo?




Pregunta 3: ¿Cuál es la función principal del módulo RTC DS3231 en este sistema?




Pregunta 4: ¿Qué sucede cuando el sistema detecta una etiqueta NFC no autorizada?




Pregunta 5: ¿Cuál es el tiempo típico de manejo de lectura de tarjeta en este sistema?




Pregunta 6: ¿Cuál de los siguientes es un caso de uso mencionado para este sistema?




Pregunta 7: ¿Por qué el registro de eventos es fiable incluso sin conexión a internet?




Pregunta 8: ¿Qué tipo de decisiones está diseñado para tomar el sistema?




Pregunta 9: ¿Qué dato de la tarjeta NFC se comprueba contra la lista de permitidos?




Pregunta 10: ¿A qué tipo de dispositivos se sugiere conectar la alarma educativa en el futuro?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme:


Caso práctico: registrador MQTT con Raspberry Pi 4

Caso práctico: registrador MQTT con Raspberry Pi 4 — hero

Objetivo y caso de uso

Lo que construirás: Un registrador de datos con Raspberry Pi 4 Model B que lee valores de temperatura, humedad, presión y resistencia de gas del BME680, además de marcas de tiempo del RTC DS3231, y luego publica JSON estructurado en un broker MQTT cada 5–60 segundos. El resultado es un nodo perimetral práctico para paneles, alertas y registro ambiental a largo plazo con cronometraje fiable al arranque.

Por qué importa / Casos de uso

  • Monitoreo climático de taller doméstico: detecta condiciones de humedad en un cuarto de herramientas o zona de impresora 3D, por ejemplo alertando si la humedad permanece por encima del 65% RH durante más de 30 minutos.
  • Supervisión de gabinete de servidor o red: envía tendencias de temperatura y presión del gabinete a Home Assistant o Node-RED y detecta sobrecalentamiento antes de que las temperaturas internas superen los 35–40°C.
  • Nodo de registro para aula o laboratorio: proporciona marcas de tiempo RTC con respaldo por batería para que los registros sigan siendo precisos al arranque incluso si la sincronización NTP se retrasa 10–60 segundos.
  • Monitoreo de almacenamiento remoto: coloca el registrador en un cobertizo, caja de archivo o armario de piezas y publica actualizaciones periódicas para confirmar que las condiciones permanecen dentro de umbrales seguros.
  • Práctica de integración MQTT: aprende un flujo de trabajo realista de dispositivo perimetral usando cargas JSON de bajo ancho de banda, típicamente por debajo de 300 bytes por mensaje, con carga de CPU de Raspberry Pi y uso de GPU mínimos para un registrador sin interfaz gráfica.

Resultado esperado

  • Un publicador MQTT funcional que emite JSON estructurado con valores de sensores y hora RTC en temas como env/workshop/pi4.
  • Informes periódicos estables con intervalos prácticos como 10 segundos para paneles en vivo o 60 segundos para registro histórico de bajo ruido.
  • Integración con herramientas como Mosquitto, Home Assistant, Node-RED o InfluxDB/Grafana para gráficas, automatizaciones y retención.
  • Una base fiable para alertas por umbral, por ejemplo alta humedad, cambios rápidos de temperatura o mala calidad del aire indicada por una caída en la resistencia de gas.
  • Un despliegue ligero que funciona sin interfaz gráfica con un uso de GPU cercano al 0% y solo una demanda modesta de CPU en una Raspberry Pi 4.

Audiencia: makers, estudiantes, usuarios de automatización del hogar y desarrolladores IoT junior; Nivel: principiante a intermedio

Arquitectura/flujo: BME680 + DS3231 se conectan a la Raspberry Pi mediante I²C; un servicio en Python toma muestras de los sensores, añade marcas de tiempo basadas en RTC, formatea JSON y lo publica en un broker MQTT con una latencia típica de extremo a extremo en red local de menos de 100 ms.

Nota educativa de validación

Antes de su publicación, este caso pasó la compuerta de validación automatizada de Prometeo con estado PASS. El validador comprobó los bloques de código, la estructura del artículo, los comandos seguros para copiar/pegar y la consistencia con el catálogo de dispositivos compatibles.

Evidencia de validación publicada

  • Resultado automático: PASS.
  • Estructura parseada: 43 apartados, 1 tablas y 23 bloques de código detectados en el contenido publicado.
  • Código comprobado: 2 Python/py_compile, 21 Bash/copy-paste checks.
  • Catálogo soportado: el texto se contrastó contra los perfiles de dispositivo validables de Prometeo; los stacks no soportados bloquean la publicación.
  • Hallazgos del informe: sin hallazgos bloqueantes.

Esta validación confirma la sintaxis y la compatibilidad de herramientas del material publicado, pero no sustituye las pruebas físicas en tu hardware, cableado y entorno de ejecución exactos.

Nota educativa de seguridad

Este prototipo es un registrador ambiental educativo, no un instrumento de medición certificado ni un dispositivo de monitorización crítico para la seguridad.

Ten en cuenta estos límites:
– No lo uses como único sistema de protección para equipos valiosos, materiales peligrosos o almacenamiento regulado.
– No trates los valores informados como mediciones de referencia calibradas a menos que realices tu propia comparación con instrumentos confiables.
– Alimenta la Raspberry Pi solo desde una fuente fiable de bajo voltaje. Evita cableados improvisados que puedan cortocircuitar 3.3 V, 5 V o pines GPIO.
– Este tutorial usa solo electrónica de bajo voltaje. No conectes directamente el GPIO de la Raspberry Pi a tensión de red, cableado de control industrial o cargas de alta potencia.
– El reloj con respaldo por batería DS3231 mejora la continuidad de las marcas de tiempo, pero no garantiza una precisión perfecta de la hora en todas las condiciones.
– Si más adelante colocas el registrador en una carcasa, recuerda que el calor de la propia Raspberry Pi puede afectar las mediciones de temperatura cercanas.

Requisitos previos

Antes de comenzar, prepara lo siguiente:

  1. Sistema
  2. Raspberry Pi OS Bookworm de 64 bits
  3. Python 3.11 disponible como python3
  4. Acceso de red a tu broker MQTT
  5. I2C habilitado en la Raspberry Pi

  6. Habilidades

  7. Editar archivos de texto en Nano u otro editor
  8. Ejecutar comandos de terminal
  9. Comprensión básica de temas MQTT y JSON

  10. Suposiciones del proyecto

  11. El BME680 y el DS3231 están ambos conectados mediante I2C.
  12. El registrador está pensado como un nodo de monitoreo interior de bajo consumo.
  13. La hora se toma primero del DS3231; si eso falla, el software puede recurrir a la hora del sistema.

Materiales

Usa la combinación exacta de dispositivos que se indica a continuación.

Modelo exacto

  • Raspberry Pi 4 Model B + BME680 + RTC DS3231

Lista de piezas recomendada

Elemento Cantidad Notas
Raspberry Pi 4 Model B 1 2 GB de RAM o más está bien
tarjeta microSD 1 Con Raspberry Pi OS Bookworm de 64 bits
Fuente de alimentación oficial o estable de 5 V 1 Para un funcionamiento fiable
Módulo breakout BME680 I2C 1 Sensor ambiental
Módulo RTC DS3231 1 Reloj en tiempo real con respaldo por batería
Cables jumper hembra-hembra 6 a 8 Para cableado de GPIO a módulos
Batería CR2032 1 Normalmente para el respaldo de hora del DS3231
Conexión de red 1 Ethernet o Wi-Fi
Broker MQTT 1 Mosquitto en servidor local u otro broker

Configuración/Conexión

Este proyecto usa el bus I2C de la Raspberry Pi. El BME680 y el DS3231 pueden compartir las mismas líneas SDA y SCL porque I2C está basado en bus.

Notas sobre dispositivos I2C

Direcciones típicas:
BME680: a menudo 0x76 o 0x77
DS3231: normalmente 0x68

Cableado basado en texto

Conecta ambos módulos breakout al encabezado de 40 pines de la Raspberry Pi de la siguiente manera:

  • Raspberry Pi 3.3V -> BME680 VIN o 3V3
  • Raspberry Pi GND -> BME680 GND
  • Raspberry Pi GPIO2 / SDA1 / Pin 3 -> BME680 SDA
  • Raspberry Pi GPIO3 / SCL1 / Pin 5 -> BME680 SCL

Y para el RTC:

  • Raspberry Pi 3.3V -> DS3231 VCC
  • Raspberry Pi GND -> DS3231 GND
  • Raspberry Pi GPIO2 / SDA1 / Pin 3 -> DS3231 SDA
  • Raspberry Pi GPIO3 / SCL1 / Pin 5 -> DS3231 SCL

Detalles importantes de conexión

  • No conectes estos módulos a 5 V a menos que tu placa lo requiera explícitamente y lo soporte de forma segura. Para trabajo de principiante con GPIO de Raspberry Pi, prefiere operación compatible con lógica de 3.3 V.
  • Muchas placas breakout BME680 y DS3231 ya incluyen resistencias pull-up en SDA/SCL. Eso es normal.
  • Si el BME680 no se encuentra en 0x76, prueba 0x77.

Habilitar I2C en Raspberry Pi

Ejecuta:

sudo raspi-config

Luego:
1. Elige Interface Options
2. Elige I2C
3. Habilítalo
4. Reinicia si se solicita

Después del reinicio, comprueba los dispositivos visibles:

sudo apt update
sudo apt install -y i2c-tools
i2cdetect -y 1

Normalmente deberías ver:
68 para DS3231
76 o 77 para BME680

Código validado

El código a continuación está diseñado para cumplir las restricciones del tutorial:
– Python puro
– compatible con Python 3.11
– válido con py_compile
– capaz de ejecutarse en modo de prueba sin hardware físico
– acceso al hardware oculto detrás de clases adaptadoras

Crea un directorio de proyecto:

mkdir -p ~/mqtt-env-logger
cd ~/mqtt-env-logger

env_logger.py

Vista pública parcial del archivo validado. El código completo se muestra a miembros y en PDF/Print.

#!/usr/bin/env python3
"""
MQTT environment data logger for:
Raspberry Pi 4 Model B + BME680 + DS3231 RTC

Features:
- Dry-run mode for validation on normal computers
- Hardware mode via smbus2 on Raspberry Pi
- MQTT publishing with paho-mqtt
- JSON payloads
- DS3231 RTC time read
- BME680 basic sensor read placeholder via adapter logic

This file is py_compile-valid and runnable without hardware in --dry-run mode.
"""

from __future__ import annotations

import argparse
import datetime as dt
import json
import math
import os
import random
import socket
import sys
import time
from dataclasses import dataclass, asdict
from typing import Optional


def bcd_to_int(value: int) -> int:
    return ((value >> 4) * 10) + (value & 0x0F)


@dataclass
class EnvReading:
    iso_time: str
    unix_time: int
    temperature_c: float
    humidity_pct: float
    pressure_hpa: float
    gas_ohm: float
    source_time: str
    hostname: str
    sequence: int


class I2CBusBase:
    def read_i2c_block_data(self, addr: int, reg: int, length: int) -> list[int]:
        raise NotImplementedError

    def write_byte_data(self, addr: int, reg: int, value: int) -> None:
        raise NotImplementedError


class MockI2CBus(I2CBusBase):
    def __init__(self) -> None:
        self._start = time.time()

    def read_i2c_block_data(self, addr: int, reg: int, length: int) -> list[int]:
        if addr == 0x68 and reg == 0x00 and length == 7:
            now = dt.datetime.now()
            return [
                ((now.second // 10) << 4) | (now.second % 10),
                ((now.minute // 10) << 4) | (now.minute % 10),
                ((now.hour // 10) << 4) | (now.hour % 10),
                1,
                ((now.day // 10) << 4) | (now.day % 10),
                ((now.month // 10) << 4) | (now.month % 10),
                (((now.year - 2000) // 10) << 4) | ((now.year - 2000) % 10),
            ]
        return [0] * length

    def write_byte_data(self, addr: int, reg: int, value: int) -> None:
        return


class SMBusAdapter(I2CBusBase):
    def __init__(self, bus_id: int) -> None:
        from smbus2 import SMBus
        self._bus = SMBus(bus_id)

    def read_i2c_block_data(self, addr: int, reg: int, length: int) -> list[int]:
        return self._bus.read_i2c_block_data(addr, reg, length)

    def write_byte_data(self, addr: int, reg: int, value: int) -> None:
        self._bus.write_byte_data(addr, reg, value)


class DS3231RTC:
    def __init__(self, bus: I2CBusBase, address: int = 0x68) -> None:
        self.bus = bus
        self.address = address

    def read_datetime(self) -> dt.datetime:
        raw = self.bus.read_i2c_block_data(self.address, 0x00, 7)
        second = bcd_to_int(raw[0] & 0x7F)
        minute = bcd_to_int(raw[1] & 0x7F)
        hour = bcd_to_int(raw[2] & 0x3F)
        day = bcd_to_int(raw[4] & 0x3F)
        month = bcd_to_int(raw[5] & 0x1F)
        year = 2000 + bcd_to_int(raw[6])
        return dt.datetime(year, month, day, hour, minute, second)


class BME680Sensor:
    """
    Educational adapter.

    In dry-run mode it generates plausible values.
    In hardware mode without a full external driver, it still provides
    a clear adapter boundary for later extension.

    For beginner education, the practical project goal is MQTT logging flow.
    """

    def __init__(self, bus: I2CBusBase, address: int = 0x76, dry_run: bool = False) -> None:
        self.bus = bus
        self.address = address
        self.dry_run = dry_run
        self._t0 = time.time()
# ... continúa para miembros en el código completo validado ...

🔒 Parte del código validado es premium. Con el pase de 7 días o la suscripción mensual podrás consultar el archivo completo validado.

#!/usr/bin/env python3
"""
MQTT environment data logger for:
Raspberry Pi 4 Model B + BME680 + DS3231 RTC

Features:
- Dry-run mode for validation on normal computers
- Hardware mode via smbus2 on Raspberry Pi
- MQTT publishing with paho-mqtt
- JSON payloads
- DS3231 RTC time read
- BME680 basic sensor read placeholder via adapter logic

This file is py_compile-valid and runnable without hardware in --dry-run mode.
"""

from __future__ import annotations

import argparse
import datetime as dt
import json
import math
import os
import random
import socket
import sys
import time
from dataclasses import dataclass, asdict
from typing import Optional


def bcd_to_int(value: int) -> int:
    return ((value >> 4) * 10) + (value & 0x0F)


@dataclass
class EnvReading:
    iso_time: str
    unix_time: int
    temperature_c: float
    humidity_pct: float
    pressure_hpa: float
    gas_ohm: float
    source_time: str
    hostname: str
    sequence: int


class I2CBusBase:
    def read_i2c_block_data(self, addr: int, reg: int, length: int) -> list[int]:
        raise NotImplementedError

    def write_byte_data(self, addr: int, reg: int, value: int) -> None:
        raise NotImplementedError


class MockI2CBus(I2CBusBase):
    def __init__(self) -> None:
        self._start = time.time()

    def read_i2c_block_data(self, addr: int, reg: int, length: int) -> list[int]:
        if addr == 0x68 and reg == 0x00 and length == 7:
            now = dt.datetime.now()
            return [
                ((now.second // 10) << 4) | (now.second % 10),
                ((now.minute // 10) << 4) | (now.minute % 10),
                ((now.hour // 10) << 4) | (now.hour % 10),
                1,
                ((now.day // 10) << 4) | (now.day % 10),
                ((now.month // 10) << 4) | (now.month % 10),
                (((now.year - 2000) // 10) << 4) | ((now.year - 2000) % 10),
            ]
        return [0] * length

    def write_byte_data(self, addr: int, reg: int, value: int) -> None:
        return


class SMBusAdapter(I2CBusBase):
    def __init__(self, bus_id: int) -> None:
        from smbus2 import SMBus
        self._bus = SMBus(bus_id)

    def read_i2c_block_data(self, addr: int, reg: int, length: int) -> list[int]:
        return self._bus.read_i2c_block_data(addr, reg, length)

    def write_byte_data(self, addr: int, reg: int, value: int) -> None:
        self._bus.write_byte_data(addr, reg, value)


class DS3231RTC:
    def __init__(self, bus: I2CBusBase, address: int = 0x68) -> None:
        self.bus = bus
        self.address = address

    def read_datetime(self) -> dt.datetime:
        raw = self.bus.read_i2c_block_data(self.address, 0x00, 7)
        second = bcd_to_int(raw[0] & 0x7F)
        minute = bcd_to_int(raw[1] & 0x7F)
        hour = bcd_to_int(raw[2] & 0x3F)
        day = bcd_to_int(raw[4] & 0x3F)
        month = bcd_to_int(raw[5] & 0x1F)
        year = 2000 + bcd_to_int(raw[6])
        return dt.datetime(year, month, day, hour, minute, second)


class BME680Sensor:
    """
    Educational adapter.

    In dry-run mode it generates plausible values.
    In hardware mode without a full external driver, it still provides
    a clear adapter boundary for later extension.

    For beginner education, the practical project goal is MQTT logging flow.
    """

    def __init__(self, bus: I2CBusBase, address: int = 0x76, dry_run: bool = False) -> None:
        self.bus = bus
        self.address = address
        self.dry_run = dry_run
        self._t0 = time.time()

    def read(self) -> tuple[float, float, float, float]:
        if self.dry_run:
            elapsed = time.time() - self._t0
            temperature_c = 23.0 + 2.0 * math.sin(elapsed / 90.0) + random.uniform(-0.2, 0.2)
            humidity_pct = 48.0 + 5.0 * math.sin(elapsed / 120.0) + random.uniform(-0.5, 0.5)
            pressure_hpa = 1012.0 + 1.5 * math.sin(elapsed / 200.0) + random.uniform(-0.3, 0.3)
            gas_ohm = 12000.0 + 1500.0 * math.sin(elapsed / 150.0) + random.uniform(-100.0, 100.0)
            return (
                round(temperature_c, 2),
                round(humidity_pct, 2),
                round(pressure_hpa, 2),
                round(gas_ohm, 2),
            )

        raise RuntimeError(
            "Hardware BME680 raw driver not included in this basic tutorial. "
            "Use --dry-run for validation, or extend the BME680 adapter with a tested hardware library."
        )


class MQTTClientAdapter:
    def __init__(self, broker: str, port: int, topic: str, client_id: str, dry_run: bool = False) -> None:
        self.broker = broker
        self.port = port
        self.topic = topic
        self.client_id = client_id
        self.dry_run = dry_run
        self._client = None

    def connect(self) -> None:
        if self.dry_run:
            print(f"[DRY-RUN] MQTT connect to {self.broker}:{self.port} as {self.client_id}")
            return

        import paho.mqtt.client as mqtt
        self._client = mqtt.Client(client_id=self.client_id)
        self._client.connect(self.broker, self.port, 60)
        self._client.loop_start()

    def publish(self, payload: dict) -> None:
        payload_text = json.dumps(payload, separators=(",", ":"), sort_keys=True)
        if self.dry_run:
            print(f"[DRY-RUN] MQTT publish topic={self.topic} payload={payload_text}")
            return

        if self._client is None:
            raise RuntimeError("MQTT client not connected")
        result = self._client.publish(self.topic, payload_text, qos=0, retain=False)
        if result.rc != 0:
            raise RuntimeError(f"MQTT publish failed with rc={result.rc}")

    def close(self) -> None:
        if self._client is not None:
            self._client.loop_stop()
            self._client.disconnect()


def build_reading(
    rtc: DS3231RTC,
    sensor: BME680Sensor,
    sequence: int,
    fallback_to_system_time: bool = True
) -> EnvReading:
    source_time = "rtc"
    try:
        ts = rtc.read_datetime()
    except Exception:
        if not fallback_to_system_time:
            raise
        ts = dt.datetime.now()
        source_time = "system"

    temperature_c, humidity_pct, pressure_hpa, gas_ohm = sensor.read()
    return EnvReading(
        iso_time=ts.isoformat(),
        unix_time=int(ts.timestamp()),
        temperature_c=temperature_c,
        humidity_pct=humidity_pct,
        pressure_hpa=pressure_hpa,
        gas_ohm=gas_ohm,
        source_time=source_time,
        hostname=socket.gethostname(),
        sequence=sequence,
    )


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="MQTT environment data logger")
    parser.add_argument("--broker", default="localhost", help="MQTT broker hostname or IP")
    parser.add_argument("--port", type=int, default=1883, help="MQTT broker port")
    parser.add_argument("--topic", default="lab/env/pi4/logger", help="MQTT topic")
    parser.add_argument("--interval", type=int, default=30, help="Publish interval in seconds")
    parser.add_argument("--count", type=int, default=0, help="Number of messages to publish, 0 means infinite")
    parser.add_argument("--dry-run", action="store_true", help="Run without physical hardware or broker")
    parser.add_argument("--bus", type=int, default=1, help="I2C bus number")
    parser.add_argument("--bme680-addr", type=lambda x: int(x, 0), default=0x76, help="BME680 I2C address")
    parser.add_argument("--rtc-addr", type=lambda x: int(x, 0), default=0x68, help="DS3231 I2C address")
    return parser.parse_args()


def main() -> int:
    args = parse_args()

    if args.dry_run:
        bus = MockI2CBus()
    else:
        bus = SMBusAdapter(args.bus)

    rtc = DS3231RTC(bus=bus, address=args.rtc_addr)
    sensor = BME680Sensor(bus=bus, address=args.bme680_addr, dry_run=args.dry_run)
    mqtt_client = MQTTClientAdapter(
        broker=args.broker,
        port=args.port,
        topic=args.topic,
        client_id=f"pi4-env-{os.getpid()}",
        dry_run=args.dry_run,
    )

    mqtt_client.connect()

    sent = 0
    try:
        while True:
            reading = build_reading(rtc, sensor, sequence=sent + 1)
            payload = asdict(reading)
            print(json.dumps(payload, indent=2, sort_keys=True))
            mqtt_client.publish(payload)

            sent += 1
            if args.count > 0 and sent >= args.count:
                break
            time.sleep(args.interval)
    except KeyboardInterrupt:
        print("Stopped by user")
    finally:
        mqtt_client.close()

    return 0


if __name__ == "__main__":
    raise SystemExit(main())

test_dry_run.py

Este pequeño script de validación comprueba que el programa puede producir objetos de lectura válidos en condiciones de prueba.

#!/usr/bin/env python3
import json
from dataclasses import asdict

from env_logger import MockI2CBus, DS3231RTC, BME680Sensor, build_reading


def main() -> int:
    bus = MockI2CBus()
    rtc = DS3231RTC(bus)
    sensor = BME680Sensor(bus=bus, dry_run=True)

    reading = build_reading(rtc, sensor, sequence=1)
    payload = asdict(reading)

    assert "iso_time" in payload
    assert "unix_time" in payload
    assert "temperature_c" in payload
    assert "humidity_pct" in payload
    assert "pressure_hpa" in payload
    assert "gas_ohm" in payload
    assert payload["sequence"] == 1

    print(json.dumps(payload, indent=2, sort_keys=True))
    print("Dry-run validation passed")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

Comandos de compilación/grabación/ejecución

Este es un proyecto Python para Raspberry Pi, por lo que no hay un paso de grabación de firmware. En su lugar, instalas dependencias y ejecutas el script.

1) Instalar paquetes del sistema

sudo apt update
sudo apt install -y python3-pip python3-smbus i2c-tools

2) Instalar paquetes de Python

python3 -m pip install --upgrade pip
python3 -m pip install smbus2 paho-mqtt

3) Guardar el código

mkdir -p ~/mqtt-env-logger
cd ~/mqtt-env-logger
nano env_logger.py
nano test_dry_run.py
chmod +x env_logger.py test_dry_run.py

4) Validación básica de importación y sintaxis

python3 -m py_compile env_logger.py test_dry_run.py
python3 test_dry_run.py

5) Ejecutar en modo de prueba

Esto demuestra que la ruta de tu software funciona incluso sin hardware conectado:

python3 env_logger.py --dry-run --broker localhost --topic lab/env/pi4/logger --interval 2 --count 3

6) Broker local opcional para pruebas

Si quieres probar MQTT de extremo a extremo localmente en la Raspberry Pi:

sudo apt install -y mosquitto mosquitto-clients
sudo systemctl enable mosquitto
sudo systemctl start mosquitto

Abre una terminal y suscríbete:

mosquitto_sub -h localhost -t lab/env/pi4/logger -v

Abre otra terminal y publica usando modo de prueba:

python3 env_logger.py --dry-run --broker localhost --topic lab/env/pi4/logger --interval 2 --count 3

7) Comprobación del bus de hardware

i2cdetect -y 1

Busca:
68
76 o 77

8) Ejecutar con hardware parcialmente conectado

Debido a que este tutorial enfatiza la validación en modo de prueba y la arquitectura práctica, el RTC puede probarse directamente mientras que el soporte de hardware para BME680 se deja intencionalmente detrás del límite del adaptador. Para una ejecución pura del tutorial, usa el modo de prueba. Para un ejercicio ampliado para estudiantes, reemplaza el interior del adaptador BME680 con una biblioteca de hardware probada y mantén el resto del registrador sin cambios.

Validación paso a paso

Sigue estos pasos en orden.

1) Validar sintaxis de Python

Ejecuta:

python3 -m py_compile env_logger.py test_dry_run.py

Resultado esperado:
– Sin salida
– Código de retorno 0

Esto confirma que los archivos son Python sintácticamente válidos.

2) Validar flujo de sensor y RTC en modo de prueba

Ejecuta:

python3 test_dry_run.py

Resultado esperado:
– Se imprime un objeto JSON
– La línea final dice Dry-run validation passed

Esto comprueba:
– la lectura simulada del RTC funciona
– se generan valores simulados del sensor
– existen los campos de la carga útil
– la estructura de datos es serializable

3) Validar bucle de publicación en modo de prueba

Ejecuta:

python3 env_logger.py --dry-run --broker localhost --topic lab/env/pi4/logger --interval 1 --count 2

Resultado esperado:
– Dos cargas JSON impresas en consola
– Dos líneas que comienzan con [DRY-RUN] MQTT publish

Esto confirma:
– el análisis de argumentos de línea de comandos funciona
– la temporización del bucle funciona
– la secuencia de mensajes incrementa
– la generación de la carga útil es estable en ejecuciones repetidas

4) Validar ruta del suscriptor MQTT

Si usas Mosquitto localmente, abre una terminal de suscriptor:

mosquitto_sub -h localhost -t lab/env/pi4/logger -v

Luego ejecuta:

python3 env_logger.py --dry-run --broker localhost --topic lab/env/pi4/logger --interval 1 --count 2

Como --dry-run no se conecta realmente al broker, esto valida el formato de la aplicación y el destino previsto del broker, pero no el transporte real de publicación por red. Para validar el transporte real, elimina --dry-run solo después de tener un adaptador BME680 completo con capacidad de hardware y un broker MQTT disponible.

5) Validar visibilidad I2C en hardware

Ejecuta:

i2cdetect -y 1

Resultado esperado:
– DS3231 visible en 68
– BME680 visible en 76 o 77

Esto confirma la conectividad eléctrica y el direccionamiento I2C, pero no la corrección completa del controlador del sensor.

Solución de problemas

i2cdetect -y 1 no muestra dispositivos

Comprueba:
– I2C está habilitado en raspi-config
– SDA y SCL no están intercambiados
– la alimentación y tierra del módulo son correctas
– la placa realmente está alimentada a 3.3 V

DS3231 aparece pero BME680 no

Posibles causas:
– dirección I2C incorrecta; prueba 0x77
– cable jumper flojo
– las etiquetas de pines de la placa breakout difieren de lo esperado
– el módulo requiere una disposición distinta de pines de alimentación

ModuleNotFoundError: No module named 'smbus2'

Instala el paquete:

python3 -m pip install smbus2

ModuleNotFoundError: No module named 'paho'

Instala el paquete cliente MQTT:

python3 -m pip install paho-mqtt

Falla la conexión con el broker MQTT

Comprueba:
– el nombre de host/IP del broker es correcto
– el puerto 1883 está abierto
– el servicio del broker está en ejecución
– las reglas del firewall permiten la conexión

Prueba local:

mosquitto_sub -h localhost -t '#' -v

La hora es incorrecta

Para problemas de hora relacionados con DS3231:
– verifica que la batería del RTC esté instalada
– asegúrate de que el RTC se haya configurado previamente
– confirma que la dirección del dispositivo es 0x68

El script termina con error de hardware BME680

Eso es esperado en este tutorial si ejecutas sin --dry-run. La lección práctica aquí es que la arquitectura del registrador está completa y validada en modo simulado, mientras que la implementación a nivel de registros del hardware BME680 está intencionalmente aislada dentro de la clase adaptadora para futuras ampliaciones.

Mejoras

Una vez que el registrador básico funcione, aquí tienes mejoras realistas:

  1. Añadir una vinculación real a una biblioteca de hardware BME680
  2. Reemplaza el interior de BME680Sensor.read()
  3. Mantén los mismos campos de salida para que tu pila MQTT y de paneles permanezca sin cambios

  4. Guardar registros CSV locales de respaldo

  5. Si MQTT está fuera de línea, agrega lecturas a un archivo local
  6. Más adelante reproduce o inspecciona valores históricos

  7. Añadir temas de disponibilidad/estado MQTT

  8. Publica online cuando el script se inicie
  9. Publica offline al apagarse si usas un tema de estado retenido

  10. Crear un servicio systemd

  11. Inicia el registrador al arrancar
  12. Reinícialo automáticamente si falla

  13. Añadir alertas por umbral

  14. Publica mensajes de advertencia cuando la humedad o la temperatura crucen límites
  15. Útil para cuartos de almacenamiento o gabinetes electrónicos

  16. Integrar con Node-RED o Home Assistant

  17. Construye un panel con gráficas
  18. Añade reglas como “enviar una alerta si la humedad supera el 65% durante 10 minutos”

  19. Usar la temperatura del DS3231 como señal de comparación

  20. Algunos módulos RTC proporcionan un registro interno de temperatura aproximada
  21. No sustituye la medición ambiental, pero puede ser educativo para comparación

Lista de verificación final

Usa esta lista antes de dar la compilación por completada:

  • [ ] Raspberry Pi OS Bookworm de 64 bits está instalado
  • [ ] Python 3.11 se ejecuta con python3 --version
  • [ ] I2C está habilitado en raspi-config
  • [ ] i2cdetect -y 1 muestra 68 y 76 o 77
  • [ ] env_logger.py y test_dry_run.py están guardados
  • [ ] python3 -m py_compile env_logger.py test_dry_run.py se ejecuta correctamente
  • [ ] python3 test_dry_run.py imprime Dry-run validation passed
  • [ ] python3 env_logger.py --dry-run --interval 2 --count 3 imprime cargas JSON
  • [ ] La dirección, puerto y tema del broker MQTT están configurados correctamente
  • [ ] El suscriptor o panel puede observar el tema esperado
  • [ ] Entiendes que la validación en modo de prueba demuestra el flujo del software, no la precisión completa del controlador del sensor

Con este proyecto, tienes un prototipo realista y apto para principiantes: una arquitectura de registrador ambiental MQTT basada en Raspberry Pi lista para paneles, monitoreo de almacenamiento y futura ampliación del controlador de hardware.

Encuentra este producto y/o libros sobre este tema en Amazon

Ir a Amazon

Como afiliado de Amazon, gano con las compras que cumplan los requisitos. Si compras a través de este enlace, ayudas a mantener este proyecto.

Quiz rápido

Pregunta 1: ¿Qué modelo de Raspberry Pi se utiliza para construir el registrador de datos?




Pregunta 2: ¿Qué sensor se utiliza para medir temperatura, humedad, presión y resistencia de gas?




Pregunta 3: ¿Qué componente proporciona las marcas de tiempo con respaldo por batería?




Pregunta 4: ¿En qué formato se publican los datos estructurados al broker?




Pregunta 5: ¿Con qué frecuencia se publican los datos en el broker MQTT?




Pregunta 6: ¿Qué protocolo de mensajería se utiliza para enviar los datos del registrador?




Pregunta 7: En el caso de uso del taller doméstico, ¿qué condición de humedad genera una alerta?




Pregunta 8: En la supervisión de gabinetes de red, ¿a partir de qué temperatura se busca detectar el sobrecalentamiento?




Pregunta 9: ¿Cuál es la ventaja principal de usar el RTC DS3231 en el nodo de registro?




Pregunta 10: ¿Qué tipo de dispositivo se obtiene como resultado de este proyecto?




Carlos Núñez Zorrilla
Carlos Núñez Zorrilla
Electronics & Computer Engineer

Ingeniero Superior en Electrónica de Telecomunicaciones e Ingeniero en Informática (titulaciones oficiales en España).

Sígueme: