Spaces:
Running
Running
| # --- Care Count (Volunteers → Visits → Items) --- | |
| # Modern UI/UX with industry-standard design patterns | |
| # Enhanced version with improved user experience | |
| from __future__ import annotations | |
| import os, io, time, base64, re, uuid, json | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| import logging | |
| import pytz | |
| import requests | |
| import pandas as pd | |
| import streamlit as st | |
| from PIL import Image, ImageOps, ImageEnhance | |
| from supabase import create_client, Client | |
| # Import our modern UI components | |
| from ui_improvements import ModernUIComponents, apply_modern_ui, create_modern_layout | |
| # ------------------------ Enhanced Logging ------------------------ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('care_count.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ------------------------ App config ------------------------ | |
| os.environ.setdefault("HOME", "/tmp") | |
| os.environ["STREAMLIT_BROWSER_GATHER_USAGE_STATS"] = "false" | |
| TZ = os.getenv("APP_TZ", "America/Toronto") | |
| CUTOFF_HOUR = int(os.getenv("CUTOFF_HOUR", "-1")) # -1 disables daily cutoff | |
| INACTIVITY_MIN = int(os.getenv("INACTIVITY_MIN", "30")) | |
| def local_now() -> datetime: | |
| return datetime.now(pytz.timezone(TZ)) | |
| # Enhanced page config with modern settings | |
| st.set_page_config( | |
| page_title="Care Count - Volunteer Management", | |
| page_icon="💜", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| menu_items={ | |
| 'Get Help': 'https://github.com/your-repo/care-count', | |
| 'Report a bug': "https://github.com/your-repo/care-count/issues", | |
| 'About': "# Care Count\nVolunteer management system for community impact tracking" | |
| } | |
| ) | |
| # Apply modern UI styling | |
| apply_modern_ui() | |
| # ------------------------ Enhanced Secrets Management ------------------------ | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except Exception: | |
| pass | |
| try: | |
| _SECRETS = getattr(st, "secrets", {}) | |
| except Exception: | |
| _SECRETS = {} | |
| def _is_useful(val: Optional[str]) -> bool: | |
| return val is not None and str(val).strip() != "" | |
| def get_secret(name: str, default: Optional[str] = None) -> Optional[str]: | |
| """Enhanced secret management with logging""" | |
| v = os.getenv(name) | |
| if _is_useful(v): | |
| logger.info(f"Secret {name} loaded from environment") | |
| return v | |
| try: | |
| if hasattr(_SECRETS, "get"): | |
| v = _SECRETS.get(name, default) | |
| if _is_useful(v): | |
| logger.info(f"Secret {name} loaded from Streamlit secrets") | |
| return v | |
| except Exception as e: | |
| logger.warning(f"Failed to load secret {name} from Streamlit secrets: {e}") | |
| return default | |
| def require_secret(name: str) -> str: | |
| """Enhanced secret requirement with better error handling""" | |
| v = get_secret(name, None) | |
| if not _is_useful(v): | |
| st.error( | |
| f"🔐 **Configuration Error**\n\n" | |
| f"Missing required configuration: `{name}`\n\n" | |
| f"Please ensure this is set in your environment variables or `.streamlit/secrets.toml` file.\n\n" | |
| f"For help, check the [documentation](https://github.com/your-repo/care-count#configuration)." | |
| ) | |
| st.stop() | |
| return str(v) | |
| # Required Supabase credentials | |
| SUPABASE_URL = require_secret("SUPABASE_URL") | |
| SUPABASE_KEY = require_secret("SUPABASE_KEY") | |
| # Initialize Supabase client with error handling | |
| try: | |
| sb: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| logger.info("Supabase client initialized successfully") | |
| except Exception as e: | |
| st.error(f"Failed to connect to Supabase: {e}") | |
| logger.error(f"Supabase connection failed: {e}") | |
| st.stop() | |
| # Optional provider/config | |
| PROVIDER = (get_secret("PROVIDER", "nebius") or "nebius").strip().lower() | |
| GEMMA_MODEL = (get_secret("GEMMA_MODEL", "google/gemma-3-27b-it") or "google/gemma-3-27b-it").strip() | |
| NEBIUS_API_KEY = get_secret("NEBIUS_API_KEY") | |
| NEBIUS_BASE_URL = get_secret("NEBIUS_BASE_URL", "https://api.nebius.ai/v1") | |
| FEATH_API_KEY = get_secret("FEATHERLESS_API_KEY") | |
| FEATH_BASE_URL = get_secret("FEATHERLESS_BASE_URL", "https://api.featherless.ai/v1") | |
| # ------------------------ Enhanced Event Logging ------------------------ | |
| def log_event(action: str, actor: Optional[str], details: dict, level: str = "info"): | |
| """Enhanced event logging with multiple levels""" | |
| log_data = { | |
| "happened_at": datetime.utcnow().isoformat(), | |
| "action": action, | |
| "actor_email": actor, | |
| "details": json.dumps(details) if isinstance(details, dict) else str(details) | |
| } | |
| # Log to file | |
| if level == "error": | |
| logger.error(f"Event: {action} by {actor} - {details}") | |
| elif level == "warning": | |
| logger.warning(f"Event: {action} by {actor} - {details}") | |
| else: | |
| logger.info(f"Event: {action} by {actor} - {details}") | |
| # Log to database (without level field to avoid schema issues) | |
| try: | |
| sb.table("events").insert(log_data).execute() | |
| except Exception as e: | |
| logger.warning(f"Failed to log event to database (continuing): {e}") | |
| # ------------------------ Modern Authentication UI ------------------------ | |
| def auth_block() -> tuple[bool, Optional[str]]: | |
| """Enhanced authentication with modern UI""" | |
| if "auth_email" not in st.session_state: | |
| st.session_state["auth_email"] = None | |
| if "user_email" in st.session_state: | |
| return True, st.session_state["user_email"] | |
| # Modern hero section for login | |
| st.markdown(ModernUIComponents.create_hero_section( | |
| "Care Count", | |
| "Snap items, track visits, and make an impact." | |
| ), unsafe_allow_html=True) | |
| # Modern login form | |
| with st.container(): | |
| st.markdown('<div class="modern-card">', unsafe_allow_html=True) | |
| st.subheader("Sign In") | |
| st.markdown("Enter your email to receive a secure login code.") | |
| with st.form("otp_request", clear_on_submit=False): | |
| email = st.text_input( | |
| "Email Address", | |
| value=st.session_state.get("auth_email") or "", | |
| placeholder="[email protected]", | |
| help="We'll send you a secure 6-digit code" | |
| ) | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| send = st.form_submit_button("📧 Send Login Code", use_container_width=True) | |
| if send: | |
| if not email or "@" not in email: | |
| st.error("Please enter a valid email address.") | |
| else: | |
| try: | |
| with st.spinner("Sending login code..."): | |
| sb.auth.sign_in_with_otp({"email": email, "shouldCreateUser": True}) | |
| st.session_state["auth_email"] = email | |
| st.success("✅ Login code sent! Check your email.") | |
| log_event("otp_requested", email, {"method": "email"}) | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "429" in error_msg or "Too Many Requests" in error_msg: | |
| st.warning("⏳ Please wait a moment before requesting another code. For security, there's a brief delay between requests.") | |
| elif "Invalid email" in error_msg: | |
| st.error("❌ Please enter a valid email address.") | |
| else: | |
| st.error(f"❌ Could not send code: {error_msg}") | |
| log_event("otp_failed", email, {"error": error_msg}, "error") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # OTP verification form | |
| if st.session_state.get("auth_email"): | |
| with st.container(): | |
| st.markdown('<div class="modern-card">', unsafe_allow_html=True) | |
| st.subheader("🔢 Verify Code") | |
| st.markdown(f"Enter the 6-digit code sent to **{st.session_state['auth_email']}**") | |
| with st.form("otp_verify", clear_on_submit=True): | |
| code = st.text_input( | |
| "Verification Code", | |
| max_chars=6, | |
| placeholder="123456", | |
| help="Enter the 6-digit code from your email" | |
| ) | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| ok = st.form_submit_button("✅ Verify & Start Shift", use_container_width=True) | |
| if ok: | |
| if len(code) != 6 or not code.isdigit(): | |
| st.error("Please enter a valid 6-digit code.") | |
| else: | |
| try: | |
| with st.spinner("Verifying code..."): | |
| res = sb.auth.verify_otp({"email": st.session_state["auth_email"], "token": code, "type": "email"}) | |
| if res and res.user: | |
| # Ensure subsequent PostgREST requests carry the user's JWT for RLS | |
| try: | |
| token = getattr(getattr(res, "session", None), "access_token", None) | |
| if token: | |
| sb.postgrest.auth(token) | |
| except Exception: | |
| pass | |
| email = st.session_state["auth_email"] | |
| # Enhanced volunteer upsert | |
| volunteer_data = { | |
| "email": email, | |
| "last_login_at": datetime.utcnow().isoformat(), | |
| "shift_started_at": datetime.utcnow().isoformat(), | |
| "shift_ended_at": None, | |
| "login_count": 1 # Track login frequency | |
| } | |
| sb.table("volunteers").upsert(volunteer_data, on_conflict="email").execute() | |
| st.session_state["user_email"] = email | |
| st.session_state["shift_started"] = True | |
| st.session_state["last_activity_at"] = local_now() | |
| log_event("login_success", email, {"method": "otp"}) | |
| st.success("🎉 Welcome back! Your shift has started.") | |
| st.balloons() | |
| return True, email | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "expired" in error_msg.lower() or "invalid" in error_msg.lower(): | |
| st.error("❌ This code has expired or is invalid. Please request a new code.") | |
| # Clear the auth_email to allow new code request | |
| if "auth_email" in st.session_state: | |
| del st.session_state["auth_email"] | |
| st.rerun() | |
| else: | |
| st.error(f"❌ Verification failed: {error_msg}") | |
| log_event("otp_verification_failed", st.session_state.get("auth_email"), {"error": error_msg}, "error") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| return False, None | |
| def end_shift(email: str, reason: str): | |
| """Enhanced shift ending with better logging""" | |
| try: | |
| end_time = datetime.utcnow().isoformat() | |
| sb.table("volunteers").update({"shift_ended_at": end_time}).eq("email", email).execute() | |
| log_event("shift_end", email, {"reason": reason, "end_time": end_time}) | |
| except Exception as e: | |
| logger.error(f"Failed to end shift for {email}: {e}") | |
| def guard_cutoff_and_idle(email: str): | |
| """Enhanced session management with better UX""" | |
| now = local_now() | |
| # Enforce daily cutoff only when configured | |
| if CUTOFF_HOUR >= 0: | |
| cutoff = now.replace(hour=CUTOFF_HOUR, minute=0, second=0, microsecond=0) | |
| if now >= cutoff: | |
| end_shift(email, "cutoff_8pm") | |
| # Inactivity guard remains unchanged | |
| last = st.session_state.get("last_activity_at") | |
| if last and (now - last).total_seconds() > INACTIVITY_MIN * 60: | |
| end_shift(email, "inactivity") | |
| st.session_state.clear() | |
| st.warning("⏰ You were logged out due to inactivity. Thank you for volunteering today!") | |
| st.stop() | |
| st.session_state["last_activity_at"] = now | |
| # ------------------------ Main App Flow ------------------------ | |
| def main(): | |
| """Main application flow with modern UI""" | |
| # Authentication | |
| signed_in, user_email = auth_block() | |
| if not signed_in: | |
| st.stop() | |
| guard_cutoff_and_idle(user_email) | |
| # Modern welcome section | |
| st.markdown(ModernUIComponents.create_hero_section( | |
| "Care Count Dashboard", | |
| "Manage visits, track items, and make a difference in your community.", | |
| user_email | |
| ), unsafe_allow_html=True) | |
| # Enhanced status cards | |
| vrow = fetch_volunteer_row(user_email) or {} | |
| shift_started_at = vrow.get("shift_started_at") | |
| lifetime_hours = vrow.get("total_hours", 0) | |
| mins_active = 0 | |
| try: | |
| if shift_started_at: | |
| t0 = datetime.fromisoformat(str(shift_started_at).replace("Z","+00:00")) | |
| mins_active = max(0, int((datetime.utcnow() - t0).total_seconds() // 60)) | |
| except Exception: | |
| pass | |
| status_data = { | |
| "shift_active": f"{mins_active} min", | |
| "items_today": items_today(user_email), | |
| "lifetime_hours": f"{round(float(lifetime_hours), 1)} hrs" if isinstance(lifetime_hours, (int, float)) else "0.0 hrs" | |
| } | |
| st.markdown(ModernUIComponents.create_status_cards(status_data), unsafe_allow_html=True) | |
| # Modern sign-out button | |
| col1, col2, col3 = st.columns([1, 1, 1]) | |
| with col2: | |
| if st.button("🔒 Sign Out", use_container_width=True, type="secondary"): | |
| end_shift(user_email, "manual") | |
| st.session_state.clear() | |
| st.success("✅ Signed out successfully. See you next time!") | |
| st.rerun() | |
| # Enhanced visit management section | |
| st.markdown(ModernUIComponents.create_modern_form_section( | |
| "🪪 Visit Management", | |
| "Start and manage student visits with unique tracking codes" | |
| ), unsafe_allow_html=True) | |
| active_visit = st.session_state.get("active_visit") | |
| col1, col2, col3 = st.columns([1, 1, 1]) | |
| with col1: | |
| if not active_visit and st.button("🚀 Start New Visit", use_container_width=True, type="primary"): | |
| try: | |
| with st.spinner("Creating visit..."): | |
| payload = { | |
| "visit_code": fallback_visit_code(), | |
| "started_at": datetime.utcnow().isoformat(), | |
| "ended_at": None, | |
| "created_by": user_email | |
| } | |
| v = sb.table("visits").insert(payload).execute().data[0] | |
| if not v.get("visit_code"): | |
| v["visit_code"] = payload["visit_code"] | |
| st.session_state["active_visit"] = v | |
| st.success(f"✅ Visit #{v['id']} started") | |
| st.info(f"**Visit Code:** `{v['visit_code']}`") | |
| log_event("visit_start", user_email, {"visit_id": v["id"], "visit_code": v["visit_code"]}) | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"❌ Could not start visit: {e}") | |
| log_event("visit_start_failed", user_email, {"error": str(e)}, "error") | |
| with col2: | |
| if active_visit and st.button("🏁 End Visit", use_container_width=True, type="secondary"): | |
| try: | |
| with st.spinner("Ending visit..."): | |
| sb.table("visits").update({"ended_at": datetime.utcnow().isoformat()}) \ | |
| .eq("id", active_visit["id"]).execute() | |
| st.success("✅ Visit completed successfully") | |
| log_event("visit_end", user_email, {"visit_id": active_visit["id"]}) | |
| st.session_state.pop("active_visit", None) | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"❌ Could not end visit: {e}") | |
| log_event("visit_end_failed", user_email, {"error": str(e)}, "error") | |
| with col3: | |
| if st.session_state.get("active_visit"): | |
| v = st.session_state["active_visit"] | |
| st.info(f"**Active Visit:** #{v['id']}\n**Code:** {v.get('visit_code','')}") | |
| else: | |
| st.info("No active visit") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Enhanced item identification section | |
| st.markdown(ModernUIComponents.create_modern_form_section( | |
| "📸 Item Identification", | |
| "Use AI-powered image recognition to identify items quickly and accurately" | |
| ), unsafe_allow_html=True) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("📷 Camera Capture") | |
| cam = st.camera_input("Take a photo of the item", help="Position the item clearly in the frame") | |
| with col2: | |
| st.subheader("📁 File Upload") | |
| up = st.file_uploader( | |
| "Upload an image", | |
| type=["png","jpg","jpeg"], | |
| help="Supported formats: PNG, JPG, JPEG" | |
| ) | |
| img_file = cam or up | |
| if img_file: | |
| try: | |
| img = Image.open(img_file).convert("RGB") | |
| st.image(img, use_container_width=True, caption="Captured Image") | |
| if st.button("🔍 Identify Item with AI", use_container_width=True, type="primary"): | |
| with st.spinner("Analyzing image with AI..."): | |
| t0 = time.time() | |
| try: | |
| pre = preprocess_for_label(img) | |
| raw = gemma_item_name(_to_png_bytes(pre)) | |
| processing_time = time.time() - t0 | |
| norm = normalize_item_name(raw) | |
| if raw: | |
| st.success(f"🤖 **AI Detection:** {raw}") | |
| st.info(f"✨ **Normalized:** {norm or '(unknown)'} · ⏱️ {processing_time:.2f}s") | |
| st.session_state["scanned_item_name"] = norm or raw or "" | |
| st.session_state["last_activity_at"] = local_now() | |
| log_event("item_identified", user_email, { | |
| "raw_name": raw, | |
| "normalized_name": norm, | |
| "processing_time": processing_time | |
| }) | |
| except Exception as e: | |
| st.error(f"❌ AI identification failed: {e}") | |
| log_event("ai_identification_failed", user_email, {"error": str(e)}, "error") | |
| except Exception as e: | |
| st.error(f"❌ Failed to process image: {e}") | |
| log_event("image_processing_failed", user_email, {"error": str(e)}, "error") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Enhanced item logging section | |
| st.markdown(ModernUIComponents.create_modern_form_section( | |
| "📬 Item Logging", | |
| "Log items to the current visit with detailed information" | |
| ), unsafe_allow_html=True) | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| item_name = st.text_input( | |
| "Item Name", | |
| value=st.session_state.get("scanned_item_name",""), | |
| placeholder="Enter item name or use AI detection above", | |
| help="Required field - item name for tracking" | |
| ) | |
| with col2: | |
| quantity = st.number_input( | |
| "Quantity", | |
| min_value=1, | |
| max_value=9999, | |
| step=1, | |
| value=1, | |
| help="Number of items" | |
| ) | |
| col3, col4 = st.columns(2) | |
| with col3: | |
| category = st.text_input( | |
| "Category (optional)", | |
| placeholder="e.g., Food, Hygiene, Clothing", | |
| help="Item category for better organization" | |
| ) | |
| with col4: | |
| unit = st.text_input( | |
| "Unit (optional)", | |
| placeholder="e.g., 500 mL, 1 L, 250 g", | |
| help="Unit of measurement" | |
| ) | |
| barcode = st.text_input( | |
| "Barcode (optional)", | |
| placeholder="Scan or enter barcode", | |
| help="Product barcode for inventory tracking" | |
| ) | |
| save_disabled = not st.session_state.get("active_visit") | |
| if st.button("✅ Save Item to Visit", disabled=save_disabled, use_container_width=True, type="primary"): | |
| v = st.session_state.get("active_visit") | |
| if not v: | |
| st.warning("⚠️ Please start a visit first before logging items.") | |
| else: | |
| name_clean = clean_text(item_name, 120) | |
| if not name_clean: | |
| st.warning("⚠️ Item name is required.") | |
| else: | |
| with st.spinner("Saving item..."): | |
| ts_iso = datetime.utcnow().isoformat() | |
| ingest_id = deterministic_ingest_id(int(v["id"]), user_email, name_clean, int(quantity), ts_iso) | |
| try: | |
| ok, msg = try_rpc_ingest( | |
| email=user_email, v_id=int(v["id"]), name=name_clean, qty=int(quantity), | |
| category=clean_text(category, 80), unit=clean_text(unit, 40), | |
| barcode=clean_text(barcode, 64), ts_iso=ts_iso, ingest_id=ingest_id | |
| ) | |
| if ok: | |
| st.success("✅ Item logged successfully!") | |
| log_event("item_logged", user_email, { | |
| "visit_id": v["id"], | |
| "item_name": name_clean, | |
| "quantity": quantity | |
| }) | |
| else: | |
| st.warning(f"⚠️ {msg}. Trying fallback method...") | |
| fallback_direct_insert(user_email, int(v["id"]), name_clean, int(quantity), | |
| clean_text(category,80), clean_text(unit,40), | |
| clean_text(barcode,64), ts_iso, ingest_id) | |
| st.success("✅ Item logged successfully (fallback method)!") | |
| log_event("item_logged_fallback", user_email, { | |
| "visit_id": v["id"], | |
| "item_name": name_clean, | |
| "quantity": quantity | |
| }) | |
| except Exception as e: | |
| try: | |
| fallback_direct_insert(user_email, int(v["id"]), name_clean, int(quantity), | |
| clean_text(category,80), clean_text(unit,40), | |
| clean_text(barcode,64), ts_iso, ingest_id) | |
| st.success("✅ Item logged successfully (fallback method)!") | |
| log_event("item_logged_fallback", user_email, { | |
| "visit_id": v["id"], | |
| "item_name": name_clean, | |
| "quantity": quantity | |
| }) | |
| except Exception as e2: | |
| st.error(f"❌ Failed to log item: {e2}") | |
| log_event("item_log_failed", user_email, {"error": str(e2)}, "error") | |
| st.session_state["last_activity_at"] = local_now() | |
| st.session_state["scanned_item_name"] = name_clean | |
| st.rerun() | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Enhanced visit items view | |
| if st.session_state.get("active_visit"): | |
| st.markdown(ModernUIComponents.create_modern_form_section( | |
| "🧾 Current Visit Items", | |
| "Review and manage items in the current visit" | |
| ), unsafe_allow_html=True) | |
| rows = load_items_for_visit(int(st.session_state["active_visit"]["id"])) | |
| if rows: | |
| df = pd.DataFrame(rows) | |
| # Enhanced dataframe display | |
| st.dataframe( | |
| df, | |
| use_container_width=True, | |
| hide_index=True, | |
| column_config={ | |
| "timestamp": st.column_config.DatetimeColumn("Time", format="MM/DD/YY HH:mm"), | |
| "item_name": st.column_config.TextColumn("Item Name", width="medium"), | |
| "qty": st.column_config.NumberColumn("Quantity", width="small"), | |
| "category": st.column_config.TextColumn("Category", width="small"), | |
| "unit": st.column_config.TextColumn("Unit", width="small") | |
| } | |
| ) | |
| # Enhanced delete functionality | |
| with st.expander("🗑️ Delete Item (if mis-logged)"): | |
| if rows: | |
| item_options = {f"{r['item_name']} (Qty: {r['qty']})": r["id"] for r in rows if "id" in r} | |
| selected_item = st.selectbox("Select item to delete", list(item_options.keys())) | |
| if st.button("🗑️ Delete Selected Item", type="secondary"): | |
| item_id = item_options[selected_item] | |
| try: | |
| # Try both tables safely | |
| try: | |
| delete_item("visit_items_p", int(item_id)) | |
| except Exception: | |
| delete_item("visit_items", int(item_id)) | |
| st.success("✅ Item deleted successfully") | |
| log_event("item_deleted", user_email, {"item_id": item_id}) | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"❌ Failed to delete item: {e}") | |
| log_event("item_delete_failed", user_email, {"error": str(e)}, "error") | |
| else: | |
| st.info("No items to delete") | |
| else: | |
| st.info("📝 No items logged for this visit yet.") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Enhanced analytics section | |
| st.markdown(ModernUIComponents.create_modern_form_section( | |
| "📈 Today's Analytics", | |
| "Real-time insights into today's volunteer activity" | |
| ), unsafe_allow_html=True) | |
| try: | |
| daily = sb.table("v_daily_activity").select("*").execute().data | |
| today_str = local_now().strftime("%Y-%m-%d") | |
| today = next((r for r in daily if str(r.get("day",""))[:10]==today_str), None) | |
| if today: | |
| visits = int(today.get("visits", 0)) | |
| items = int(today.get("items", 0)) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Total Visits Today", visits, delta=None) | |
| with col2: | |
| st.metric("Total Items Processed", items, delta=None) | |
| # Progress indicator | |
| if visits > 0: | |
| st.markdown(ModernUIComponents.create_progress_indicator( | |
| items, visits * 10, "Items per Visit Target" | |
| ), unsafe_allow_html=True) | |
| else: | |
| st.info("📊 Analytics data will appear here as activity increases.") | |
| except Exception as e: | |
| st.info("📊 Analytics view will be available once the database view is set up.") | |
| logger.warning(f"Analytics view not available: {e}") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Enhanced footer with helpful information | |
| st.markdown(""" | |
| <div class="modern-card" style="margin-top: var(--space-8); text-align: center;"> | |
| <h4>💡 Quick Tips</h4> | |
| <p style="color: var(--gray-400); margin: 0;"> | |
| When you're done, please <strong>End Visit</strong> and <strong>Sign out</strong>.<br> | |
| We'll auto-sign you out after inactivity or at 8pm local time. 💜 | |
| </p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # ------------------------ Helper Functions (Enhanced) ------------------------ | |
| def fetch_volunteer_row(email: str) -> dict | None: | |
| """Enhanced volunteer data fetching with error handling""" | |
| try: | |
| return sb.table("volunteers").select("*").eq("email", email).single().execute().data | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch volunteer data for {email}: {e}") | |
| return None | |
| def items_today(email: str) -> int: | |
| """Enhanced item counting with better error handling""" | |
| try: | |
| day = local_now().strftime("%Y-%m-%d") | |
| # Try partitioned table first | |
| data = sb.table("visit_items_p").select("id,timestamp,volunteer") \ | |
| .gte("timestamp", f"{day} 00:00:00").lte("timestamp", f"{day} 23:59:59") \ | |
| .eq("volunteer", email).execute().data | |
| return len(data or []) | |
| except Exception: | |
| try: | |
| data = sb.table("visit_items").select("id,timestamp,volunteer") \ | |
| .gte("timestamp", f"{day} 00:00:00").lte("timestamp", f"{day} 23:59:59") \ | |
| .eq("volunteer", email).execute().data | |
| return len(data or []) | |
| except Exception as e: | |
| logger.warning(f"Failed to count items for {email}: {e}") | |
| return 0 | |
| def fallback_visit_code() -> str: | |
| """Enhanced visit code generation with better uniqueness""" | |
| try: | |
| day = local_now().strftime("%Y-%m-%d") | |
| todays = sb.table("visits").select("id,started_at").gte("started_at", f"{day} 00:00:00") \ | |
| .lte("started_at", f"{day} 23:59:59").execute().data or [] | |
| seq = len(todays) + 1 | |
| except Exception: | |
| seq = int(time.time()) % 1000 | |
| return f"V-{seq}-{local_now().strftime('%Y%m%d')}-{str(uuid.uuid4())[:6]}" | |
| def _to_png_bytes(img: Image.Image) -> bytes: | |
| """Convert image to PNG bytes""" | |
| b = io.BytesIO() | |
| img.save(b, format="PNG") | |
| return b.getvalue() | |
| def preprocess_for_label(img: Image.Image) -> Image.Image: | |
| """Enhanced image preprocessing for better AI recognition""" | |
| img = img.convert("RGB") | |
| w, h = img.size | |
| scale = 1024 / max(w, h) if max(w, h) > 1024 else 1.0 | |
| if scale < 1.0: | |
| img = img.resize((int(w * scale), int(h * scale))) | |
| img = ImageOps.autocontrast(img, cutoff=2) | |
| img = ImageEnhance.Brightness(img).enhance(1.06) | |
| img = ImageEnhance.Contrast(img).enhance(1.05) | |
| img = ImageEnhance.Sharpness(img).enhance(1.15) | |
| return img | |
| def gemma_item_name(img_bytes: bytes) -> str: | |
| """Enhanced AI item identification with better error handling""" | |
| try: | |
| primary_err = None | |
| if PROVIDER == "nebius": | |
| try: | |
| if not NEBIUS_API_KEY: | |
| raise RuntimeError("NEBIUS_API_KEY missing") | |
| return _openai_style_chat(NEBIUS_BASE_URL, NEBIUS_API_KEY, GEMMA_MODEL, img_bytes) | |
| except Exception as e: | |
| primary_err = e | |
| if PROVIDER == "featherless": | |
| try: | |
| if not FEATH_API_KEY: | |
| raise RuntimeError("FEATHERLESS_API_KEY missing") | |
| return _openai_style_chat(FEATH_BASE_URL, FEATH_API_KEY, GEMMA_MODEL, img_bytes) | |
| except Exception as e: | |
| primary_err = e | |
| # Fallback chain if primary failed or unknown provider | |
| if FEATH_API_KEY: | |
| return _openai_style_chat(FEATH_BASE_URL, FEATH_API_KEY, GEMMA_MODEL, img_bytes) | |
| raise primary_err or RuntimeError(f"Unknown PROVIDER: {PROVIDER}") | |
| except Exception as e: | |
| logger.error(f"AI identification failed: {e}") | |
| raise | |
| def _openai_style_chat(base_url: str, api_key: str, model_id: str, img_bytes: bytes) -> str: | |
| """Enhanced API communication with better error handling""" | |
| b64 = base64.b64encode(img_bytes).decode("utf-8") | |
| url = f"{base_url.rstrip('/')}/chat/completions" | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| payload = { | |
| "model": model_id, | |
| "temperature": 0, | |
| "messages": [ | |
| {"role": "system", "content": "You label item being held in the image for a food bank. Return ONLY the item name."}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "What is the name of the item in the picture? Return only the item name."}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}} | |
| ]} | |
| ] | |
| } | |
| try: | |
| r = requests.post(url, json=payload, headers=headers, timeout=90) | |
| if r.status_code != 200: | |
| raise RuntimeError(f"LLM HTTP {r.status_code}: {r.text[:200]}") | |
| data = r.json() | |
| return (data["choices"][0]["message"]["content"] or "").strip() | |
| except requests.exceptions.Timeout: | |
| raise RuntimeError("AI service timeout - please try again") | |
| except requests.exceptions.RequestException as e: | |
| raise RuntimeError(f"AI service error: {e}") | |
| def normalize_item_name(s: str) -> str: | |
| """Enhanced item name normalization""" | |
| s = (s or "").strip() | |
| if not s: | |
| return "" | |
| # Enhanced brand and type recognition | |
| BRANDS = { | |
| "whiskas","tetley","kellogg's","kelloggs","campbell's","campbells","heinz", | |
| "nestle","kraft","general mills","cheerios","oreo","oreos","pringles","lays","doritos", | |
| "ice river","green bottle","great value","wheat thins","vegetable thins","raid" | |
| } | |
| GENERIC_TYPES = { | |
| "water","toothpaste","deodorant","antiperspirant","soap","shampoo","conditioner", | |
| "lotion","tea","coffee","cereal","pasta","rice","beans","sauce","salsa","cleaner", | |
| "peanut butter","jam","jelly","tuna","chicken","beef","flour","sugar","salt","oil", | |
| "crackers","cookies","soup","insect killer","spray" | |
| } | |
| low = re.sub(r"[®™]", "", s.lower()) | |
| for b in BRANDS: | |
| low = low.replace(b, "") | |
| chosen = None | |
| for t in GENERIC_TYPES: | |
| if t in low: | |
| chosen = t | |
| break | |
| cleaned = " ".join(low.split()) | |
| return (chosen or cleaned.title())[:120] | |
| def clean_text(v: Optional[str], maxlen: int = 120) -> Optional[str]: | |
| """Enhanced text cleaning""" | |
| if not v: | |
| return None | |
| v = re.sub(r"\s+", " ", v).strip() | |
| return v[:maxlen] if v else None | |
| def deterministic_ingest_id(v_id: int, email: str, name: str, qty: int, ts_iso: str) -> str: | |
| """Enhanced ID generation for data integrity""" | |
| key = f"visit_items::{v_id}::{email}::{name}::{qty}::{ts_iso}" | |
| return str(uuid.uuid5(uuid.NAMESPACE_URL, key)) | |
| def try_rpc_ingest(email: str, v_id: int, name: str, qty: int, | |
| category: Optional[str], unit: Optional[str], | |
| barcode: Optional[str], ts_iso: str, ingest_id: str) -> tuple[bool,str]: | |
| """Enhanced RPC ingestion with better error handling""" | |
| try: | |
| res = sb.rpc("safe_ingest_visit_item", { | |
| "p_email": email, | |
| "p_visit_id": v_id, | |
| "p_item_name": name, | |
| "p_qty": qty, | |
| "p_category": category, | |
| "p_unit": unit, | |
| "p_barcode": barcode, | |
| "p_ts": ts_iso, | |
| "p_ingest_id": ingest_id | |
| }).execute() | |
| rows = res.data if isinstance(res.data, list) else [] | |
| if rows: | |
| r0 = rows[0] | |
| ok = bool(r0.get("ok", False)) | |
| msg = str(r0.get("msg", "")) | |
| return ok, msg or ("ok" if ok else "failed") | |
| return True, "ok" | |
| except Exception as e: | |
| logger.warning(f"RPC ingest failed, using fallback: {e}") | |
| raise e | |
| def fallback_direct_insert(email: str, v_id: int, name: str, qty: int, | |
| category: Optional[str], unit: Optional[str], | |
| barcode: Optional[str], ts_iso: str, ingest_id: str) -> None: | |
| """Enhanced fallback insertion with better error handling""" | |
| payload = { | |
| "visit_id": v_id, | |
| "timestamp": ts_iso, | |
| "volunteer": email, | |
| "item_name": name, | |
| "category": category, | |
| "unit": unit, | |
| "qty": qty, | |
| "barcode": barcode, | |
| "weather_type": None, | |
| "temp_c": None, | |
| "ingest_id": ingest_id | |
| } | |
| try: | |
| sb.table("visit_items_p").insert(payload).execute() | |
| except Exception: | |
| # Legacy table fallback | |
| payload.pop("ingest_id", None) | |
| sb.table("visit_items").insert(payload).execute() | |
| def load_items_for_visit(visit_id: int) -> list[dict]: | |
| """Enhanced item loading with better error handling""" | |
| try: | |
| return sb.table("visit_items_p").select("*").eq("visit_id", visit_id) \ | |
| .order("timestamp", desc=True).limit(500).execute().data or [] | |
| except Exception: | |
| try: | |
| return sb.table("visit_items").select("*").eq("visit_id", visit_id) \ | |
| .order("timestamp", desc=True).limit(500).execute().data or [] | |
| except Exception as e: | |
| logger.warning(f"Failed to load items for visit {visit_id}: {e}") | |
| return [] | |
| def delete_item(table: str, item_id: int): | |
| """Enhanced item deletion with better error handling""" | |
| try: | |
| sb.table(table).delete().eq("id", item_id).execute() | |
| except Exception as e: | |
| logger.error(f"Failed to delete item {item_id} from {table}: {e}") | |
| raise e | |
| # ------------------------ App Configuration Display ------------------------ | |
| st.sidebar.markdown("### ⚙️ Configuration") | |
| st.sidebar.info(f""" | |
| **Provider:** `{PROVIDER}` | |
| **Model:** `{GEMMA_MODEL}` | |
| **Timezone:** `{TZ}` | |
| **Cutoff:** {CUTOFF_HOUR}:00 PM | |
| **Inactivity:** {INACTIVITY_MIN} min | |
| """) | |
| # ------------------------ Main App Execution ------------------------ | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except Exception as e: | |
| logger.error(f"Application error: {e}") | |
| st.error(f"❌ Application error: {e}") | |
| st.info("Please refresh the page or contact support if the issue persists.") | |