|
|
"""
|
|
|
Dynamic Simulation Module - Real-time fire spread and sensor updates
|
|
|
Simulates how fire conditions change over time and updates evacuation recommendations
|
|
|
"""
|
|
|
import random
|
|
|
import time
|
|
|
from typing import Dict, List, Tuple
|
|
|
from .floor_plan import FloorPlan
|
|
|
from .sensor_system import SensorSystem, SensorReading
|
|
|
from .pathfinding import PathFinder, RiskAssessment
|
|
|
|
|
|
|
|
|
class FireSpreadSimulator:
|
|
|
"""Simulates dynamic fire spread and environmental changes"""
|
|
|
|
|
|
def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem):
|
|
|
self.floor_plan = floor_plan
|
|
|
self.sensor_system = sensor_system
|
|
|
self.time_step = 0
|
|
|
self.fire_sources = []
|
|
|
|
|
|
def initialize_fire(self, fire_locations: List[str]):
|
|
|
"""Initialize fire at specific locations with all real-world factors"""
|
|
|
self.fire_sources = fire_locations
|
|
|
|
|
|
for location in fire_locations:
|
|
|
if location in self.sensor_system.sensors:
|
|
|
|
|
|
self.sensor_system.update_sensor(
|
|
|
location,
|
|
|
|
|
|
fire_detected=True,
|
|
|
smoke_level=random.uniform(0.7, 0.9),
|
|
|
temperature=random.uniform(150, 250),
|
|
|
oxygen_level=random.uniform(14, 16),
|
|
|
visibility=random.uniform(5, 15),
|
|
|
structural_integrity=random.uniform(60, 80),
|
|
|
|
|
|
|
|
|
fire_growth_rate=random.uniform(5, 15),
|
|
|
flashover_risk=random.uniform(0.3, 0.6),
|
|
|
backdraft_risk=random.uniform(0.2, 0.4),
|
|
|
heat_radiation=random.uniform(3, 8),
|
|
|
fire_type=random.choice(["wood", "electrical", "chemical"]),
|
|
|
|
|
|
|
|
|
carbon_monoxide=random.uniform(50, 200),
|
|
|
carbon_dioxide=random.uniform(5000, 15000),
|
|
|
hydrogen_cyanide=random.uniform(10, 50),
|
|
|
hydrogen_chloride=random.uniform(5, 20),
|
|
|
|
|
|
|
|
|
wind_direction=random.uniform(0, 360),
|
|
|
wind_speed=random.uniform(2, 8),
|
|
|
air_pressure=random.uniform(1000, 1020),
|
|
|
humidity=random.uniform(30, 60),
|
|
|
|
|
|
|
|
|
occupancy_density=random.uniform(0.3, 0.7),
|
|
|
mobility_limitations=random.randint(0, 3),
|
|
|
panic_level=random.uniform(0.6, 0.9),
|
|
|
evacuation_progress=0.0,
|
|
|
|
|
|
|
|
|
sprinkler_active=random.choice([True, False]),
|
|
|
emergency_lighting=random.choice([True, False]),
|
|
|
elevator_available=False,
|
|
|
stairwell_clear=random.choice([True, False]),
|
|
|
exit_accessible=random.choice([True, False]),
|
|
|
exit_capacity=random.randint(50, 150),
|
|
|
ventilation_active=random.choice([True, False]),
|
|
|
|
|
|
|
|
|
time_since_fire_start=0,
|
|
|
estimated_time_to_exit=random.randint(60, 300),
|
|
|
|
|
|
|
|
|
emergency_comm_working=random.choice([True, False]),
|
|
|
wifi_signal_strength=random.uniform(40, 80),
|
|
|
|
|
|
|
|
|
weather_temperature=random.uniform(15, 25),
|
|
|
weather_rain=random.choice([True, False]),
|
|
|
time_of_day=random.randint(8, 18),
|
|
|
day_of_week=random.randint(0, 6)
|
|
|
)
|
|
|
|
|
|
def update_simulation(self, intensity: float = 1.0):
|
|
|
"""
|
|
|
Update fire conditions for one time step
|
|
|
|
|
|
Args:
|
|
|
intensity: Fire spread intensity (0.5 = slow, 1.0 = normal, 2.0 = fast)
|
|
|
"""
|
|
|
self.time_step += 1
|
|
|
|
|
|
|
|
|
for sensor in self.sensor_system.sensors.values():
|
|
|
sensor.time_since_fire_start = self.time_step * 30
|
|
|
|
|
|
|
|
|
self._intensify_fires(intensity)
|
|
|
|
|
|
|
|
|
self._spread_fire(intensity)
|
|
|
|
|
|
|
|
|
self._spread_smoke(intensity)
|
|
|
|
|
|
|
|
|
self._update_structures(intensity)
|
|
|
|
|
|
|
|
|
self._update_evacuation_progress(intensity)
|
|
|
|
|
|
def _intensify_fires(self, intensity: float):
|
|
|
"""Make existing fires worse over time with all factors"""
|
|
|
for location_id, sensor in self.sensor_system.sensors.items():
|
|
|
if sensor.fire_detected:
|
|
|
|
|
|
temp_increase = random.uniform(5, 15) * intensity
|
|
|
new_temp = min(sensor.temperature + temp_increase, 300)
|
|
|
|
|
|
|
|
|
smoke_increase = random.uniform(0.02, 0.05) * intensity
|
|
|
new_smoke = min(sensor.smoke_level + smoke_increase, 1.0)
|
|
|
|
|
|
|
|
|
oxygen_decrease = random.uniform(0.2, 0.5) * intensity
|
|
|
new_oxygen = max(sensor.oxygen_level - oxygen_decrease, 10.0)
|
|
|
|
|
|
|
|
|
visibility_decrease = random.uniform(1, 3) * intensity
|
|
|
new_visibility = max(sensor.visibility - visibility_decrease, 0.0)
|
|
|
|
|
|
|
|
|
integrity_decrease = random.uniform(1, 3) * intensity
|
|
|
new_integrity = max(sensor.structural_integrity - integrity_decrease, 30.0)
|
|
|
|
|
|
|
|
|
self.sensor_system.update_sensor(
|
|
|
location_id,
|
|
|
temperature=new_temp,
|
|
|
smoke_level=new_smoke,
|
|
|
oxygen_level=new_oxygen,
|
|
|
visibility=new_visibility,
|
|
|
structural_integrity=new_integrity,
|
|
|
|
|
|
|
|
|
fire_growth_rate=min(sensor.fire_growth_rate + random.uniform(0.5, 2) * intensity, 20),
|
|
|
flashover_risk=min(sensor.flashover_risk + random.uniform(0.02, 0.05) * intensity, 1.0),
|
|
|
backdraft_risk=min(sensor.backdraft_risk + random.uniform(0.01, 0.03) * intensity, 1.0),
|
|
|
heat_radiation=min(sensor.heat_radiation + random.uniform(0.2, 0.5) * intensity, 15),
|
|
|
|
|
|
|
|
|
carbon_monoxide=min(sensor.carbon_monoxide + random.uniform(5, 15) * intensity, 500),
|
|
|
carbon_dioxide=min(sensor.carbon_dioxide + random.uniform(200, 500) * intensity, 20000),
|
|
|
hydrogen_cyanide=min(sensor.hydrogen_cyanide + random.uniform(1, 3) * intensity, 100),
|
|
|
hydrogen_chloride=min(sensor.hydrogen_chloride + random.uniform(0.5, 2) * intensity, 50),
|
|
|
|
|
|
|
|
|
time_since_fire_start=sensor.time_since_fire_start + 30,
|
|
|
|
|
|
|
|
|
panic_level=min(sensor.panic_level + random.uniform(0.01, 0.03) * intensity, 1.0),
|
|
|
|
|
|
|
|
|
emergency_lighting=random.random() > 0.1,
|
|
|
exit_accessible=random.random() > 0.15,
|
|
|
stairwell_clear=random.random() > 0.2
|
|
|
)
|
|
|
|
|
|
def _spread_fire(self, intensity: float):
|
|
|
"""Fire spreads to adjacent rooms based on conditions"""
|
|
|
new_fires = []
|
|
|
|
|
|
for location_id, sensor in self.sensor_system.sensors.items():
|
|
|
if sensor.fire_detected:
|
|
|
|
|
|
neighbors = self.floor_plan.get_neighbors(location_id)
|
|
|
|
|
|
for neighbor_id, _ in neighbors:
|
|
|
neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id)
|
|
|
|
|
|
if neighbor_sensor and not neighbor_sensor.fire_detected:
|
|
|
|
|
|
spread_chance = 0.15 * intensity
|
|
|
|
|
|
|
|
|
if neighbor_sensor.temperature > 80:
|
|
|
spread_chance += 0.2
|
|
|
if neighbor_sensor.smoke_level > 0.5:
|
|
|
spread_chance += 0.15
|
|
|
|
|
|
|
|
|
room = self.floor_plan.get_room(neighbor_id)
|
|
|
if room and room.has_oxygen_cylinder and neighbor_sensor.temperature > 60:
|
|
|
spread_chance += 0.4
|
|
|
|
|
|
if random.random() < spread_chance:
|
|
|
new_fires.append(neighbor_id)
|
|
|
|
|
|
|
|
|
for location_id in new_fires:
|
|
|
self.sensor_system.update_sensor(
|
|
|
location_id,
|
|
|
fire_detected=True,
|
|
|
smoke_level=random.uniform(0.6, 0.8),
|
|
|
temperature=random.uniform(120, 180),
|
|
|
oxygen_level=random.uniform(15, 17),
|
|
|
visibility=random.uniform(15, 30)
|
|
|
)
|
|
|
self.fire_sources.append(location_id)
|
|
|
|
|
|
def _spread_smoke(self, intensity: float):
|
|
|
"""Smoke and toxic gases spread to all connected areas"""
|
|
|
for location_id, sensor in self.sensor_system.sensors.items():
|
|
|
|
|
|
if sensor.fire_detected or sensor.smoke_level > 0.3:
|
|
|
neighbors = self.floor_plan.get_neighbors(location_id)
|
|
|
|
|
|
for neighbor_id, _ in neighbors:
|
|
|
neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id)
|
|
|
|
|
|
if neighbor_sensor and not neighbor_sensor.fire_detected:
|
|
|
|
|
|
smoke_increase = random.uniform(0.03, 0.08) * intensity
|
|
|
new_smoke = min(neighbor_sensor.smoke_level + smoke_increase, 1.0)
|
|
|
|
|
|
|
|
|
temp_increase = random.uniform(2, 8) * intensity
|
|
|
new_temp = min(neighbor_sensor.temperature + temp_increase, 100)
|
|
|
|
|
|
|
|
|
oxygen_decrease = random.uniform(0.1, 0.3) * intensity
|
|
|
new_oxygen = max(neighbor_sensor.oxygen_level - oxygen_decrease, 16.0)
|
|
|
|
|
|
|
|
|
visibility_decrease = random.uniform(2, 5) * intensity
|
|
|
new_visibility = max(neighbor_sensor.visibility - visibility_decrease, 10.0)
|
|
|
|
|
|
|
|
|
co_spread = sensor.carbon_monoxide * 0.1 * intensity
|
|
|
co2_spread = sensor.carbon_dioxide * 0.05 * intensity
|
|
|
hcn_spread = sensor.hydrogen_cyanide * 0.15 * intensity
|
|
|
hcl_spread = sensor.hydrogen_chloride * 0.2 * intensity
|
|
|
|
|
|
|
|
|
heat_spread = sensor.heat_radiation * 0.3 * intensity
|
|
|
|
|
|
self.sensor_system.update_sensor(
|
|
|
neighbor_id,
|
|
|
smoke_level=new_smoke,
|
|
|
temperature=new_temp,
|
|
|
oxygen_level=new_oxygen,
|
|
|
visibility=new_visibility,
|
|
|
|
|
|
|
|
|
carbon_monoxide=min(neighbor_sensor.carbon_monoxide + co_spread, 200),
|
|
|
carbon_dioxide=min(neighbor_sensor.carbon_dioxide + co2_spread, 10000),
|
|
|
hydrogen_cyanide=min(neighbor_sensor.hydrogen_cyanide + hcn_spread, 50),
|
|
|
hydrogen_chloride=min(neighbor_sensor.hydrogen_chloride + hcl_spread, 30),
|
|
|
|
|
|
|
|
|
heat_radiation=min(neighbor_sensor.heat_radiation + heat_spread, 5),
|
|
|
|
|
|
|
|
|
flashover_risk=min(neighbor_sensor.flashover_risk + random.uniform(0.01, 0.02) * intensity, 0.5),
|
|
|
|
|
|
|
|
|
occupancy_density=max(neighbor_sensor.occupancy_density - random.uniform(0.05, 0.15), 0.0),
|
|
|
panic_level=min(neighbor_sensor.panic_level + random.uniform(0.02, 0.05) * intensity, 1.0)
|
|
|
)
|
|
|
|
|
|
def _update_structures(self, intensity: float):
|
|
|
"""Update structural integrity based on fire exposure"""
|
|
|
for location_id, sensor in self.sensor_system.sensors.items():
|
|
|
if sensor.fire_detected or sensor.temperature > 100:
|
|
|
|
|
|
damage = random.uniform(0.5, 2.0) * intensity
|
|
|
new_integrity = max(sensor.structural_integrity - damage, 20.0)
|
|
|
|
|
|
self.sensor_system.update_sensor(
|
|
|
location_id,
|
|
|
structural_integrity=new_integrity
|
|
|
)
|
|
|
|
|
|
def _update_evacuation_progress(self, intensity: float):
|
|
|
"""Update evacuation progress and occupancy"""
|
|
|
for location_id, sensor in self.sensor_system.sensors.items():
|
|
|
|
|
|
if sensor.occupancy_density > 0:
|
|
|
evacuation_rate = random.uniform(0.05, 0.15) * intensity
|
|
|
new_occupancy = max(sensor.occupancy_density - evacuation_rate, 0.0)
|
|
|
new_progress = min(sensor.evacuation_progress + evacuation_rate * 100, 100.0)
|
|
|
|
|
|
self.sensor_system.update_sensor(
|
|
|
location_id,
|
|
|
occupancy_density=new_occupancy,
|
|
|
evacuation_progress=new_progress
|
|
|
)
|
|
|
|
|
|
|
|
|
room = self.floor_plan.get_room(location_id)
|
|
|
if room and room.room_type == "exit":
|
|
|
|
|
|
if sensor.occupancy_density < 0.8:
|
|
|
arrival_rate = random.uniform(0.02, 0.08) * intensity
|
|
|
new_occupancy = min(sensor.occupancy_density + arrival_rate, 0.9)
|
|
|
self.sensor_system.update_sensor(
|
|
|
location_id,
|
|
|
occupancy_density=new_occupancy
|
|
|
)
|
|
|
|
|
|
|
|
|
class DynamicEvacuationSystem:
|
|
|
"""Manages dynamic evacuation with changing conditions"""
|
|
|
|
|
|
def __init__(self, floor_plan: FloorPlan):
|
|
|
self.floor_plan = floor_plan
|
|
|
self.sensor_system = SensorSystem(floor_plan)
|
|
|
self.simulator = FireSpreadSimulator(floor_plan, self.sensor_system)
|
|
|
self.pathfinder = PathFinder(floor_plan, self.sensor_system)
|
|
|
self.current_recommendation = None
|
|
|
self.recommendation_history = []
|
|
|
|
|
|
def initialize_scenario(self, fire_locations: List[str],
|
|
|
affected_areas: Dict[str, Dict] = None):
|
|
|
"""Initialize the fire scenario"""
|
|
|
|
|
|
if affected_areas:
|
|
|
for location, values in affected_areas.items():
|
|
|
if location in self.sensor_system.sensors:
|
|
|
self.sensor_system.update_sensor(location, **values)
|
|
|
|
|
|
|
|
|
self.simulator.initialize_fire(fire_locations)
|
|
|
|
|
|
|
|
|
self._update_recommendation()
|
|
|
|
|
|
def step(self, intensity: float = 1.0, start_location: str = "R1"):
|
|
|
"""
|
|
|
Advance simulation by one time step
|
|
|
|
|
|
Args:
|
|
|
intensity: Fire spread intensity
|
|
|
start_location: Where person is evacuating from
|
|
|
|
|
|
Returns:
|
|
|
(time_step, routes, recommended_route, route_changed)
|
|
|
"""
|
|
|
|
|
|
self.simulator.update_simulation(intensity)
|
|
|
|
|
|
|
|
|
previous_rec = self.current_recommendation
|
|
|
self._update_recommendation(start_location)
|
|
|
|
|
|
|
|
|
route_changed = False
|
|
|
if previous_rec and self.current_recommendation:
|
|
|
prev_path = previous_rec[0] if previous_rec else None
|
|
|
curr_path = self.current_recommendation[0] if self.current_recommendation else None
|
|
|
route_changed = (prev_path != curr_path)
|
|
|
|
|
|
|
|
|
routes = self.pathfinder.find_all_evacuation_routes(start_location)
|
|
|
|
|
|
return self.simulator.time_step, routes, self.current_recommendation, route_changed
|
|
|
|
|
|
def _update_recommendation(self, start_location: str = "R1"):
|
|
|
"""Update the evacuation recommendation"""
|
|
|
routes = self.pathfinder.find_all_evacuation_routes(start_location)
|
|
|
|
|
|
if routes:
|
|
|
recommended = RiskAssessment.recommend_path(
|
|
|
[(path, risk) for _, path, risk in routes]
|
|
|
)
|
|
|
self.current_recommendation = recommended
|
|
|
|
|
|
|
|
|
if recommended:
|
|
|
rec_path, rec_risk = recommended
|
|
|
rec_exit = [e for e, p, _ in routes if p == rec_path][0]
|
|
|
self.recommendation_history.append({
|
|
|
'time_step': self.simulator.time_step,
|
|
|
'exit': rec_exit,
|
|
|
'path': rec_path,
|
|
|
'danger': rec_risk['avg_danger']
|
|
|
})
|
|
|
else:
|
|
|
self.current_recommendation = None
|
|
|
|
|
|
def get_status_summary(self, start_location: str = "R1"):
|
|
|
"""Get current status summary"""
|
|
|
routes = self.pathfinder.find_all_evacuation_routes(start_location)
|
|
|
|
|
|
summary = {
|
|
|
'time_step': self.simulator.time_step,
|
|
|
'total_fires': sum(1 for s in self.sensor_system.sensors.values()
|
|
|
if s.fire_detected),
|
|
|
'passable_routes': sum(1 for _, _, r in routes if r['passable']),
|
|
|
'total_routes': len(routes),
|
|
|
'avg_danger': sum(r['avg_danger'] for _, _, r in routes) / len(routes) if routes else 100,
|
|
|
}
|
|
|
|
|
|
if self.current_recommendation:
|
|
|
rec_path, rec_risk = self.current_recommendation
|
|
|
rec_exit = [e for e, p, _ in routes if p == rec_path][0]
|
|
|
summary['recommended_exit'] = rec_exit
|
|
|
summary['recommended_danger'] = rec_risk['avg_danger']
|
|
|
else:
|
|
|
summary['recommended_exit'] = None
|
|
|
summary['recommended_danger'] = 100.0
|
|
|
|
|
|
return summary
|
|
|
|
|
|
|