Compare commits

...

6 Commits

Author SHA1 Message Date
95e5e1438f Merge pull request 'Add initial DB - WIP' (#6) from add-storage-system into main
Reviewed-on: #6
2026-01-22 09:32:22 -05:00
Luke Calladine
364f9259b3 Add initial DB - WIP 2026-01-22 14:28:05 +00:00
39a86ee2f9 Merge pull request 'connect-to-hanchuess' (#5) from connect-to-hanchuess into main
Reviewed-on: #5
2026-01-13 14:39:11 -05:00
Preteck
d073456c2f Update README.md 2026-01-13 19:37:22 +00:00
Luke Calladine
680335fba1 Initial HanchuESS services 2026-01-13 16:14:20 +00:00
Luke Calladine
5cf970df62 Add simple HanchuESS Helper 2026-01-12 20:27:00 +00:00
10 changed files with 1535 additions and 4 deletions

24
.env.example Normal file
View File

@@ -0,0 +1,24 @@
# HanchuESS API Configuration
# Copy this file to .env and fill in your actual values
# Required: AES encryption key (must be 16, 24, or 32 bytes)
HANCHU_AES_KEY=your_aes_key_here
# Required: AES initialization vector (must be 16 bytes)
HANCHU_AES_IV=your_aes_iv_here
# Required: Login URL for the HanchuESS API
HANCHU_LOGIN_URL=https://api.example.com/login
# Optional: Login type (default: ACCOUNT)
HANCHU_LOGIN_TYPE=ACCOUNT
# Optional: HTTP timeout in seconds (default: 10)
HANCHU_HTTP_TIMEOUT=10
# Optional: Verify SSL certificates (default: true, set to false for self-signed certs)
HANCHU_VERIFY_SSL=true
# Optional: Username and password
HANCHU_USERNAME=
HANCHU_PASSWORD=

View File

@@ -1,2 +1,5 @@
fastapi
uvicorn[standard]
requests>=2.31.0
pycryptodome>=3.20.0
python-dotenv>=1.0.1

View File

@@ -0,0 +1 @@
# Database package

389
backend/src/database/db.py Normal file
View File

@@ -0,0 +1,389 @@
"""
Database module for storing HanchuESS power readings.
Uses SQLite with thread-safe operations.
"""
import os
import sqlite3
from datetime import datetime
from typing import List, Dict, Optional
from contextlib import contextmanager
class PowerReadingsDB:
"""Database manager for power readings."""
def __init__(self, db_path: str = None):
"""
Initialize database connection.
Args:
db_path: Path to SQLite database file. If None, reads from SQLITE_DB_PATH env var.
"""
if db_path is None:
db_path = os.getenv("SQLITE_DB_PATH", "./hanchuess_solar.db")
self.db_path = db_path
self._init_database()
@contextmanager
def _get_connection(self):
"""Context manager for database connections."""
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def _init_database(self):
"""Create tables if they don't exist."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS power_readings (
-- Primary key and timestamp
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
data_source VARCHAR(20) DEFAULT 'realtime', -- 'realtime' or 'backfill'
-- Device information
sn VARCHAR(50) NOT NULL, -- Serial number (e.g., "H016164380126")
device_model VARCHAR(10), -- devModel: Device model identifier
device_status INTEGER, -- devStatus: Device status code
-- Solar/PV metrics (photovoltaic generation)
pv_total_power REAL, -- pvTtPwr: Current PV generation in watts
pv_today_generation REAL, -- pvTdGe: Today's total PV generation in kWh
pv_today_grid_export REAL, -- pvDge: PV direct to grid today in kWh
-- Battery metrics
battery_power REAL, -- batP: Current charge(+)/discharge(-) power in watts
battery_soc REAL, -- batSoc: State of charge percentage (0-100)
battery_today_charge REAL, -- batTdChg: Today's charge energy in kWh
battery_today_discharge REAL, -- batTdDschg: Today's discharge energy in kWh
battery_status INTEGER, -- batSts: Battery status code
battery_design_capacity REAL, -- bmsDesignCap: Design capacity in kWh
-- Load/consumption metrics
load_power REAL, -- loadPwr: Current total load in watts
load_eps_power REAL, -- loadEpsPwr: EPS (backup) load power in watts
load_today_energy REAL, -- loadTdEe: Today's consumption in kWh
-- Grid metrics
grid_today_feed REAL, -- gridTdFe: Today's grid feed energy in kWh
grid_today_export REAL, -- gridTdEe: Today's grid export energy in kWh
meter_power REAL, -- meterPPwr: Current meter power reading in watts
grid_power_sum REAL, -- pwrGridSum: Grid power sum
-- System status and operating mode
work_mode INTEGER, -- workMode: Operating mode code
work_mode_combined INTEGER,-- workModeCmb: Combined work mode
work_mode_flag INTEGER, -- workModeFlag: Work mode flag
total_power INTEGER, -- pPwr: Total system power in watts
-- Bypass meter (if installed)
bypass_meter_switch INTEGER, -- bypMeterSwitch: Bypass meter on/off
bypass_meter_total_power REAL, -- bypMeterTotalPower: Bypass meter power
bypass_install_direction INTEGER, -- bypInstallDirection: Installation direction
-- Generator (if present)
dg_run_state INTEGER, -- dgRunState: Generator running state
dg_grid_state INTEGER, -- dgGridState: Generator grid connection state
-- Additional features
has_charger INTEGER, -- hasCharger: Has EV charger
has_heat INTEGER, -- hasHeat: Has heating system
has_os INTEGER, -- hasOs: Has operating system feature
has_sg INTEGER -- hasSg: Has smart grid feature
)
""")
# Create indexes for common queries
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp
ON power_readings(timestamp DESC)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_sn_timestamp
ON power_readings(sn, timestamp DESC)
""")
# Create unique constraint to prevent duplicate timestamps per device
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_sn_timestamp
ON power_readings(sn, timestamp)
""")
conn.commit()
def insert_reading(self, data: Dict, data_source: str = 'realtime') -> int:
"""
Insert a power reading into the database.
Args:
data: Dictionary containing the API response data
data_source: Source of data ('realtime' or 'backfill')
Returns:
The ID of the inserted row, or None if duplicate
"""
with self._get_connection() as conn:
cursor = conn.cursor()
# Extract nested data if present
reading_data = data.get('data', data)
try:
cursor.execute("""
INSERT INTO power_readings (
timestamp, data_source, sn, device_model, device_status,
pv_total_power, pv_today_generation, pv_today_grid_export,
battery_power, battery_soc, battery_today_charge, battery_today_discharge,
battery_status, battery_design_capacity,
load_power, load_eps_power, load_today_energy,
grid_today_feed, grid_today_export, meter_power, grid_power_sum,
work_mode, work_mode_combined, work_mode_flag, total_power,
bypass_meter_switch, bypass_meter_total_power, bypass_install_direction,
dg_run_state, dg_grid_state,
has_charger, has_heat, has_os, has_sg
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.now().isoformat(),
data_source,
reading_data.get('sn'),
reading_data.get('devModel'),
reading_data.get('devStatus'),
self._to_float(reading_data.get('pvTtPwr')),
self._to_float(reading_data.get('pvTdGe')),
self._to_float(reading_data.get('pvDge')),
self._to_float(reading_data.get('batP')),
self._to_float(reading_data.get('batSoc')),
self._to_float(reading_data.get('batTdChg')),
self._to_float(reading_data.get('batTdDschg')),
reading_data.get('batSts'),
self._to_float(reading_data.get('bmsDesignCap')),
self._to_float(reading_data.get('loadPwr')),
self._to_float(reading_data.get('loadEpsPwr')),
self._to_float(reading_data.get('loadTdEe')),
self._to_float(reading_data.get('gridTdFe')),
self._to_float(reading_data.get('gridTdEe')),
self._to_float(reading_data.get('meterPPwr')),
self._to_float(reading_data.get('pwrGridSum')),
reading_data.get('workMode'),
reading_data.get('workModeCmb'),
reading_data.get('workModeFlag'),
reading_data.get('pPwr'),
reading_data.get('bypMeterSwitch'),
self._to_float(reading_data.get('bypMeterTotalPower')),
reading_data.get('bypInstallDirection'),
reading_data.get('dgRunState'),
reading_data.get('dgGridState'),
1 if reading_data.get('hasCharger') else 0,
1 if reading_data.get('hasHeat') else 0,
1 if reading_data.get('hasOs') else 0,
1 if reading_data.get('hasSg') else 0
))
return cursor.lastrowid
except Exception as e:
# If it's a unique constraint violation, return None (duplicate)
if "UNIQUE constraint failed" in str(e):
return None
raise
def get_readings(
self,
limit: int = 100,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
sn: Optional[str] = None
) -> List[Dict]:
"""
Query power readings from the database.
Args:
limit: Maximum number of records to return
start_date: Start date (ISO format) for filtering
end_date: End date (ISO format) for filtering
sn: Serial number to filter by
Returns:
List of reading dictionaries
"""
with self._get_connection() as conn:
cursor = conn.cursor()
query = "SELECT * FROM power_readings WHERE 1=1"
params = []
if sn:
query += " AND sn = ?"
params.append(sn)
if start_date:
query += " AND timestamp >= ?"
params.append(start_date)
if end_date:
query += " AND timestamp <= ?"
params.append(end_date)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_latest_reading(self, sn: Optional[str] = None) -> Optional[Dict]:
"""
Get the most recent power reading.
Args:
sn: Optional serial number to filter by
Returns:
Latest reading dictionary or None
"""
readings = self.get_readings(limit=1, sn=sn)
return readings[0] if readings else None
def get_stats(self, sn: Optional[str] = None) -> Dict:
"""
Get statistics about stored readings.
Args:
sn: Optional serial number to filter by
Returns:
Dictionary with count, first/last timestamps
"""
with self._get_connection() as conn:
cursor = conn.cursor()
query = """
SELECT
COUNT(*) as count,
MIN(timestamp) as first_reading,
MAX(timestamp) as last_reading
FROM power_readings
"""
params = []
if sn:
query += " WHERE sn = ?"
params.append(sn)
cursor.execute(query, params)
row = cursor.fetchone()
return dict(row) if row else {}
@staticmethod
def _to_float(value) -> Optional[float]:
"""Convert string or numeric value to float, handling None."""
if value is None or value == "":
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def check_day_coverage(self, date: str, sn: Optional[str] = None) -> Dict:
"""
Check what data exists for a specific day.
Args:
date: Date string in ISO format (e.g., "2026-01-13")
sn: Optional serial number filter
Returns:
Dictionary with coverage information
"""
from datetime import datetime
# Parse date and get day boundaries
day = datetime.fromisoformat(date.split('T')[0])
start = day.replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
end = day.replace(hour=23, minute=59, second=59, microsecond=999999).isoformat()
with self._get_connection() as conn:
cursor = conn.cursor()
query = """
SELECT COUNT(*) as count,
MIN(timestamp) as first_reading,
MAX(timestamp) as last_reading
FROM power_readings
WHERE timestamp >= ? AND timestamp <= ?
"""
params = [start, end]
if sn:
query += " AND sn = ?"
params.append(sn)
cursor.execute(query, params)
row = cursor.fetchone()
return {
"date": date,
"has_data": row['count'] > 0,
"reading_count": row['count'],
"first_reading": row['first_reading'],
"last_reading": row['last_reading'],
"expected_readings": 1440 # One per minute for full day
}
def insert_minute_reading(self, timestamp: str, data: Dict, sn: str, data_source: str = 'backfill') -> Optional[int]:
"""
Insert a single minute reading from historical data.
Args:
timestamp: ISO format timestamp
data: Data dictionary for that minute
sn: Serial number
data_source: 'realtime' or 'backfill'
Returns:
Row ID if inserted, None if duplicate
"""
with self._get_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO power_readings (
timestamp, data_source, sn,
pv_total_power, battery_power, battery_soc,
load_power, grid_power_sum
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
timestamp,
data_source,
sn,
self._to_float(data.get('pvTtPwr')),
self._to_float(data.get('batP')),
self._to_float(data.get('batSoc')),
self._to_float(data.get('loadPwr')),
self._to_float(data.get('pwrGridSum'))
))
return cursor.lastrowid
except Exception as e:
if "UNIQUE constraint failed" in str(e):
return None
raise

View File

@@ -1,11 +1,417 @@
from fastapi import FastAPI
from pydantic import BaseModel
from contextlib import asynccontextmanager
from typing import Optional
import os
import sqlite3
from service.monitoring_service import get_monitoring_service
from service.backfill_service import get_backfill_service
from database.db import PowerReadingsDB
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events."""
# Startup: Initialize database
db = PowerReadingsDB()
print(f"Database initialized at: {db.db_path}")
yield
# Shutdown: Stop monitoring if running
monitoring = get_monitoring_service()
if monitoring.is_running:
await monitoring.stop()
print("Monitoring service stopped")
app = FastAPI(title="HanchuESS Solar Backend API", lifespan=lifespan)
class DecryptRequest(BaseModel):
encrypted_payload: str
app = FastAPI(title="HanchuESS Solar Backend API")
@app.get("/", tags=["Root"])
def root():
return {"message": "Welcome to the HanchuESS Solar Backend API!"}
@app.get("/test_sqlite_health", tags=["Database"])
def test_sqlite_health():
"""Test SQLite database health by creating a test table and inserting a record"""
db_path = os.getenv("SQLITE_DB_PATH")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Create test table
cursor.execute("""
CREATE TABLE IF NOT EXISTS test_health (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_value TEXT NOT NULL
)
""")
# Insert test record
cursor.execute("INSERT INTO test_health (test_value) VALUES (?)", ("health_check",))
conn.commit()
return {"status": "success", "message": "SQLite database is healthy."}
except Exception as e:
return {"status": "error", "message": str(e)}
finally:
cursor.close()
conn.close()
@app.get("/get_access_token", tags=["HanchuESS"])
def get_access_token():
"""Get access token by logging into HanchuESS"""
from service.hanchu_service import HanchuESSService
hanchu_service = HanchuESSService()
try:
access_token = hanchu_service.get_access_token()
return {"access_token": access_token}
except Exception as e:
return {"error": str(e)}
@app.post("/decrypt_payload", tags=["Payload"])
def decrypt_payload(request: DecryptRequest):
"""Decrypt an AES-encrypted HanchuESS payload"""
from service.hanchu_service import HanchuESSService
try:
hanchu_service = HanchuESSService()
decrypted_data = hanchu_service.decrypt_payload(request.encrypted_payload)
return {
"decrypted_data": decrypted_data,
"data_type": type(decrypted_data).__name__
}
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
@app.get("/get_power_chart", tags=["HanchuESS"])
def get_power_chart():
"""Get 65-second power chart data from HanchuESS"""
from service.hanchu_service import HanchuESSService
try:
hanchu_service = HanchuESSService()
# Get power chart data (will automatically handle authentication)
power_data = hanchu_service.get_power_chart()
return power_data
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
# ============================================================================
# MONITORING ENDPOINTS
# ============================================================================
@app.post("/monitoring/start", tags=["Monitoring"])
async def start_monitoring():
"""Start the 65-second monitoring service"""
monitoring = get_monitoring_service()
success = await monitoring.start()
if success:
return {
"status": "started",
"message": "Monitoring service started successfully",
"poll_interval": monitoring.poll_interval
}
else:
return {
"status": "already_running",
"message": "Monitoring service is already running"
}
@app.post("/monitoring/stop", tags=["Monitoring"])
async def stop_monitoring():
"""Stop the monitoring service"""
monitoring = get_monitoring_service()
success = await monitoring.stop()
if success:
return {
"status": "stopped",
"message": "Monitoring service stopped successfully"
}
else:
return {
"status": "not_running",
"message": "Monitoring service was not running"
}
@app.get("/monitoring/status", tags=["Monitoring"])
def get_monitoring_status():
"""Get current monitoring service status"""
monitoring = get_monitoring_service()
return monitoring.get_status()
# ============================================================================
# DATABASE QUERY ENDPOINTS
# ============================================================================
@app.get("/readings", tags=["Database"])
def get_readings(
limit: int = 100,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
sn: Optional[str] = None
):
"""
Get stored power readings from the database.
Args:
limit: Maximum number of records (default: 100)
start_date: Start date in ISO format (e.g., "2026-01-13T00:00:00")
end_date: End date in ISO format
sn: Serial number to filter by
"""
try:
db = PowerReadingsDB()
readings = db.get_readings(
limit=limit,
start_date=start_date,
end_date=end_date,
sn=sn
)
return {
"count": len(readings),
"readings": readings
}
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
@app.get("/readings/latest", tags=["Database"])
def get_latest_reading(sn: Optional[str] = None):
"""Get the most recent power reading"""
try:
db = PowerReadingsDB()
reading = db.get_latest_reading(sn=sn)
if reading:
return reading
else:
return {"message": "No readings found"}
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
@app.get("/readings/stats", tags=["Database"])
def get_reading_stats(sn: Optional[str] = None):
"""Get statistics about stored readings"""
try:
db = PowerReadingsDB()
stats = db.get_stats(sn=sn)
return stats
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
@app.get("/readings/gaps", tags=["Database"])
def find_data_gaps(
threshold_seconds: int = 70,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
sn: Optional[str] = None
):
"""
Find gaps in the stored power readings.
A gap is detected when the time between consecutive readings exceeds the threshold.
Since readings should occur every 65 seconds, a threshold of 70 seconds is reasonable.
Args:
threshold_seconds: Gap threshold in seconds (default: 70)
start_date: Start date in ISO format (optional)
end_date: End date in ISO format (optional)
sn: Serial number to filter by (optional)
Returns:
List of gaps found with start/end times and duration
"""
from datetime import datetime
try:
db = PowerReadingsDB()
# Get readings ordered by timestamp
readings = db.get_readings(
limit=10000, # Large limit to get comprehensive data
start_date=start_date,
end_date=end_date,
sn=sn
)
if len(readings) < 2:
return {
"gaps_found": 0,
"gaps": [],
"message": "Need at least 2 readings to detect gaps"
}
# Reverse to get chronological order (oldest first)
readings = list(reversed(readings))
gaps = []
for i in range(len(readings) - 1):
current = readings[i]
next_reading = readings[i + 1]
# Parse timestamps
current_time = datetime.fromisoformat(current['timestamp'])
next_time = datetime.fromisoformat(next_reading['timestamp'])
# Calculate time difference
time_diff = (next_time - current_time).total_seconds()
# If gap exceeds threshold, record it
if time_diff > threshold_seconds:
gap_duration_minutes = time_diff / 60
missed_readings = int(time_diff / 65) - 1 # Estimate missed readings
gaps.append({
"gap_start": current['timestamp'],
"gap_end": next_reading['timestamp'],
"duration_seconds": round(time_diff, 2),
"duration_minutes": round(gap_duration_minutes, 2),
"estimated_missed_readings": missed_readings,
"last_reading_before_gap_id": current['id'],
"first_reading_after_gap_id": next_reading['id']
})
# Calculate summary statistics
total_gap_time = sum(gap['duration_seconds'] for gap in gaps)
return {
"gaps_found": len(gaps),
"total_gap_duration_seconds": round(total_gap_time, 2),
"total_gap_duration_hours": round(total_gap_time / 3600, 2),
"threshold_seconds": threshold_seconds,
"readings_analyzed": len(readings),
"date_range": {
"start": readings[0]['timestamp'],
"end": readings[-1]['timestamp']
},
"gaps": gaps
}
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
@app.get("/get_day_data", tags=["HanchuESS"])
def get_day_data(date: str):
"""
Get a full day's worth of minute-by-minute power data from HanchuESS.
Args:
date: Date in ISO format (e.g., "2026-01-13" or "2026-01-13T00:00:00")
Returns:
Minute-by-minute power data for the entire day
"""
from service.hanchu_service import HanchuESSService
from datetime import datetime, timezone, timedelta
try:
hanchu_service = HanchuESSService()
# Parse the date string
try:
# Try parsing with time
day = datetime.fromisoformat(date.replace('Z', '+00:00'))
except:
# Try parsing just date
day = datetime.strptime(date, "%Y-%m-%d")
# Set to UTC timezone if not already
if day.tzinfo is None:
day = day.replace(tzinfo=timezone.utc)
# Get start of day (00:00:00)
start_of_day = day.replace(hour=0, minute=0, second=0, microsecond=0)
start_ts = int(start_of_day.timestamp() * 1000)
# Get end of day (23:59:59.999)
end_of_day = day.replace(hour=23, minute=59, second=59, microsecond=999000)
end_ts = int(end_of_day.timestamp() * 1000)
# Get the minute chart data for the full day
chart_data = hanchu_service.get_power_minute_chart(start_ts=start_ts, end_ts=end_ts)
return {
"requested_date": date,
"start_timestamp": start_ts,
"end_timestamp": end_ts,
"start_time": start_of_day.isoformat(),
"end_time": end_of_day.isoformat(),
"data": chart_data
}
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
# ============================================================================
# BACKFILL ENDPOINTS
# ============================================================================
@app.post("/backfill/start", tags=["Backfill"])
async def start_backfill(
start_date: str,
end_date: Optional[str] = None,
delay_seconds: int = 2
):
"""
Start backfilling historical data from start_date to end_date.
Args:
start_date: Start date in ISO format (e.g., "2025-07-02")
end_date: End date in ISO format (defaults to today)
delay_seconds: Delay between API calls for rate limiting (default: 2)
Example:
POST /backfill/start?start_date=2025-07-02&end_date=2026-01-22&delay_seconds=3
"""
backfill = get_backfill_service()
result = await backfill.start_backfill(
start_date=start_date,
end_date=end_date,
delay_seconds=delay_seconds
)
return result
@app.post("/backfill/stop", tags=["Backfill"])
async def stop_backfill():
"""Stop the current backfill process."""
backfill = get_backfill_service()
result = await backfill.stop_backfill()
return result
@app.get("/backfill/status", tags=["Backfill"])
def get_backfill_status():
"""
Get current backfill status and progress.
Returns:
Status including days processed, records inserted, estimated completion time
"""
backfill = get_backfill_service()
return backfill.get_status()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8050)

View File

@@ -0,0 +1,306 @@
"""
Backfill service for historical data collection.
Fetches day-by-day minute data from HanchuESS API and fills database gaps.
"""
import asyncio
import logging
from typing import Optional, Dict, List
from datetime import datetime, timedelta, timezone
from service.hanchu_service import HanchuESSService
from database.db import PowerReadingsDB
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BackfillService:
"""Service for backfilling historical power data."""
def __init__(self):
"""Initialize backfill service."""
self.hanchu_service = HanchuESSService()
self.db = PowerReadingsDB()
self.is_running = False
self.task: Optional[asyncio.Task] = None
# Progress tracking
self.status = {
"is_running": False,
"start_date": None,
"end_date": None,
"current_date": None,
"days_total": 0,
"days_completed": 0,
"days_remaining": 0,
"records_inserted": 0,
"records_skipped": 0,
"conflicts_found": 0,
"errors": [],
"started_at": None,
"estimated_completion": None
}
# Configuration
self.delay_between_days = 2 # seconds between API calls
self.sn = self.hanchu_service.base_serial_number
async def start_backfill(
self,
start_date: str,
end_date: Optional[str] = None,
delay_seconds: int = 2
) -> Dict:
"""
Start backfilling data from start_date to end_date.
Args:
start_date: Start date in ISO format (e.g., "2025-07-02")
end_date: End date in ISO format (defaults to today)
delay_seconds: Delay between API calls for rate limiting
Returns:
Status dictionary
"""
if self.is_running:
return {"error": "Backfill already in progress"}
# Parse dates
start = datetime.fromisoformat(start_date.split('T')[0])
if end_date:
end = datetime.fromisoformat(end_date.split('T')[0])
else:
end = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
# Calculate total days
total_days = (end - start).days + 1
# Initialize status
self.status = {
"is_running": True,
"start_date": start.isoformat(),
"end_date": end.isoformat(),
"current_date": start.isoformat(),
"days_total": total_days,
"days_completed": 0,
"days_remaining": total_days,
"records_inserted": 0,
"records_skipped": 0,
"conflicts_found": 0,
"errors": [],
"started_at": datetime.now().isoformat(),
"estimated_completion": None
}
self.delay_between_days = delay_seconds
self.is_running = True
# Start background task
self.task = asyncio.create_task(
self._backfill_loop(start, end)
)
logger.info(f"Backfill started: {start_date} to {end.isoformat()} ({total_days} days)")
return self.status
async def stop_backfill(self) -> Dict:
"""Stop the backfill process."""
if not self.is_running:
return {"error": "No backfill in progress"}
self.is_running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
logger.info("Backfill stopped")
self.status["is_running"] = False
return self.status
def get_status(self) -> Dict:
"""Get current backfill status."""
# Update estimated completion time
if self.status["is_running"] and self.status["days_completed"] > 0:
elapsed = (
datetime.now() -
datetime.fromisoformat(self.status["started_at"])
).total_seconds()
avg_time_per_day = elapsed / self.status["days_completed"]
remaining_seconds = avg_time_per_day * self.status["days_remaining"]
eta = datetime.now() + timedelta(seconds=remaining_seconds)
self.status["estimated_completion"] = eta.isoformat()
self.status["estimated_time_remaining_minutes"] = round(remaining_seconds / 60, 1)
return self.status
async def _backfill_loop(self, start_date: datetime, end_date: datetime):
"""Main backfill loop."""
current_date = start_date
while current_date <= end_date and self.is_running:
try:
date_str = current_date.strftime("%Y-%m-%d")
self.status["current_date"] = date_str
logger.info(f"Processing {date_str}...")
# Check existing data coverage
coverage = self.db.check_day_coverage(date_str, sn=self.sn)
if coverage["has_data"] and coverage["reading_count"] > 1400:
# Day is well covered, skip
logger.info(f" {date_str} already has {coverage['reading_count']} readings, skipping")
self.status["days_completed"] += 1
self.status["days_remaining"] -= 1
current_date += timedelta(days=1)
continue
# Fetch day data from API
result = await self._fetch_and_store_day(current_date)
self.status["records_inserted"] += result["inserted"]
self.status["records_skipped"] += result["skipped"]
self.status["conflicts_found"] += result["conflicts"]
if result["error"]:
self.status["errors"].append({
"date": date_str,
"error": result["error"]
})
self.status["days_completed"] += 1
self.status["days_remaining"] -= 1
logger.info(
f" {date_str}: inserted={result['inserted']}, "
f"skipped={result['skipped']}, conflicts={result['conflicts']}"
)
# Rate limiting delay
if self.is_running:
await asyncio.sleep(self.delay_between_days)
except Exception as e:
logger.error(f"Error processing {current_date}: {e}", exc_info=True)
self.status["errors"].append({
"date": current_date.strftime("%Y-%m-%d"),
"error": str(e)
})
current_date += timedelta(days=1)
self.is_running = False
self.status["is_running"] = False
logger.info("Backfill completed")
async def _fetch_and_store_day(self, date: datetime) -> Dict:
"""
Fetch minute data for a day and store in database.
Returns:
Dictionary with counts: inserted, skipped, conflicts, error
"""
result = {
"inserted": 0,
"skipped": 0,
"conflicts": 0,
"error": None
}
try:
# Calculate timestamps for the day
start_of_day = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)
end_of_day = date.replace(hour=23, minute=59, second=59, microsecond=999000, tzinfo=timezone.utc)
start_ts = int(start_of_day.timestamp() * 1000)
end_ts = int(end_of_day.timestamp() * 1000)
# Fetch minute chart data
api_response = await asyncio.to_thread(
self.hanchu_service.get_power_minute_chart,
start_ts=start_ts,
end_ts=end_ts
)
# Check if successful
if not api_response.get("success"):
result["error"] = f"API returned success=false: {api_response}"
return result
# Extract data points
data = api_response.get("data", {})
# Check if data is a list or dict
if isinstance(data, list):
# Data is an array of readings
logger.info(f" Received {len(data)} data points")
# Process each data point
for idx, point in enumerate(data):
try:
if isinstance(point, dict):
# Extract timestamp from dataTimeTs field (milliseconds)
if 'dataTimeTs' in point:
timestamp_ms = point['dataTimeTs']
point_time = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
timestamp_str = point_time.isoformat()
else:
# Fallback: generate timestamp based on index (minute of the day)
point_time = start_of_day + timedelta(minutes=idx)
timestamp_str = point_time.isoformat()
row_id = self.db.insert_minute_reading(
timestamp=timestamp_str,
data=point,
sn=self.sn,
data_source='backfill'
)
if row_id:
result["inserted"] += 1
else:
result["skipped"] += 1
except Exception as e:
logger.error(f" Error inserting point {idx}: {e}")
result["conflicts"] += 1
elif isinstance(data, dict):
# Data is a dictionary, log keys to understand structure
logger.info(f" Received API response with keys: {list(data.keys())}")
# Check for common array keys that might contain the data
for key in ['data', 'points', 'readings', 'values', 'chart']:
if key in data and isinstance(data[key], list):
logger.info(f" Found array in '{key}' with {len(data[key])} items")
break
# Mark as processed but note we need to implement parsing
result["error"] = f"Need to implement parsing for dict structure: {list(data.keys())}"
else:
result["error"] = f"Unexpected data type: {type(data)}"
except Exception as e:
result["error"] = str(e)
logger.error(f"Error fetching day data: {e}")
return result
# Global backfill service instance
_backfill_service: Optional[BackfillService] = None
def get_backfill_service() -> BackfillService:
"""Get or create the global backfill service instance."""
global _backfill_service
if _backfill_service is None:
_backfill_service = BackfillService()
return _backfill_service

View File

@@ -0,0 +1,266 @@
import base64
import json
import os
import requests
from Crypto.Cipher import AES, PKCS1_v1_5
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import pad, unpad
from dotenv import load_dotenv
load_dotenv()
class HanchuESSService:
# RSA public key from the JavaScript code
RSA_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCVg7RFDLMGM4O98d1zWKI5RQan
jci3iY4qlpgsH76fUn3GnZtqjbRk37lCQDv6AhgPNXRPpty81+g909/c4yzySKaP
CcDZv7KdCRB1mVxkq+0z4EtKx9EoTXKnFSDBaYi2srdal1tM3gGOsNTDN58CzYPX
nDGPX7+EHS1Mm4aVDQIDAQAB
-----END PUBLIC KEY-----"""
def __init__(self):
self.name = "HanchuESS Service"
# Load config from environment
self.aes_key = os.environ["HANCHU_AES_KEY"].encode("utf-8")
self.aes_iv = os.environ["HANCHU_AES_IV"].encode("utf-8")
self.login_url = os.environ["HANCHU_LOGIN_URL"]
self.login_type = os.getenv("HANCHU_LOGIN_TYPE", "ACCOUNT")
self.timeout = int(os.getenv("HANCHU_HTTP_TIMEOUT", "10"))
self.verify_ssl = os.getenv("HANCHU_VERIFY_SSL", "true").lower() == "true"
self.hanchu_username = os.getenv("HANCHU_USERNAME", "")
self.hanchu_password = os.getenv("HANCHU_PASSWORD", "")
self.base_serial_number = os.getenv("BASE_SERIAL_NUMBER", "")
# Cache for access token
self._access_token = None
# Safety checks
if len(self.aes_key) not in (16, 24, 32):
raise ValueError("AES key must be 16, 24, or 32 bytes")
if len(self.aes_iv) != 16:
raise ValueError("AES IV must be exactly 16 bytes")
def encrypt_payload(self, data: dict | str) -> str:
"""
Encrypt payload using AES-CBC and return base64 string.
"""
if not isinstance(data, str):
data = json.dumps(data, separators=(",", ":"))
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_iv)
ciphertext = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size))
return base64.b64encode(ciphertext).decode("utf-8")
def decrypt_payload(self, encrypted_data: str) -> dict | str:
"""
Decrypt base64-encoded AES-CBC payload and return the original data.
"""
ciphertext = base64.b64decode(encrypted_data)
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_iv)
decrypted = unpad(cipher.decrypt(ciphertext), AES.block_size)
decrypted_str = decrypted.decode("utf-8")
# Try to parse as JSON, return string if it fails
try:
return json.loads(decrypted_str)
except json.JSONDecodeError:
return decrypted_str
def encrypt_password_rsa(self, password: str) -> str:
"""
Encrypt password using RSA public key (matches JavaScript GO function).
Returns base64-encoded encrypted password.
"""
public_key = RSA.import_key(self.RSA_PUBLIC_KEY)
cipher = PKCS1_v1_5.new(public_key)
encrypted = cipher.encrypt(password.encode('utf-8'))
return base64.b64encode(encrypted).decode('utf-8')
def get_access_token(self) -> str:
"""
Authenticate with Hanchu ESS and return access token.
Uses double encryption: RSA for password, then AES for entire payload.
Caches the token to avoid unnecessary logins.
"""
# Return cached token if available
if self._access_token:
return self._access_token
# Step 1: RSA encrypt the password
encrypted_password = self.encrypt_password_rsa(self.hanchu_password)
# Step 2: Build payload with encrypted password
payload = {
"account": self.hanchu_username,
"pwd": encrypted_password,
"loginType": self.login_type,
}
# Step 3: AES encrypt the entire payload
encrypted_payload = self.encrypt_payload(payload)
# Step 4: Send to API with correct headers
headers = {
"Content-Type": "text/plain",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"appplat": "iess",
"locale": "en",
"timezone": "Africa/Accra",
"timeselected": "GMT",
"version": "1.0",
"crypto-version": "1.0.0",
"Origin": "https://iess3.hanchuess.com",
"Referer": "https://iess3.hanchuess.com/login",
}
response = requests.post(
self.login_url,
data=encrypted_payload,
headers=headers,
timeout=self.timeout,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
try:
# The token is directly in the 'data' field as a JWT string
if result.get("success") and result.get("data"):
self._access_token = result["data"]
return self._access_token
else:
raise RuntimeError(
f"Hanchu login failed: {json.dumps(result, ensure_ascii=False)}"
)
except (KeyError, TypeError):
raise RuntimeError(
f"Hanchu login failed: {json.dumps(result, ensure_ascii=False)}"
)
def get_power_chart(self, access_token: str = None) -> dict:
"""
Get 65-second power chart data from HanchuESS.
Args:
access_token: Optional JWT token from login. If not provided, will get one automatically.
Returns:
Power chart data from the API
"""
# Get access token if not provided
if not access_token:
# Check if we have a cached token first
if self._access_token:
access_token = self._access_token
else:
access_token = self.get_access_token()
# Build payload with serial number
payload = {"sn": self.base_serial_number}
# AES encrypt the payload
encrypted_payload = self.encrypt_payload(payload)
# Send to API with access token
url = "https://iess3.hanchuess.com/gateway/platform/pcs/powerChart"
headers = {
"Content-Type": "text/plain",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"appplat": "iess",
"locale": "en",
"timezone": "Africa/Accra",
"timeselected": "GMT",
"version": "1.0",
"crypto-version": "1.0.0",
"access-token": access_token,
"Origin": "https://iess3.hanchuess.com",
"Referer": "https://iess3.hanchuess.com/",
}
response = requests.post(
url,
data=encrypted_payload,
headers=headers,
timeout=self.timeout,
verify=self.verify_ssl,
)
response.raise_for_status()
return response.json()
def get_power_minute_chart(self, access_token: str = None, start_ts: int = None, end_ts: int = None) -> dict:
"""
Get minute-by-minute power chart data from HanchuESS.
Args:
access_token: Optional JWT token from login. If not provided, will get one automatically.
start_ts: Start timestamp in milliseconds. If not provided, defaults to start of today.
end_ts: End timestamp in milliseconds. If not provided, defaults to end of today.
Returns:
Power minute chart data from the API
"""
# Get access token if not provided
if not access_token:
# Check if we have a cached token first
if self._access_token:
access_token = self._access_token
else:
access_token = self.get_access_token()
# Set default timestamps if not provided (today's data)
if not start_ts or not end_ts:
from datetime import datetime, timezone, timedelta
now = datetime.now(timezone.utc)
# 30 minutes ago
start_time = now - timedelta(minutes=30)
start_ts = int(start_time.timestamp() * 1000)
# Current time
end_ts = int(now.timestamp() * 1000)
# Build payload
payload = {
"sn": self.base_serial_number,
"devType": "2",
"maxCount": 1440, # 24 hours * 60 minutes
"dataTimeTsEnd": end_ts,
"dataTimeTsStart": start_ts,
"masterSum": True
}
# AES encrypt the payload
encrypted_payload = self.encrypt_payload(payload)
# Send to API with access token
url = "https://iess3.hanchuess.com/gateway/platform/pcs/powerMinuteChart"
headers = {
"Content-Type": "text/plain",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"appplat": "iess",
"locale": "en",
"timezone": "Africa/Accra",
"timeselected": "GMT",
"version": "1.0",
"crypto-version": "1.0.0",
"access-token": access_token,
"Origin": "https://iess3.hanchuess.com",
"Referer": "https://iess3.hanchuess.com/",
}
response = requests.post(
url,
data=encrypted_payload,
headers=headers,
timeout=self.timeout,
verify=self.verify_ssl,
)
response.raise_for_status()
return response.json()

View File

@@ -0,0 +1,119 @@
"""
Monitoring service for continuous power data collection.
Polls the HanchuESS API every 65 seconds and stores data in SQLite.
"""
import asyncio
import logging
from typing import Optional
from datetime import datetime
from service.hanchu_service import HanchuESSService
from database.db import PowerReadingsDB
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MonitoringService:
"""Background service for monitoring power readings."""
def __init__(self):
"""Initialize monitoring service."""
self.hanchu_service = HanchuESSService()
self.db = PowerReadingsDB()
self.is_running = False
self.task: Optional[asyncio.Task] = None
self.poll_interval = 65 # seconds
async def start(self):
"""Start the monitoring service."""
if self.is_running:
logger.warning("Monitoring service is already running")
return False
self.is_running = True
self.task = asyncio.create_task(self._monitoring_loop())
logger.info("Monitoring service started")
return True
async def stop(self):
"""Stop the monitoring service."""
if not self.is_running:
logger.warning("Monitoring service is not running")
return False
self.is_running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
logger.info("Monitoring service stopped")
return True
def get_status(self) -> dict:
"""Get current monitoring status."""
return {
"is_running": self.is_running,
"poll_interval": self.poll_interval,
"db_path": self.db.db_path,
"db_stats": self.db.get_stats()
}
async def _monitoring_loop(self):
"""Main monitoring loop that runs every 65 seconds."""
logger.info(f"Monitoring loop started (polling every {self.poll_interval}s)")
while self.is_running:
try:
# Fetch power data from API
logger.info("Fetching power chart data...")
power_data = await asyncio.to_thread(
self.hanchu_service.get_power_chart
)
# Check if API call was successful
if power_data.get('success'):
# Insert into database with 'realtime' source
reading_id = self.db.insert_reading(power_data, data_source='realtime')
logger.info(
f"Stored reading ID {reading_id} at {datetime.now().isoformat()}"
)
# Log some key metrics
data = power_data.get('data', {})
logger.info(
f" PV: {data.get('pvTtPwr')}W | "
f"Battery: {data.get('batP')}W ({data.get('batSoc')}%) | "
f"Load: {data.get('loadPwr')}W"
)
else:
logger.error(f"API call failed: {power_data}")
except Exception as e:
logger.error(f"Error in monitoring loop: {e}", exc_info=True)
# Wait for next poll interval
try:
await asyncio.sleep(self.poll_interval)
except asyncio.CancelledError:
logger.info("Monitoring loop cancelled")
break
logger.info("Monitoring loop ended")
# Global monitoring service instance
_monitoring_service: Optional[MonitoringService] = None
def get_monitoring_service() -> MonitoringService:
"""Get or create the global monitoring service instance."""
global _monitoring_service
if _monitoring_service is None:
_monitoring_service = MonitoringService()
return _monitoring_service

View File

@@ -6,3 +6,20 @@ services:
container_name: hanchuess-solar-backend
ports:
- "8050:8050"
volumes:
- sqlite_data:/data
env_file:
- .env
environment:
- HANCHU_AES_KEY=${HANCHU_AES_KEY}
- HANCHU_AES_IV=${HANCHU_AES_IV}
- HANCHU_LOGIN_URL=${HANCHU_LOGIN_URL}
- HANCHU_LOGIN_TYPE=${HANCHU_LOGIN_TYPE:-ACCOUNT}
- HANCHU_HTTP_TIMEOUT=${HANCHU_HTTP_TIMEOUT:-10}
- HANCHU_VERIFY_SSL=${HANCHU_VERIFY_SSL:-true}
- HANCHU_USERNAME=${HANCHU_USERNAME:-}
- HANCHU_PASSWORD=${HANCHU_PASSWORD:-}
- SQLITE_DB_PATH=${SQLITE_DB_PATH:-}
volumes:
sqlite_data: