Bridge25's picture
Update app.py
c1c6691 verified
# -*- coding: utf-8 -*-
"""Untitled0.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1RZKKUOkD7gN7jCFfzMWwmIADVoADyNUj
"""
print("🧪 APP.PY 已開始執行")
import zipfile
import gradio as gr
from google import genai
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch
import os
import json
import regex as re
from langchain_community.document_loaders import JSONLoader
from langchain.text_splitter import TokenTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload
import io
import copy
# 解壓縮 vector_store.zip(如果還沒解壓縮)
if not os.path.exists("vector_store"):
with zipfile.ZipFile("vector_store.zip", "r") as zip_ref:
zip_ref.extractall("vector_store")
# ==== 初始化向量資料庫與模型 ====
data_path = "vector_store" # 請上傳該資料夾到 HF Space
# 加了一層 vector_store
persist_dir = os.path.join(data_path, "vector_store", "ptt_cvs_articles_chroma")
#這裡是Ver_4新增區(使用者回饋)______________________________________________________________________________________________________________________________
#建立使用者回饋資料夾
USER_BASE_DIR = "/home/user/user_profiles"
os.makedirs(USER_BASE_DIR, exist_ok=True)
GOOGLE_DRIVE_FOLDER_ID = "11bSM6UezPID7fCqfSZk2ykGOTO9FT2x-"
def save_user_profile(name, new_preference):
if not name.strip():
return "❌ 請輸入有效的名字"
# === 先嘗試從 Google Drive 讀取原有檔案 ===
filename = f"{name}_config.json"
existing_data = {}
try:
existing_data = load_user_profile_from_drive(name) or {}
except Exception as e:
print("⚠️ 無法讀取舊檔案,將建立新檔")
# === 建構新的資料 ===
updated_data = {
"name": name,
"feedback": existing_data.get("feedback", []) + [new_preference]
}
# === 上傳新的 JSON 檔案(覆蓋) ===
file_id = upload_json_to_drive(updated_data, filename, GOOGLE_DRIVE_FOLDER_ID)
return f"✅ 新回饋已新增並上傳到 Google Drive(檔案 ID:{file_id})"
profile_interface = gr.Interface(
fn=save_user_profile,
inputs=[
gr.Textbox(label="請輸入您的名字"),
gr.Textbox(label="請輸入您的美食偏好(如:想吃健康、喜歡甜食)")
],
outputs="text",
title="設定個人偏好",
description="這裡可以儲存您專屬的美食偏好,未來問問題時會自動考慮進去。"
)
def upload_json_to_drive(json_data, filename, folder_id):
# 從環境變數讀取金鑰字串
key_json_str = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
key_info = json.loads(key_json_str)
credentials = service_account.Credentials.from_service_account_info(key_info)
service = build("drive", "v3", credentials=credentials)
# 查找是否已存在同名檔案
query = f"'{folder_id}' in parents and name = '{filename}' and trashed = false"
results = service.files().list(q=query, spaces='drive', fields='files(id)', pageSize=1).execute()
files = results.get("files", [])
media = MediaIoBaseUpload(
io.BytesIO(json.dumps(json_data, ensure_ascii=False).encode("utf-8")),
mimetype="application/json"
)
if files:
file_id = files[0]['id']
file = service.files().update(fileId=file_id, media_body=media).execute()
print(f"🔁 已更新 {filename},ID:{file_id}")
else:
file_metadata = {
"name": filename,
"parents": [folder_id],
"mimeType": "application/json"
}
file = service.files().create(body=file_metadata, media_body=media, fields="id").execute()
file_id = file.get("id")
print(f"✅ 已上傳新檔案 {filename},ID:{file_id}")
return file_id
def load_user_profile_from_drive(name):
key_json_str = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
key_info = json.loads(key_json_str)
credentials = service_account.Credentials.from_service_account_info(key_info)
service = build("drive", "v3", credentials=credentials)
query = f"'{GOOGLE_DRIVE_FOLDER_ID}' in parents and name = '{name}_config.json'"
results = service.files().list(q=query, spaces='drive', fields='files(id, name)', pageSize=1).execute()
files = results.get("files", [])
if not files:
print(f"❌ 找不到 {name} 的偏好設定檔")
return None
file_id = files[0]["id"]
request = service.files().get_media(fileId=file_id)
response = request.execute()
json_data = json.loads(response.decode("utf-8"))
print(f"✅ 成功讀取 {name} 的偏好設定:", json_data)
return json_data
#這裡是Ver_4新增區______________________________________________________________________________________________________________________________
print("📁 vector_store 內容:", os.listdir("vector_store"))
print("📁 persist_dir =", persist_dir)
print("📂 persist_dir 存在嗎?", os.path.exists(persist_dir))
if os.path.exists(persist_dir):
print("📄 persist_dir 內容物 =", os.listdir(persist_dir))
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-m3",
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True}
)
vector_store = Chroma(
embedding_function=embeddings,
persist_directory=persist_dir
)
# 初始化 Gemini
client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY"))
# 設定 Gemini 回應參數
generation_config=genai.types.GenerateContentConfig(
temperature= 0.7,
top_p= 1,
top_k= 1,
max_output_tokens= 1024,
tools=[genai.types.Tool(
google_search=GoogleSearch()
)]
)
# 查詢向量資料庫,取出前20筆分數 > 0.3 的文章
def document_retrieval(query):
print("🔍 查詢 query =", query)
results = vector_store.similarity_search_with_relevance_scores(query, k=20)
results = [doc for doc, score in results if score > 0.3]
print("📄 查到篇數 =", len(results))
return results
# 擷取 Gemini 回傳 JSON 格式
'''
def extract_JSON_obj(text):
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and start < end:
try:
return json.loads(text[start:end+1])
except Exception as e:
print("❌ extract_JSON-Obj 錯誤:", e)
return None
'''
def extract_JSON_obj(text):
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and start < end:
json_str = text[start:end+1]
try:
# 嘗試先用 json.loads()
return json.loads(json_str)
except json.JSONDecodeError as e:
print("❌ json.loads() 失敗,嘗試修正 Unicode:", e)
try:
# 將錯誤的 \uXXXX 替換成轉義後的版本(防止非法 unicode)
json_str_fixed = re.sub(r'\\u(?![0-9a-fA-F]{4})', r'\\\\u', json_str)
return json.loads(json_str_fixed)
except Exception as e2:
print("❌ 修正後仍錯誤:", e2)
return None
# 使用 Gemini 對檢索出的文章進行摘要與推薦
def celebrity_food_query(name):
system_prompt = f"""
你是一現實人物與虛擬角色的吃飯愛好分析師,請「上網搜尋」有關「{name}」的飲食偏好或者是相關資料(請注意中日英人名翻譯及簡稱的問題,要盡量搜尋)。
請給我純文字內容,分析列出人物的食物喜好、忌口、常見飲食習慣等(如:喜歡吃高蛋白、避免糖類、愛喝果昔等)。
如果找不到相關資訊也完全無法推測,請直接回覆「完全搜尋不到也無法推測 {name} 的飲食偏好」。
"""
config_celeb=copy.deepcopy(generation_config)
config_celeb.system_instruction=system_prompt
config_celeb.max_output_tokens= 512
response = client.models.generate_content(
model='gemini-2.0-flash',
contents=f"請列出 {name} 的飲食偏好",
config=config_celeb,
)
print(response.text)
return response.text.strip()
def answer_generation(question, res_docs, style,preference=""):
print("🧠 正在執行 answer_generation()")
base_instruction = '''
你是一位熟悉超商美食的專家,這是我的問題<question></question>與幾篇相關文章<text></text>還有我的偏好回饋「{preference}」,
請從每篇文章中摘出與問題最相關的重點(如:評分、熱量、口感、CP值等),並統整成有風格的回答「{preference}」。
也請判斷問題類型,決定回覆模式:
推薦方面:根據我的狀態或心情推薦我十篇文章中的三篇
選擇類型:結合我問句想要你評判的產品內容,及上面針對不同店家與食品類別的文章分析,說明你覺得我要不要買。整體風格依據如下。
'''
style_prompts = {
"理性分析": "請以資訊條列式、簡潔明確的方式回答,列出推薦商品與原因。",
"可愛輕鬆": "請以輕鬆可愛、有情緒、有表情符號的語氣回答,就像 IG 限動那樣推薦食物,要有趣有梗。",
"中英雙語": "回答請使用中英對照的方式,每個段落先英文、再中文,語氣自然親切、提供外國人食物推薦",
"直白推薦": "不廢話、直接提供簡單回覆"}
sys_instruction = base_instruction.format(preference=preference) + "\n" + style_prompts.get(style, "")
config_answer=copy.deepcopy(generation_config)
config_answer.system_instruction=sys_instruction
chat_answer = client.chats.create(
model='gemini-2.0-flash',
config=config_answer,
history=[]
)
user_input = [f'<question>{question}</question>']
for res in res_docs:
text_str = f'''<text><title>{res.metadata.get("Title", "")}</title><content>{res.page_content}</content></text>'''
user_input.append(text_str)
user_input_str = '\n'.join(user_input)
response = chat_answer.send_message(user_input_str)
return response.text
def gradio_query_processing(preference, question):
print("🔁 正在重新執行 query_processing()")
sys_instruction = '''
你是一位熟悉超商美食的專家,以下將提供給你(使用者的偏好/一位名人的偏好文章)與問題。
請你根據這些資訊,將問題改寫成一個適合檢索超商美食文章的查詢問句。
請務必考慮(使用者的偏好/一位名人的偏好文章)<preference></preference>
輸出格式如下:
{
"question": "...", # 原始問題(可改寫為完整句)
"query": "..." # 用於語意檢索的精簡查詢關鍵詞
}
並只回傳 JSON 物件(請確保內容皆是字元),不需解釋。
'''
system_prompt = sys_instruction.replace("{preference}", preference)
config_query=copy.deepcopy(generation_config)
config_query.system_instruction=system_prompt
print(f'config為{config_query}')
try:
payload = json.dumps({"question": question})
chat_query = client.chats.create(
model='gemini-2.0-flash',
config=config_query,
history=[]
)
preference_input={f'<preference>{preference}</preference>'}
response = chat_query.send_message(f'{preference_input}問題是{payload}')
print("📨 Gemini 回應:", extract_JSON_obj(response.text))
return extract_JSON_obj(response.text)
except Exception as e:
print("⚠️ query_processing 錯誤:", e)
return {"question": question, "query": question}
# 主流程:整合 Gradio + RAG
def gradio_rag(name, question, style):
if name.strip():
user_data = load_user_profile_from_drive(name)
feedback_list = user_data.get("feedback", []) if user_data else []
preference = "\n".join(feedback_list)
else:
preference = ""
yield "正在分析問題..."
query_obj = gradio_query_processing(preference, question)
if not query_obj:
yield "❌ 無法處理問題"
return
yield f"✅ 查詢問題:{query_obj['query']}"
res_docs = document_retrieval(query_obj["query"])
if not res_docs:
yield "找不到相關內容。"
return
yield f"以「{style}」風格生成回答中..."
answer = answer_generation(query_obj["question"], res_docs, style, preference=preference)
yield answer
main_interface = gr.Interface(
fn=gradio_rag,
inputs=[
gr.Textbox(label="請輸入您的名字(選填)"),
gr.Textbox(label="請輸入您想問的問題"),
gr.Radio(
["理性分析", "可愛輕鬆", "中英雙語", "直白推薦"],
label="請選擇你想要的回答風格",
value="理性分析"
)
],
outputs=gr.Markdown(label="回答"),
title="PTT 超商美食推薦小助手",
description="請輸入您的名字與問題,系統將根據您的偏好提供推薦。"
)
def celebrity_rag(celebrity_name, question, style):
yield f"🔍 正在查詢「{celebrity_name}」的飲食偏好..."
preference = celebrity_food_query(celebrity_name)
if not preference.strip() or "完全搜尋不到也無法推測" in preference:
yield f"⚠️ 查無「{celebrity_name}」的飲食偏好,請確認名稱是否正確。"
return
yield "✅ 已獲得名人飲食偏好,開始分析問題..."
query_obj = gradio_query_processing(preference, question)
if not query_obj:
yield "❌ 無法處理問題"
return
yield f"🔎 查詢關鍵詞:{query_obj['query']}"
res_docs = document_retrieval(query_obj["query"])
if not res_docs:
yield "❌ 找不到相關內容。"
return
yield f"🍽️ 以「{style}」風格生成推薦..."
answer = answer_generation(query_obj["question"], res_docs, style, preference=preference)
yield answer
celebrity_interface = gr.Interface(
fn=celebrity_rag,
inputs=[
gr.Textbox(label="請輸入名人姓名", lines=1),
gr.Textbox(label="請輸入你想問的問題(如:適合早餐的推薦?)", lines=2),
gr.Radio(
["理性分析", "可愛輕鬆", "中英雙語", "直白推薦"],
label="請選擇你想要的回答風格",
value="理性分析"
)
],
outputs=gr.Markdown(label="AI 回答"),
title="🌟 名人推薦模式",
description="輸入名人姓名和你的問題,我們會根據該名人的飲食偏好推薦商品。"
)
demo = gr.TabbedInterface(
interface_list=[main_interface, celebrity_interface, profile_interface],
tab_names=["美食推薦小助手", "名人模式", "設定個人偏好"]
)
demo.launch()