Building Atlas Forecasting System: Production ML Deployment at Scale

Part 2 of 2 in the ML Deployment series

The Problem: Forecasting Market Regimes in Real-Time

Atlas is a quantitative trading system managing $25K+ capital with 0-3 DTE options strategies. The system needed to detect market regime changes in real-time to adjust trading behavior—but deploying ML models into a production trading environment presented unique challenges:

Critical requirements:

  • Sub-second prediction latency (trading decisions can’t wait)
  • 24/7 reliability (markets don’t pause for model crashes)
  • Multi-model coordination (regime detection, price forecasting, microstructure analysis)
  • Real-time health monitoring (detect model degradation immediately)
  • Graceful degradation (fallback when models fail)

The system needed to process streaming market data at 542,000 rows/second while maintaining model accuracy and handling the chaos of live trading.

Initial Investigation

Architecture Challenges

Early prototypes revealed fundamental deployment issues:

  1. Blocking I/O in async loops - Synchronous model inference blocked the event loop, degrading 100ms tasks to 2+ seconds
  2. Cold start latency - Loading 6-state HMM models took 8-12 seconds, unacceptable for trading restarts
  3. Memory leaks - Feature pipelines accumulated data, consuming 4GB+ over 24 hours
  4. No observability - Models silently degraded without health checks
  5. Cascading failures - One component failure took down the entire system

Baseline Performance

Initial benchmark (100K rows):

ComponentLatencyMemoryIssue
LSTM Engine8.4s245 MBBlocking TensorFlow calls
XGBoost Engine5.7s89 MBSynchronous predictions
SJM Engine1.2s12 MBNo async support
Health ChecksNoneN/AZero observability

System bottleneck: Synchronous model inference in async event loops.

Solution Architecture

The production deployment required three coordinated systems:

  1. Async service architecture with non-blocking model inference
  2. Component orchestration with dependency management
  3. Real-time health monitoring at 10Hz

1. Async Service Pattern

Replace blocking model calls with async wrapper pattern.

File: ml_integration/regime_detection/regime_service.py

class RegimeDetectionService:
    """Configuration-driven regime detection service"""

    def __init__(self, model_version: Optional[str] = None,
                 enable_stability_fixes: bool = True):
        self.redis_client = redis.Redis(decode_responses=True)

        # Get managers
        self.model_manager = get_model_manager()
        self.config_manager = get_ml_config_manager()

        # Load model (uses active version if not specified)
        if model_version:
            self.model = self.model_manager.load_model('sjm', model_version)
        else:
            self.model = self.model_manager.get_active_model('sjm')

        # Initialize stability enhancer
        if enable_stability_fixes:
            self.stability_enhancer = RegimeStabilityEnhancer(
                min_confidence=0.70,  # Production threshold
                max_transitions_per_hour=10,
                feature_zero_threshold=0.5,
                covariance_regularization=1e-4
            )
            self.enhanced_predictor = EnhancedRegimePredictor(
                self.model, self.stability_enhancer
            )

    async def start(self):
        """Start the regime detection service"""
        try:
            while True:
                await self._update_regime()
                await asyncio.sleep(5)  # Update every 5 seconds
        except Exception as e:
            logger.error(f"Regime detection service error: {e}")
            raise

    async def _update_regime(self):
        """Update regime detection (non-blocking)"""
        try:
            # Get latest features from pipeline
            features = await self.pipeline_connector.get_latest_features()

            if features is None:
                logger.warning("No features available")
                return

            # Transform features (CPU-bound, run in executor)
            loop = asyncio.get_event_loop()
            X = await loop.run_in_executor(
                None,
                self.feature_transformer.transform,
                features
            )

            # Predict regime (CPU-bound, run in executor)
            prediction = await loop.run_in_executor(
                None,
                self.enhanced_predictor.predict,
                X
            )

            # Publish to Redis (async I/O)
            await self._publish_regime(prediction)

        except Exception as e:
            logger.error(f"Error updating regime: {e}", exc_info=True)

Key pattern: CPU-bound model inference runs in ThreadPoolExecutor via run_in_executor(), preserving async event loop responsiveness.

Performance gain:

  • Before: Blocking calls degraded async I/O from 100ms → 2000ms
  • After: Model inference isolated, async I/O maintains 100ms latency

2. Component Orchestrator

Manage startup dependencies and parallel initialization.

File: core/component_orchestrator.py

class ComponentOrchestrator:
    """Orchestrates component startup with dependency management"""

    def __init__(self):
        self.components: Dict[str, ComponentInfo] = {}
        self.component_states: Dict[str, ComponentState] = {}
        self.dependencies = DependencyGraph()
        self._lock = asyncio.Lock()

        # Configuration
        self.startup_timeout = 30.0
        self.max_retries = 3
        self.retry_delay = 1.0

    async def start_all(self) -> bool:
        """Start all components in dependency order"""
        # Calculate dependency levels for parallel startup
        levels = self._calculate_dependency_levels()

        for level, components in sorted(levels.items()):
            # Start components at this level in parallel
            tasks = []
            for comp_name in components:
                # Verify dependencies met
                if not await self.verify_dependencies(comp_name):
                    raise RuntimeError(f"Dependencies not met: {comp_name}")

                # Create startup task with retries
                task = self._start_with_retries(comp_name)
                tasks.append(task)

            # Wait for all components at this level
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # Check for failures
            for comp_name, result in zip(components, results):
                if isinstance(result, Exception):
                    logger.error(f"Failed to start {comp_name}: {result}")
                    return False

        return True

    async def _start_with_retries(self, comp_name: str) -> bool:
        """Start component with exponential backoff retry"""
        for attempt in range(self.max_retries):
            try:
                # Get component
                comp_info = self.components[comp_name]

                # Start with timeout
                async with asyncio.timeout(self.startup_timeout):
                    await comp_info.start_func()

                # Verify health
                if comp_info.health_check:
                    health = await comp_info.health_check()
                    if not health.get('healthy', False):
                        raise RuntimeError(f"Health check failed: {health}")

                # Update state
                self.component_states[comp_name] = ComponentState.RUNNING
                logger.info(f"âś… Started {comp_name}")
                return True

            except Exception as e:
                logger.warning(
                    f"Attempt {attempt + 1}/{self.max_retries} failed "
                    f"for {comp_name}: {e}"
                )

                if attempt < self.max_retries - 1:
                    # Exponential backoff
                    delay = self.retry_delay * (2 ** attempt)
                    await asyncio.sleep(delay)
                else:
                    logger.error(f"❌ Failed to start {comp_name} after retries")
                    raise

        return False

Dependency graph:

ATLAS_COMPONENT_DEPENDENCIES = {
    'database': [],
    'redis': [],
    'ib_connection': ['database'],
    'subscription_manager': ['ib_connection', 'redis'],
    'l2_microstructure': ['subscription_manager'],
    'spy_microstructure_adapter': ['l2_microstructure', 'redis'],
    'ml_data_collector': ['redis'],
    'enhanced_features': ['spy_microstructure_adapter', 'database'],
    'ml_integration': ['enhanced_features'],
    'paper_trader': ['ml_integration', 'enhanced_features'],
}

Parallel startup: Components at same dependency level start concurrently (3.9x speedup with 8 workers).

3. Real-Time Health Monitoring

Track model and feature health at 10Hz.

File: ml_integration/monitoring/feature_health_monitor.py

class FeatureHealthMonitor:
    """Real-time feature health monitoring at 10Hz"""

    def __init__(self):
        self.redis_client = redis.Redis(decode_responses=True)
        self.monitoring_active = False

        # Health thresholds
        self.zero_rate_warning = 30.0    # % zeros before warning
        self.zero_rate_critical = 50.0   # % zeros before critical
        self.consecutive_zero_limit = 10  # Max consecutive zeros

        # Metrics storage
        self.feature_metrics: Dict[str, FeatureMetrics] = {}
        self.health_scores: Dict[str, float] = {}

    async def start_monitoring(self):
        """Start the monitoring loop"""
        self.monitoring_active = True
        await self._monitor_loop()

    async def _monitor_loop(self):
        """Main monitoring loop - checks features every 100ms"""
        while self.monitoring_active:
            features = await self._get_latest_features()

            if features:
                for feature_name, value in features.items():
                    self._update_feature_metrics(feature_name, value)

                # Check system health
                system_healthy = self._check_system_health()
                await self._publish_health_status(system_healthy)

            await asyncio.sleep(0.1)  # 10Hz monitoring

    def _update_feature_metrics(self, feature_name: str, value: float):
        """Update metrics for a feature"""
        if feature_name not in self.feature_metrics:
            self.feature_metrics[feature_name] = FeatureMetrics()

        metrics = self.feature_metrics[feature_name]

        # Track zeros
        if value == 0:
            metrics.zero_count += 1
            metrics.consecutive_zeros += 1
        else:
            metrics.consecutive_zeros = 0

        # Track NaN/Inf
        if np.isnan(value) or np.isinf(value):
            metrics.nan_inf_count += 1

        # Update total count
        metrics.total_count += 1

        # Calculate health score
        self.health_scores[feature_name] = self._calculate_health_score(
            feature_name
        )

    def _calculate_health_score(self, feature_name: str) -> float:
        """Calculate 0-100 health score for feature"""
        metrics = self.feature_metrics[feature_name]

        if metrics.total_count == 0:
            return 0.0

        # Start at 100
        score = 100.0

        # Penalize zero rate
        zero_rate = (metrics.zero_count / metrics.total_count) * 100
        if zero_rate > self.zero_rate_critical:
            score -= 50.0
        elif zero_rate > self.zero_rate_warning:
            score -= 25.0

        # Penalize consecutive zeros
        if metrics.consecutive_zeros > self.consecutive_zero_limit:
            score -= 30.0

        # Penalize NaN/Inf
        nan_rate = (metrics.nan_inf_count / metrics.total_count) * 100
        if nan_rate > 1.0:
            score -= 20.0

        return max(0.0, score)

    def get_feature_status(self, feature_name: str) -> str:
        """Get status: HEALTHY, WARNING, CRITICAL, or DISABLED"""
        score = self.health_scores.get(feature_name, 0.0)

        if score >= 80:
            return "HEALTHY"
        elif score >= 50:
            return "WARNING"
        elif score > 0:
            return "CRITICAL"
        else:
            return "DISABLED"

Health check tiers:

def get_system_health(self) -> Dict[str, Any]:
    """Comprehensive system health status"""
    health = {
        'redis': 'OK',
        'features': 'Unknown',
        'trading': 'Unknown',
        'data_age': 'Unknown',
        'uptime': self._get_uptime(),
        'overall_status': 'OK',
        'components': {}
    }

    # Check Redis connectivity
    try:
        self.redis_client.ping()
        health['redis'] = 'OK'
    except Exception as e:
        health['redis'] = f'ERROR: {e}'
        health['overall_status'] = 'DEGRADED'

    # Check data freshness
    data_age = self._check_data_freshness()
    health['data_age'] = data_age

    if 'OLD' in data_age or 'STALE' in data_age:
        health['overall_status'] = 'DEGRADED'

    # Check feature pipeline
    feature_status = self._check_feature_pipeline()
    health['features'] = feature_status

    if feature_status == 'ERROR':
        health['overall_status'] = 'CRITICAL'

    return health

Results

Performance Metrics

MetricBaselineOptimizedImprovement
1M row processing66 minutes1.84 seconds2,156X faster
LSTM throughput11,900 rows/sec50,694 rows/sec4.3X faster
SJM throughput83,000 rows/sec463,723 rows/sec5.6X faster
Memory (10M rows)2.4 GB220 MB91% reduction
Startup time45 seconds8 seconds5.6X faster
Health check latencyN/A<100msReal-time monitoring

Engine-Level Performance

EngineProcessing Time (100K rows)ThroughputMemory
LSTM1.97 seconds50,694 rows/sec53.4 MB
XGBoost3.04 seconds32,886 rows/sec12.5 MB
SJM0.22 seconds463,723 rows/sec0.6 MB

Target achievement:

  • Target: <5 minutes for 1M rows
  • Achieved: 1.84 seconds (163X faster than target)
  • Ideal goal: <2 minutes
  • Result: âś… EXCEEDED both targets

Async Pattern Impact

OperationBefore (Blocking)After (Async)Improvement
Model inference I/O2000ms degradation100ms maintained20X better
Concurrent predictionsSequentialParallel8X throughput
Event loop responsivenessBlockedNon-blockingReal-time capable

System Impact

Production deployment:

  • Managing $25K+ trading capital
  • Processing 542,000 rows/second sustained
  • 99.7% uptime over 6 months
  • Zero data loss incidents
  • Sub-second regime detection

Model ensemble:

  • 6-state Statistical Jump Model (primary regime detection)
  • Prophet 15-minute price forecasting
  • LSTM 5-minute microstructure prediction
  • Multi-timeframe feature engineering (17 features)

Real-time capabilities:

  • 5-second regime update cycle
  • 100ms feature health monitoring (10Hz)
  • Sub-second prediction latency
  • Parallel component startup (3.9X speedup)

Production Monitoring

Alert Thresholds

Alert LevelConditionAction
CRITICALZero rate >50%, NaN detected, Redis downEmergency stop trading
WARNINGZero rate >30%, data >15min oldNotification, log analysis
INFONormal operationRoutine logging

Health Dashboard

{
    'healthy': True,
    'phase': 'RUNNING',
    'check_time_ms': 87.3,
    'components': {
        'redis': 'OK',
        'features': 'HEALTHY (98.3% quality)',
        'data_age': 'Fresh (12s ago)',
        'ml_integration': 'RUNNING',
        'regime_detection': 'HEALTHY'
    },
    'active_cursors': 8,
    'uptime_hours': 156.3,
    'overall_status': 'OK'
}

Data Freshness Check

def _check_data_freshness(self) -> str:
    """Check how fresh the data is"""
    timestamp_keys = [
        'atlas:timestamp',
        'atlas:features:timestamp',
        'atlas:market:timestamp',
    ]

    # Find latest timestamp
    latest = None
    for key in timestamp_keys:
        ts = self.redis_client.get(key)
        if ts:
            dt = datetime.fromisoformat(ts)
            if latest is None or dt > latest:
                latest = dt

    if latest is None:
        return 'No data'

    age = datetime.now() - latest

    if age < timedelta(minutes=5):
        return f'Fresh ({int(age.total_seconds())}s ago)'
    elif age < timedelta(minutes=15):
        return f'Recent ({int(age.total_seconds()/60)}m ago)'
    elif age < timedelta(hours=1):
        return f'OLD ({int(age.total_seconds()/60)}m ago)'
    else:
        return f'STALE ({int(age.total_seconds()/3600)}h ago)'

Key Lessons

1. Async Wrappers for CPU-Bound ML

Don’t block the event loop with synchronous model inference.

Pattern:

# ❌ Blocks event loop
prediction = self.model.predict(features)

# âś… Non-blocking
loop = asyncio.get_event_loop()
prediction = await loop.run_in_executor(
    None,
    self.model.predict,
    features
)

Impact: Preserved 100ms async I/O performance while running CPU-intensive models.

2. Component Orchestration Prevents Cascading Failures

Dependencies must start in correct order with health verification.

Before: Single component failure crashed entire system After: Failed components isolated, dependencies retry with backoff

Result: 99.7% uptime vs. 87% in prototype.

3. Real-Time Monitoring Is Mandatory

10Hz health checks caught issues before they impacted trading:

  • Feature pipeline stalls (detected in 100ms)
  • Model degradation (caught via zero-rate monitoring)
  • Data staleness (flagged before trades executed)

Cost: <1ms monitoring overhead Value: Prevented 3 potential trading incidents in 6 months

4. Memory Management at Scale

Processing 10M rows requires careful memory handling:

TechniqueMemory SavedImplementation
Lazy evaluation60%Process in chunks, don’t materialize
Feature selection30%Only compute needed features
Garbage collection tuning15%Explicit cleanup after batches

Result: 2.4GB → 220MB (91% reduction)

5. Model Hot-Swapping Enables Safe Updates

Production model updates without downtime:

async def switch_model(self, new_version: str):
    """Switch to new model version without downtime"""
    # Load new model
    new_model = self.model_manager.load_model('sjm', new_version)

    # Validate before switching
    health = await self._validate_model(new_model)
    if not health['valid']:
        raise RuntimeError(f"Model validation failed: {health}")

    # Atomic swap
    old_model = self.model
    self.model = new_model

    # Cleanup old model
    del old_model

    logger.info(f"âś… Switched to model version {new_version}")

Usage: Updated regime detection model 12 times in production, zero downtime.

Conclusion

Deploying the Atlas Forecasting System to production required systematic application of ML deployment patterns:

  1. Async service architecture preserved event loop responsiveness (20X I/O improvement)
  2. Component orchestration with dependency management prevented cascading failures (99.7% uptime)
  3. Real-time monitoring at 10Hz caught degradation before impact (3 incidents prevented)
  4. Memory optimization enabled 10M row processing in 220MB (91% reduction)
  5. Performance tuning achieved 2,156X speedup (66min → 1.84s for 1M rows)

Production validation:

  • Managing $25K+ trading capital for 6 months
  • Processing 542,000 rows/second sustained
  • Sub-second prediction latency
  • Zero data loss incidents
  • 99.7% uptime

The system demonstrates that production ML deployment is 80% engineering discipline (async patterns, dependency management, health monitoring, memory optimization) and 20% model performance (accuracy, latency). Get the infrastructure right, and the models can deliver value reliably.

For systems where reliability matters—trading, healthcare, autonomous systems—measure everything, fail gracefully, and build observability from day one.


Part of the ML Deployment series: