File size: 20,696 Bytes
faa740f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 |
"""
Dynamic Simulation Module - Real-time fire spread and sensor updates
Simulates how fire conditions change over time and updates evacuation recommendations
"""
import random
import time
from typing import Dict, List, Tuple
from .floor_plan import FloorPlan
from .sensor_system import SensorSystem, SensorReading
from .pathfinding import PathFinder, RiskAssessment
class FireSpreadSimulator:
"""Simulates dynamic fire spread and environmental changes"""
def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem):
self.floor_plan = floor_plan
self.sensor_system = sensor_system
self.time_step = 0
self.fire_sources = [] # List of rooms where fire started
def initialize_fire(self, fire_locations: List[str]):
"""Initialize fire at specific locations with all real-world factors"""
self.fire_sources = fire_locations
for location in fire_locations:
if location in self.sensor_system.sensors:
# Generate comprehensive mock data for fire location
self.sensor_system.update_sensor(
location,
# Basic factors
fire_detected=True,
smoke_level=random.uniform(0.7, 0.9),
temperature=random.uniform(150, 250),
oxygen_level=random.uniform(14, 16),
visibility=random.uniform(5, 15),
structural_integrity=random.uniform(60, 80),
# Fire-specific factors
fire_growth_rate=random.uniform(5, 15), # m²/min
flashover_risk=random.uniform(0.3, 0.6),
backdraft_risk=random.uniform(0.2, 0.4),
heat_radiation=random.uniform(3, 8), # kW/m²
fire_type=random.choice(["wood", "electrical", "chemical"]),
# Toxic gases (high in fire areas)
carbon_monoxide=random.uniform(50, 200), # ppm
carbon_dioxide=random.uniform(5000, 15000), # ppm
hydrogen_cyanide=random.uniform(10, 50), # ppm
hydrogen_chloride=random.uniform(5, 20), # ppm
# Environmental (affected by fire)
wind_direction=random.uniform(0, 360),
wind_speed=random.uniform(2, 8), # m/s
air_pressure=random.uniform(1000, 1020),
humidity=random.uniform(30, 60),
# Human factors
occupancy_density=random.uniform(0.3, 0.7),
mobility_limitations=random.randint(0, 3),
panic_level=random.uniform(0.6, 0.9),
evacuation_progress=0.0,
# Infrastructure (may fail near fire)
sprinkler_active=random.choice([True, False]),
emergency_lighting=random.choice([True, False]),
elevator_available=False, # Elevators disabled in fire
stairwell_clear=random.choice([True, False]),
exit_accessible=random.choice([True, False]),
exit_capacity=random.randint(50, 150),
ventilation_active=random.choice([True, False]),
# Time-based
time_since_fire_start=0,
estimated_time_to_exit=random.randint(60, 300),
# Communication
emergency_comm_working=random.choice([True, False]),
wifi_signal_strength=random.uniform(40, 80),
# External
weather_temperature=random.uniform(15, 25),
weather_rain=random.choice([True, False]),
time_of_day=random.randint(8, 18),
day_of_week=random.randint(0, 6)
)
def update_simulation(self, intensity: float = 1.0):
"""
Update fire conditions for one time step
Args:
intensity: Fire spread intensity (0.5 = slow, 1.0 = normal, 2.0 = fast)
"""
self.time_step += 1
# Update time for all sensors
for sensor in self.sensor_system.sensors.values():
sensor.time_since_fire_start = self.time_step * 30 # 30 seconds per step
# 1. Intensify existing fires
self._intensify_fires(intensity)
# 2. Spread fire to adjacent rooms
self._spread_fire(intensity)
# 3. Spread smoke further
self._spread_smoke(intensity)
# 4. Update structural integrity
self._update_structures(intensity)
# 5. Update evacuation progress
self._update_evacuation_progress(intensity)
def _intensify_fires(self, intensity: float):
"""Make existing fires worse over time with all factors"""
for location_id, sensor in self.sensor_system.sensors.items():
if sensor.fire_detected:
# Fire gets hotter
temp_increase = random.uniform(5, 15) * intensity
new_temp = min(sensor.temperature + temp_increase, 300)
# More smoke
smoke_increase = random.uniform(0.02, 0.05) * intensity
new_smoke = min(sensor.smoke_level + smoke_increase, 1.0)
# Less oxygen
oxygen_decrease = random.uniform(0.2, 0.5) * intensity
new_oxygen = max(sensor.oxygen_level - oxygen_decrease, 10.0)
# Worse visibility
visibility_decrease = random.uniform(1, 3) * intensity
new_visibility = max(sensor.visibility - visibility_decrease, 0.0)
# Structural damage
integrity_decrease = random.uniform(1, 3) * intensity
new_integrity = max(sensor.structural_integrity - integrity_decrease, 30.0)
# Update all fire-related factors
self.sensor_system.update_sensor(
location_id,
temperature=new_temp,
smoke_level=new_smoke,
oxygen_level=new_oxygen,
visibility=new_visibility,
structural_integrity=new_integrity,
# Fire growth increases
fire_growth_rate=min(sensor.fire_growth_rate + random.uniform(0.5, 2) * intensity, 20),
flashover_risk=min(sensor.flashover_risk + random.uniform(0.02, 0.05) * intensity, 1.0),
backdraft_risk=min(sensor.backdraft_risk + random.uniform(0.01, 0.03) * intensity, 1.0),
heat_radiation=min(sensor.heat_radiation + random.uniform(0.2, 0.5) * intensity, 15),
# Toxic gases increase
carbon_monoxide=min(sensor.carbon_monoxide + random.uniform(5, 15) * intensity, 500),
carbon_dioxide=min(sensor.carbon_dioxide + random.uniform(200, 500) * intensity, 20000),
hydrogen_cyanide=min(sensor.hydrogen_cyanide + random.uniform(1, 3) * intensity, 100),
hydrogen_chloride=min(sensor.hydrogen_chloride + random.uniform(0.5, 2) * intensity, 50),
# Time increases
time_since_fire_start=sensor.time_since_fire_start + 30, # 30 seconds per step
# Panic increases
panic_level=min(sensor.panic_level + random.uniform(0.01, 0.03) * intensity, 1.0),
# Infrastructure may fail
emergency_lighting=random.random() > 0.1, # 10% chance of failure
exit_accessible=random.random() > 0.15, # 15% chance of blockage
stairwell_clear=random.random() > 0.2 # 20% chance of blockage
)
def _spread_fire(self, intensity: float):
"""Fire spreads to adjacent rooms based on conditions"""
new_fires = []
for location_id, sensor in self.sensor_system.sensors.items():
if sensor.fire_detected:
# Get adjacent rooms
neighbors = self.floor_plan.get_neighbors(location_id)
for neighbor_id, _ in neighbors:
neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id)
if neighbor_sensor and not neighbor_sensor.fire_detected:
# Chance of fire spreading based on conditions
spread_chance = 0.15 * intensity # Base 15% chance per time step
# Higher chance if already hot or smoky
if neighbor_sensor.temperature > 80:
spread_chance += 0.2
if neighbor_sensor.smoke_level > 0.5:
spread_chance += 0.15
# Check if oxygen cylinder present (explosion!)
room = self.floor_plan.get_room(neighbor_id)
if room and room.has_oxygen_cylinder and neighbor_sensor.temperature > 60:
spread_chance += 0.4 # Much higher chance!
if random.random() < spread_chance:
new_fires.append(neighbor_id)
# Apply new fires
for location_id in new_fires:
self.sensor_system.update_sensor(
location_id,
fire_detected=True,
smoke_level=random.uniform(0.6, 0.8),
temperature=random.uniform(120, 180),
oxygen_level=random.uniform(15, 17),
visibility=random.uniform(15, 30)
)
self.fire_sources.append(location_id)
def _spread_smoke(self, intensity: float):
"""Smoke and toxic gases spread to all connected areas"""
for location_id, sensor in self.sensor_system.sensors.items():
# Rooms with fire or high smoke affect neighbors
if sensor.fire_detected or sensor.smoke_level > 0.3:
neighbors = self.floor_plan.get_neighbors(location_id)
for neighbor_id, _ in neighbors:
neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id)
if neighbor_sensor and not neighbor_sensor.fire_detected:
# Smoke drifts to adjacent areas
smoke_increase = random.uniform(0.03, 0.08) * intensity
new_smoke = min(neighbor_sensor.smoke_level + smoke_increase, 1.0)
# Temperature rises slightly
temp_increase = random.uniform(2, 8) * intensity
new_temp = min(neighbor_sensor.temperature + temp_increase, 100)
# Oxygen decreases
oxygen_decrease = random.uniform(0.1, 0.3) * intensity
new_oxygen = max(neighbor_sensor.oxygen_level - oxygen_decrease, 16.0)
# Visibility decreases
visibility_decrease = random.uniform(2, 5) * intensity
new_visibility = max(neighbor_sensor.visibility - visibility_decrease, 10.0)
# Toxic gases spread (reduced concentration)
co_spread = sensor.carbon_monoxide * 0.1 * intensity
co2_spread = sensor.carbon_dioxide * 0.05 * intensity
hcn_spread = sensor.hydrogen_cyanide * 0.15 * intensity
hcl_spread = sensor.hydrogen_chloride * 0.2 * intensity
# Heat radiation spreads
heat_spread = sensor.heat_radiation * 0.3 * intensity
self.sensor_system.update_sensor(
neighbor_id,
smoke_level=new_smoke,
temperature=new_temp,
oxygen_level=new_oxygen,
visibility=new_visibility,
# Toxic gases (accumulate)
carbon_monoxide=min(neighbor_sensor.carbon_monoxide + co_spread, 200),
carbon_dioxide=min(neighbor_sensor.carbon_dioxide + co2_spread, 10000),
hydrogen_cyanide=min(neighbor_sensor.hydrogen_cyanide + hcn_spread, 50),
hydrogen_chloride=min(neighbor_sensor.hydrogen_chloride + hcl_spread, 30),
# Heat radiation
heat_radiation=min(neighbor_sensor.heat_radiation + heat_spread, 5),
# Flashover risk increases slightly
flashover_risk=min(neighbor_sensor.flashover_risk + random.uniform(0.01, 0.02) * intensity, 0.5),
# Occupancy may increase (people moving away)
occupancy_density=max(neighbor_sensor.occupancy_density - random.uniform(0.05, 0.15), 0.0),
panic_level=min(neighbor_sensor.panic_level + random.uniform(0.02, 0.05) * intensity, 1.0)
)
def _update_structures(self, intensity: float):
"""Update structural integrity based on fire exposure"""
for location_id, sensor in self.sensor_system.sensors.items():
if sensor.fire_detected or sensor.temperature > 100:
# Structural damage from heat
damage = random.uniform(0.5, 2.0) * intensity
new_integrity = max(sensor.structural_integrity - damage, 20.0)
self.sensor_system.update_sensor(
location_id,
structural_integrity=new_integrity
)
def _update_evacuation_progress(self, intensity: float):
"""Update evacuation progress and occupancy"""
for location_id, sensor in self.sensor_system.sensors.items():
# People evacuate from rooms (occupancy decreases)
if sensor.occupancy_density > 0:
evacuation_rate = random.uniform(0.05, 0.15) * intensity
new_occupancy = max(sensor.occupancy_density - evacuation_rate, 0.0)
new_progress = min(sensor.evacuation_progress + evacuation_rate * 100, 100.0)
self.sensor_system.update_sensor(
location_id,
occupancy_density=new_occupancy,
evacuation_progress=new_progress
)
# Exits may get more crowded as people arrive
room = self.floor_plan.get_room(location_id)
if room and room.room_type == "exit":
# Exits get more crowded as people evacuate
if sensor.occupancy_density < 0.8:
arrival_rate = random.uniform(0.02, 0.08) * intensity
new_occupancy = min(sensor.occupancy_density + arrival_rate, 0.9)
self.sensor_system.update_sensor(
location_id,
occupancy_density=new_occupancy
)
class DynamicEvacuationSystem:
"""Manages dynamic evacuation with changing conditions"""
def __init__(self, floor_plan: FloorPlan):
self.floor_plan = floor_plan
self.sensor_system = SensorSystem(floor_plan)
self.simulator = FireSpreadSimulator(floor_plan, self.sensor_system)
self.pathfinder = PathFinder(floor_plan, self.sensor_system)
self.current_recommendation = None
self.recommendation_history = []
def initialize_scenario(self, fire_locations: List[str],
affected_areas: Dict[str, Dict] = None):
"""Initialize the fire scenario"""
# Set up initial conditions
if affected_areas:
for location, values in affected_areas.items():
if location in self.sensor_system.sensors:
self.sensor_system.update_sensor(location, **values)
# Initialize fires
self.simulator.initialize_fire(fire_locations)
# Get initial recommendation
self._update_recommendation()
def step(self, intensity: float = 1.0, start_location: str = "R1"):
"""
Advance simulation by one time step
Args:
intensity: Fire spread intensity
start_location: Where person is evacuating from
Returns:
(time_step, routes, recommended_route, route_changed)
"""
# Update fire conditions
self.simulator.update_simulation(intensity)
# Recalculate best route
previous_rec = self.current_recommendation
self._update_recommendation(start_location)
# Check if recommendation changed
route_changed = False
if previous_rec and self.current_recommendation:
prev_path = previous_rec[0] if previous_rec else None
curr_path = self.current_recommendation[0] if self.current_recommendation else None
route_changed = (prev_path != curr_path)
# Get all routes
routes = self.pathfinder.find_all_evacuation_routes(start_location)
return self.simulator.time_step, routes, self.current_recommendation, route_changed
def _update_recommendation(self, start_location: str = "R1"):
"""Update the evacuation recommendation"""
routes = self.pathfinder.find_all_evacuation_routes(start_location)
if routes:
recommended = RiskAssessment.recommend_path(
[(path, risk) for _, path, risk in routes]
)
self.current_recommendation = recommended
# Track history
if recommended:
rec_path, rec_risk = recommended
rec_exit = [e for e, p, _ in routes if p == rec_path][0]
self.recommendation_history.append({
'time_step': self.simulator.time_step,
'exit': rec_exit,
'path': rec_path,
'danger': rec_risk['avg_danger']
})
else:
self.current_recommendation = None
def get_status_summary(self, start_location: str = "R1"):
"""Get current status summary"""
routes = self.pathfinder.find_all_evacuation_routes(start_location)
summary = {
'time_step': self.simulator.time_step,
'total_fires': sum(1 for s in self.sensor_system.sensors.values()
if s.fire_detected),
'passable_routes': sum(1 for _, _, r in routes if r['passable']),
'total_routes': len(routes),
'avg_danger': sum(r['avg_danger'] for _, _, r in routes) / len(routes) if routes else 100,
}
if self.current_recommendation:
rec_path, rec_risk = self.current_recommendation
rec_exit = [e for e, p, _ in routes if p == rec_path][0]
summary['recommended_exit'] = rec_exit
summary['recommended_danger'] = rec_risk['avg_danger']
else:
summary['recommended_exit'] = None
summary['recommended_danger'] = 100.0
return summary
|