shanusherly commited on
Commit
94a9ac3
·
verified ·
1 Parent(s): f0e7f66

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +92 -63
app.py CHANGED
@@ -7,7 +7,7 @@ import google.generativeai as genai
7
  from google.api_core.exceptions import ResourceExhausted
8
 
9
  # -----------------------
10
- # Configuration / secrets
11
  # -----------------------
12
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
13
  ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")
@@ -17,9 +17,8 @@ AUDIO_TMP_DIR = "/tmp"
17
  if not GEMINI_API_KEY:
18
  raise RuntimeError("Missing GEMINI_API_KEY in environment. Set it in HF Space Secrets.")
19
 
20
- # Configure Gemini SDK
21
  genai.configure(api_key=GEMINI_API_KEY)
22
- # single model instance
23
  gemini_model = genai.GenerativeModel("gemini-2.5-flash")
24
 
25
  # -----------------------
@@ -28,7 +27,7 @@ gemini_model = genai.GenerativeModel("gemini-2.5-flash")
28
  class SimpleMemory:
29
  def __init__(self, max_messages=20):
30
  self.max_messages = max_messages
31
- self.history = [] # tuples (role, text)
32
 
33
  def add(self, role, text):
34
  self.history.append((role, text))
@@ -36,49 +35,85 @@ class SimpleMemory:
36
  self.history = self.history[-self.max_messages :]
37
 
38
  def as_prompt_text(self):
39
- # produce compact prompt history
40
  lines = []
41
- for role, text in self.history:
42
  if role == "user":
43
- lines.append(f"User: {text}")
44
  else:
45
- lines.append(f"Chatbot: {text}")
46
  return "\n".join(lines)
47
 
48
  memory = SimpleMemory(max_messages=20)
49
 
50
  # -----------------------
51
- # Gemini text generation (safe)
52
  # -----------------------
53
  PROMPT_TEMPLATE = """You are a helpful assistant.
54
  {chat_history}
55
  User: {user_message}
56
  Chatbot:"""
57
 
 
 
 
 
58
  def generate_text_with_gemini(user_message):
59
  chat_history_text = memory.as_prompt_text()
60
- prompt = PROMPT_TEMPLATE.format(chat_history=chat_history_text, user_message=user_message)
61
 
 
62
  try:
63
- response = gemini_model.generate_content(prompt)
64
- text = response.text if hasattr(response, "text") else str(response)
 
 
65
  return text, None
66
  except ResourceExhausted as e:
67
- # quota exceeded return friendly message
68
- print("Gemini quota exhausted:", e)
69
  return None, "Gemini quota exceeded. Please try again later."
70
- except Exception as e:
71
- print("Gemini error:", e)
72
- return None, f"Gemini error: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
 
74
  # -----------------------
75
- # ElevenLabs HTTP fallback (robust)
 
76
  # -----------------------
77
  def generate_audio_elevenlabs_http(text):
78
- """
79
- Returns (output_path, error_message). On success: output_path path string, error_message empty.
80
- On failure: output_path '', error_message string.
81
- """
82
  if not ELEVENLABS_API_KEY:
83
  return "", "ELEVENLABS_API_KEY not configured."
84
 
@@ -97,13 +132,12 @@ def generate_audio_elevenlabs_http(text):
97
  try:
98
  resp = requests.post(url, json=payload, headers=headers, timeout=30)
99
  except Exception as e:
100
- err = f"HTTP request to ElevenLabs failed: {e}"
101
  print(err)
102
  return "", err
103
 
104
  if resp.status_code == 200:
105
  try:
106
- # save audio bytes to temp file
107
  filename = f"audio_{int(time.time()*1000)}_{abs(hash(text))%100000}.mp3"
108
  path = os.path.join(AUDIO_TMP_DIR, filename)
109
  with open(path, "wb") as f:
@@ -114,7 +148,6 @@ def generate_audio_elevenlabs_http(text):
114
  print(err)
115
  return "", err
116
  else:
117
- # return response body if available
118
  try:
119
  body = resp.json()
120
  except Exception:
@@ -124,74 +157,70 @@ def generate_audio_elevenlabs_http(text):
124
  return "", err
125
 
126
  # -----------------------
127
- # Main combined workflow
128
  # -----------------------
129
  def process_user_message(user_message):
130
- """
131
- Returns tuple: (chat_history_list, audio_path_or_empty, error_message_or_empty)
132
- chat_history_list is a list of (speaker, message) for the UI chat component.
133
- """
134
- # 1) Get text from Gemini with error handling
135
  text, gen_err = generate_text_with_gemini(user_message)
136
  if gen_err:
137
- # don't crash show friendly message and no audio
138
  memory.add("user", user_message)
139
- fallback_text = "Sorry — the assistant is temporarily unavailable: " + gen_err
140
- memory.add("bot", fallback_text)
141
- # build chat list for UI
142
- chat_list = [(role, msg) for role, msg in memory.history]
143
- return chat_list, "", gen_err
144
 
145
- # 2) Update memory
146
  memory.add("user", user_message)
147
  memory.add("bot", text)
148
 
149
- # 3) Try to generate audio (HTTP fallback)
150
  audio_path, audio_err = generate_audio_elevenlabs_http(text)
151
  if audio_err:
152
  print("Audio generation error:", audio_err)
153
- # Return history and audio (audio path may be empty)
154
- chat_list = [(role, msg) for role, msg in memory.history]
155
- return chat_list, audio_path or "", audio_err or ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
  # -----------------------
158
- # Gradio UI (Blocks)
159
  # -----------------------
160
  with gr.Blocks() as demo:
161
  gr.Markdown("## 🤖 Gemini + ElevenLabs Chatbot (Text + Audio replies)")
162
- chatbot = gr.Chatbot(elem_id="chatbot")
163
  with gr.Row():
164
  txt = gr.Textbox(show_label=False, placeholder="Type your message and press Enter")
165
  send_btn = gr.Button("Send")
166
-
167
  audio_player = gr.Audio(label="Last reply audio (if available)", visible=False)
168
 
169
- # submit action
170
  def submit_message(message):
171
- # process and return chat content and audio
172
- history, audio_path, audio_err = process_user_message(message)
173
- # format chat history for gr.Chatbot: list of [user, bot] pairs for display
174
- # our memory stores alternating user/bot entries; convert to pairs
175
- pairs = []
176
- temp_user = None
177
- for role, msg in history:
178
- if role == "user":
179
- temp_user = msg
180
- else:
181
- pairs.append((temp_user or "", msg))
182
- temp_user = None
183
-
184
- # show audio if available
185
  if audio_path:
186
  return pairs, gr.update(value=audio_path, visible=True)
187
  else:
188
  return pairs, gr.update(value=None, visible=False)
189
 
190
- # wire button and textbox
191
  send_btn.click(fn=submit_message, inputs=[txt], outputs=[chatbot, audio_player])
192
  txt.submit(fn=submit_message, inputs=[txt], outputs=[chatbot, audio_player])
193
 
194
- # Launch
195
  if __name__ == "__main__":
196
- # Do not enable share=True unless you want a public link
197
  demo.launch(debug=True)
 
7
  from google.api_core.exceptions import ResourceExhausted
8
 
9
  # -----------------------
10
+ # Config / Secrets
11
  # -----------------------
12
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
13
  ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")
 
17
  if not GEMINI_API_KEY:
18
  raise RuntimeError("Missing GEMINI_API_KEY in environment. Set it in HF Space Secrets.")
19
 
20
+ # Configure Gemini
21
  genai.configure(api_key=GEMINI_API_KEY)
 
22
  gemini_model = genai.GenerativeModel("gemini-2.5-flash")
23
 
24
  # -----------------------
 
27
  class SimpleMemory:
28
  def __init__(self, max_messages=20):
29
  self.max_messages = max_messages
30
+ self.history = [] # list of tuples (role, text) with role in {"user","bot"}
31
 
32
  def add(self, role, text):
33
  self.history.append((role, text))
 
35
  self.history = self.history[-self.max_messages :]
36
 
37
  def as_prompt_text(self):
 
38
  lines = []
39
+ for role, txt in self.history:
40
  if role == "user":
41
+ lines.append(f"User: {txt}")
42
  else:
43
+ lines.append(f"Chatbot: {txt}")
44
  return "\n".join(lines)
45
 
46
  memory = SimpleMemory(max_messages=20)
47
 
48
  # -----------------------
49
+ # Prompt template
50
  # -----------------------
51
  PROMPT_TEMPLATE = """You are a helpful assistant.
52
  {chat_history}
53
  User: {user_message}
54
  Chatbot:"""
55
 
56
+ # -----------------------
57
+ # Robust Gemini generator (tries multiple message formats)
58
+ # Returns: (text_or_None, error_message_or_None)
59
+ # -----------------------
60
  def generate_text_with_gemini(user_message):
61
  chat_history_text = memory.as_prompt_text()
62
+ full_prompt = PROMPT_TEMPLATE.format(chat_history=chat_history_text, user_message=user_message)
63
 
64
+ # 1) Try the simplest call (raw prompt)
65
  try:
66
+ resp = gemini_model.generate_content(full_prompt)
67
+ text = getattr(resp, "text", None)
68
+ if not text:
69
+ text = str(resp)
70
  return text, None
71
  except ResourceExhausted as e:
72
+ print("Gemini quota exhausted (raw prompt):", e)
 
73
  return None, "Gemini quota exceeded. Please try again later."
74
+ except Exception as e1:
75
+ print("generate_content(raw) failed, will retry with messages format:", repr(e1))
76
+
77
+ # 2) Try messages with content as plain string
78
+ try:
79
+ messages = [
80
+ {"role": "system", "content": "You are a helpful assistant."},
81
+ {"role": "user", "content": full_prompt}
82
+ ]
83
+ resp = gemini_model.generate_content(messages=messages)
84
+ text = getattr(resp, "text", None)
85
+ if not text:
86
+ text = str(resp)
87
+ return text, None
88
+ except ResourceExhausted as e:
89
+ print("Gemini quota exhausted (messages):", e)
90
+ return None, "Gemini quota exceeded. Please try again later."
91
+ except Exception as e2:
92
+ print("generate_content(messages) failed, will retry with typed content:", repr(e2))
93
+
94
+ # 3) Try messages where content is a list of typed chunks
95
+ try:
96
+ messages2 = [
97
+ {"role": "system", "content": [{"type": "text", "text": "You are a helpful assistant."}]},
98
+ {"role": "user", "content": [{"type": "text", "text": full_prompt}]}
99
+ ]
100
+ resp = gemini_model.generate_content(messages=messages2)
101
+ text = getattr(resp, "text", None)
102
+ if not text:
103
+ text = str(resp)
104
+ return text, None
105
+ except ResourceExhausted as e:
106
+ print("Gemini quota exhausted (messages2):", e)
107
+ return None, "Gemini quota exceeded. Please try again later."
108
+ except Exception as efinal:
109
+ print("Gemini generate_content all attempts failed:", repr(efinal))
110
+ return None, f"Gemini error: {repr(efinal)}"
111
 
112
  # -----------------------
113
+ # ElevenLabs HTTP TTS (fallback, robust)
114
+ # Returns: (output_path_or_empty, error_or_empty)
115
  # -----------------------
116
  def generate_audio_elevenlabs_http(text):
 
 
 
 
117
  if not ELEVENLABS_API_KEY:
118
  return "", "ELEVENLABS_API_KEY not configured."
119
 
 
132
  try:
133
  resp = requests.post(url, json=payload, headers=headers, timeout=30)
134
  except Exception as e:
135
+ err = f"ElevenLabs HTTP request failed: {e}"
136
  print(err)
137
  return "", err
138
 
139
  if resp.status_code == 200:
140
  try:
 
141
  filename = f"audio_{int(time.time()*1000)}_{abs(hash(text))%100000}.mp3"
142
  path = os.path.join(AUDIO_TMP_DIR, filename)
143
  with open(path, "wb") as f:
 
148
  print(err)
149
  return "", err
150
  else:
 
151
  try:
152
  body = resp.json()
153
  except Exception:
 
157
  return "", err
158
 
159
  # -----------------------
160
+ # Combined workflow
161
  # -----------------------
162
  def process_user_message(user_message):
163
+ # 1) Generate text (robust)
 
 
 
 
164
  text, gen_err = generate_text_with_gemini(user_message)
165
  if gen_err:
166
+ # safe fallback: store user and friendly message
167
  memory.add("user", user_message)
168
+ fallback = "Sorry — the assistant is temporarily unavailable: " + gen_err
169
+ memory.add("bot", fallback)
170
+ history_pairs = convert_memory_to_pairs(memory.history)
171
+ return history_pairs, "", gen_err
 
172
 
173
+ # 2) update memory
174
  memory.add("user", user_message)
175
  memory.add("bot", text)
176
 
177
+ # 3) try audio (HTTP fallback)
178
  audio_path, audio_err = generate_audio_elevenlabs_http(text)
179
  if audio_err:
180
  print("Audio generation error:", audio_err)
181
+
182
+ history_pairs = convert_memory_to_pairs(memory.history)
183
+ return history_pairs, audio_path or "", audio_err or ""
184
+
185
+ def convert_memory_to_pairs(history):
186
+ """
187
+ Convert memory list of tuples into chat pairs for gr.Chatbot.
188
+ memory.history is [(role, text), ...] where roles alternate.
189
+ Returns list of (user_text, bot_text) pairs.
190
+ """
191
+ pairs = []
192
+ temp_user = None
193
+ for role, msg in history:
194
+ if role == "user":
195
+ temp_user = msg
196
+ else: # bot
197
+ pairs.append((temp_user or "", msg))
198
+ temp_user = None
199
+ return pairs
200
 
201
  # -----------------------
202
+ # Gradio UI
203
  # -----------------------
204
  with gr.Blocks() as demo:
205
  gr.Markdown("## 🤖 Gemini + ElevenLabs Chatbot (Text + Audio replies)")
206
+ chatbot = gr.Chatbot()
207
  with gr.Row():
208
  txt = gr.Textbox(show_label=False, placeholder="Type your message and press Enter")
209
  send_btn = gr.Button("Send")
 
210
  audio_player = gr.Audio(label="Last reply audio (if available)", visible=False)
211
 
 
212
  def submit_message(message):
213
+ # Run the combined workflow
214
+ pairs, audio_path, err = process_user_message(message)
215
+ # The chat UI expects list of pairs (user, bot)
 
 
 
 
 
 
 
 
 
 
 
216
  if audio_path:
217
  return pairs, gr.update(value=audio_path, visible=True)
218
  else:
219
  return pairs, gr.update(value=None, visible=False)
220
 
 
221
  send_btn.click(fn=submit_message, inputs=[txt], outputs=[chatbot, audio_player])
222
  txt.submit(fn=submit_message, inputs=[txt], outputs=[chatbot, audio_player])
223
 
224
+ # Launch app
225
  if __name__ == "__main__":
 
226
  demo.launch(debug=True)