File size: 13,653 Bytes
faa740f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
"""
Pathfinding and Risk Assessment Module
Uses A* algorithm with custom risk-based heuristics
"""
import heapq
from typing import Dict, List, Tuple, Optional
from .floor_plan import FloorPlan
from .sensor_system import SensorSystem, SensorReading
class PathNode:
"""Node for pathfinding algorithm"""
def __init__(self, room_id: str, g_cost: float, h_cost: float, parent=None):
self.room_id = room_id
self.g_cost = g_cost # Cost from start
self.h_cost = h_cost # Heuristic cost to goal
self.f_cost = g_cost + h_cost # Total cost
self.parent = parent
def __lt__(self, other):
return self.f_cost < other.f_cost
def __eq__(self, other):
return self.room_id == other.room_id
class RiskAssessment:
"""Assesses risk for different evacuation routes"""
@staticmethod
def calculate_path_risk(path: List[str], sensor_system: SensorSystem,
floor_plan: FloorPlan) -> Dict:
"""
Calculate comprehensive risk assessment for a path
Returns dict with:
- total_danger: Overall danger score
- max_danger: Maximum danger point
- avg_danger: Average danger
- has_fire: Whether path goes through fire
- has_oxygen_hazard: Whether path has oxygen cylinder
- passable: Whether path is passable
- risk_factors: List of specific risks
"""
if not path:
return {"passable": False, "total_danger": 100.0}
danger_scores = []
risk_factors = []
has_fire = False
has_oxygen_hazard = False
max_danger_location = None
max_danger_score = 0.0
for room_id in path:
sensor = sensor_system.get_sensor_reading(room_id)
room = floor_plan.get_room(room_id)
if sensor:
danger = sensor.calculate_danger_score()
danger_scores.append(danger)
if danger > max_danger_score:
max_danger_score = danger
max_danger_location = room_id
# Check specific hazards
if sensor.fire_detected:
has_fire = True
risk_factors.append(f"Fire detected in {room_id}")
if room and room.has_oxygen_cylinder:
has_oxygen_hazard = True
# Increase danger if there's heat near oxygen
if sensor.temperature > 40 or sensor.fire_detected:
danger_scores[-1] += 15 # Add explosion risk
risk_factors.append(f"Oxygen cylinder explosion risk in {room_id}")
else:
risk_factors.append(f"Oxygen cylinder present in {room_id}")
if sensor.smoke_level > 0.5:
risk_factors.append(f"Heavy smoke in {room_id}")
if sensor.temperature > 60:
risk_factors.append(f"High temperature ({sensor.temperature:.1f}°C) in {room_id}")
if sensor.oxygen_level < 19.5:
risk_factors.append(f"Low oxygen ({sensor.oxygen_level:.1f}%) in {room_id}")
# NEW: Toxic gas warnings
if sensor.carbon_monoxide > 50:
risk_factors.append(f"TOXIC CO ({sensor.carbon_monoxide:.0f} ppm) in {room_id}")
if sensor.carbon_dioxide > 5000:
risk_factors.append(f"High CO2 ({sensor.carbon_dioxide:.0f} ppm) in {room_id}")
if sensor.hydrogen_cyanide > 20:
risk_factors.append(f"TOXIC HCN ({sensor.hydrogen_cyanide:.1f} ppm) in {room_id}")
# NEW: Flashover/backdraft warnings
if sensor.flashover_risk > 0.7:
risk_factors.append(f"CRITICAL: Flashover risk ({sensor.flashover_risk*100:.0f}%) in {room_id}")
if sensor.backdraft_risk > 0.6:
risk_factors.append(f"CRITICAL: Backdraft risk ({sensor.backdraft_risk*100:.0f}%) in {room_id}")
# NEW: Crowd density warnings
if sensor.occupancy_density > 0.8:
risk_factors.append(f"High crowd density ({sensor.occupancy_density*100:.0f}%) in {room_id}")
# NEW: Infrastructure failures
if not sensor.exit_accessible:
risk_factors.append(f"Exit BLOCKED in {room_id}")
if not sensor.stairwell_clear:
risk_factors.append(f"Stairwell BLOCKED in {room_id}")
if not sensor.emergency_lighting:
risk_factors.append(f"No emergency lighting in {room_id}")
# NEW: Time pressure
if sensor.time_since_fire_start > 300:
risk_factors.append(f"Time pressure: {sensor.time_since_fire_start//60} min elapsed")
if not sensor.is_passable():
risk_factors.append(f"Path blocked at {room_id}")
total_danger = sum(danger_scores)
avg_danger = total_danger / len(danger_scores) if danger_scores else 0
# Check if all segments are passable
passable = all(sensor_system.get_sensor_reading(rid).is_passable()
for rid in path if sensor_system.get_sensor_reading(rid))
return {
"total_danger": total_danger,
"max_danger": max_danger_score,
"max_danger_location": max_danger_location,
"avg_danger": avg_danger,
"path_length": len(path),
"has_fire": has_fire,
"has_oxygen_hazard": has_oxygen_hazard,
"passable": passable,
"risk_factors": risk_factors,
"danger_scores": danger_scores
}
@staticmethod
def get_risk_level(avg_danger: float) -> str:
"""Get risk level description"""
if avg_danger < 20:
return "LOW"
elif avg_danger < 40:
return "MODERATE"
elif avg_danger < 60:
return "HIGH"
else:
return "CRITICAL"
@staticmethod
def recommend_path(paths: List[Tuple[List[str], Dict]]) -> Optional[Tuple[List[str], Dict]]:
"""
Recommend the best path based on risk assessment
Args:
paths: List of (path, risk_assessment) tuples
Returns:
Best (path, risk_assessment) tuple or None
"""
if not paths:
return None
# Filter to only passable paths
passable_paths = [(p, r) for p, r in paths if r["passable"]]
if not passable_paths:
# No fully passable paths, return least dangerous
return min(paths, key=lambda x: x[1]["total_danger"])
# Score each path (lower is better)
def score_path(path_info):
path, risk = path_info
score = 0
# Heavily penalize fire
if risk["has_fire"]:
score += 100
# Add oxygen hazard risk (but less than fire)
if risk["has_oxygen_hazard"]:
score += 30
# Add danger scores
score += risk["total_danger"]
# Prefer shorter paths (slight preference)
score += risk["path_length"] * 2
# Penalize high max danger points
score += risk["max_danger"] * 0.5
return score
# Return path with lowest score
return min(passable_paths, key=score_path)
class PathFinder:
"""Find optimal evacuation paths using A* algorithm with risk assessment"""
def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem):
self.floor_plan = floor_plan
self.sensor_system = sensor_system
def find_all_evacuation_routes(self, start: str) -> List[Tuple[str, List[str], Dict]]:
"""
Find evacuation routes to all exits
Returns: List of (exit_id, path, risk_assessment) tuples
"""
routes = []
exits = self.floor_plan.get_all_exits()
for exit_id in exits:
path = self.find_path(start, exit_id)
if path:
risk = RiskAssessment.calculate_path_risk(
path, self.sensor_system, self.floor_plan
)
routes.append((exit_id, path, risk))
return routes
def find_path(self, start: str, goal: str) -> Optional[List[str]]:
"""
Find path from start to goal using A* with risk-based costs
Returns: List of room IDs representing the path, or None if no path exists
"""
if start not in self.floor_plan.rooms or goal not in self.floor_plan.rooms:
return None
# Priority queue: (f_cost, node)
open_set = []
closed_set = set()
# Initialize start node
start_node = PathNode(start, 0, self._heuristic(start, goal))
heapq.heappush(open_set, start_node)
# Track best g_cost for each room
g_costs = {start: 0}
while open_set:
current = heapq.heappop(open_set)
# Goal reached
if current.room_id == goal:
return self._reconstruct_path(current)
# Skip if already processed
if current.room_id in closed_set:
continue
closed_set.add(current.room_id)
# Explore neighbors
neighbors = self.floor_plan.get_neighbors(current.room_id)
for neighbor_id, base_distance in neighbors:
if neighbor_id in closed_set:
continue
# Calculate risk-adjusted cost
risk_cost = self._calculate_risk_cost(neighbor_id)
tentative_g = current.g_cost + base_distance + risk_cost
# If this path is better, add to open set
if neighbor_id not in g_costs or tentative_g < g_costs[neighbor_id]:
g_costs[neighbor_id] = tentative_g
h_cost = self._heuristic(neighbor_id, goal)
neighbor_node = PathNode(neighbor_id, tentative_g, h_cost, current)
heapq.heappush(open_set, neighbor_node)
# No path found
return None
def _heuristic(self, room_id1: str, room_id2: str) -> float:
"""
Heuristic function for A* (Manhattan distance between room positions)
"""
room1 = self.floor_plan.get_room(room_id1)
room2 = self.floor_plan.get_room(room_id2)
if not room1 or not room2:
return 0
x1, y1 = room1.position
x2, y2 = room2.position
return abs(x2 - x1) + abs(y2 - y1)
def _calculate_risk_cost(self, room_id: str) -> float:
"""
Calculate risk-based cost for traversing a room with all real-world factors
Higher danger = higher cost
"""
sensor = self.sensor_system.get_sensor_reading(room_id)
if not sensor:
return 0.0
danger_score = sensor.calculate_danger_score()
# Convert danger score to cost multiplier
# Danger 0-20: minimal cost
# Danger 20-50: moderate cost
# Danger 50+: high cost
risk_cost = danger_score / 10.0
# Extra penalty for fire
if sensor.fire_detected:
risk_cost += 20.0
# Extra penalty for oxygen cylinder in dangerous conditions
room = self.floor_plan.get_room(room_id)
if room and room.has_oxygen_cylinder:
if sensor.temperature > 40 or sensor.fire_detected:
risk_cost += 10.0
# NEW: Extra penalties for critical factors
# Toxic gas penalty
if sensor.carbon_monoxide > 50:
risk_cost += 15.0
if sensor.carbon_dioxide > 5000:
risk_cost += 10.0
# Flashover risk penalty
if sensor.flashover_risk > 0.7:
risk_cost += 25.0
elif sensor.flashover_risk > 0.5:
risk_cost += 15.0
# Exit blockage penalty
if not sensor.exit_accessible:
risk_cost += 30.0
if not sensor.stairwell_clear:
risk_cost += 20.0
# Crowd density penalty (slows movement)
if sensor.occupancy_density > 0.8:
risk_cost += sensor.occupancy_density * 20.0
# Infrastructure failure penalty
if not sensor.emergency_lighting:
risk_cost += 5.0
# Make impassable areas very expensive (but not infinite)
if not sensor.is_passable():
risk_cost += 50.0
return risk_cost
def _reconstruct_path(self, node: PathNode) -> List[str]:
"""Reconstruct path from goal node back to start"""
path = []
current = node
while current:
path.append(current.room_id)
current = current.parent
return list(reversed(path))
|