Add simple HanchuESS Helper
This commit is contained in:
@@ -1,2 +1,5 @@
|
||||
fastapi
|
||||
uvicorn[standard]
|
||||
uvicorn[standard]
|
||||
requests>=2.31.0
|
||||
pycryptodome>=3.20.0
|
||||
python-dotenv>=1.0.1
|
||||
@@ -6,6 +6,17 @@ app = FastAPI(title="HanchuESS Solar Backend API")
|
||||
def root():
|
||||
return {"message": "Welcome to the HanchuESS Solar Backend API!"}
|
||||
|
||||
@app.get("/get_access_token", tags=["HanchuESS"])
|
||||
def get_access_token():
|
||||
from service.hanchu_service import HanchuESSService
|
||||
|
||||
hanchu_service = HanchuESSService()
|
||||
try:
|
||||
access_token = hanchu_service.get_access_token()
|
||||
return {"access_token": access_token}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8050)
|
||||
80
backend/src/service/hanchu_service.py
Normal file
80
backend/src/service/hanchu_service.py
Normal file
@@ -0,0 +1,80 @@
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class HanchuESSService:
|
||||
def __init__(self):
|
||||
self.name = "HanchuESS Service"
|
||||
|
||||
# Load config from environment
|
||||
self.aes_key = os.environ["HANCHU_AES_KEY"].encode("utf-8")
|
||||
self.aes_iv = os.environ["HANCHU_AES_IV"].encode("utf-8")
|
||||
self.login_url = os.environ["HANCHU_LOGIN_URL"]
|
||||
self.login_type = os.getenv("HANCHU_LOGIN_TYPE", "ACCOUNT")
|
||||
self.timeout = int(os.getenv("HANCHU_HTTP_TIMEOUT", "10"))
|
||||
self.verify_ssl = os.getenv("HANCHU_VERIFY_SSL", "true").lower() == "true"
|
||||
self.hanchu_username = os.getenv("HANCHU_USERNAME", "")
|
||||
self.hanchu_password = os.getenv("HANCHU_PASSWORD", "")
|
||||
|
||||
# Safety checks
|
||||
if len(self.aes_key) not in (16, 24, 32):
|
||||
raise ValueError("AES key must be 16, 24, or 32 bytes")
|
||||
if len(self.aes_iv) != 16:
|
||||
raise ValueError("AES IV must be exactly 16 bytes")
|
||||
|
||||
def encrypt_payload(self, data: dict | str) -> str:
|
||||
"""
|
||||
Encrypt payload using AES-CBC and return base64 string.
|
||||
"""
|
||||
if not isinstance(data, str):
|
||||
data = json.dumps(data, separators=(",", ":"))
|
||||
|
||||
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.aes_iv)
|
||||
ciphertext = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size))
|
||||
return base64.b64encode(ciphertext).decode("utf-8")
|
||||
|
||||
def get_access_token(self) -> str:
|
||||
"""
|
||||
Authenticate with Hanchu ESS and return access token.
|
||||
"""
|
||||
|
||||
payload = {
|
||||
"account": self.hanchu_username,
|
||||
"password": self.hanchu_password,
|
||||
"loginType": self.login_type,
|
||||
}
|
||||
|
||||
encrypted_payload = self.encrypt_payload(payload)
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"User-Agent": "Mozilla/5.0",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
self.login_url,
|
||||
json={"data": encrypted_payload},
|
||||
headers=headers,
|
||||
timeout=self.timeout,
|
||||
verify=self.verify_ssl,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
try:
|
||||
return result["data"]["accessToken"]
|
||||
except (KeyError, TypeError):
|
||||
raise RuntimeError(
|
||||
f"Hanchu login failed: {json.dumps(result, ensure_ascii=False)}"
|
||||
)
|
||||
Reference in New Issue
Block a user