File size: 13,653 Bytes
faa740f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""
Pathfinding and Risk Assessment Module
Uses A* algorithm with custom risk-based heuristics
"""
import heapq
from typing import Dict, List, Tuple, Optional
from .floor_plan import FloorPlan
from .sensor_system import SensorSystem, SensorReading


class PathNode:
    """Node for pathfinding algorithm"""
    def __init__(self, room_id: str, g_cost: float, h_cost: float, parent=None):
        self.room_id = room_id
        self.g_cost = g_cost  # Cost from start
        self.h_cost = h_cost  # Heuristic cost to goal
        self.f_cost = g_cost + h_cost  # Total cost
        self.parent = parent
        
    def __lt__(self, other):
        return self.f_cost < other.f_cost
    
    def __eq__(self, other):
        return self.room_id == other.room_id


class RiskAssessment:
    """Assesses risk for different evacuation routes"""
    
    @staticmethod
    def calculate_path_risk(path: List[str], sensor_system: SensorSystem, 
                           floor_plan: FloorPlan) -> Dict:
        """
        Calculate comprehensive risk assessment for a path
        
        Returns dict with:
        - total_danger: Overall danger score
        - max_danger: Maximum danger point
        - avg_danger: Average danger
        - has_fire: Whether path goes through fire
        - has_oxygen_hazard: Whether path has oxygen cylinder
        - passable: Whether path is passable
        - risk_factors: List of specific risks
        """
        if not path:
            return {"passable": False, "total_danger": 100.0}
        
        danger_scores = []
        risk_factors = []
        has_fire = False
        has_oxygen_hazard = False
        max_danger_location = None
        max_danger_score = 0.0
        
        for room_id in path:
            sensor = sensor_system.get_sensor_reading(room_id)
            room = floor_plan.get_room(room_id)
            
            if sensor:
                danger = sensor.calculate_danger_score()
                danger_scores.append(danger)
                
                if danger > max_danger_score:
                    max_danger_score = danger
                    max_danger_location = room_id
                
                # Check specific hazards
                if sensor.fire_detected:
                    has_fire = True
                    risk_factors.append(f"Fire detected in {room_id}")
                
                if room and room.has_oxygen_cylinder:
                    has_oxygen_hazard = True
                    # Increase danger if there's heat near oxygen
                    if sensor.temperature > 40 or sensor.fire_detected:
                        danger_scores[-1] += 15  # Add explosion risk
                        risk_factors.append(f"Oxygen cylinder explosion risk in {room_id}")
                    else:
                        risk_factors.append(f"Oxygen cylinder present in {room_id}")
                
                if sensor.smoke_level > 0.5:
                    risk_factors.append(f"Heavy smoke in {room_id}")
                
                if sensor.temperature > 60:
                    risk_factors.append(f"High temperature ({sensor.temperature:.1f}°C) in {room_id}")
                
                if sensor.oxygen_level < 19.5:
                    risk_factors.append(f"Low oxygen ({sensor.oxygen_level:.1f}%) in {room_id}")
                
                # NEW: Toxic gas warnings
                if sensor.carbon_monoxide > 50:
                    risk_factors.append(f"TOXIC CO ({sensor.carbon_monoxide:.0f} ppm) in {room_id}")
                if sensor.carbon_dioxide > 5000:
                    risk_factors.append(f"High CO2 ({sensor.carbon_dioxide:.0f} ppm) in {room_id}")
                if sensor.hydrogen_cyanide > 20:
                    risk_factors.append(f"TOXIC HCN ({sensor.hydrogen_cyanide:.1f} ppm) in {room_id}")
                
                # NEW: Flashover/backdraft warnings
                if sensor.flashover_risk > 0.7:
                    risk_factors.append(f"CRITICAL: Flashover risk ({sensor.flashover_risk*100:.0f}%) in {room_id}")
                if sensor.backdraft_risk > 0.6:
                    risk_factors.append(f"CRITICAL: Backdraft risk ({sensor.backdraft_risk*100:.0f}%) in {room_id}")
                
                # NEW: Crowd density warnings
                if sensor.occupancy_density > 0.8:
                    risk_factors.append(f"High crowd density ({sensor.occupancy_density*100:.0f}%) in {room_id}")
                
                # NEW: Infrastructure failures
                if not sensor.exit_accessible:
                    risk_factors.append(f"Exit BLOCKED in {room_id}")
                if not sensor.stairwell_clear:
                    risk_factors.append(f"Stairwell BLOCKED in {room_id}")
                if not sensor.emergency_lighting:
                    risk_factors.append(f"No emergency lighting in {room_id}")
                
                # NEW: Time pressure
                if sensor.time_since_fire_start > 300:
                    risk_factors.append(f"Time pressure: {sensor.time_since_fire_start//60} min elapsed")
                
                if not sensor.is_passable():
                    risk_factors.append(f"Path blocked at {room_id}")
        
        total_danger = sum(danger_scores)
        avg_danger = total_danger / len(danger_scores) if danger_scores else 0
        
        # Check if all segments are passable
        passable = all(sensor_system.get_sensor_reading(rid).is_passable() 
                      for rid in path if sensor_system.get_sensor_reading(rid))
        
        return {
            "total_danger": total_danger,
            "max_danger": max_danger_score,
            "max_danger_location": max_danger_location,
            "avg_danger": avg_danger,
            "path_length": len(path),
            "has_fire": has_fire,
            "has_oxygen_hazard": has_oxygen_hazard,
            "passable": passable,
            "risk_factors": risk_factors,
            "danger_scores": danger_scores
        }
    
    @staticmethod
    def get_risk_level(avg_danger: float) -> str:
        """Get risk level description"""
        if avg_danger < 20:
            return "LOW"
        elif avg_danger < 40:
            return "MODERATE"
        elif avg_danger < 60:
            return "HIGH"
        else:
            return "CRITICAL"
    
    @staticmethod
    def recommend_path(paths: List[Tuple[List[str], Dict]]) -> Optional[Tuple[List[str], Dict]]:
        """
        Recommend the best path based on risk assessment
        
        Args:
            paths: List of (path, risk_assessment) tuples
        
        Returns:
            Best (path, risk_assessment) tuple or None
        """
        if not paths:
            return None
        
        # Filter to only passable paths
        passable_paths = [(p, r) for p, r in paths if r["passable"]]
        
        if not passable_paths:
            # No fully passable paths, return least dangerous
            return min(paths, key=lambda x: x[1]["total_danger"])
        
        # Score each path (lower is better)
        def score_path(path_info):
            path, risk = path_info
            score = 0
            
            # Heavily penalize fire
            if risk["has_fire"]:
                score += 100
            
            # Add oxygen hazard risk (but less than fire)
            if risk["has_oxygen_hazard"]:
                score += 30
            
            # Add danger scores
            score += risk["total_danger"]
            
            # Prefer shorter paths (slight preference)
            score += risk["path_length"] * 2
            
            # Penalize high max danger points
            score += risk["max_danger"] * 0.5
            
            return score
        
        # Return path with lowest score
        return min(passable_paths, key=score_path)


class PathFinder:
    """Find optimal evacuation paths using A* algorithm with risk assessment"""
    
    def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem):
        self.floor_plan = floor_plan
        self.sensor_system = sensor_system
        
    def find_all_evacuation_routes(self, start: str) -> List[Tuple[str, List[str], Dict]]:
        """
        Find evacuation routes to all exits
        
        Returns: List of (exit_id, path, risk_assessment) tuples
        """
        routes = []
        exits = self.floor_plan.get_all_exits()
        
        for exit_id in exits:
            path = self.find_path(start, exit_id)
            if path:
                risk = RiskAssessment.calculate_path_risk(
                    path, self.sensor_system, self.floor_plan
                )
                routes.append((exit_id, path, risk))
        
        return routes
    
    def find_path(self, start: str, goal: str) -> Optional[List[str]]:
        """
        Find path from start to goal using A* with risk-based costs
        
        Returns: List of room IDs representing the path, or None if no path exists
        """
        if start not in self.floor_plan.rooms or goal not in self.floor_plan.rooms:
            return None
        
        # Priority queue: (f_cost, node)
        open_set = []
        closed_set = set()
        
        # Initialize start node
        start_node = PathNode(start, 0, self._heuristic(start, goal))
        heapq.heappush(open_set, start_node)
        
        # Track best g_cost for each room
        g_costs = {start: 0}
        
        while open_set:
            current = heapq.heappop(open_set)
            
            # Goal reached
            if current.room_id == goal:
                return self._reconstruct_path(current)
            
            # Skip if already processed
            if current.room_id in closed_set:
                continue
            
            closed_set.add(current.room_id)
            
            # Explore neighbors
            neighbors = self.floor_plan.get_neighbors(current.room_id)
            for neighbor_id, base_distance in neighbors:
                if neighbor_id in closed_set:
                    continue
                
                # Calculate risk-adjusted cost
                risk_cost = self._calculate_risk_cost(neighbor_id)
                tentative_g = current.g_cost + base_distance + risk_cost
                
                # If this path is better, add to open set
                if neighbor_id not in g_costs or tentative_g < g_costs[neighbor_id]:
                    g_costs[neighbor_id] = tentative_g
                    h_cost = self._heuristic(neighbor_id, goal)
                    neighbor_node = PathNode(neighbor_id, tentative_g, h_cost, current)
                    heapq.heappush(open_set, neighbor_node)
        
        # No path found
        return None
    
    def _heuristic(self, room_id1: str, room_id2: str) -> float:
        """
        Heuristic function for A* (Manhattan distance between room positions)
        """
        room1 = self.floor_plan.get_room(room_id1)
        room2 = self.floor_plan.get_room(room_id2)
        
        if not room1 or not room2:
            return 0
        
        x1, y1 = room1.position
        x2, y2 = room2.position
        
        return abs(x2 - x1) + abs(y2 - y1)
    
    def _calculate_risk_cost(self, room_id: str) -> float:
        """
        Calculate risk-based cost for traversing a room with all real-world factors
        Higher danger = higher cost
        """
        sensor = self.sensor_system.get_sensor_reading(room_id)
        if not sensor:
            return 0.0
        
        danger_score = sensor.calculate_danger_score()
        
        # Convert danger score to cost multiplier
        # Danger 0-20: minimal cost
        # Danger 20-50: moderate cost
        # Danger 50+: high cost
        risk_cost = danger_score / 10.0
        
        # Extra penalty for fire
        if sensor.fire_detected:
            risk_cost += 20.0
        
        # Extra penalty for oxygen cylinder in dangerous conditions
        room = self.floor_plan.get_room(room_id)
        if room and room.has_oxygen_cylinder:
            if sensor.temperature > 40 or sensor.fire_detected:
                risk_cost += 10.0
        
        # NEW: Extra penalties for critical factors
        # Toxic gas penalty
        if sensor.carbon_monoxide > 50:
            risk_cost += 15.0
        if sensor.carbon_dioxide > 5000:
            risk_cost += 10.0
        
        # Flashover risk penalty
        if sensor.flashover_risk > 0.7:
            risk_cost += 25.0
        elif sensor.flashover_risk > 0.5:
            risk_cost += 15.0
        
        # Exit blockage penalty
        if not sensor.exit_accessible:
            risk_cost += 30.0
        
        if not sensor.stairwell_clear:
            risk_cost += 20.0
        
        # Crowd density penalty (slows movement)
        if sensor.occupancy_density > 0.8:
            risk_cost += sensor.occupancy_density * 20.0
        
        # Infrastructure failure penalty
        if not sensor.emergency_lighting:
            risk_cost += 5.0
        
        # Make impassable areas very expensive (but not infinite)
        if not sensor.is_passable():
            risk_cost += 50.0
        
        return risk_cost
    
    def _reconstruct_path(self, node: PathNode) -> List[str]:
        """Reconstruct path from goal node back to start"""
        path = []
        current = node
        while current:
            path.append(current.room_id)
            current = current.parent
        return list(reversed(path))