Add initial DB - WIP
This commit is contained in:
389
backend/src/database/db.py
Normal file
389
backend/src/database/db.py
Normal file
@@ -0,0 +1,389 @@
|
||||
"""
|
||||
Database module for storing HanchuESS power readings.
|
||||
Uses SQLite with thread-safe operations.
|
||||
"""
|
||||
import os
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Optional
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
class PowerReadingsDB:
|
||||
"""Database manager for power readings."""
|
||||
|
||||
def __init__(self, db_path: str = None):
|
||||
"""
|
||||
Initialize database connection.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database file. If None, reads from SQLITE_DB_PATH env var.
|
||||
"""
|
||||
if db_path is None:
|
||||
db_path = os.getenv("SQLITE_DB_PATH", "./hanchuess_solar.db")
|
||||
|
||||
self.db_path = db_path
|
||||
self._init_database()
|
||||
|
||||
@contextmanager
|
||||
def _get_connection(self):
|
||||
"""Context manager for database connections."""
|
||||
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
yield conn
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
raise e
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _init_database(self):
|
||||
"""Create tables if they don't exist."""
|
||||
with self._get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS power_readings (
|
||||
-- Primary key and timestamp
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp DATETIME NOT NULL,
|
||||
data_source VARCHAR(20) DEFAULT 'realtime', -- 'realtime' or 'backfill'
|
||||
|
||||
-- Device information
|
||||
sn VARCHAR(50) NOT NULL, -- Serial number (e.g., "H016164380126")
|
||||
device_model VARCHAR(10), -- devModel: Device model identifier
|
||||
device_status INTEGER, -- devStatus: Device status code
|
||||
|
||||
-- Solar/PV metrics (photovoltaic generation)
|
||||
pv_total_power REAL, -- pvTtPwr: Current PV generation in watts
|
||||
pv_today_generation REAL, -- pvTdGe: Today's total PV generation in kWh
|
||||
pv_today_grid_export REAL, -- pvDge: PV direct to grid today in kWh
|
||||
|
||||
-- Battery metrics
|
||||
battery_power REAL, -- batP: Current charge(+)/discharge(-) power in watts
|
||||
battery_soc REAL, -- batSoc: State of charge percentage (0-100)
|
||||
battery_today_charge REAL, -- batTdChg: Today's charge energy in kWh
|
||||
battery_today_discharge REAL, -- batTdDschg: Today's discharge energy in kWh
|
||||
battery_status INTEGER, -- batSts: Battery status code
|
||||
battery_design_capacity REAL, -- bmsDesignCap: Design capacity in kWh
|
||||
|
||||
-- Load/consumption metrics
|
||||
load_power REAL, -- loadPwr: Current total load in watts
|
||||
load_eps_power REAL, -- loadEpsPwr: EPS (backup) load power in watts
|
||||
load_today_energy REAL, -- loadTdEe: Today's consumption in kWh
|
||||
|
||||
-- Grid metrics
|
||||
grid_today_feed REAL, -- gridTdFe: Today's grid feed energy in kWh
|
||||
grid_today_export REAL, -- gridTdEe: Today's grid export energy in kWh
|
||||
meter_power REAL, -- meterPPwr: Current meter power reading in watts
|
||||
grid_power_sum REAL, -- pwrGridSum: Grid power sum
|
||||
|
||||
-- System status and operating mode
|
||||
work_mode INTEGER, -- workMode: Operating mode code
|
||||
work_mode_combined INTEGER,-- workModeCmb: Combined work mode
|
||||
work_mode_flag INTEGER, -- workModeFlag: Work mode flag
|
||||
total_power INTEGER, -- pPwr: Total system power in watts
|
||||
|
||||
-- Bypass meter (if installed)
|
||||
bypass_meter_switch INTEGER, -- bypMeterSwitch: Bypass meter on/off
|
||||
bypass_meter_total_power REAL, -- bypMeterTotalPower: Bypass meter power
|
||||
bypass_install_direction INTEGER, -- bypInstallDirection: Installation direction
|
||||
|
||||
-- Generator (if present)
|
||||
dg_run_state INTEGER, -- dgRunState: Generator running state
|
||||
dg_grid_state INTEGER, -- dgGridState: Generator grid connection state
|
||||
|
||||
-- Additional features
|
||||
has_charger INTEGER, -- hasCharger: Has EV charger
|
||||
has_heat INTEGER, -- hasHeat: Has heating system
|
||||
has_os INTEGER, -- hasOs: Has operating system feature
|
||||
has_sg INTEGER -- hasSg: Has smart grid feature
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for common queries
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_timestamp
|
||||
ON power_readings(timestamp DESC)
|
||||
""")
|
||||
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_sn_timestamp
|
||||
ON power_readings(sn, timestamp DESC)
|
||||
""")
|
||||
|
||||
# Create unique constraint to prevent duplicate timestamps per device
|
||||
cursor.execute("""
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_sn_timestamp
|
||||
ON power_readings(sn, timestamp)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
|
||||
def insert_reading(self, data: Dict, data_source: str = 'realtime') -> int:
|
||||
"""
|
||||
Insert a power reading into the database.
|
||||
|
||||
Args:
|
||||
data: Dictionary containing the API response data
|
||||
data_source: Source of data ('realtime' or 'backfill')
|
||||
|
||||
Returns:
|
||||
The ID of the inserted row, or None if duplicate
|
||||
"""
|
||||
with self._get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Extract nested data if present
|
||||
reading_data = data.get('data', data)
|
||||
|
||||
try:
|
||||
cursor.execute("""
|
||||
INSERT INTO power_readings (
|
||||
timestamp, data_source, sn, device_model, device_status,
|
||||
pv_total_power, pv_today_generation, pv_today_grid_export,
|
||||
battery_power, battery_soc, battery_today_charge, battery_today_discharge,
|
||||
battery_status, battery_design_capacity,
|
||||
load_power, load_eps_power, load_today_energy,
|
||||
grid_today_feed, grid_today_export, meter_power, grid_power_sum,
|
||||
work_mode, work_mode_combined, work_mode_flag, total_power,
|
||||
bypass_meter_switch, bypass_meter_total_power, bypass_install_direction,
|
||||
dg_run_state, dg_grid_state,
|
||||
has_charger, has_heat, has_os, has_sg
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
datetime.now().isoformat(),
|
||||
data_source,
|
||||
reading_data.get('sn'),
|
||||
reading_data.get('devModel'),
|
||||
reading_data.get('devStatus'),
|
||||
|
||||
self._to_float(reading_data.get('pvTtPwr')),
|
||||
self._to_float(reading_data.get('pvTdGe')),
|
||||
self._to_float(reading_data.get('pvDge')),
|
||||
|
||||
self._to_float(reading_data.get('batP')),
|
||||
self._to_float(reading_data.get('batSoc')),
|
||||
self._to_float(reading_data.get('batTdChg')),
|
||||
self._to_float(reading_data.get('batTdDschg')),
|
||||
reading_data.get('batSts'),
|
||||
self._to_float(reading_data.get('bmsDesignCap')),
|
||||
|
||||
self._to_float(reading_data.get('loadPwr')),
|
||||
self._to_float(reading_data.get('loadEpsPwr')),
|
||||
self._to_float(reading_data.get('loadTdEe')),
|
||||
|
||||
self._to_float(reading_data.get('gridTdFe')),
|
||||
self._to_float(reading_data.get('gridTdEe')),
|
||||
self._to_float(reading_data.get('meterPPwr')),
|
||||
self._to_float(reading_data.get('pwrGridSum')),
|
||||
|
||||
reading_data.get('workMode'),
|
||||
reading_data.get('workModeCmb'),
|
||||
reading_data.get('workModeFlag'),
|
||||
reading_data.get('pPwr'),
|
||||
|
||||
reading_data.get('bypMeterSwitch'),
|
||||
self._to_float(reading_data.get('bypMeterTotalPower')),
|
||||
reading_data.get('bypInstallDirection'),
|
||||
|
||||
reading_data.get('dgRunState'),
|
||||
reading_data.get('dgGridState'),
|
||||
|
||||
1 if reading_data.get('hasCharger') else 0,
|
||||
1 if reading_data.get('hasHeat') else 0,
|
||||
1 if reading_data.get('hasOs') else 0,
|
||||
1 if reading_data.get('hasSg') else 0
|
||||
))
|
||||
|
||||
return cursor.lastrowid
|
||||
except Exception as e:
|
||||
# If it's a unique constraint violation, return None (duplicate)
|
||||
if "UNIQUE constraint failed" in str(e):
|
||||
return None
|
||||
raise
|
||||
|
||||
def get_readings(
|
||||
self,
|
||||
limit: int = 100,
|
||||
start_date: Optional[str] = None,
|
||||
end_date: Optional[str] = None,
|
||||
sn: Optional[str] = None
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Query power readings from the database.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of records to return
|
||||
start_date: Start date (ISO format) for filtering
|
||||
end_date: End date (ISO format) for filtering
|
||||
sn: Serial number to filter by
|
||||
|
||||
Returns:
|
||||
List of reading dictionaries
|
||||
"""
|
||||
with self._get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = "SELECT * FROM power_readings WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if sn:
|
||||
query += " AND sn = ?"
|
||||
params.append(sn)
|
||||
|
||||
if start_date:
|
||||
query += " AND timestamp >= ?"
|
||||
params.append(start_date)
|
||||
|
||||
if end_date:
|
||||
query += " AND timestamp <= ?"
|
||||
params.append(end_date)
|
||||
|
||||
query += " ORDER BY timestamp DESC LIMIT ?"
|
||||
params.append(limit)
|
||||
|
||||
cursor.execute(query, params)
|
||||
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
def get_latest_reading(self, sn: Optional[str] = None) -> Optional[Dict]:
|
||||
"""
|
||||
Get the most recent power reading.
|
||||
|
||||
Args:
|
||||
sn: Optional serial number to filter by
|
||||
|
||||
Returns:
|
||||
Latest reading dictionary or None
|
||||
"""
|
||||
readings = self.get_readings(limit=1, sn=sn)
|
||||
return readings[0] if readings else None
|
||||
|
||||
def get_stats(self, sn: Optional[str] = None) -> Dict:
|
||||
"""
|
||||
Get statistics about stored readings.
|
||||
|
||||
Args:
|
||||
sn: Optional serial number to filter by
|
||||
|
||||
Returns:
|
||||
Dictionary with count, first/last timestamps
|
||||
"""
|
||||
with self._get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = """
|
||||
SELECT
|
||||
COUNT(*) as count,
|
||||
MIN(timestamp) as first_reading,
|
||||
MAX(timestamp) as last_reading
|
||||
FROM power_readings
|
||||
"""
|
||||
|
||||
params = []
|
||||
if sn:
|
||||
query += " WHERE sn = ?"
|
||||
params.append(sn)
|
||||
|
||||
cursor.execute(query, params)
|
||||
row = cursor.fetchone()
|
||||
|
||||
return dict(row) if row else {}
|
||||
|
||||
@staticmethod
|
||||
def _to_float(value) -> Optional[float]:
|
||||
"""Convert string or numeric value to float, handling None."""
|
||||
if value is None or value == "":
|
||||
return None
|
||||
try:
|
||||
return float(value)
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
def check_day_coverage(self, date: str, sn: Optional[str] = None) -> Dict:
|
||||
"""
|
||||
Check what data exists for a specific day.
|
||||
|
||||
Args:
|
||||
date: Date string in ISO format (e.g., "2026-01-13")
|
||||
sn: Optional serial number filter
|
||||
|
||||
Returns:
|
||||
Dictionary with coverage information
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
# Parse date and get day boundaries
|
||||
day = datetime.fromisoformat(date.split('T')[0])
|
||||
start = day.replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
|
||||
end = day.replace(hour=23, minute=59, second=59, microsecond=999999).isoformat()
|
||||
|
||||
with self._get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = """
|
||||
SELECT COUNT(*) as count,
|
||||
MIN(timestamp) as first_reading,
|
||||
MAX(timestamp) as last_reading
|
||||
FROM power_readings
|
||||
WHERE timestamp >= ? AND timestamp <= ?
|
||||
"""
|
||||
params = [start, end]
|
||||
|
||||
if sn:
|
||||
query += " AND sn = ?"
|
||||
params.append(sn)
|
||||
|
||||
cursor.execute(query, params)
|
||||
row = cursor.fetchone()
|
||||
|
||||
return {
|
||||
"date": date,
|
||||
"has_data": row['count'] > 0,
|
||||
"reading_count": row['count'],
|
||||
"first_reading": row['first_reading'],
|
||||
"last_reading": row['last_reading'],
|
||||
"expected_readings": 1440 # One per minute for full day
|
||||
}
|
||||
|
||||
def insert_minute_reading(self, timestamp: str, data: Dict, sn: str, data_source: str = 'backfill') -> Optional[int]:
|
||||
"""
|
||||
Insert a single minute reading from historical data.
|
||||
|
||||
Args:
|
||||
timestamp: ISO format timestamp
|
||||
data: Data dictionary for that minute
|
||||
sn: Serial number
|
||||
data_source: 'realtime' or 'backfill'
|
||||
|
||||
Returns:
|
||||
Row ID if inserted, None if duplicate
|
||||
"""
|
||||
with self._get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute("""
|
||||
INSERT INTO power_readings (
|
||||
timestamp, data_source, sn,
|
||||
pv_total_power, battery_power, battery_soc,
|
||||
load_power, grid_power_sum
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
timestamp,
|
||||
data_source,
|
||||
sn,
|
||||
self._to_float(data.get('pvTtPwr')),
|
||||
self._to_float(data.get('batP')),
|
||||
self._to_float(data.get('batSoc')),
|
||||
self._to_float(data.get('loadPwr')),
|
||||
self._to_float(data.get('pwrGridSum'))
|
||||
))
|
||||
|
||||
return cursor.lastrowid
|
||||
except Exception as e:
|
||||
if "UNIQUE constraint failed" in str(e):
|
||||
return None
|
||||
raise
|
||||
Reference in New Issue
Block a user