|
|
|
|
|
"""Untitled0.ipynb |
|
|
Automatically generated by Colab. |
|
|
Original file is located at |
|
|
https://colab.research.google.com/drive/1RZKKUOkD7gN7jCFfzMWwmIADVoADyNUj |
|
|
""" |
|
|
print("🧪 APP.PY 已開始執行") |
|
|
|
|
|
import zipfile |
|
|
import gradio as gr |
|
|
from google import genai |
|
|
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch |
|
|
import os |
|
|
import json |
|
|
import regex as re |
|
|
from langchain_community.document_loaders import JSONLoader |
|
|
from langchain.text_splitter import TokenTextSplitter |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
from langchain_chroma import Chroma |
|
|
from google.oauth2 import service_account |
|
|
from googleapiclient.discovery import build |
|
|
from googleapiclient.http import MediaIoBaseUpload |
|
|
import io |
|
|
import copy |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not os.path.exists("vector_store"): |
|
|
with zipfile.ZipFile("vector_store.zip", "r") as zip_ref: |
|
|
zip_ref.extractall("vector_store") |
|
|
|
|
|
|
|
|
data_path = "vector_store" |
|
|
|
|
|
persist_dir = os.path.join(data_path, "vector_store", "ptt_cvs_articles_chroma") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
USER_BASE_DIR = "/home/user/user_profiles" |
|
|
os.makedirs(USER_BASE_DIR, exist_ok=True) |
|
|
|
|
|
GOOGLE_DRIVE_FOLDER_ID = "11bSM6UezPID7fCqfSZk2ykGOTO9FT2x-" |
|
|
|
|
|
def save_user_profile(name, new_preference): |
|
|
if not name.strip(): |
|
|
return "❌ 請輸入有效的名字" |
|
|
|
|
|
|
|
|
filename = f"{name}_config.json" |
|
|
existing_data = {} |
|
|
try: |
|
|
existing_data = load_user_profile_from_drive(name) or {} |
|
|
except Exception as e: |
|
|
print("⚠️ 無法讀取舊檔案,將建立新檔") |
|
|
|
|
|
|
|
|
updated_data = { |
|
|
"name": name, |
|
|
"feedback": existing_data.get("feedback", []) + [new_preference] |
|
|
} |
|
|
|
|
|
|
|
|
file_id = upload_json_to_drive(updated_data, filename, GOOGLE_DRIVE_FOLDER_ID) |
|
|
return f"✅ 新回饋已新增並上傳到 Google Drive(檔案 ID:{file_id})" |
|
|
|
|
|
|
|
|
profile_interface = gr.Interface( |
|
|
fn=save_user_profile, |
|
|
inputs=[ |
|
|
gr.Textbox(label="請輸入您的名字"), |
|
|
gr.Textbox(label="請輸入您的美食偏好(如:想吃健康、喜歡甜食)") |
|
|
], |
|
|
outputs="text", |
|
|
title="設定個人偏好", |
|
|
description="這裡可以儲存您專屬的美食偏好,未來問問題時會自動考慮進去。" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_json_to_drive(json_data, filename, folder_id): |
|
|
|
|
|
key_json_str = os.environ.get("GCP_SERVICE_ACCOUNT_JSON") |
|
|
key_info = json.loads(key_json_str) |
|
|
credentials = service_account.Credentials.from_service_account_info(key_info) |
|
|
|
|
|
service = build("drive", "v3", credentials=credentials) |
|
|
|
|
|
|
|
|
query = f"'{folder_id}' in parents and name = '{filename}' and trashed = false" |
|
|
results = service.files().list(q=query, spaces='drive', fields='files(id)', pageSize=1).execute() |
|
|
files = results.get("files", []) |
|
|
|
|
|
media = MediaIoBaseUpload( |
|
|
io.BytesIO(json.dumps(json_data, ensure_ascii=False).encode("utf-8")), |
|
|
mimetype="application/json" |
|
|
) |
|
|
|
|
|
if files: |
|
|
file_id = files[0]['id'] |
|
|
file = service.files().update(fileId=file_id, media_body=media).execute() |
|
|
print(f"🔁 已更新 {filename},ID:{file_id}") |
|
|
else: |
|
|
file_metadata = { |
|
|
"name": filename, |
|
|
"parents": [folder_id], |
|
|
"mimeType": "application/json" |
|
|
} |
|
|
file = service.files().create(body=file_metadata, media_body=media, fields="id").execute() |
|
|
file_id = file.get("id") |
|
|
print(f"✅ 已上傳新檔案 {filename},ID:{file_id}") |
|
|
|
|
|
return file_id |
|
|
|
|
|
def load_user_profile_from_drive(name): |
|
|
key_json_str = os.environ.get("GCP_SERVICE_ACCOUNT_JSON") |
|
|
key_info = json.loads(key_json_str) |
|
|
credentials = service_account.Credentials.from_service_account_info(key_info) |
|
|
service = build("drive", "v3", credentials=credentials) |
|
|
query = f"'{GOOGLE_DRIVE_FOLDER_ID}' in parents and name = '{name}_config.json'" |
|
|
results = service.files().list(q=query, spaces='drive', fields='files(id, name)', pageSize=1).execute() |
|
|
files = results.get("files", []) |
|
|
|
|
|
if not files: |
|
|
print(f"❌ 找不到 {name} 的偏好設定檔") |
|
|
return None |
|
|
|
|
|
file_id = files[0]["id"] |
|
|
request = service.files().get_media(fileId=file_id) |
|
|
response = request.execute() |
|
|
|
|
|
json_data = json.loads(response.decode("utf-8")) |
|
|
print(f"✅ 成功讀取 {name} 的偏好設定:", json_data) |
|
|
return json_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("📁 vector_store 內容:", os.listdir("vector_store")) |
|
|
print("📁 persist_dir =", persist_dir) |
|
|
print("📂 persist_dir 存在嗎?", os.path.exists(persist_dir)) |
|
|
if os.path.exists(persist_dir): |
|
|
print("📄 persist_dir 內容物 =", os.listdir(persist_dir)) |
|
|
|
|
|
embeddings = HuggingFaceEmbeddings( |
|
|
model_name="BAAI/bge-m3", |
|
|
model_kwargs={"device": "cpu"}, |
|
|
encode_kwargs={"normalize_embeddings": True} |
|
|
) |
|
|
|
|
|
vector_store = Chroma( |
|
|
embedding_function=embeddings, |
|
|
persist_directory=persist_dir |
|
|
) |
|
|
|
|
|
|
|
|
client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY")) |
|
|
|
|
|
|
|
|
generation_config=genai.types.GenerateContentConfig( |
|
|
temperature= 0.7, |
|
|
top_p= 1, |
|
|
top_k= 1, |
|
|
max_output_tokens= 1024, |
|
|
tools=[genai.types.Tool( |
|
|
google_search=GoogleSearch() |
|
|
)] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def document_retrieval(query): |
|
|
print("🔍 查詢 query =", query) |
|
|
results = vector_store.similarity_search_with_relevance_scores(query, k=20) |
|
|
results = [doc for doc, score in results if score > 0.3] |
|
|
print("📄 查到篇數 =", len(results)) |
|
|
return results |
|
|
|
|
|
|
|
|
''' |
|
|
def extract_JSON_obj(text): |
|
|
start = text.find("{") |
|
|
end = text.rfind("}") |
|
|
if start != -1 and end != -1 and start < end: |
|
|
try: |
|
|
return json.loads(text[start:end+1]) |
|
|
except Exception as e: |
|
|
print("❌ extract_JSON-Obj 錯誤:", e) |
|
|
return None |
|
|
|
|
|
''' |
|
|
def extract_JSON_obj(text): |
|
|
start = text.find("{") |
|
|
end = text.rfind("}") |
|
|
if start != -1 and end != -1 and start < end: |
|
|
json_str = text[start:end+1] |
|
|
|
|
|
try: |
|
|
|
|
|
return json.loads(json_str) |
|
|
except json.JSONDecodeError as e: |
|
|
print("❌ json.loads() 失敗,嘗試修正 Unicode:", e) |
|
|
try: |
|
|
|
|
|
json_str_fixed = re.sub(r'\\u(?![0-9a-fA-F]{4})', r'\\\\u', json_str) |
|
|
return json.loads(json_str_fixed) |
|
|
except Exception as e2: |
|
|
print("❌ 修正後仍錯誤:", e2) |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def celebrity_food_query(name): |
|
|
system_prompt = f""" |
|
|
你是一現實人物與虛擬角色的吃飯愛好分析師,請「上網搜尋」有關「{name}」的飲食偏好或者是相關資料(請注意中日英人名翻譯及簡稱的問題,要盡量搜尋)。 |
|
|
請給我純文字內容,分析列出人物的食物喜好、忌口、常見飲食習慣等(如:喜歡吃高蛋白、避免糖類、愛喝果昔等)。 |
|
|
如果找不到相關資訊也完全無法推測,請直接回覆「完全搜尋不到也無法推測 {name} 的飲食偏好」。 |
|
|
""" |
|
|
|
|
|
config_celeb=copy.deepcopy(generation_config) |
|
|
config_celeb.system_instruction=system_prompt |
|
|
config_celeb.max_output_tokens= 512 |
|
|
response = client.models.generate_content( |
|
|
model='gemini-2.0-flash', |
|
|
contents=f"請列出 {name} 的飲食偏好", |
|
|
config=config_celeb, |
|
|
) |
|
|
|
|
|
print(response.text) |
|
|
return response.text.strip() |
|
|
|
|
|
|
|
|
|
|
|
def answer_generation(question, res_docs, style,preference=""): |
|
|
print("🧠 正在執行 answer_generation()") |
|
|
|
|
|
|
|
|
base_instruction = ''' |
|
|
你是一位熟悉超商美食的專家,這是我的問題<question></question>與幾篇相關文章<text></text>還有我的偏好回饋「{preference}」, |
|
|
請從每篇文章中摘出與問題最相關的重點(如:評分、熱量、口感、CP值等),並統整成有風格的回答「{preference}」。 |
|
|
也請判斷問題類型,決定回覆模式: |
|
|
推薦方面:根據我的狀態或心情推薦我十篇文章中的三篇 |
|
|
選擇類型:結合我問句想要你評判的產品內容,及上面針對不同店家與食品類別的文章分析,說明你覺得我要不要買。整體風格依據如下。 |
|
|
''' |
|
|
|
|
|
style_prompts = { |
|
|
"理性分析": "請以資訊條列式、簡潔明確的方式回答,列出推薦商品與原因。", |
|
|
"可愛輕鬆": "請以輕鬆可愛、有情緒、有表情符號的語氣回答,就像 IG 限動那樣推薦食物,要有趣有梗。", |
|
|
"中英雙語": "回答請使用中英對照的方式,每個段落先英文、再中文,語氣自然親切、提供外國人食物推薦", |
|
|
"直白推薦": "不廢話、直接提供簡單回覆"} |
|
|
|
|
|
|
|
|
sys_instruction = base_instruction.format(preference=preference) + "\n" + style_prompts.get(style, "") |
|
|
config_answer=copy.deepcopy(generation_config) |
|
|
config_answer.system_instruction=sys_instruction |
|
|
|
|
|
|
|
|
|
|
|
chat_answer = client.chats.create( |
|
|
model='gemini-2.0-flash', |
|
|
config=config_answer, |
|
|
history=[] |
|
|
) |
|
|
|
|
|
user_input = [f'<question>{question}</question>'] |
|
|
for res in res_docs: |
|
|
text_str = f'''<text><title>{res.metadata.get("Title", "")}</title><content>{res.page_content}</content></text>''' |
|
|
user_input.append(text_str) |
|
|
|
|
|
user_input_str = '\n'.join(user_input) |
|
|
response = chat_answer.send_message(user_input_str) |
|
|
return response.text |
|
|
|
|
|
def gradio_query_processing(preference, question): |
|
|
print("🔁 正在重新執行 query_processing()") |
|
|
|
|
|
sys_instruction = ''' |
|
|
你是一位熟悉超商美食的專家,以下將提供給你(使用者的偏好/一位名人的偏好文章)與問題。 |
|
|
請你根據這些資訊,將問題改寫成一個適合檢索超商美食文章的查詢問句。 |
|
|
請務必考慮(使用者的偏好/一位名人的偏好文章)<preference></preference> |
|
|
輸出格式如下: |
|
|
{ |
|
|
"question": "...", # 原始問題(可改寫為完整句) |
|
|
"query": "..." # 用於語意檢索的精簡查詢關鍵詞 |
|
|
} |
|
|
並只回傳 JSON 物件(請確保內容皆是字元),不需解釋。 |
|
|
''' |
|
|
|
|
|
system_prompt = sys_instruction.replace("{preference}", preference) |
|
|
config_query=copy.deepcopy(generation_config) |
|
|
config_query.system_instruction=system_prompt |
|
|
print(f'config為{config_query}') |
|
|
try: |
|
|
payload = json.dumps({"question": question}) |
|
|
chat_query = client.chats.create( |
|
|
model='gemini-2.0-flash', |
|
|
config=config_query, |
|
|
history=[] |
|
|
) |
|
|
preference_input={f'<preference>{preference}</preference>'} |
|
|
response = chat_query.send_message(f'{preference_input}問題是{payload}') |
|
|
print("📨 Gemini 回應:", extract_JSON_obj(response.text)) |
|
|
return extract_JSON_obj(response.text) |
|
|
except Exception as e: |
|
|
print("⚠️ query_processing 錯誤:", e) |
|
|
return {"question": question, "query": question} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gradio_rag(name, question, style): |
|
|
if name.strip(): |
|
|
user_data = load_user_profile_from_drive(name) |
|
|
feedback_list = user_data.get("feedback", []) if user_data else [] |
|
|
preference = "\n".join(feedback_list) |
|
|
else: |
|
|
preference = "" |
|
|
|
|
|
yield "正在分析問題..." |
|
|
query_obj = gradio_query_processing(preference, question) |
|
|
if not query_obj: |
|
|
yield "❌ 無法處理問題" |
|
|
return |
|
|
|
|
|
yield f"✅ 查詢問題:{query_obj['query']}" |
|
|
res_docs = document_retrieval(query_obj["query"]) |
|
|
if not res_docs: |
|
|
yield "找不到相關內容。" |
|
|
return |
|
|
|
|
|
yield f"以「{style}」風格生成回答中..." |
|
|
answer = answer_generation(query_obj["question"], res_docs, style, preference=preference) |
|
|
yield answer |
|
|
|
|
|
main_interface = gr.Interface( |
|
|
fn=gradio_rag, |
|
|
inputs=[ |
|
|
gr.Textbox(label="請輸入您的名字(選填)"), |
|
|
gr.Textbox(label="請輸入您想問的問題"), |
|
|
gr.Radio( |
|
|
["理性分析", "可愛輕鬆", "中英雙語", "直白推薦"], |
|
|
label="請選擇你想要的回答風格", |
|
|
value="理性分析" |
|
|
) |
|
|
], |
|
|
outputs=gr.Markdown(label="回答"), |
|
|
title="PTT 超商美食推薦小助手", |
|
|
description="請輸入您的名字與問題,系統將根據您的偏好提供推薦。" |
|
|
) |
|
|
|
|
|
def celebrity_rag(celebrity_name, question, style): |
|
|
yield f"🔍 正在查詢「{celebrity_name}」的飲食偏好..." |
|
|
preference = celebrity_food_query(celebrity_name) |
|
|
|
|
|
if not preference.strip() or "完全搜尋不到也無法推測" in preference: |
|
|
yield f"⚠️ 查無「{celebrity_name}」的飲食偏好,請確認名稱是否正確。" |
|
|
return |
|
|
|
|
|
yield "✅ 已獲得名人飲食偏好,開始分析問題..." |
|
|
|
|
|
query_obj = gradio_query_processing(preference, question) |
|
|
if not query_obj: |
|
|
yield "❌ 無法處理問題" |
|
|
return |
|
|
|
|
|
yield f"🔎 查詢關鍵詞:{query_obj['query']}" |
|
|
res_docs = document_retrieval(query_obj["query"]) |
|
|
if not res_docs: |
|
|
yield "❌ 找不到相關內容。" |
|
|
return |
|
|
|
|
|
yield f"🍽️ 以「{style}」風格生成推薦..." |
|
|
answer = answer_generation(query_obj["question"], res_docs, style, preference=preference) |
|
|
yield answer |
|
|
|
|
|
celebrity_interface = gr.Interface( |
|
|
fn=celebrity_rag, |
|
|
inputs=[ |
|
|
gr.Textbox(label="請輸入名人姓名", lines=1), |
|
|
gr.Textbox(label="請輸入你想問的問題(如:適合早餐的推薦?)", lines=2), |
|
|
gr.Radio( |
|
|
["理性分析", "可愛輕鬆", "中英雙語", "直白推薦"], |
|
|
label="請選擇你想要的回答風格", |
|
|
value="理性分析" |
|
|
) |
|
|
], |
|
|
outputs=gr.Markdown(label="AI 回答"), |
|
|
title="🌟 名人推薦模式", |
|
|
description="輸入名人姓名和你的問題,我們會根據該名人的飲食偏好推薦商品。" |
|
|
) |
|
|
|
|
|
demo = gr.TabbedInterface( |
|
|
interface_list=[main_interface, celebrity_interface, profile_interface], |
|
|
tab_names=["美食推薦小助手", "名人模式", "設定個人偏好"] |
|
|
) |
|
|
|
|
|
demo.launch() |