|
|
import json |
|
|
import os |
|
|
from typing import Optional, Union |
|
|
from PIL import Image |
|
|
import base64 |
|
|
from io import BytesIO |
|
|
import gradio as gr |
|
|
import markdown |
|
|
import zipfile |
|
|
import tempfile |
|
|
from datetime import datetime |
|
|
import re |
|
|
|
|
|
def export_to_zip(images, conversations, format_type="original"): |
|
|
""" |
|
|
Export images and conversation data to a ZIP file |
|
|
|
|
|
Args: |
|
|
images: List of extracted images |
|
|
conversations: Conversation JSON data |
|
|
format_type: Format type, "original" or "sharegpt" |
|
|
|
|
|
Returns: |
|
|
Path to the generated ZIP file |
|
|
""" |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
zip_filename = os.path.join(temp_dir, f"export_{timestamp}.zip") |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(zip_filename, 'w') as zipf: |
|
|
|
|
|
for i, img in enumerate(images): |
|
|
img_path = os.path.join(temp_dir, f"image_{i}.png") |
|
|
img.save(img_path) |
|
|
zipf.write(img_path, f"images/image_{i}.png") |
|
|
os.remove(img_path) |
|
|
|
|
|
|
|
|
json_path = os.path.join(temp_dir, "conversations.json") |
|
|
with open(json_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(conversations, f, ensure_ascii=False, indent=4) |
|
|
zipf.write(json_path, "conversations.json") |
|
|
os.remove(json_path) |
|
|
|
|
|
return zip_filename |
|
|
|
|
|
def base64_to_image( |
|
|
base64_str: str, |
|
|
remove_prefix: bool = True, |
|
|
convert_mode: Optional[str] = "RGB" |
|
|
) -> Union[Image.Image, None]: |
|
|
""" |
|
|
Convert a base64 encoded image string to a PIL Image object |
|
|
|
|
|
Args: |
|
|
base64_str: Base64 encoded image string (with or without data: prefix) |
|
|
remove_prefix: Whether to automatically remove the "data:image/..." prefix (default True) |
|
|
convert_mode: Convert to the specified mode (e.g., "RGB"/"RGBA", None means no conversion) |
|
|
|
|
|
Returns: |
|
|
PIL.Image.Image object, returns None if decoding fails |
|
|
""" |
|
|
try: |
|
|
|
|
|
if remove_prefix and "," in base64_str: |
|
|
base64_str = base64_str.split(",")[1] |
|
|
|
|
|
|
|
|
image_data = base64.b64decode(base64_str) |
|
|
|
|
|
|
|
|
image = Image.open(BytesIO(image_data)) |
|
|
|
|
|
|
|
|
if convert_mode: |
|
|
image = image.convert(convert_mode) |
|
|
|
|
|
return image |
|
|
|
|
|
except (base64.binascii.Error, OSError, Exception) as e: |
|
|
print(f"Base64 decoding failed: {str(e)}") |
|
|
return None |
|
|
|
|
|
def process_message_to_sharegpt_format(message): |
|
|
""" |
|
|
Convert messages to ShareGPT format |
|
|
|
|
|
Args: |
|
|
message: Original message data |
|
|
|
|
|
Returns: |
|
|
Data in ShareGPT format |
|
|
""" |
|
|
sharegpt_images = [] |
|
|
sharegpt_conversation = [] |
|
|
image_idx = 0 |
|
|
|
|
|
for i, message_item in enumerate(message): |
|
|
role = message_item['role'] |
|
|
|
|
|
content_list = message_item['content'] |
|
|
whole_content = "" |
|
|
for content_item in content_list: |
|
|
content_type = content_item['type'] |
|
|
if content_type == "text": |
|
|
content_value = content_item['text'] |
|
|
whole_content += content_value |
|
|
elif content_type == "image_url": |
|
|
content_value = content_item['image_url']['url'] |
|
|
whole_content += "<image>" |
|
|
image = base64_to_image(content_value) |
|
|
if image: |
|
|
sharegpt_images.append(image) |
|
|
image_idx += 1 |
|
|
|
|
|
if i == 0: |
|
|
sharegpt_conversation.append({"from": "human", "value": whole_content}) |
|
|
continue |
|
|
|
|
|
if "<interpreter>" in whole_content: |
|
|
gpt_content, observation_content = whole_content.split("<interpreter>", -1) |
|
|
sharegpt_conversation.append({"from": "gpt", "value": gpt_content}) |
|
|
sharegpt_conversation.append({"from": "observation", "value": "<interpreter>"+observation_content}) |
|
|
elif i != 0: |
|
|
sharegpt_conversation.append({"from": "gpt", "value": whole_content}) |
|
|
|
|
|
sharegpt_data_item = { |
|
|
"conversations": sharegpt_conversation, |
|
|
"images": sharegpt_images |
|
|
} |
|
|
|
|
|
return sharegpt_data_item |
|
|
|
|
|
def extract_images_from_messages(messages): |
|
|
""" |
|
|
Extract all images from messages |
|
|
|
|
|
Args: |
|
|
messages: Message JSON data |
|
|
|
|
|
Returns: |
|
|
Extracted image list and updated messages |
|
|
""" |
|
|
images = [] |
|
|
|
|
|
for message in messages: |
|
|
if 'content' in message and isinstance(message['content'], list): |
|
|
for content_item in message['content']: |
|
|
if content_item.get('type') == 'image_url': |
|
|
image_url = content_item.get('image_url', {}).get('url', '') |
|
|
if image_url.startswith('data:'): |
|
|
|
|
|
image = base64_to_image(image_url) |
|
|
if image: |
|
|
images.append(image) |
|
|
|
|
|
return images, messages |
|
|
|
|
|
def process_message(file_path): |
|
|
try: |
|
|
|
|
|
with open(file_path, "r", encoding="utf-8") as f: |
|
|
messages = json.load(f) |
|
|
|
|
|
|
|
|
images, messages = extract_images_from_messages(messages) |
|
|
|
|
|
|
|
|
sharegpt_data = process_message_to_sharegpt_format(messages) |
|
|
|
|
|
|
|
|
html_output = '<div style="color: black;">' |
|
|
|
|
|
for message_item in messages: |
|
|
role = message_item['role'] |
|
|
content = message_item['content'] |
|
|
|
|
|
|
|
|
if role == "user" or role == "human": |
|
|
html_output += f'<div style="background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>User:</strong><br>' |
|
|
elif role == "assistant": |
|
|
html_output += f'<div style="background-color: #e6f7ff; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>Assistant:</strong><br>' |
|
|
else: |
|
|
html_output += f'<div style="background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>{role.capitalize()}:</strong><br>' |
|
|
|
|
|
|
|
|
for content_item in content: |
|
|
content_type = content_item['type'] |
|
|
|
|
|
if content_type == "text": |
|
|
|
|
|
md_text = content_item['text'] |
|
|
html_text = markdown.markdown(md_text, extensions=['fenced_code', 'codehilite']) |
|
|
html_output += f'<div style="color: black;">{html_text}</div>' |
|
|
|
|
|
elif content_type == "image_url": |
|
|
content_value = content_item['image_url']['url'] |
|
|
|
|
|
if content_value.startswith("data:"): |
|
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
|
else: |
|
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
|
|
|
|
html_output += '</div>' |
|
|
|
|
|
html_output += '</div>' |
|
|
return html_output, images, messages, sharegpt_data |
|
|
|
|
|
except Exception as e: |
|
|
return f"<div style='color: red;'>Error processing file: {str(e)}</div>", [], None, None |
|
|
|
|
|
def upload_and_process(file): |
|
|
if file is None: |
|
|
return "Please upload a JSON file", [], None, None |
|
|
|
|
|
html_output, images, messages, sharegpt_data = process_message(file.name) |
|
|
return html_output, images, messages, sharegpt_data |
|
|
|
|
|
def use_example(): |
|
|
|
|
|
example_path = "test_message_gpt.json" |
|
|
return process_message(example_path) |
|
|
|
|
|
def handle_export_original(images, conversations): |
|
|
"""Handle export request for original format""" |
|
|
if not images or conversations is None: |
|
|
return None |
|
|
|
|
|
zip_path = export_to_zip(images, conversations, "original") |
|
|
return zip_path |
|
|
|
|
|
def handle_export_sharegpt(sharegpt_data): |
|
|
"""Handle export request for ShareGPT format""" |
|
|
if sharegpt_data is None: |
|
|
return None |
|
|
|
|
|
images = sharegpt_data.get("images", []) |
|
|
conversations = sharegpt_data.get("conversations", []) |
|
|
|
|
|
if not images and not conversations: |
|
|
return None |
|
|
|
|
|
zip_path = export_to_zip(images, conversations, "sharegpt") |
|
|
return zip_path |
|
|
|
|
|
|
|
|
def setup_example_file(): |
|
|
|
|
|
|
|
|
example_path = "test_message_gpt.json" |
|
|
|
|
|
|
|
|
if not os.path.exists(example_path): |
|
|
example_messages = [ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": "Hello, please introduce yourself." |
|
|
} |
|
|
] |
|
|
}, |
|
|
{ |
|
|
"role": "assistant", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": "Hello! I am an AI assistant. I can help answer questions, provide information, and have conversations. I am designed to assist users with a variety of tasks, from simple Q&A to more complex discussions.\n\nI can handle text information and also understand and describe images. Although I have some limitations, I will do my best to provide useful, accurate, and helpful responses.\n\nHow can I help you today?" |
|
|
} |
|
|
] |
|
|
} |
|
|
] |
|
|
|
|
|
with open(example_path, "w", encoding="utf-8") as f: |
|
|
json.dump(example_messages, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
setup_example_file() |
|
|
|
|
|
|
|
|
with gr.Blocks(title="ChatGPT Conversation Visualizer", css="div.prose * {color: black !important;}") as demo: |
|
|
gr.Markdown("# ChatGPT Conversation Visualization Tool") |
|
|
gr.Markdown("Upload a JSON file containing ChatGPT conversation records or use the example file to view visualization results.") |
|
|
|
|
|
with gr.Row(): |
|
|
file_input = gr.File(label="Upload JSON File", file_types=[".json"]) |
|
|
|
|
|
with gr.Row(): |
|
|
col1, col2 = gr.Column(), gr.Column() |
|
|
with col1: |
|
|
visualize_button = gr.Button("Visualize Uploaded Conversation") |
|
|
with col2: |
|
|
example_button = gr.Button("Use Example File") |
|
|
|
|
|
with gr.Row(): |
|
|
output = gr.HTML(label="Conversation Content") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
export_original_btn = gr.Button("Export Original Format") |
|
|
download_original_file = gr.File(label="Download Original Format ZIP") |
|
|
|
|
|
with gr.Column(): |
|
|
export_sharegpt_btn = gr.Button("Export ShareGPT Format") |
|
|
download_sharegpt_file = gr.File(label="Download ShareGPT Format ZIP") |
|
|
|
|
|
|
|
|
current_images = gr.State([]) |
|
|
current_json = gr.State(None) |
|
|
current_sharegpt = gr.State(None) |
|
|
|
|
|
visualize_button.click( |
|
|
fn=upload_and_process, |
|
|
inputs=[file_input], |
|
|
outputs=[output, current_images, current_json, current_sharegpt] |
|
|
) |
|
|
|
|
|
example_button.click( |
|
|
fn=use_example, |
|
|
inputs=[], |
|
|
outputs=[output, current_images, current_json, current_sharegpt] |
|
|
) |
|
|
|
|
|
export_original_btn.click( |
|
|
fn=handle_export_original, |
|
|
inputs=[current_images, current_json], |
|
|
outputs=[download_original_file] |
|
|
) |
|
|
|
|
|
export_sharegpt_btn.click( |
|
|
fn=handle_export_sharegpt, |
|
|
inputs=[current_sharegpt], |
|
|
outputs=[download_sharegpt_file] |
|
|
) |
|
|
|
|
|
|
|
|
demo.launch() |