1use std::sync::Arc;
6
7use crate::integration::error::{SystemError, SystemResult};
8use crate::integration::performance::{SystemPerformance, SystemStatistics};
9use nerve_core_types::Message;
10use nerve_core_traits::{CommunicationComponent, MemoryBufferFactory, MemoryComponent, MemoryPool, MessageBuffer, NodeComponent, NodeRegistry, Publisher, RequestResponse, SubscriptionManager, ThreadComponent, ThreadCoordinator, ThreadWatchdog, MessageRouter};
11use nerve_memory::MemoryManager;
12use nerve_thread::{ThreadManager, SimpleThreadCoordinator};
13use nerve_node::NodeManager;
14use nerve_communication::CommunicationManager;
15
16pub struct NerveSystem {
21 performance: Arc<SystemPerformance>,
23 config: SystemConfig,
25 memory_pool: Arc<dyn MemoryPool<Vec<u8>>>,
27 message_buffer: Arc<dyn MessageBuffer<Message>>,
28 thread_coordinator: Arc<dyn ThreadCoordinator>,
30 thread_watchdog: Arc<dyn ThreadWatchdog>,
31 node_registry: Arc<dyn NodeRegistry>,
33 message_router: Arc<dyn MessageRouter>,
35 subscription_manager: Arc<dyn SubscriptionManager>,
36 publisher: Arc<dyn Publisher>,
37 request_response: Arc<dyn RequestResponse>,
39}
40
41#[derive(Debug, Clone)]
43pub struct SystemConfig {
44 pub max_memory_bytes: u64,
46 pub max_threads: usize,
48 pub max_queue_size: usize,
50 pub performance_monitoring: bool,
52}
53
54impl Default for SystemConfig {
55 fn default() -> Self {
56 Self {
57 max_memory_bytes: 1_000_000_000, max_threads: 16,
59 max_queue_size: 10_000,
60 performance_monitoring: true,
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct SystemStatus {
68 pub overall_health: String,
70 pub statistics: SystemStatistics,
72 pub memory_health: String,
74 pub thread_health: String,
76 pub node_health: String,
78 pub communication_health: String,
80}
81
82impl NerveSystem {
83 pub fn new() -> SystemResult<Self> {
85 Self::with_config(SystemConfig::default())
86 }
87
88 pub fn with_config(config: SystemConfig) -> SystemResult<Self> {
90 if config.max_memory_bytes == 0 {
92 return Err(SystemError::integration("Maximum memory bytes must be greater than 0"));
93 }
94
95 if config.max_threads == 0 {
96 return Err(SystemError::integration("Maximum threads must be greater than 0"));
97 }
98
99 if config.max_queue_size == 0 {
100 return Err(SystemError::integration("Maximum queue size must be greater than 0"));
101 }
102
103 let performance = SystemPerformance::new();
105
106 let memory_manager = Arc::new(MemoryManager::new(1024));
108 let memory_pool = memory_manager.create_memory_pool::<Vec<u8>>(1000)
109 .map_err(|e| SystemError::memory(format!("Failed to create memory pool: {}", e)))?
110 .into();
111 let message_buffer = memory_manager.create_buffer::<Message>(1000, nerve_core_types::QoS::BestEffort, nerve_core_types::BufferMode::FullFeatures)
112 .map_err(|e| SystemError::memory(format!("Failed to create message buffer: {}", e)))?
113 .into();
114
115 let thread_manager = Arc::new(ThreadManager::new(config.max_threads));
117 let thread_coordinator = thread_manager.create_coordinator()
118 .map_err(|e| SystemError::thread(format!("Failed to create thread coordinator: {}", e)))?
119 .into();
120 let thread_watchdog = thread_manager.create_watchdog(std::time::Duration::from_millis(1000))
121 .map_err(|e| SystemError::thread(format!("Failed to create thread watchdog: {}", e)))?
122 .into();
123
124 let node_manager = Arc::new(NodeManager::new());
126 let node_registry = node_manager.create_registry()
127 .map_err(|e| SystemError::node(format!("Failed to create node registry: {}", e)))?
128 .into();
129
130 let communication_manager = Arc::new(CommunicationManager::new());
132 let message_router = communication_manager.create_router()
133 .map_err(|e| SystemError::communication(format!("Failed to create message router: {}", e)))?
134 .into();
135 let subscription_manager = communication_manager.create_subscription_manager()
136 .map_err(|e| SystemError::communication(format!("Failed to create subscription manager: {}", e)))?
137 .into();
138 let publisher = communication_manager.create_publisher()
139 .map_err(|e| SystemError::communication(format!("Failed to create publisher: {}", e)))?
140 .into();
141 let request_response = communication_manager.create_request_response()
142 .map_err(|e| SystemError::communication(format!("Failed to create request-response: {}", e)))?
143 .into();
144
145 Ok(Self {
146 performance,
147 config,
148 memory_pool,
149 message_buffer,
150 thread_coordinator,
151 thread_watchdog,
152 node_registry,
153 message_router,
154 subscription_manager,
155 publisher,
156 request_response,
157 })
158 }
159
160 pub fn get_statistics(&self) -> SystemStatistics {
162 self.performance.get_statistics()
163 }
164
165 pub fn config(&self) -> &SystemConfig {
167 &self.config
168 }
169
170 pub fn memory_pool(&self) -> &Arc<dyn MemoryPool<Vec<u8>>> {
172 &self.memory_pool
173 }
174
175 pub fn message_buffer(&self) -> &Arc<dyn MessageBuffer<Message>> {
177 &self.message_buffer
178 }
179
180 pub fn thread_coordinator(&self) -> &Arc<dyn ThreadCoordinator> {
182 &self.thread_coordinator
183 }
184
185 pub fn thread_watchdog(&self) -> &Arc<dyn ThreadWatchdog> {
187 &self.thread_watchdog
188 }
189
190 pub fn node_registry(&self) -> &Arc<dyn NodeRegistry> {
192 &self.node_registry
193 }
194
195 pub fn message_router(&self) -> &Arc<dyn MessageRouter> {
197 &self.message_router
198 }
199
200 pub fn subscription_manager(&self) -> &Arc<dyn SubscriptionManager> {
202 &self.subscription_manager
203 }
204
205 pub fn publisher(&self) -> &Arc<dyn Publisher> {
207 &self.publisher
208 }
209
210 pub fn request_response(&self) -> &Arc<dyn RequestResponse> {
212 &self.request_response
213 }
214
215 pub fn is_healthy(&self) -> bool {
217 let stats = self.performance.get_statistics();
219 stats.messages_processed == 0 || stats.is_healthy()
220 }
221
222 pub fn health_status(&self) -> String {
224 let stats = self.performance.get_statistics();
225
226 if stats.messages_processed == 0 {
228 return "Healthy".to_string();
229 }
230
231 let health = stats.health_status();
232
233 match health {
234 crate::integration::performance::SystemHealth::Healthy => "Healthy".to_string(),
235 crate::integration::performance::SystemHealth::Warning => format!(
236 "Warning - Error rate: {:.2}%, Memory: {} MB",
237 stats.error_rate_percent,
238 stats.memory_usage_bytes / 1_000_000
239 ),
240 crate::integration::performance::SystemHealth::Critical => format!(
241 "Critical - Error rate: {:.2}%, Memory: {} MB",
242 stats.error_rate_percent,
243 stats.memory_usage_bytes / 1_000_000
244 ),
245 }
246 }
247
248 pub fn reset_statistics(&self) {
250 }
253
254 pub fn process_message(&self, topic: &str, payload: Vec<u8>) -> SystemResult<()> {
263 let _timer = crate::integration::performance::PerformanceTimer::start(
265 Arc::clone(&self.performance)
266 );
267
268 let payload_clone = payload.clone();
274 let message = Message::binary(payload);
275 self.message_router.route_message(topic, &message)
284 .map_err(|e| SystemError::communication(format!("Message routing failed: {}", e)))?;
285
286 let payload_str = String::from_utf8_lossy(&payload_clone);
289 self.publisher.publish_text(topic, payload_str.as_ref())
290 .map_err(|e| SystemError::communication(format!("Message publishing failed: {}", e)))?;
291
292 self.performance.record_message_processed(std::time::Duration::from_micros(1)); Ok(())
298 }
299
300 pub fn get_system_status(&self) -> SystemStatus {
302 let stats = self.performance.get_statistics();
303
304 let memory_health = if self.memory_pool.utilization() < 80.0 {
306 "Healthy".to_string()
307 } else {
308 format!("Warning - {:.1}% utilization", self.memory_pool.utilization())
309 };
310
311 let thread_health = "Healthy".to_string();
314
315 let node_health = "Healthy".to_string();
318
319 let communication_health = "Healthy".to_string();
322
323 SystemStatus {
324 overall_health: self.health_status(),
325 statistics: stats,
326 memory_health,
327 thread_health,
328 node_health,
329 communication_health,
330 }
331 }
332
333 pub fn shutdown(self) -> SystemResult<()> {
335 println!("🔄 Starting system shutdown...");
339
340 let stats = self.performance.get_statistics();
378 println!("✅ System shutdown complete.");
379 println!("📊 Final statistics:");
380 println!(" Messages processed: {}", stats.messages_processed);
381 println!(" Errors encountered: {}", stats.errors_encountered);
382 println!(" Memory usage: {} bytes", stats.memory_usage_bytes);
383 println!(" Thread utilization: {}%", stats.thread_utilization_percent);
384 println!(" Error rate: {:.2}%", stats.error_rate_percent);
385
386 Ok(())
387 }
388}
389
390impl Default for NerveSystem {
391 fn default() -> Self {
392 Self::new().expect("Failed to create default NerveSystem")
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_nerve_system_creation() {
402 let system = NerveSystem::new();
403 assert!(system.is_ok());
404
405 let system = system.unwrap();
406 assert!(system.is_healthy());
407 }
408
409 #[test]
410 fn test_nerve_system_with_config() {
411 let config = SystemConfig {
412 max_memory_bytes: 500_000_000,
413 max_threads: 8,
414 max_queue_size: 5_000,
415 performance_monitoring: true,
416 };
417
418 let system = NerveSystem::with_config(config);
419 assert!(system.is_ok());
420
421 let system = system.unwrap();
422 assert_eq!(system.config().max_memory_bytes, 500_000_000);
423 assert_eq!(system.config().max_threads, 8);
424 assert_eq!(system.config().max_queue_size, 5_000);
425 }
426
427 #[test]
428 fn test_nerve_system_invalid_config() {
429 let config = SystemConfig {
430 max_memory_bytes: 0, max_threads: 8,
432 max_queue_size: 5_000,
433 performance_monitoring: true,
434 };
435
436 let system = NerveSystem::with_config(config);
437 assert!(system.is_err());
438 }
439
440 #[test]
441 fn test_system_statistics() {
442 let system = NerveSystem::new().unwrap();
443 let stats = system.get_statistics();
444
445 assert_eq!(stats.messages_processed, 0);
447 assert_eq!(stats.errors_encountered, 0);
448 assert_eq!(stats.memory_usage_bytes, 0);
449 assert_eq!(stats.thread_utilization_percent, 0);
450 }
451
452 #[test]
453 fn test_health_status() {
454 let system = NerveSystem::new().unwrap();
455 let status = system.health_status();
456
457 assert_eq!(status, "Healthy");
459 }
460}