import base64 import json import os import requests from Crypto.Cipher import AES, PKCS1_v1_5 from Crypto.PublicKey import RSA from Crypto.Util.Padding import pad, unpad from dotenv import load_dotenv load_dotenv() class HanchuESSService: # RSA public key from the JavaScript code RSA_PUBLIC_KEY = """-----BEGIN PUBLIC KEY----- MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCVg7RFDLMGM4O98d1zWKI5RQan jci3iY4qlpgsH76fUn3GnZtqjbRk37lCQDv6AhgPNXRPpty81+g909/c4yzySKaP CcDZv7KdCRB1mVxkq+0z4EtKx9EoTXKnFSDBaYi2srdal1tM3gGOsNTDN58CzYPX nDGPX7+EHS1Mm4aVDQIDAQAB -----END PUBLIC KEY-----""" def __init__(self): self.name = "HanchuESS Service" # Load config from environment self.aes_key = os.environ["HANCHU_AES_KEY"].encode("utf-8") self.aes_iv = os.environ["HANCHU_AES_IV"].encode("utf-8") self.login_url = os.environ["HANCHU_LOGIN_URL"] self.login_type = os.getenv("HANCHU_LOGIN_TYPE", "ACCOUNT") self.timeout = int(os.getenv("HANCHU_HTTP_TIMEOUT", "10")) self.verify_ssl = os.getenv("HANCHU_VERIFY_SSL", "true").lower() == "true" self.hanchu_username = os.getenv("HANCHU_USERNAME", "") self.hanchu_password = os.getenv("HANCHU_PASSWORD", "") self.base_serial_number = os.getenv("BASE_SERIAL_NUMBER", "") # Cache for access token self._access_token = None # Safety checks if len(self.aes_key) not in (16, 24, 32): raise ValueError("AES key must be 16, 24, or 32 bytes") if len(self.aes_iv) != 16: raise ValueError("AES IV must be exactly 16 bytes") def encrypt_payload(self, data: dict | str) -> str: """ Encrypt payload using AES-CBC and return base64 string. """ if not isinstance(data, str): data = json.dumps(data, separators=(",", ":")) cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_iv) ciphertext = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size)) return base64.b64encode(ciphertext).decode("utf-8") def decrypt_payload(self, encrypted_data: str) -> dict | str: """ Decrypt base64-encoded AES-CBC payload and return the original data. """ ciphertext = base64.b64decode(encrypted_data) cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_iv) decrypted = unpad(cipher.decrypt(ciphertext), AES.block_size) decrypted_str = decrypted.decode("utf-8") # Try to parse as JSON, return string if it fails try: return json.loads(decrypted_str) except json.JSONDecodeError: return decrypted_str def encrypt_password_rsa(self, password: str) -> str: """ Encrypt password using RSA public key (matches JavaScript GO function). Returns base64-encoded encrypted password. """ public_key = RSA.import_key(self.RSA_PUBLIC_KEY) cipher = PKCS1_v1_5.new(public_key) encrypted = cipher.encrypt(password.encode('utf-8')) return base64.b64encode(encrypted).decode('utf-8') def get_access_token(self) -> str: """ Authenticate with Hanchu ESS and return access token. Uses double encryption: RSA for password, then AES for entire payload. Caches the token to avoid unnecessary logins. """ # Return cached token if available if self._access_token: return self._access_token # Step 1: RSA encrypt the password encrypted_password = self.encrypt_password_rsa(self.hanchu_password) # Step 2: Build payload with encrypted password payload = { "account": self.hanchu_username, "pwd": encrypted_password, "loginType": self.login_type, } # Step 3: AES encrypt the entire payload encrypted_payload = self.encrypt_payload(payload) # Step 4: Send to API with correct headers headers = { "Content-Type": "text/plain", "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "appplat": "iess", "locale": "en", "timezone": "Africa/Accra", "timeselected": "GMT", "version": "1.0", "crypto-version": "1.0.0", "Origin": "https://iess3.hanchuess.com", "Referer": "https://iess3.hanchuess.com/login", } response = requests.post( self.login_url, data=encrypted_payload, headers=headers, timeout=self.timeout, verify=self.verify_ssl, ) response.raise_for_status() result = response.json() try: # The token is directly in the 'data' field as a JWT string if result.get("success") and result.get("data"): self._access_token = result["data"] return self._access_token else: raise RuntimeError( f"Hanchu login failed: {json.dumps(result, ensure_ascii=False)}" ) except (KeyError, TypeError): raise RuntimeError( f"Hanchu login failed: {json.dumps(result, ensure_ascii=False)}" ) def get_power_chart(self, access_token: str = None) -> dict: """ Get 65-second power chart data from HanchuESS. Args: access_token: Optional JWT token from login. If not provided, will get one automatically. Returns: Power chart data from the API """ # Get access token if not provided if not access_token: # Check if we have a cached token first if self._access_token: access_token = self._access_token else: access_token = self.get_access_token() # Build payload with serial number payload = {"sn": self.base_serial_number} # AES encrypt the payload encrypted_payload = self.encrypt_payload(payload) # Send to API with access token url = "https://iess3.hanchuess.com/gateway/platform/pcs/powerChart" headers = { "Content-Type": "text/plain", "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "appplat": "iess", "locale": "en", "timezone": "Africa/Accra", "timeselected": "GMT", "version": "1.0", "crypto-version": "1.0.0", "access-token": access_token, "Origin": "https://iess3.hanchuess.com", "Referer": "https://iess3.hanchuess.com/", } response = requests.post( url, data=encrypted_payload, headers=headers, timeout=self.timeout, verify=self.verify_ssl, ) response.raise_for_status() return response.json() def get_power_minute_chart(self, access_token: str = None, start_ts: int = None, end_ts: int = None) -> dict: """ Get minute-by-minute power chart data from HanchuESS. Args: access_token: Optional JWT token from login. If not provided, will get one automatically. start_ts: Start timestamp in milliseconds. If not provided, defaults to start of today. end_ts: End timestamp in milliseconds. If not provided, defaults to end of today. Returns: Power minute chart data from the API """ # Get access token if not provided if not access_token: # Check if we have a cached token first if self._access_token: access_token = self._access_token else: access_token = self.get_access_token() # Set default timestamps if not provided (today's data) if not start_ts or not end_ts: from datetime import datetime, timezone, timedelta now = datetime.now(timezone.utc) # 30 minutes ago start_time = now - timedelta(minutes=30) start_ts = int(start_time.timestamp() * 1000) # Current time end_ts = int(now.timestamp() * 1000) # Build payload payload = { "sn": self.base_serial_number, "devType": "2", "maxCount": 1440, # 24 hours * 60 minutes "dataTimeTsEnd": end_ts, "dataTimeTsStart": start_ts, "masterSum": True } # AES encrypt the payload encrypted_payload = self.encrypt_payload(payload) # Send to API with access token url = "https://iess3.hanchuess.com/gateway/platform/pcs/powerMinuteChart" headers = { "Content-Type": "text/plain", "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "appplat": "iess", "locale": "en", "timezone": "Africa/Accra", "timeselected": "GMT", "version": "1.0", "crypto-version": "1.0.0", "access-token": access_token, "Origin": "https://iess3.hanchuess.com", "Referer": "https://iess3.hanchuess.com/", } response = requests.post( url, data=encrypted_payload, headers=headers, timeout=self.timeout, verify=self.verify_ssl, ) response.raise_for_status() return response.json()