fire_evac / llm_route_decider.py
ratyim's picture
Upload 6 files
faa740f verified
"""
Realtime LLM-based route decision script for mock sensor data streams.
Designed to run outside the main project. It reads sensor payloads (JSON per line),
formats a prompt for a Llama-family model (e.g., via Ollama), and logs the AI's
route recommendation plus warnings/actions for each update.
"""
import argparse
import json
import sys
import textwrap
import time
from pathlib import Path
from typing import Dict, Any, List, Optional
import requests
PROMPT_TEMPLATE = """You are an emergency evacuation strategist using live sensor and route telemetry.
Evaluate the candidate routes and choose the safest *passable* option.
Decision rules:
1. Reject any route marked passable=false or containing simultaneous fire and oxygen-cylinder hazards.
2. Avoid rooms with biohazard alerts unless respirators are available; penalize heavily if no PPE.
3. Prefer routes with lower average danger and lower max danger; break ties by shorter path_length.
4. Flag any oxygen cylinders near heat (>40°C) or fire for operator intervention.
5. Provide backup route only if it is passable and within 15 danger points of the best route.
Data:
{data}
Respond in JSON with this schema:
{{
"recommended_route": "<route_id>",
"confidence": "high|medium|low",
"warnings": ["..."],
"actions": ["..."],
"backup_route": "<route_id or null>"
}}
If no safe route exists, set recommended_route=null and explain why in warnings.
"""
def load_event_stream(path: Path):
"""Yield parsed JSON objects from a file (one JSON object per line)."""
with path.open("r", encoding="utf-8") as f:
for line_num, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON on line {line_num}: {exc}") from exc
def format_payload(event: Dict[str, Any]) -> str:
"""Pretty-print the incoming event for the LLM prompt."""
return json.dumps(event, indent=2, ensure_ascii=False)
def call_llm(model: str, prompt: str, server_url: str) -> str:
"""
Call a Llama-family model via Ollama (or any compatible endpoint).
server_url defaults to http://localhost:11434/api/generate.
"""
payload = {
"model": model,
"prompt": prompt,
"stream": False,
}
response = requests.post(server_url, json=payload, timeout=120)
response.raise_for_status()
data = response.json()
# Ollama returns {"response": "...", "done": true, ...}
if "response" in data:
return data["response"]
# Fallback for other APIs
return data.get("text") or json.dumps(data)
def parse_llm_output(text_output: str) -> Dict[str, Any]:
"""Attempt to parse the LLM JSON; fall back to error record."""
text_output = text_output.strip()
try:
return json.loads(text_output)
except json.JSONDecodeError:
return {
"recommended_route": None,
"confidence": "low",
"warnings": ["LLM returned non-JSON output", text_output],
"actions": [],
"backup_route": None,
}
def process_event(event: Dict[str, Any], args) -> Dict[str, Any]:
"""Send single event to LLM and return structured decision."""
payload_str = format_payload(event)
prompt = PROMPT_TEMPLATE.format(data=payload_str)
raw_output = call_llm(args.model, prompt, args.server_url)
decision = parse_llm_output(raw_output)
return {
"timestamp": event.get("timestamp_sec"),
"decision": decision,
"raw_prompt": prompt if args.debug else None,
"raw_output": raw_output if args.debug else None,
}
def log_decision(result: Dict[str, Any]):
"""Print a concise summary to stdout."""
decision = result["decision"]
ts = result["timestamp"]
header = f"[t={ts}s]" if ts is not None else "[t=?]"
print(f"{header} recommended_route={decision.get('recommended_route')} "
f"confidence={decision.get('confidence')}")
if decision.get("warnings"):
print(" warnings:")
for warning in decision["warnings"]:
wrapped = textwrap.fill(warning, width=78, subsequent_indent=" ")
print(f" - {wrapped}")
if decision.get("actions"):
print(" actions:")
for action in decision["actions"]:
wrapped = textwrap.fill(action, width=78, subsequent_indent=" ")
print(f" - {wrapped}")
if decision.get("backup_route"):
print(f" backup_route: {decision['backup_route']}")
print()
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Stream sensor events to a Llama model for route decisions.")
parser.add_argument("--input", required=True,
help="Path to JSONL file containing mock sensor events.")
parser.add_argument("--model", default="llama3",
help="Model name served by Ollama (default: llama3).")
parser.add_argument("--server-url", default="http://localhost:11434/api/generate",
help="Generation endpoint URL.")
parser.add_argument("--delay", type=float, default=0.0,
help="Seconds to wait between events (simulate realtime).")
parser.add_argument("--debug", action="store_true",
help="Print raw prompt/output for troubleshooting.")
return parser
def main(argv: Optional[List[str]] = None):
parser = build_arg_parser()
args = parser.parse_args(argv)
input_path = Path(args.input)
if not input_path.exists():
parser.error(f"Input file not found: {input_path}")
try:
for event in load_event_stream(input_path):
result = process_event(event, args)
log_decision(result)
if args.debug:
print("----- RAW PROMPT -----")
print(result["raw_prompt"])
print("----- RAW OUTPUT -----")
print(result["raw_output"])
print("----------------------\n")
if args.delay > 0:
time.sleep(args.delay)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()