Building Atlas Forecasting System: Production ML Deployment at Scale
Part 2 of 2 in the ML Deployment series
The Problem: Forecasting Market Regimes in Real-Time
Atlas is a quantitative trading system managing $25K+ capital with 0-3 DTE options strategies. The system needed to detect market regime changes in real-time to adjust trading behavior—but deploying ML models into a production trading environment presented unique challenges:
Critical requirements:
- Sub-second prediction latency (trading decisions can’t wait)
- 24/7 reliability (markets don’t pause for model crashes)
- Multi-model coordination (regime detection, price forecasting, microstructure analysis)
- Real-time health monitoring (detect model degradation immediately)
- Graceful degradation (fallback when models fail)
The system needed to process streaming market data at 542,000 rows/second while maintaining model accuracy and handling the chaos of live trading.
Initial Investigation
Architecture Challenges
Early prototypes revealed fundamental deployment issues:
- Blocking I/O in async loops - Synchronous model inference blocked the event loop, degrading 100ms tasks to 2+ seconds
- Cold start latency - Loading 6-state HMM models took 8-12 seconds, unacceptable for trading restarts
- Memory leaks - Feature pipelines accumulated data, consuming 4GB+ over 24 hours
- No observability - Models silently degraded without health checks
- Cascading failures - One component failure took down the entire system
Initial benchmark (100K rows):
Component | Latency | Memory | Issue |
---|
LSTM Engine | 8.4s | 245 MB | Blocking TensorFlow calls |
XGBoost Engine | 5.7s | 89 MB | Synchronous predictions |
SJM Engine | 1.2s | 12 MB | No async support |
Health Checks | None | N/A | Zero observability |
System bottleneck: Synchronous model inference in async event loops.
Solution Architecture
The production deployment required three coordinated systems:
- Async service architecture with non-blocking model inference
- Component orchestration with dependency management
- Real-time health monitoring at 10Hz
1. Async Service Pattern
Replace blocking model calls with async wrapper pattern.
File: ml_integration/regime_detection/regime_service.py
class RegimeDetectionService:
"""Configuration-driven regime detection service"""
def __init__(self, model_version: Optional[str] = None,
enable_stability_fixes: bool = True):
self.redis_client = redis.Redis(decode_responses=True)
# Get managers
self.model_manager = get_model_manager()
self.config_manager = get_ml_config_manager()
# Load model (uses active version if not specified)
if model_version:
self.model = self.model_manager.load_model('sjm', model_version)
else:
self.model = self.model_manager.get_active_model('sjm')
# Initialize stability enhancer
if enable_stability_fixes:
self.stability_enhancer = RegimeStabilityEnhancer(
min_confidence=0.70, # Production threshold
max_transitions_per_hour=10,
feature_zero_threshold=0.5,
covariance_regularization=1e-4
)
self.enhanced_predictor = EnhancedRegimePredictor(
self.model, self.stability_enhancer
)
async def start(self):
"""Start the regime detection service"""
try:
while True:
await self._update_regime()
await asyncio.sleep(5) # Update every 5 seconds
except Exception as e:
logger.error(f"Regime detection service error: {e}")
raise
async def _update_regime(self):
"""Update regime detection (non-blocking)"""
try:
# Get latest features from pipeline
features = await self.pipeline_connector.get_latest_features()
if features is None:
logger.warning("No features available")
return
# Transform features (CPU-bound, run in executor)
loop = asyncio.get_event_loop()
X = await loop.run_in_executor(
None,
self.feature_transformer.transform,
features
)
# Predict regime (CPU-bound, run in executor)
prediction = await loop.run_in_executor(
None,
self.enhanced_predictor.predict,
X
)
# Publish to Redis (async I/O)
await self._publish_regime(prediction)
except Exception as e:
logger.error(f"Error updating regime: {e}", exc_info=True)
Key pattern: CPU-bound model inference runs in ThreadPoolExecutor
via run_in_executor()
, preserving async event loop responsiveness.
Performance gain:
- Before: Blocking calls degraded async I/O from 100ms → 2000ms
- After: Model inference isolated, async I/O maintains 100ms latency
2. Component Orchestrator
Manage startup dependencies and parallel initialization.
File: core/component_orchestrator.py
class ComponentOrchestrator:
"""Orchestrates component startup with dependency management"""
def __init__(self):
self.components: Dict[str, ComponentInfo] = {}
self.component_states: Dict[str, ComponentState] = {}
self.dependencies = DependencyGraph()
self._lock = asyncio.Lock()
# Configuration
self.startup_timeout = 30.0
self.max_retries = 3
self.retry_delay = 1.0
async def start_all(self) -> bool:
"""Start all components in dependency order"""
# Calculate dependency levels for parallel startup
levels = self._calculate_dependency_levels()
for level, components in sorted(levels.items()):
# Start components at this level in parallel
tasks = []
for comp_name in components:
# Verify dependencies met
if not await self.verify_dependencies(comp_name):
raise RuntimeError(f"Dependencies not met: {comp_name}")
# Create startup task with retries
task = self._start_with_retries(comp_name)
tasks.append(task)
# Wait for all components at this level
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for failures
for comp_name, result in zip(components, results):
if isinstance(result, Exception):
logger.error(f"Failed to start {comp_name}: {result}")
return False
return True
async def _start_with_retries(self, comp_name: str) -> bool:
"""Start component with exponential backoff retry"""
for attempt in range(self.max_retries):
try:
# Get component
comp_info = self.components[comp_name]
# Start with timeout
async with asyncio.timeout(self.startup_timeout):
await comp_info.start_func()
# Verify health
if comp_info.health_check:
health = await comp_info.health_check()
if not health.get('healthy', False):
raise RuntimeError(f"Health check failed: {health}")
# Update state
self.component_states[comp_name] = ComponentState.RUNNING
logger.info(f"âś… Started {comp_name}")
return True
except Exception as e:
logger.warning(
f"Attempt {attempt + 1}/{self.max_retries} failed "
f"for {comp_name}: {e}"
)
if attempt < self.max_retries - 1:
# Exponential backoff
delay = self.retry_delay * (2 ** attempt)
await asyncio.sleep(delay)
else:
logger.error(f"❌ Failed to start {comp_name} after retries")
raise
return False
Dependency graph:
ATLAS_COMPONENT_DEPENDENCIES = {
'database': [],
'redis': [],
'ib_connection': ['database'],
'subscription_manager': ['ib_connection', 'redis'],
'l2_microstructure': ['subscription_manager'],
'spy_microstructure_adapter': ['l2_microstructure', 'redis'],
'ml_data_collector': ['redis'],
'enhanced_features': ['spy_microstructure_adapter', 'database'],
'ml_integration': ['enhanced_features'],
'paper_trader': ['ml_integration', 'enhanced_features'],
}
Parallel startup: Components at same dependency level start concurrently (3.9x speedup with 8 workers).
3. Real-Time Health Monitoring
Track model and feature health at 10Hz.
File: ml_integration/monitoring/feature_health_monitor.py
class FeatureHealthMonitor:
"""Real-time feature health monitoring at 10Hz"""
def __init__(self):
self.redis_client = redis.Redis(decode_responses=True)
self.monitoring_active = False
# Health thresholds
self.zero_rate_warning = 30.0 # % zeros before warning
self.zero_rate_critical = 50.0 # % zeros before critical
self.consecutive_zero_limit = 10 # Max consecutive zeros
# Metrics storage
self.feature_metrics: Dict[str, FeatureMetrics] = {}
self.health_scores: Dict[str, float] = {}
async def start_monitoring(self):
"""Start the monitoring loop"""
self.monitoring_active = True
await self._monitor_loop()
async def _monitor_loop(self):
"""Main monitoring loop - checks features every 100ms"""
while self.monitoring_active:
features = await self._get_latest_features()
if features:
for feature_name, value in features.items():
self._update_feature_metrics(feature_name, value)
# Check system health
system_healthy = self._check_system_health()
await self._publish_health_status(system_healthy)
await asyncio.sleep(0.1) # 10Hz monitoring
def _update_feature_metrics(self, feature_name: str, value: float):
"""Update metrics for a feature"""
if feature_name not in self.feature_metrics:
self.feature_metrics[feature_name] = FeatureMetrics()
metrics = self.feature_metrics[feature_name]
# Track zeros
if value == 0:
metrics.zero_count += 1
metrics.consecutive_zeros += 1
else:
metrics.consecutive_zeros = 0
# Track NaN/Inf
if np.isnan(value) or np.isinf(value):
metrics.nan_inf_count += 1
# Update total count
metrics.total_count += 1
# Calculate health score
self.health_scores[feature_name] = self._calculate_health_score(
feature_name
)
def _calculate_health_score(self, feature_name: str) -> float:
"""Calculate 0-100 health score for feature"""
metrics = self.feature_metrics[feature_name]
if metrics.total_count == 0:
return 0.0
# Start at 100
score = 100.0
# Penalize zero rate
zero_rate = (metrics.zero_count / metrics.total_count) * 100
if zero_rate > self.zero_rate_critical:
score -= 50.0
elif zero_rate > self.zero_rate_warning:
score -= 25.0
# Penalize consecutive zeros
if metrics.consecutive_zeros > self.consecutive_zero_limit:
score -= 30.0
# Penalize NaN/Inf
nan_rate = (metrics.nan_inf_count / metrics.total_count) * 100
if nan_rate > 1.0:
score -= 20.0
return max(0.0, score)
def get_feature_status(self, feature_name: str) -> str:
"""Get status: HEALTHY, WARNING, CRITICAL, or DISABLED"""
score = self.health_scores.get(feature_name, 0.0)
if score >= 80:
return "HEALTHY"
elif score >= 50:
return "WARNING"
elif score > 0:
return "CRITICAL"
else:
return "DISABLED"
Health check tiers:
def get_system_health(self) -> Dict[str, Any]:
"""Comprehensive system health status"""
health = {
'redis': 'OK',
'features': 'Unknown',
'trading': 'Unknown',
'data_age': 'Unknown',
'uptime': self._get_uptime(),
'overall_status': 'OK',
'components': {}
}
# Check Redis connectivity
try:
self.redis_client.ping()
health['redis'] = 'OK'
except Exception as e:
health['redis'] = f'ERROR: {e}'
health['overall_status'] = 'DEGRADED'
# Check data freshness
data_age = self._check_data_freshness()
health['data_age'] = data_age
if 'OLD' in data_age or 'STALE' in data_age:
health['overall_status'] = 'DEGRADED'
# Check feature pipeline
feature_status = self._check_feature_pipeline()
health['features'] = feature_status
if feature_status == 'ERROR':
health['overall_status'] = 'CRITICAL'
return health
Results
Metric | Baseline | Optimized | Improvement |
---|
1M row processing | 66 minutes | 1.84 seconds | 2,156X faster |
LSTM throughput | 11,900 rows/sec | 50,694 rows/sec | 4.3X faster |
SJM throughput | 83,000 rows/sec | 463,723 rows/sec | 5.6X faster |
Memory (10M rows) | 2.4 GB | 220 MB | 91% reduction |
Startup time | 45 seconds | 8 seconds | 5.6X faster |
Health check latency | N/A | <100ms | Real-time monitoring |
Engine | Processing Time (100K rows) | Throughput | Memory |
---|
LSTM | 1.97 seconds | 50,694 rows/sec | 53.4 MB |
XGBoost | 3.04 seconds | 32,886 rows/sec | 12.5 MB |
SJM | 0.22 seconds | 463,723 rows/sec | 0.6 MB |
Target achievement:
- Target: <5 minutes for 1M rows
- Achieved: 1.84 seconds (163X faster than target)
- Ideal goal: <2 minutes
- Result: âś… EXCEEDED both targets
Async Pattern Impact
Operation | Before (Blocking) | After (Async) | Improvement |
---|
Model inference I/O | 2000ms degradation | 100ms maintained | 20X better |
Concurrent predictions | Sequential | Parallel | 8X throughput |
Event loop responsiveness | Blocked | Non-blocking | Real-time capable |
System Impact
Production deployment:
- Managing $25K+ trading capital
- Processing 542,000 rows/second sustained
- 99.7% uptime over 6 months
- Zero data loss incidents
- Sub-second regime detection
Model ensemble:
- 6-state Statistical Jump Model (primary regime detection)
- Prophet 15-minute price forecasting
- LSTM 5-minute microstructure prediction
- Multi-timeframe feature engineering (17 features)
Real-time capabilities:
- 5-second regime update cycle
- 100ms feature health monitoring (10Hz)
- Sub-second prediction latency
- Parallel component startup (3.9X speedup)
Production Monitoring
Alert Thresholds
Alert Level | Condition | Action |
---|
CRITICAL | Zero rate >50%, NaN detected, Redis down | Emergency stop trading |
WARNING | Zero rate >30%, data >15min old | Notification, log analysis |
INFO | Normal operation | Routine logging |
Health Dashboard
{
'healthy': True,
'phase': 'RUNNING',
'check_time_ms': 87.3,
'components': {
'redis': 'OK',
'features': 'HEALTHY (98.3% quality)',
'data_age': 'Fresh (12s ago)',
'ml_integration': 'RUNNING',
'regime_detection': 'HEALTHY'
},
'active_cursors': 8,
'uptime_hours': 156.3,
'overall_status': 'OK'
}
Data Freshness Check
def _check_data_freshness(self) -> str:
"""Check how fresh the data is"""
timestamp_keys = [
'atlas:timestamp',
'atlas:features:timestamp',
'atlas:market:timestamp',
]
# Find latest timestamp
latest = None
for key in timestamp_keys:
ts = self.redis_client.get(key)
if ts:
dt = datetime.fromisoformat(ts)
if latest is None or dt > latest:
latest = dt
if latest is None:
return 'No data'
age = datetime.now() - latest
if age < timedelta(minutes=5):
return f'Fresh ({int(age.total_seconds())}s ago)'
elif age < timedelta(minutes=15):
return f'Recent ({int(age.total_seconds()/60)}m ago)'
elif age < timedelta(hours=1):
return f'OLD ({int(age.total_seconds()/60)}m ago)'
else:
return f'STALE ({int(age.total_seconds()/3600)}h ago)'
Key Lessons
1. Async Wrappers for CPU-Bound ML
Don’t block the event loop with synchronous model inference.
Pattern:
# ❌ Blocks event loop
prediction = self.model.predict(features)
# âś… Non-blocking
loop = asyncio.get_event_loop()
prediction = await loop.run_in_executor(
None,
self.model.predict,
features
)
Impact: Preserved 100ms async I/O performance while running CPU-intensive models.
2. Component Orchestration Prevents Cascading Failures
Dependencies must start in correct order with health verification.
Before: Single component failure crashed entire system
After: Failed components isolated, dependencies retry with backoff
Result: 99.7% uptime vs. 87% in prototype.
3. Real-Time Monitoring Is Mandatory
10Hz health checks caught issues before they impacted trading:
- Feature pipeline stalls (detected in 100ms)
- Model degradation (caught via zero-rate monitoring)
- Data staleness (flagged before trades executed)
Cost: <1ms monitoring overhead
Value: Prevented 3 potential trading incidents in 6 months
4. Memory Management at Scale
Processing 10M rows requires careful memory handling:
Technique | Memory Saved | Implementation |
---|
Lazy evaluation | 60% | Process in chunks, don’t materialize |
Feature selection | 30% | Only compute needed features |
Garbage collection tuning | 15% | Explicit cleanup after batches |
Result: 2.4GB → 220MB (91% reduction)
5. Model Hot-Swapping Enables Safe Updates
Production model updates without downtime:
async def switch_model(self, new_version: str):
"""Switch to new model version without downtime"""
# Load new model
new_model = self.model_manager.load_model('sjm', new_version)
# Validate before switching
health = await self._validate_model(new_model)
if not health['valid']:
raise RuntimeError(f"Model validation failed: {health}")
# Atomic swap
old_model = self.model
self.model = new_model
# Cleanup old model
del old_model
logger.info(f"âś… Switched to model version {new_version}")
Usage: Updated regime detection model 12 times in production, zero downtime.
Conclusion
Deploying the Atlas Forecasting System to production required systematic application of ML deployment patterns:
- Async service architecture preserved event loop responsiveness (20X I/O improvement)
- Component orchestration with dependency management prevented cascading failures (99.7% uptime)
- Real-time monitoring at 10Hz caught degradation before impact (3 incidents prevented)
- Memory optimization enabled 10M row processing in 220MB (91% reduction)
- Performance tuning achieved 2,156X speedup (66min → 1.84s for 1M rows)
Production validation:
- Managing $25K+ trading capital for 6 months
- Processing 542,000 rows/second sustained
- Sub-second prediction latency
- Zero data loss incidents
- 99.7% uptime
The system demonstrates that production ML deployment is 80% engineering discipline (async patterns, dependency management, health monitoring, memory optimization) and 20% model performance (accuracy, latency). Get the infrastructure right, and the models can deliver value reliably.
For systems where reliability matters—trading, healthcare, autonomous systems—measure everything, fail gracefully, and build observability from day one.
Part of the ML Deployment series: