|
|
"""
|
|
|
Realtime LLM-based route decision script for mock sensor data streams.
|
|
|
|
|
|
Designed to run outside the main project. It reads sensor payloads (JSON per line),
|
|
|
formats a prompt for a Llama-family model (e.g., via Ollama), and logs the AI's
|
|
|
route recommendation plus warnings/actions for each update.
|
|
|
"""
|
|
|
import argparse
|
|
|
import json
|
|
|
import sys
|
|
|
import textwrap
|
|
|
import time
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
PROMPT_TEMPLATE = """You are an emergency evacuation strategist using live sensor and route telemetry.
|
|
|
Evaluate the candidate routes and choose the safest *passable* option.
|
|
|
|
|
|
Decision rules:
|
|
|
1. Reject any route marked passable=false or containing simultaneous fire and oxygen-cylinder hazards.
|
|
|
2. Avoid rooms with biohazard alerts unless respirators are available; penalize heavily if no PPE.
|
|
|
3. Prefer routes with lower average danger and lower max danger; break ties by shorter path_length.
|
|
|
4. Flag any oxygen cylinders near heat (>40°C) or fire for operator intervention.
|
|
|
5. Provide backup route only if it is passable and within 15 danger points of the best route.
|
|
|
|
|
|
Data:
|
|
|
{data}
|
|
|
|
|
|
Respond in JSON with this schema:
|
|
|
{{
|
|
|
"recommended_route": "<route_id>",
|
|
|
"confidence": "high|medium|low",
|
|
|
"warnings": ["..."],
|
|
|
"actions": ["..."],
|
|
|
"backup_route": "<route_id or null>"
|
|
|
}}
|
|
|
|
|
|
If no safe route exists, set recommended_route=null and explain why in warnings.
|
|
|
"""
|
|
|
|
|
|
|
|
|
def load_event_stream(path: Path):
|
|
|
"""Yield parsed JSON objects from a file (one JSON object per line)."""
|
|
|
with path.open("r", encoding="utf-8") as f:
|
|
|
for line_num, line in enumerate(f, start=1):
|
|
|
line = line.strip()
|
|
|
if not line:
|
|
|
continue
|
|
|
try:
|
|
|
yield json.loads(line)
|
|
|
except json.JSONDecodeError as exc:
|
|
|
raise ValueError(f"Invalid JSON on line {line_num}: {exc}") from exc
|
|
|
|
|
|
|
|
|
def format_payload(event: Dict[str, Any]) -> str:
|
|
|
"""Pretty-print the incoming event for the LLM prompt."""
|
|
|
return json.dumps(event, indent=2, ensure_ascii=False)
|
|
|
|
|
|
|
|
|
def call_llm(model: str, prompt: str, server_url: str) -> str:
|
|
|
"""
|
|
|
Call a Llama-family model via Ollama (or any compatible endpoint).
|
|
|
|
|
|
server_url defaults to http://localhost:11434/api/generate.
|
|
|
"""
|
|
|
payload = {
|
|
|
"model": model,
|
|
|
"prompt": prompt,
|
|
|
"stream": False,
|
|
|
}
|
|
|
response = requests.post(server_url, json=payload, timeout=120)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
|
|
|
if "response" in data:
|
|
|
return data["response"]
|
|
|
|
|
|
return data.get("text") or json.dumps(data)
|
|
|
|
|
|
|
|
|
def parse_llm_output(text_output: str) -> Dict[str, Any]:
|
|
|
"""Attempt to parse the LLM JSON; fall back to error record."""
|
|
|
text_output = text_output.strip()
|
|
|
try:
|
|
|
return json.loads(text_output)
|
|
|
except json.JSONDecodeError:
|
|
|
return {
|
|
|
"recommended_route": None,
|
|
|
"confidence": "low",
|
|
|
"warnings": ["LLM returned non-JSON output", text_output],
|
|
|
"actions": [],
|
|
|
"backup_route": None,
|
|
|
}
|
|
|
|
|
|
|
|
|
def process_event(event: Dict[str, Any], args) -> Dict[str, Any]:
|
|
|
"""Send single event to LLM and return structured decision."""
|
|
|
payload_str = format_payload(event)
|
|
|
prompt = PROMPT_TEMPLATE.format(data=payload_str)
|
|
|
raw_output = call_llm(args.model, prompt, args.server_url)
|
|
|
decision = parse_llm_output(raw_output)
|
|
|
return {
|
|
|
"timestamp": event.get("timestamp_sec"),
|
|
|
"decision": decision,
|
|
|
"raw_prompt": prompt if args.debug else None,
|
|
|
"raw_output": raw_output if args.debug else None,
|
|
|
}
|
|
|
|
|
|
|
|
|
def log_decision(result: Dict[str, Any]):
|
|
|
"""Print a concise summary to stdout."""
|
|
|
decision = result["decision"]
|
|
|
ts = result["timestamp"]
|
|
|
header = f"[t={ts}s]" if ts is not None else "[t=?]"
|
|
|
print(f"{header} recommended_route={decision.get('recommended_route')} "
|
|
|
f"confidence={decision.get('confidence')}")
|
|
|
if decision.get("warnings"):
|
|
|
print(" warnings:")
|
|
|
for warning in decision["warnings"]:
|
|
|
wrapped = textwrap.fill(warning, width=78, subsequent_indent=" ")
|
|
|
print(f" - {wrapped}")
|
|
|
if decision.get("actions"):
|
|
|
print(" actions:")
|
|
|
for action in decision["actions"]:
|
|
|
wrapped = textwrap.fill(action, width=78, subsequent_indent=" ")
|
|
|
print(f" - {wrapped}")
|
|
|
if decision.get("backup_route"):
|
|
|
print(f" backup_route: {decision['backup_route']}")
|
|
|
print()
|
|
|
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="Stream sensor events to a Llama model for route decisions.")
|
|
|
parser.add_argument("--input", required=True,
|
|
|
help="Path to JSONL file containing mock sensor events.")
|
|
|
parser.add_argument("--model", default="llama3",
|
|
|
help="Model name served by Ollama (default: llama3).")
|
|
|
parser.add_argument("--server-url", default="http://localhost:11434/api/generate",
|
|
|
help="Generation endpoint URL.")
|
|
|
parser.add_argument("--delay", type=float, default=0.0,
|
|
|
help="Seconds to wait between events (simulate realtime).")
|
|
|
parser.add_argument("--debug", action="store_true",
|
|
|
help="Print raw prompt/output for troubleshooting.")
|
|
|
return parser
|
|
|
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None):
|
|
|
parser = build_arg_parser()
|
|
|
args = parser.parse_args(argv)
|
|
|
|
|
|
input_path = Path(args.input)
|
|
|
if not input_path.exists():
|
|
|
parser.error(f"Input file not found: {input_path}")
|
|
|
|
|
|
try:
|
|
|
for event in load_event_stream(input_path):
|
|
|
result = process_event(event, args)
|
|
|
log_decision(result)
|
|
|
|
|
|
if args.debug:
|
|
|
print("----- RAW PROMPT -----")
|
|
|
print(result["raw_prompt"])
|
|
|
print("----- RAW OUTPUT -----")
|
|
|
print(result["raw_output"])
|
|
|
print("----------------------\n")
|
|
|
|
|
|
if args.delay > 0:
|
|
|
time.sleep(args.delay)
|
|
|
except KeyboardInterrupt:
|
|
|
print("\nInterrupted by user.", file=sys.stderr)
|
|
|
except Exception as exc:
|
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|
|
|
|