Compare commits
4 Commits
6810de2583
...
39a86ee2f9
| Author | SHA1 | Date | |
|---|---|---|---|
| 39a86ee2f9 | |||
|
|
d073456c2f | ||
|
|
680335fba1 | ||
|
|
5cf970df62 |
24
.env.example
Normal file
24
.env.example
Normal file
@@ -0,0 +1,24 @@
|
||||
# HanchuESS API Configuration
|
||||
# Copy this file to .env and fill in your actual values
|
||||
|
||||
# Required: AES encryption key (must be 16, 24, or 32 bytes)
|
||||
HANCHU_AES_KEY=your_aes_key_here
|
||||
|
||||
# Required: AES initialization vector (must be 16 bytes)
|
||||
HANCHU_AES_IV=your_aes_iv_here
|
||||
|
||||
# Required: Login URL for the HanchuESS API
|
||||
HANCHU_LOGIN_URL=https://api.example.com/login
|
||||
|
||||
# Optional: Login type (default: ACCOUNT)
|
||||
HANCHU_LOGIN_TYPE=ACCOUNT
|
||||
|
||||
# Optional: HTTP timeout in seconds (default: 10)
|
||||
HANCHU_HTTP_TIMEOUT=10
|
||||
|
||||
# Optional: Verify SSL certificates (default: true, set to false for self-signed certs)
|
||||
HANCHU_VERIFY_SSL=true
|
||||
|
||||
# Optional: Username and password
|
||||
HANCHU_USERNAME=
|
||||
HANCHU_PASSWORD=
|
||||
@@ -1,2 +1,5 @@
|
||||
fastapi
|
||||
uvicorn[standard]
|
||||
requests>=2.31.0
|
||||
pycryptodome>=3.20.0
|
||||
python-dotenv>=1.0.1
|
||||
@@ -1,11 +1,84 @@
|
||||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI(title="HanchuESS Solar Backend API")
|
||||
|
||||
|
||||
class DecryptRequest(BaseModel):
|
||||
encrypted_payload: str
|
||||
|
||||
|
||||
@app.get("/", tags=["Root"])
|
||||
def root():
|
||||
return {"message": "Welcome to the HanchuESS Solar Backend API!"}
|
||||
|
||||
@app.get("/get_access_token", tags=["HanchuESS"])
|
||||
def get_access_token():
|
||||
"""Get access token by logging into HanchuESS"""
|
||||
from service.hanchu_service import HanchuESSService
|
||||
|
||||
hanchu_service = HanchuESSService()
|
||||
try:
|
||||
access_token = hanchu_service.get_access_token()
|
||||
return {"access_token": access_token}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@app.post("/decrypt_payload", tags=["Payload"])
|
||||
def decrypt_payload(request: DecryptRequest):
|
||||
"""Decrypt an AES-encrypted HanchuESS payload"""
|
||||
from service.hanchu_service import HanchuESSService
|
||||
|
||||
try:
|
||||
hanchu_service = HanchuESSService()
|
||||
decrypted_data = hanchu_service.decrypt_payload(request.encrypted_payload)
|
||||
|
||||
return {
|
||||
"decrypted_data": decrypted_data,
|
||||
"data_type": type(decrypted_data).__name__
|
||||
}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
return {"error": str(e), "traceback": traceback.format_exc()}
|
||||
|
||||
@app.get("/get_power_chart", tags=["HanchuESS"])
|
||||
def get_power_chart():
|
||||
"""Get 65-second power chart data from HanchuESS"""
|
||||
from service.hanchu_service import HanchuESSService
|
||||
|
||||
try:
|
||||
hanchu_service = HanchuESSService()
|
||||
|
||||
# Get power chart data (will automatically handle authentication)
|
||||
power_data = hanchu_service.get_power_chart()
|
||||
|
||||
return power_data
|
||||
except Exception as e:
|
||||
import traceback
|
||||
return {"error": str(e), "traceback": traceback.format_exc()}
|
||||
|
||||
@app.get("/get_power_minute_chart", tags=["HanchuESS"])
|
||||
def get_power_minute_chart(start_ts: int = None, end_ts: int = None):
|
||||
"""Get minute-by-minute power chart data from HanchuESS
|
||||
|
||||
Args:
|
||||
start_ts: Optional start timestamp in milliseconds
|
||||
end_ts: Optional end timestamp in milliseconds
|
||||
"""
|
||||
from service.hanchu_service import HanchuESSService
|
||||
|
||||
try:
|
||||
hanchu_service = HanchuESSService()
|
||||
|
||||
# Get minute chart data (will automatically handle authentication)
|
||||
chart_data = hanchu_service.get_power_minute_chart(start_ts=start_ts, end_ts=end_ts)
|
||||
|
||||
return chart_data
|
||||
except Exception as e:
|
||||
import traceback
|
||||
return {"error": str(e), "traceback": traceback.format_exc()}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8050)
|
||||
266
backend/src/service/hanchu_service.py
Normal file
266
backend/src/service/hanchu_service.py
Normal file
@@ -0,0 +1,266 @@
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
|
||||
from Crypto.Cipher import AES, PKCS1_v1_5
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class HanchuESSService:
|
||||
# RSA public key from the JavaScript code
|
||||
RSA_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
|
||||
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCVg7RFDLMGM4O98d1zWKI5RQan
|
||||
jci3iY4qlpgsH76fUn3GnZtqjbRk37lCQDv6AhgPNXRPpty81+g909/c4yzySKaP
|
||||
CcDZv7KdCRB1mVxkq+0z4EtKx9EoTXKnFSDBaYi2srdal1tM3gGOsNTDN58CzYPX
|
||||
nDGPX7+EHS1Mm4aVDQIDAQAB
|
||||
-----END PUBLIC KEY-----"""
|
||||
def __init__(self):
|
||||
self.name = "HanchuESS Service"
|
||||
|
||||
# Load config from environment
|
||||
self.aes_key = os.environ["HANCHU_AES_KEY"].encode("utf-8")
|
||||
self.aes_iv = os.environ["HANCHU_AES_IV"].encode("utf-8")
|
||||
self.login_url = os.environ["HANCHU_LOGIN_URL"]
|
||||
self.login_type = os.getenv("HANCHU_LOGIN_TYPE", "ACCOUNT")
|
||||
self.timeout = int(os.getenv("HANCHU_HTTP_TIMEOUT", "10"))
|
||||
self.verify_ssl = os.getenv("HANCHU_VERIFY_SSL", "true").lower() == "true"
|
||||
self.hanchu_username = os.getenv("HANCHU_USERNAME", "")
|
||||
self.hanchu_password = os.getenv("HANCHU_PASSWORD", "")
|
||||
self.base_serial_number = os.getenv("BASE_SERIAL_NUMBER", "")
|
||||
|
||||
# Cache for access token
|
||||
self._access_token = None
|
||||
|
||||
# Safety checks
|
||||
if len(self.aes_key) not in (16, 24, 32):
|
||||
raise ValueError("AES key must be 16, 24, or 32 bytes")
|
||||
if len(self.aes_iv) != 16:
|
||||
raise ValueError("AES IV must be exactly 16 bytes")
|
||||
|
||||
def encrypt_payload(self, data: dict | str) -> str:
|
||||
"""
|
||||
Encrypt payload using AES-CBC and return base64 string.
|
||||
"""
|
||||
if not isinstance(data, str):
|
||||
data = json.dumps(data, separators=(",", ":"))
|
||||
|
||||
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_iv)
|
||||
ciphertext = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size))
|
||||
return base64.b64encode(ciphertext).decode("utf-8")
|
||||
|
||||
def decrypt_payload(self, encrypted_data: str) -> dict | str:
|
||||
"""
|
||||
Decrypt base64-encoded AES-CBC payload and return the original data.
|
||||
"""
|
||||
ciphertext = base64.b64decode(encrypted_data)
|
||||
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_iv)
|
||||
decrypted = unpad(cipher.decrypt(ciphertext), AES.block_size)
|
||||
decrypted_str = decrypted.decode("utf-8")
|
||||
|
||||
# Try to parse as JSON, return string if it fails
|
||||
try:
|
||||
return json.loads(decrypted_str)
|
||||
except json.JSONDecodeError:
|
||||
return decrypted_str
|
||||
|
||||
def encrypt_password_rsa(self, password: str) -> str:
|
||||
"""
|
||||
Encrypt password using RSA public key (matches JavaScript GO function).
|
||||
Returns base64-encoded encrypted password.
|
||||
"""
|
||||
public_key = RSA.import_key(self.RSA_PUBLIC_KEY)
|
||||
cipher = PKCS1_v1_5.new(public_key)
|
||||
encrypted = cipher.encrypt(password.encode('utf-8'))
|
||||
return base64.b64encode(encrypted).decode('utf-8')
|
||||
|
||||
def get_access_token(self) -> str:
|
||||
"""
|
||||
Authenticate with Hanchu ESS and return access token.
|
||||
Uses double encryption: RSA for password, then AES for entire payload.
|
||||
Caches the token to avoid unnecessary logins.
|
||||
"""
|
||||
# Return cached token if available
|
||||
if self._access_token:
|
||||
return self._access_token
|
||||
|
||||
# Step 1: RSA encrypt the password
|
||||
encrypted_password = self.encrypt_password_rsa(self.hanchu_password)
|
||||
|
||||
# Step 2: Build payload with encrypted password
|
||||
payload = {
|
||||
"account": self.hanchu_username,
|
||||
"pwd": encrypted_password,
|
||||
"loginType": self.login_type,
|
||||
}
|
||||
|
||||
# Step 3: AES encrypt the entire payload
|
||||
encrypted_payload = self.encrypt_payload(payload)
|
||||
|
||||
# Step 4: Send to API with correct headers
|
||||
headers = {
|
||||
"Content-Type": "text/plain",
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||||
"appplat": "iess",
|
||||
"locale": "en",
|
||||
"timezone": "Africa/Accra",
|
||||
"timeselected": "GMT",
|
||||
"version": "1.0",
|
||||
"crypto-version": "1.0.0",
|
||||
"Origin": "https://iess3.hanchuess.com",
|
||||
"Referer": "https://iess3.hanchuess.com/login",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
self.login_url,
|
||||
data=encrypted_payload,
|
||||
headers=headers,
|
||||
timeout=self.timeout,
|
||||
verify=self.verify_ssl,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
try:
|
||||
# The token is directly in the 'data' field as a JWT string
|
||||
if result.get("success") and result.get("data"):
|
||||
self._access_token = result["data"]
|
||||
return self._access_token
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"Hanchu login failed: {json.dumps(result, ensure_ascii=False)}"
|
||||
)
|
||||
except (KeyError, TypeError):
|
||||
raise RuntimeError(
|
||||
f"Hanchu login failed: {json.dumps(result, ensure_ascii=False)}"
|
||||
)
|
||||
|
||||
def get_power_chart(self, access_token: str = None) -> dict:
|
||||
"""
|
||||
Get 65-second power chart data from HanchuESS.
|
||||
|
||||
Args:
|
||||
access_token: Optional JWT token from login. If not provided, will get one automatically.
|
||||
|
||||
Returns:
|
||||
Power chart data from the API
|
||||
"""
|
||||
# Get access token if not provided
|
||||
if not access_token:
|
||||
# Check if we have a cached token first
|
||||
if self._access_token:
|
||||
access_token = self._access_token
|
||||
else:
|
||||
access_token = self.get_access_token()
|
||||
|
||||
# Build payload with serial number
|
||||
payload = {"sn": self.base_serial_number}
|
||||
|
||||
# AES encrypt the payload
|
||||
encrypted_payload = self.encrypt_payload(payload)
|
||||
|
||||
# Send to API with access token
|
||||
url = "https://iess3.hanchuess.com/gateway/platform/pcs/powerChart"
|
||||
headers = {
|
||||
"Content-Type": "text/plain",
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||||
"appplat": "iess",
|
||||
"locale": "en",
|
||||
"timezone": "Africa/Accra",
|
||||
"timeselected": "GMT",
|
||||
"version": "1.0",
|
||||
"crypto-version": "1.0.0",
|
||||
"access-token": access_token,
|
||||
"Origin": "https://iess3.hanchuess.com",
|
||||
"Referer": "https://iess3.hanchuess.com/",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
url,
|
||||
data=encrypted_payload,
|
||||
headers=headers,
|
||||
timeout=self.timeout,
|
||||
verify=self.verify_ssl,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def get_power_minute_chart(self, access_token: str = None, start_ts: int = None, end_ts: int = None) -> dict:
|
||||
"""
|
||||
Get minute-by-minute power chart data from HanchuESS.
|
||||
|
||||
Args:
|
||||
access_token: Optional JWT token from login. If not provided, will get one automatically.
|
||||
start_ts: Start timestamp in milliseconds. If not provided, defaults to start of today.
|
||||
end_ts: End timestamp in milliseconds. If not provided, defaults to end of today.
|
||||
|
||||
Returns:
|
||||
Power minute chart data from the API
|
||||
"""
|
||||
# Get access token if not provided
|
||||
if not access_token:
|
||||
# Check if we have a cached token first
|
||||
if self._access_token:
|
||||
access_token = self._access_token
|
||||
else:
|
||||
access_token = self.get_access_token()
|
||||
|
||||
# Set default timestamps if not provided (today's data)
|
||||
if not start_ts or not end_ts:
|
||||
from datetime import datetime, timezone, timedelta
|
||||
now = datetime.now(timezone.utc)
|
||||
# 30 minutes ago
|
||||
start_time = now - timedelta(minutes=30)
|
||||
start_ts = int(start_time.timestamp() * 1000)
|
||||
# Current time
|
||||
end_ts = int(now.timestamp() * 1000)
|
||||
|
||||
# Build payload
|
||||
payload = {
|
||||
"sn": self.base_serial_number,
|
||||
"devType": "2",
|
||||
"maxCount": 1440, # 24 hours * 60 minutes
|
||||
"dataTimeTsEnd": end_ts,
|
||||
"dataTimeTsStart": start_ts,
|
||||
"masterSum": True
|
||||
}
|
||||
|
||||
# AES encrypt the payload
|
||||
encrypted_payload = self.encrypt_payload(payload)
|
||||
|
||||
# Send to API with access token
|
||||
url = "https://iess3.hanchuess.com/gateway/platform/pcs/powerMinuteChart"
|
||||
headers = {
|
||||
"Content-Type": "text/plain",
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||||
"appplat": "iess",
|
||||
"locale": "en",
|
||||
"timezone": "Africa/Accra",
|
||||
"timeselected": "GMT",
|
||||
"version": "1.0",
|
||||
"crypto-version": "1.0.0",
|
||||
"access-token": access_token,
|
||||
"Origin": "https://iess3.hanchuess.com",
|
||||
"Referer": "https://iess3.hanchuess.com/",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
url,
|
||||
data=encrypted_payload,
|
||||
headers=headers,
|
||||
timeout=self.timeout,
|
||||
verify=self.verify_ssl,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
@@ -6,3 +6,14 @@ services:
|
||||
container_name: hanchuess-solar-backend
|
||||
ports:
|
||||
- "8050:8050"
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
- HANCHU_AES_KEY=${HANCHU_AES_KEY}
|
||||
- HANCHU_AES_IV=${HANCHU_AES_IV}
|
||||
- HANCHU_LOGIN_URL=${HANCHU_LOGIN_URL}
|
||||
- HANCHU_LOGIN_TYPE=${HANCHU_LOGIN_TYPE:-ACCOUNT}
|
||||
- HANCHU_HTTP_TIMEOUT=${HANCHU_HTTP_TIMEOUT:-10}
|
||||
- HANCHU_VERIFY_SSL=${HANCHU_VERIFY_SSL:-true}
|
||||
- HANCHU_USERNAME=${HANCHU_USERNAME:-}
|
||||
- HANCHU_PASSWORD=${HANCHU_PASSWORD:-}
|
||||
Reference in New Issue
Block a user