|
|
""" |
|
|
β‘ Performance Optimization & Dataset Integration β‘ |
|
|
|
|
|
1. Website Loading: |
|
|
- Telegram and Gmail email sending are commented out to avoid |
|
|
restrictions on Hugging Face free trial spaces. |
|
|
- Focus on fast page load and smooth API responses. |
|
|
- Images are served from optimized folders for faster rendering. |
|
|
|
|
|
2. Booking Data: |
|
|
- All booking data is now saved directly to Hugging Face Dataset |
|
|
using save_output_to_dataset(). |
|
|
- Local JSON file writes are disabled to reduce disk I/O and latency. |
|
|
- Each booking is stored with a timestamp and full user/product info. |
|
|
|
|
|
3. API Updates: |
|
|
- search_image and search_text APIs are fully functional. |
|
|
- Frontend can use these APIs without delay. |
|
|
- Optimized static files are served via /optimized/ route. |
|
|
|
|
|
Overall, these changes improve website performance, reduce dependency on |
|
|
external services, and securely store all booking data in the cloud dataset. |
|
|
""" |
|
|
|
|
|
from huggingface_hub import HfApi |
|
|
from datetime import datetime |
|
|
from flask import Flask, request, jsonify, render_template, send_from_directory |
|
|
from siglip_search_2 import search_image, search_text |
|
|
import os |
|
|
import json |
|
|
import requests |
|
|
import base64 |
|
|
import io |
|
|
import mimetypes |
|
|
import smtplib |
|
|
from email.mime.multipart import MIMEMultipart |
|
|
from email.mime.text import MIMEText |
|
|
from email.mime.image import MIMEImage |
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
env_blob = os.environ.get("find_it_or_create", "") |
|
|
secrets = {} |
|
|
|
|
|
for line in env_blob.splitlines(): |
|
|
if "=" in line: |
|
|
k, v = line.split("=", 1) |
|
|
secrets[k.strip()] = v.strip().strip('"') |
|
|
|
|
|
|
|
|
def get_secret(key: str, default=None): |
|
|
return os.environ.get(key) or secrets.get(key, default) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GMAIL_USER = get_secret("GMAIL_USER") |
|
|
GMAIL_PASSWORD = get_secret("GMAIL_PASSWORD") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bot_token = get_secret("TELEGRAM_BOT_TOKEN") |
|
|
chat_id = get_secret("TELEGRAM_CHAT_ID") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BOOKING_FILE = "bookings.json" |
|
|
UPLOAD_FOLDER = "uploads" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
env_blob = os.environ.get("find_it_or_create", "") |
|
|
secrets = {} |
|
|
|
|
|
for line in env_blob.splitlines(): |
|
|
if "=" in line: |
|
|
k, v = line.split("=", 1) |
|
|
secrets[k.strip()] = v.strip().strip('"') |
|
|
|
|
|
|
|
|
|
|
|
hf_token = get_secret("HF_TOKEN") |
|
|
dataset_repo = "mlbhanuprakash/data" |
|
|
|
|
|
api = HfApi() |
|
|
|
|
|
def save_output_to_dataset(output): |
|
|
|
|
|
timestamp = datetime.now().isoformat().replace(":", "-") |
|
|
filename = f"data_{timestamp}.json" |
|
|
local_file = f"/tmp/{filename}" |
|
|
|
|
|
|
|
|
entry = { |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"output": output |
|
|
} |
|
|
|
|
|
|
|
|
with open(local_file, "w") as f: |
|
|
json.dump(entry, f, indent=2) |
|
|
|
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=local_file, |
|
|
path_in_repo="JSON_data/"+filename, |
|
|
repo_id=dataset_repo, |
|
|
token=hf_token, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Add new output file {filename}" |
|
|
) |
|
|
|
|
|
print(f"β
Output saved as {filename} in Hugging Face Dataset!") |
|
|
|
|
|
|
|
|
UPLOAD_FOLDER = "/tmp" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""" |
|
|
def send_email(to_email, subject, body, product_image_str=None): |
|
|
msg = MIMEMultipart() |
|
|
msg['From'] = GMAIL_USER |
|
|
msg['To'] = to_email |
|
|
msg['Subject'] = subject |
|
|
|
|
|
# Attach text body |
|
|
msg.attach(MIMEText(body, 'plain')) |
|
|
|
|
|
# If product image is available, try attaching |
|
|
if product_image_str: |
|
|
try: |
|
|
img_bytes = None |
|
|
|
|
|
# Case 1: Data URI (base64) |
|
|
if product_image_str.startswith("data:"): |
|
|
header, b64data = product_image_str.split(",", 1) |
|
|
img_bytes = base64.b64decode(b64data) |
|
|
|
|
|
# Case 2: HTTP(S) URL β download the image |
|
|
elif product_image_str.startswith("http://") or product_image_str.startswith("https://"): |
|
|
resp = requests.get(product_image_str, timeout=10) |
|
|
if resp.status_code == 200: |
|
|
img_bytes = resp.content |
|
|
|
|
|
# Case 3: Local file path |
|
|
elif os.path.exists(product_image_str): |
|
|
with open(product_image_str, "rb") as f: |
|
|
img_bytes = f.read() |
|
|
|
|
|
# If we got image bytes, attach them |
|
|
if img_bytes: |
|
|
image = MIMEImage(img_bytes, name="product_image.jpg") |
|
|
msg.attach(image) |
|
|
|
|
|
except Exception as e: |
|
|
app.logger.error(f"Failed to attach product image: {e}") |
|
|
|
|
|
# Send email via Gmail SMTP |
|
|
try: |
|
|
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server: |
|
|
server.login(GMAIL_USER, GMAIL_PASSWORD) |
|
|
server.sendmail(GMAIL_USER, to_email, msg.as_string()) |
|
|
app.logger.info(f"Email sent to {to_email}") |
|
|
except Exception as e: |
|
|
app.logger.error(f"Email sending failed: {e}") |
|
|
return False |
|
|
return True""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/") |
|
|
def index(): |
|
|
return render_template("index.html") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/search_image", methods=["POST"]) |
|
|
|
|
|
def search_image_api(): |
|
|
"""Handle image upload and run SigLIP image search""" |
|
|
if "file" not in request.files: |
|
|
return jsonify({"error": "No file uploaded"}), 400 |
|
|
|
|
|
file = request.files["file"] |
|
|
if file.filename == "": |
|
|
return jsonify({"error": "Empty filename"}), 400 |
|
|
|
|
|
filepath = os.path.join(UPLOAD_FOLDER, file.filename) |
|
|
file.save(filepath) |
|
|
app.logger.info(f"Received image: {filepath}") |
|
|
try: |
|
|
results = search_image(filepath, topk=1) |
|
|
app.logger.info(f"Returning {len(results)} image results") |
|
|
except Exception as e: |
|
|
app.logger.error(f"{e}") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
return jsonify(results) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/search_text", methods=["POST"]) |
|
|
def search_text_api(): |
|
|
"""Handle text query and run SigLIP text search""" |
|
|
data = request.get_json() |
|
|
query = data.get("query") if data else None |
|
|
|
|
|
if not query: |
|
|
return jsonify({"error": "No search query provided"}), 400 |
|
|
|
|
|
app.logger.info(f"Received text query: {query}") |
|
|
try: |
|
|
results = search_text(query, topk=5) |
|
|
app.logger.info(f"Returning {len(results)} text results") |
|
|
except Exception as e: |
|
|
app.logger.error(f"{e}") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
return jsonify(results) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/save_booking", methods=["POST"]) |
|
|
def save_booking(): |
|
|
data = request.get_json() |
|
|
if not data: |
|
|
return jsonify({"error": "No data received"}), 400 |
|
|
|
|
|
|
|
|
product = data.get("productResult", {}) |
|
|
|
|
|
|
|
|
product_image_str = (data.get("productImage") |
|
|
or product.get("image_url") |
|
|
or product.get("uploaded_image") |
|
|
or "") |
|
|
|
|
|
saved_data = { |
|
|
"name": data.get("name"), |
|
|
"email": data.get("email"), |
|
|
"phone": data.get("phone"), |
|
|
"location": data.get("location"), |
|
|
"chest": data.get("chest"), |
|
|
"waist": data.get("waist"), |
|
|
"shoulder": data.get("shoulder"), |
|
|
"sleeve": data.get("sleeve"), |
|
|
"selected Size": data.get("selectedSize"), |
|
|
"product Image": product_image_str, |
|
|
"search Query": (product.get("search_query") or data.get("searchQuery") or ""), |
|
|
"designer ID/Employee ID ": data.get("designerID"), |
|
|
"designer Name": data.get("designerName") |
|
|
} |
|
|
|
|
|
if os.path.exists(BOOKING_FILE): |
|
|
with open(BOOKING_FILE, "r", encoding="utf-8") as f: |
|
|
try: |
|
|
bookings = json.load(f) |
|
|
except json.JSONDecodeError: |
|
|
bookings = [] |
|
|
else: |
|
|
bookings = [] |
|
|
|
|
|
save_output_to_dataset(saved_data) |
|
|
""" |
|
|
bookings.append(saved_data) |
|
|
# with open(BOOKING_FILE, "w", encoding="utf-8") as f: |
|
|
# json.dump(bookings, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print("sssssssssssssssssssssssssss",saved_data) |
|
|
subject = f"New Task Assigned: {saved_data.get('name')}" |
|
|
|
|
|
# ----------------------------- |
|
|
# Telegram message & photo sending |
|
|
# ----------------------------- |
|
|
global bot_token, chat_id |
|
|
message = ( |
|
|
f"π’ *New Task Assignment* π’\n\n" |
|
|
f"Hello {saved_data.get('designer Name')},\n\n" |
|
|
f"Assigned Designer: {saved_data.get('designer Name')} (Employee ID: {saved_data.get('designer ID/Employee ID ')})\n\n" |
|
|
f"A new customer request has been assigned to you automatically by our system.\n\n" |
|
|
f"--- π€ Customer Details ---\n" |
|
|
f"Name: {saved_data.get('name')}\n" |
|
|
f"Email: {saved_data.get('email')}\n" |
|
|
f"Phone: {saved_data.get('phone')}\n" |
|
|
f"Location: {saved_data.get('location')}\n\n" |
|
|
f"--- π Measurements ---\n" |
|
|
f"Chest: {saved_data.get('chest')} in\n" |
|
|
f"Waist: {saved_data.get('waist')} in\n" |
|
|
f"Shoulder: {saved_data.get('shoulder')} in\n" |
|
|
f"Sleeve: {saved_data.get('sleeve')} in\n" |
|
|
f"Selected Size: {saved_data.get('selected Size')}\n\n" |
|
|
f"--- ποΈ Product Details ---\n" |
|
|
f"Search Query: {saved_data.get('search Query')}\n\n" |
|
|
f"--- π Assignment Info ---\n" |
|
|
f"Task generated automatically by system bot.\n\n" |
|
|
f"If you have any queries, please reach out to Krish (Team Lead).\n\n" |
|
|
f"β‘ Please proceed with this task at your earliest convenience.\n\n" |
|
|
f"Thank you! π" |
|
|
) |
|
|
body = message""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
if product_image_str: |
|
|
try: |
|
|
# Data URI |
|
|
if product_image_str.startswith("data:"): |
|
|
header, b64data = product_image_str.split(",", 1) |
|
|
img_bytes = base64.b64decode(b64data) |
|
|
mime = "image/jpeg" |
|
|
try: |
|
|
mime = header.split(";")[0].split(":")[1] |
|
|
except Exception: |
|
|
pass |
|
|
ext = mimetypes.guess_extension(mime) or ".jpg" |
|
|
filename = f"photo{ext}" |
|
|
bio = io.BytesIO(img_bytes) |
|
|
bio.name = filename |
|
|
files = {'photo': (filename, bio, mime)} |
|
|
resp = requests.post( |
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto', |
|
|
data={'chat_id': chat_id, 'caption': f"Product image for {message}"}, |
|
|
files=files, |
|
|
timeout=30 |
|
|
) |
|
|
if resp.status_code != 200: |
|
|
app.logger.error("Telegram sendPhoto (data URI) failed: %s %s", resp.status_code, resp.text) |
|
|
# HTTP(S) URL |
|
|
elif product_image_str.startswith("http://") or product_image_str.startswith("https://"): |
|
|
resp = requests.post( |
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto', |
|
|
data={'chat_id': chat_id, 'photo': product_image_str, 'caption': f"Product image for {saved_data.get('name')}"}, |
|
|
timeout=15 |
|
|
) |
|
|
if resp.status_code != 200: |
|
|
app.logger.error("Telegram sendPhoto (URL) returned %s: %s", resp.status_code, resp.text) |
|
|
requests.get( |
|
|
f'https://api.telegram.org/bot{bot_token}/sendMessage', |
|
|
params={'chat_id': chat_id, 'text': message}, |
|
|
timeout=10 |
|
|
) |
|
|
# Local file or path |
|
|
else: |
|
|
candidates = [ |
|
|
product_image_str, |
|
|
os.path.join(os.getcwd(), product_image_str), |
|
|
os.path.join(os.getcwd(), UPLOAD_FOLDER, os.path.basename(product_image_str)), |
|
|
os.path.join(os.getcwd(), 'images', os.path.basename(product_image_str)) |
|
|
] |
|
|
found = None |
|
|
for p in candidates: |
|
|
if p and os.path.exists(p): |
|
|
found = p |
|
|
break |
|
|
if found: |
|
|
with open(found, 'rb') as photo_f: |
|
|
filename = os.path.basename(found) |
|
|
mime = mimetypes.guess_type(found)[0] or 'application/octet-stream' |
|
|
files = {'photo': (filename, photo_f, mime)} |
|
|
resp = requests.post( |
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto', |
|
|
data={'chat_id': chat_id, 'caption': f"Product image for {saved_data.get('name')}"}, |
|
|
files=files, |
|
|
timeout=30 |
|
|
) |
|
|
if resp.status_code != 200: |
|
|
app.logger.error("Telegram sendPhoto (local) returned %s: %s", resp.status_code, resp.text) |
|
|
else: |
|
|
try: |
|
|
base = request.host_url.rstrip('/') |
|
|
photo_url = base + '/' + product_image_str.lstrip('/') |
|
|
resp = requests.post( |
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto', |
|
|
data={'chat_id': chat_id, 'photo': photo_url, 'caption': f"Product image for {saved_data.get('name')}"}, |
|
|
timeout=15 |
|
|
) |
|
|
if resp.status_code != 200: |
|
|
app.logger.error("Telegram sendPhoto (fallback URL) returned %s: %s", resp.status_code, resp.text) |
|
|
except Exception as e: |
|
|
app.logger.error("Telegram fallback URL attempt failed: %s", e) |
|
|
except Exception as e: |
|
|
app.logger.exception("Telegram photo send failed: %s", e) |
|
|
else: |
|
|
requests.get( |
|
|
f'https://api.telegram.org/bot{bot_token}/sendMessage', |
|
|
params={'chat_id': chat_id, 'text': message}, |
|
|
timeout=10 |
|
|
) |
|
|
# Send email |
|
|
email_sent = send_email("[email protected]", subject, body, product_image_str) |
|
|
if not email_sent: |
|
|
return jsonify({"error": "Booking saved but email failed"}), 500 |
|
|
""" |
|
|
|
|
|
return jsonify({"message": "Booking successfully saved"}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/optimized/<path:filename>") |
|
|
def serve_images(filename): |
|
|
return send_from_directory("images", filename) |
|
|
|
|
|
@app.route("/uploads/<path:filename>") |
|
|
def serve_uploads(filename): |
|
|
return send_from_directory(UPLOAD_FOLDER, filename) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 5000)), debug=True) |