""" Backfill service for historical data collection. Fetches day-by-day minute data from HanchuESS API and fills database gaps. """ import asyncio import logging from typing import Optional, Dict, List from datetime import datetime, timedelta, timezone from service.hanchu_service import HanchuESSService from database.db import PowerReadingsDB logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BackfillService: """Service for backfilling historical power data.""" def __init__(self): """Initialize backfill service.""" self.hanchu_service = HanchuESSService() self.db = PowerReadingsDB() self.is_running = False self.task: Optional[asyncio.Task] = None # Progress tracking self.status = { "is_running": False, "start_date": None, "end_date": None, "current_date": None, "days_total": 0, "days_completed": 0, "days_remaining": 0, "records_inserted": 0, "records_skipped": 0, "conflicts_found": 0, "errors": [], "started_at": None, "estimated_completion": None } # Configuration self.delay_between_days = 2 # seconds between API calls self.sn = self.hanchu_service.base_serial_number async def start_backfill( self, start_date: str, end_date: Optional[str] = None, delay_seconds: int = 2 ) -> Dict: """ Start backfilling data from start_date to end_date. Args: start_date: Start date in ISO format (e.g., "2025-07-02") end_date: End date in ISO format (defaults to today) delay_seconds: Delay between API calls for rate limiting Returns: Status dictionary """ if self.is_running: return {"error": "Backfill already in progress"} # Parse dates start = datetime.fromisoformat(start_date.split('T')[0]) if end_date: end = datetime.fromisoformat(end_date.split('T')[0]) else: end = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) # Calculate total days total_days = (end - start).days + 1 # Initialize status self.status = { "is_running": True, "start_date": start.isoformat(), "end_date": end.isoformat(), "current_date": start.isoformat(), "days_total": total_days, "days_completed": 0, "days_remaining": total_days, "records_inserted": 0, "records_skipped": 0, "conflicts_found": 0, "errors": [], "started_at": datetime.now().isoformat(), "estimated_completion": None } self.delay_between_days = delay_seconds self.is_running = True # Start background task self.task = asyncio.create_task( self._backfill_loop(start, end) ) logger.info(f"Backfill started: {start_date} to {end.isoformat()} ({total_days} days)") return self.status async def stop_backfill(self) -> Dict: """Stop the backfill process.""" if not self.is_running: return {"error": "No backfill in progress"} self.is_running = False if self.task: self.task.cancel() try: await self.task except asyncio.CancelledError: pass logger.info("Backfill stopped") self.status["is_running"] = False return self.status def get_status(self) -> Dict: """Get current backfill status.""" # Update estimated completion time if self.status["is_running"] and self.status["days_completed"] > 0: elapsed = ( datetime.now() - datetime.fromisoformat(self.status["started_at"]) ).total_seconds() avg_time_per_day = elapsed / self.status["days_completed"] remaining_seconds = avg_time_per_day * self.status["days_remaining"] eta = datetime.now() + timedelta(seconds=remaining_seconds) self.status["estimated_completion"] = eta.isoformat() self.status["estimated_time_remaining_minutes"] = round(remaining_seconds / 60, 1) return self.status async def _backfill_loop(self, start_date: datetime, end_date: datetime): """Main backfill loop.""" current_date = start_date while current_date <= end_date and self.is_running: try: date_str = current_date.strftime("%Y-%m-%d") self.status["current_date"] = date_str logger.info(f"Processing {date_str}...") # Check existing data coverage coverage = self.db.check_day_coverage(date_str, sn=self.sn) if coverage["has_data"] and coverage["reading_count"] > 1400: # Day is well covered, skip logger.info(f" {date_str} already has {coverage['reading_count']} readings, skipping") self.status["days_completed"] += 1 self.status["days_remaining"] -= 1 current_date += timedelta(days=1) continue # Fetch day data from API result = await self._fetch_and_store_day(current_date) self.status["records_inserted"] += result["inserted"] self.status["records_skipped"] += result["skipped"] self.status["conflicts_found"] += result["conflicts"] if result["error"]: self.status["errors"].append({ "date": date_str, "error": result["error"] }) self.status["days_completed"] += 1 self.status["days_remaining"] -= 1 logger.info( f" {date_str}: inserted={result['inserted']}, " f"skipped={result['skipped']}, conflicts={result['conflicts']}" ) # Rate limiting delay if self.is_running: await asyncio.sleep(self.delay_between_days) except Exception as e: logger.error(f"Error processing {current_date}: {e}", exc_info=True) self.status["errors"].append({ "date": current_date.strftime("%Y-%m-%d"), "error": str(e) }) current_date += timedelta(days=1) self.is_running = False self.status["is_running"] = False logger.info("Backfill completed") async def _fetch_and_store_day(self, date: datetime) -> Dict: """ Fetch minute data for a day and store in database. Returns: Dictionary with counts: inserted, skipped, conflicts, error """ result = { "inserted": 0, "skipped": 0, "conflicts": 0, "error": None } try: # Calculate timestamps for the day start_of_day = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) end_of_day = date.replace(hour=23, minute=59, second=59, microsecond=999000, tzinfo=timezone.utc) start_ts = int(start_of_day.timestamp() * 1000) end_ts = int(end_of_day.timestamp() * 1000) # Fetch minute chart data api_response = await asyncio.to_thread( self.hanchu_service.get_power_minute_chart, start_ts=start_ts, end_ts=end_ts ) # Check if successful if not api_response.get("success"): result["error"] = f"API returned success=false: {api_response}" return result # Extract data points data = api_response.get("data", {}) # Check if data is a list or dict if isinstance(data, list): # Data is an array of readings logger.info(f" Received {len(data)} data points") # Process each data point for idx, point in enumerate(data): try: if isinstance(point, dict): # Extract timestamp from dataTimeTs field (milliseconds) if 'dataTimeTs' in point: timestamp_ms = point['dataTimeTs'] point_time = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) timestamp_str = point_time.isoformat() else: # Fallback: generate timestamp based on index (minute of the day) point_time = start_of_day + timedelta(minutes=idx) timestamp_str = point_time.isoformat() row_id = self.db.insert_minute_reading( timestamp=timestamp_str, data=point, sn=self.sn, data_source='backfill' ) if row_id: result["inserted"] += 1 else: result["skipped"] += 1 except Exception as e: logger.error(f" Error inserting point {idx}: {e}") result["conflicts"] += 1 elif isinstance(data, dict): # Data is a dictionary, log keys to understand structure logger.info(f" Received API response with keys: {list(data.keys())}") # Check for common array keys that might contain the data for key in ['data', 'points', 'readings', 'values', 'chart']: if key in data and isinstance(data[key], list): logger.info(f" Found array in '{key}' with {len(data[key])} items") break # Mark as processed but note we need to implement parsing result["error"] = f"Need to implement parsing for dict structure: {list(data.keys())}" else: result["error"] = f"Unexpected data type: {type(data)}" except Exception as e: result["error"] = str(e) logger.error(f"Error fetching day data: {e}") return result # Global backfill service instance _backfill_service: Optional[BackfillService] = None def get_backfill_service() -> BackfillService: """Get or create the global backfill service instance.""" global _backfill_service if _backfill_service is None: _backfill_service = BackfillService() return _backfill_service