fire_evac / dynamic_simulation.py
ratyim's picture
Upload 6 files
faa740f verified
"""
Dynamic Simulation Module - Real-time fire spread and sensor updates
Simulates how fire conditions change over time and updates evacuation recommendations
"""
import random
import time
from typing import Dict, List, Tuple
from .floor_plan import FloorPlan
from .sensor_system import SensorSystem, SensorReading
from .pathfinding import PathFinder, RiskAssessment
class FireSpreadSimulator:
"""Simulates dynamic fire spread and environmental changes"""
def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem):
self.floor_plan = floor_plan
self.sensor_system = sensor_system
self.time_step = 0
self.fire_sources = [] # List of rooms where fire started
def initialize_fire(self, fire_locations: List[str]):
"""Initialize fire at specific locations with all real-world factors"""
self.fire_sources = fire_locations
for location in fire_locations:
if location in self.sensor_system.sensors:
# Generate comprehensive mock data for fire location
self.sensor_system.update_sensor(
location,
# Basic factors
fire_detected=True,
smoke_level=random.uniform(0.7, 0.9),
temperature=random.uniform(150, 250),
oxygen_level=random.uniform(14, 16),
visibility=random.uniform(5, 15),
structural_integrity=random.uniform(60, 80),
# Fire-specific factors
fire_growth_rate=random.uniform(5, 15), # m²/min
flashover_risk=random.uniform(0.3, 0.6),
backdraft_risk=random.uniform(0.2, 0.4),
heat_radiation=random.uniform(3, 8), # kW/m²
fire_type=random.choice(["wood", "electrical", "chemical"]),
# Toxic gases (high in fire areas)
carbon_monoxide=random.uniform(50, 200), # ppm
carbon_dioxide=random.uniform(5000, 15000), # ppm
hydrogen_cyanide=random.uniform(10, 50), # ppm
hydrogen_chloride=random.uniform(5, 20), # ppm
# Environmental (affected by fire)
wind_direction=random.uniform(0, 360),
wind_speed=random.uniform(2, 8), # m/s
air_pressure=random.uniform(1000, 1020),
humidity=random.uniform(30, 60),
# Human factors
occupancy_density=random.uniform(0.3, 0.7),
mobility_limitations=random.randint(0, 3),
panic_level=random.uniform(0.6, 0.9),
evacuation_progress=0.0,
# Infrastructure (may fail near fire)
sprinkler_active=random.choice([True, False]),
emergency_lighting=random.choice([True, False]),
elevator_available=False, # Elevators disabled in fire
stairwell_clear=random.choice([True, False]),
exit_accessible=random.choice([True, False]),
exit_capacity=random.randint(50, 150),
ventilation_active=random.choice([True, False]),
# Time-based
time_since_fire_start=0,
estimated_time_to_exit=random.randint(60, 300),
# Communication
emergency_comm_working=random.choice([True, False]),
wifi_signal_strength=random.uniform(40, 80),
# External
weather_temperature=random.uniform(15, 25),
weather_rain=random.choice([True, False]),
time_of_day=random.randint(8, 18),
day_of_week=random.randint(0, 6)
)
def update_simulation(self, intensity: float = 1.0):
"""
Update fire conditions for one time step
Args:
intensity: Fire spread intensity (0.5 = slow, 1.0 = normal, 2.0 = fast)
"""
self.time_step += 1
# Update time for all sensors
for sensor in self.sensor_system.sensors.values():
sensor.time_since_fire_start = self.time_step * 30 # 30 seconds per step
# 1. Intensify existing fires
self._intensify_fires(intensity)
# 2. Spread fire to adjacent rooms
self._spread_fire(intensity)
# 3. Spread smoke further
self._spread_smoke(intensity)
# 4. Update structural integrity
self._update_structures(intensity)
# 5. Update evacuation progress
self._update_evacuation_progress(intensity)
def _intensify_fires(self, intensity: float):
"""Make existing fires worse over time with all factors"""
for location_id, sensor in self.sensor_system.sensors.items():
if sensor.fire_detected:
# Fire gets hotter
temp_increase = random.uniform(5, 15) * intensity
new_temp = min(sensor.temperature + temp_increase, 300)
# More smoke
smoke_increase = random.uniform(0.02, 0.05) * intensity
new_smoke = min(sensor.smoke_level + smoke_increase, 1.0)
# Less oxygen
oxygen_decrease = random.uniform(0.2, 0.5) * intensity
new_oxygen = max(sensor.oxygen_level - oxygen_decrease, 10.0)
# Worse visibility
visibility_decrease = random.uniform(1, 3) * intensity
new_visibility = max(sensor.visibility - visibility_decrease, 0.0)
# Structural damage
integrity_decrease = random.uniform(1, 3) * intensity
new_integrity = max(sensor.structural_integrity - integrity_decrease, 30.0)
# Update all fire-related factors
self.sensor_system.update_sensor(
location_id,
temperature=new_temp,
smoke_level=new_smoke,
oxygen_level=new_oxygen,
visibility=new_visibility,
structural_integrity=new_integrity,
# Fire growth increases
fire_growth_rate=min(sensor.fire_growth_rate + random.uniform(0.5, 2) * intensity, 20),
flashover_risk=min(sensor.flashover_risk + random.uniform(0.02, 0.05) * intensity, 1.0),
backdraft_risk=min(sensor.backdraft_risk + random.uniform(0.01, 0.03) * intensity, 1.0),
heat_radiation=min(sensor.heat_radiation + random.uniform(0.2, 0.5) * intensity, 15),
# Toxic gases increase
carbon_monoxide=min(sensor.carbon_monoxide + random.uniform(5, 15) * intensity, 500),
carbon_dioxide=min(sensor.carbon_dioxide + random.uniform(200, 500) * intensity, 20000),
hydrogen_cyanide=min(sensor.hydrogen_cyanide + random.uniform(1, 3) * intensity, 100),
hydrogen_chloride=min(sensor.hydrogen_chloride + random.uniform(0.5, 2) * intensity, 50),
# Time increases
time_since_fire_start=sensor.time_since_fire_start + 30, # 30 seconds per step
# Panic increases
panic_level=min(sensor.panic_level + random.uniform(0.01, 0.03) * intensity, 1.0),
# Infrastructure may fail
emergency_lighting=random.random() > 0.1, # 10% chance of failure
exit_accessible=random.random() > 0.15, # 15% chance of blockage
stairwell_clear=random.random() > 0.2 # 20% chance of blockage
)
def _spread_fire(self, intensity: float):
"""Fire spreads to adjacent rooms based on conditions"""
new_fires = []
for location_id, sensor in self.sensor_system.sensors.items():
if sensor.fire_detected:
# Get adjacent rooms
neighbors = self.floor_plan.get_neighbors(location_id)
for neighbor_id, _ in neighbors:
neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id)
if neighbor_sensor and not neighbor_sensor.fire_detected:
# Chance of fire spreading based on conditions
spread_chance = 0.15 * intensity # Base 15% chance per time step
# Higher chance if already hot or smoky
if neighbor_sensor.temperature > 80:
spread_chance += 0.2
if neighbor_sensor.smoke_level > 0.5:
spread_chance += 0.15
# Check if oxygen cylinder present (explosion!)
room = self.floor_plan.get_room(neighbor_id)
if room and room.has_oxygen_cylinder and neighbor_sensor.temperature > 60:
spread_chance += 0.4 # Much higher chance!
if random.random() < spread_chance:
new_fires.append(neighbor_id)
# Apply new fires
for location_id in new_fires:
self.sensor_system.update_sensor(
location_id,
fire_detected=True,
smoke_level=random.uniform(0.6, 0.8),
temperature=random.uniform(120, 180),
oxygen_level=random.uniform(15, 17),
visibility=random.uniform(15, 30)
)
self.fire_sources.append(location_id)
def _spread_smoke(self, intensity: float):
"""Smoke and toxic gases spread to all connected areas"""
for location_id, sensor in self.sensor_system.sensors.items():
# Rooms with fire or high smoke affect neighbors
if sensor.fire_detected or sensor.smoke_level > 0.3:
neighbors = self.floor_plan.get_neighbors(location_id)
for neighbor_id, _ in neighbors:
neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id)
if neighbor_sensor and not neighbor_sensor.fire_detected:
# Smoke drifts to adjacent areas
smoke_increase = random.uniform(0.03, 0.08) * intensity
new_smoke = min(neighbor_sensor.smoke_level + smoke_increase, 1.0)
# Temperature rises slightly
temp_increase = random.uniform(2, 8) * intensity
new_temp = min(neighbor_sensor.temperature + temp_increase, 100)
# Oxygen decreases
oxygen_decrease = random.uniform(0.1, 0.3) * intensity
new_oxygen = max(neighbor_sensor.oxygen_level - oxygen_decrease, 16.0)
# Visibility decreases
visibility_decrease = random.uniform(2, 5) * intensity
new_visibility = max(neighbor_sensor.visibility - visibility_decrease, 10.0)
# Toxic gases spread (reduced concentration)
co_spread = sensor.carbon_monoxide * 0.1 * intensity
co2_spread = sensor.carbon_dioxide * 0.05 * intensity
hcn_spread = sensor.hydrogen_cyanide * 0.15 * intensity
hcl_spread = sensor.hydrogen_chloride * 0.2 * intensity
# Heat radiation spreads
heat_spread = sensor.heat_radiation * 0.3 * intensity
self.sensor_system.update_sensor(
neighbor_id,
smoke_level=new_smoke,
temperature=new_temp,
oxygen_level=new_oxygen,
visibility=new_visibility,
# Toxic gases (accumulate)
carbon_monoxide=min(neighbor_sensor.carbon_monoxide + co_spread, 200),
carbon_dioxide=min(neighbor_sensor.carbon_dioxide + co2_spread, 10000),
hydrogen_cyanide=min(neighbor_sensor.hydrogen_cyanide + hcn_spread, 50),
hydrogen_chloride=min(neighbor_sensor.hydrogen_chloride + hcl_spread, 30),
# Heat radiation
heat_radiation=min(neighbor_sensor.heat_radiation + heat_spread, 5),
# Flashover risk increases slightly
flashover_risk=min(neighbor_sensor.flashover_risk + random.uniform(0.01, 0.02) * intensity, 0.5),
# Occupancy may increase (people moving away)
occupancy_density=max(neighbor_sensor.occupancy_density - random.uniform(0.05, 0.15), 0.0),
panic_level=min(neighbor_sensor.panic_level + random.uniform(0.02, 0.05) * intensity, 1.0)
)
def _update_structures(self, intensity: float):
"""Update structural integrity based on fire exposure"""
for location_id, sensor in self.sensor_system.sensors.items():
if sensor.fire_detected or sensor.temperature > 100:
# Structural damage from heat
damage = random.uniform(0.5, 2.0) * intensity
new_integrity = max(sensor.structural_integrity - damage, 20.0)
self.sensor_system.update_sensor(
location_id,
structural_integrity=new_integrity
)
def _update_evacuation_progress(self, intensity: float):
"""Update evacuation progress and occupancy"""
for location_id, sensor in self.sensor_system.sensors.items():
# People evacuate from rooms (occupancy decreases)
if sensor.occupancy_density > 0:
evacuation_rate = random.uniform(0.05, 0.15) * intensity
new_occupancy = max(sensor.occupancy_density - evacuation_rate, 0.0)
new_progress = min(sensor.evacuation_progress + evacuation_rate * 100, 100.0)
self.sensor_system.update_sensor(
location_id,
occupancy_density=new_occupancy,
evacuation_progress=new_progress
)
# Exits may get more crowded as people arrive
room = self.floor_plan.get_room(location_id)
if room and room.room_type == "exit":
# Exits get more crowded as people evacuate
if sensor.occupancy_density < 0.8:
arrival_rate = random.uniform(0.02, 0.08) * intensity
new_occupancy = min(sensor.occupancy_density + arrival_rate, 0.9)
self.sensor_system.update_sensor(
location_id,
occupancy_density=new_occupancy
)
class DynamicEvacuationSystem:
"""Manages dynamic evacuation with changing conditions"""
def __init__(self, floor_plan: FloorPlan):
self.floor_plan = floor_plan
self.sensor_system = SensorSystem(floor_plan)
self.simulator = FireSpreadSimulator(floor_plan, self.sensor_system)
self.pathfinder = PathFinder(floor_plan, self.sensor_system)
self.current_recommendation = None
self.recommendation_history = []
def initialize_scenario(self, fire_locations: List[str],
affected_areas: Dict[str, Dict] = None):
"""Initialize the fire scenario"""
# Set up initial conditions
if affected_areas:
for location, values in affected_areas.items():
if location in self.sensor_system.sensors:
self.sensor_system.update_sensor(location, **values)
# Initialize fires
self.simulator.initialize_fire(fire_locations)
# Get initial recommendation
self._update_recommendation()
def step(self, intensity: float = 1.0, start_location: str = "R1"):
"""
Advance simulation by one time step
Args:
intensity: Fire spread intensity
start_location: Where person is evacuating from
Returns:
(time_step, routes, recommended_route, route_changed)
"""
# Update fire conditions
self.simulator.update_simulation(intensity)
# Recalculate best route
previous_rec = self.current_recommendation
self._update_recommendation(start_location)
# Check if recommendation changed
route_changed = False
if previous_rec and self.current_recommendation:
prev_path = previous_rec[0] if previous_rec else None
curr_path = self.current_recommendation[0] if self.current_recommendation else None
route_changed = (prev_path != curr_path)
# Get all routes
routes = self.pathfinder.find_all_evacuation_routes(start_location)
return self.simulator.time_step, routes, self.current_recommendation, route_changed
def _update_recommendation(self, start_location: str = "R1"):
"""Update the evacuation recommendation"""
routes = self.pathfinder.find_all_evacuation_routes(start_location)
if routes:
recommended = RiskAssessment.recommend_path(
[(path, risk) for _, path, risk in routes]
)
self.current_recommendation = recommended
# Track history
if recommended:
rec_path, rec_risk = recommended
rec_exit = [e for e, p, _ in routes if p == rec_path][0]
self.recommendation_history.append({
'time_step': self.simulator.time_step,
'exit': rec_exit,
'path': rec_path,
'danger': rec_risk['avg_danger']
})
else:
self.current_recommendation = None
def get_status_summary(self, start_location: str = "R1"):
"""Get current status summary"""
routes = self.pathfinder.find_all_evacuation_routes(start_location)
summary = {
'time_step': self.simulator.time_step,
'total_fires': sum(1 for s in self.sensor_system.sensors.values()
if s.fire_detected),
'passable_routes': sum(1 for _, _, r in routes if r['passable']),
'total_routes': len(routes),
'avg_danger': sum(r['avg_danger'] for _, _, r in routes) / len(routes) if routes else 100,
}
if self.current_recommendation:
rec_path, rec_risk = self.current_recommendation
rec_exit = [e for e, p, _ in routes if p == rec_path][0]
summary['recommended_exit'] = rec_exit
summary['recommended_danger'] = rec_risk['avg_danger']
else:
summary['recommended_exit'] = None
summary['recommended_danger'] = 100.0
return summary