data-view / app.py
stzhao's picture
Update app.py
4dcfc4d verified
import json
import os
from typing import Optional, Union
from PIL import Image
import base64
from io import BytesIO
import gradio as gr
import markdown
import zipfile
import tempfile
from datetime import datetime
import re
def export_to_zip(images, conversations, format_type="original"):
"""
Export images and conversation data to a ZIP file
Args:
images: List of extracted images
conversations: Conversation JSON data
format_type: Format type, "original" or "sharegpt"
Returns:
Path to the generated ZIP file
"""
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = os.path.join(temp_dir, f"export_{timestamp}.zip")
# Create a ZIP file
with zipfile.ZipFile(zip_filename, 'w') as zipf:
# Save images
for i, img in enumerate(images):
img_path = os.path.join(temp_dir, f"image_{i}.png")
img.save(img_path)
zipf.write(img_path, f"images/image_{i}.png")
os.remove(img_path) # Delete temporary image file
# Save conversation data
json_path = os.path.join(temp_dir, "conversations.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(conversations, f, ensure_ascii=False, indent=4)
zipf.write(json_path, "conversations.json")
os.remove(json_path) # Delete temporary JSON file
return zip_filename
def base64_to_image(
base64_str: str,
remove_prefix: bool = True,
convert_mode: Optional[str] = "RGB"
) -> Union[Image.Image, None]:
"""
Convert a base64 encoded image string to a PIL Image object
Args:
base64_str: Base64 encoded image string (with or without data: prefix)
remove_prefix: Whether to automatically remove the "data:image/..." prefix (default True)
convert_mode: Convert to the specified mode (e.g., "RGB"/"RGBA", None means no conversion)
Returns:
PIL.Image.Image object, returns None if decoding fails
"""
try:
# 1. Handle Base64 prefix
if remove_prefix and "," in base64_str:
base64_str = base64_str.split(",")[1]
# 2. Decode Base64
image_data = base64.b64decode(base64_str)
# 3. Convert to PIL Image
image = Image.open(BytesIO(image_data))
# 4. Optional mode conversion
if convert_mode:
image = image.convert(convert_mode)
return image
except (base64.binascii.Error, OSError, Exception) as e:
print(f"Base64 decoding failed: {str(e)}")
return None
def process_message_to_sharegpt_format(message):
"""
Convert messages to ShareGPT format
Args:
message: Original message data
Returns:
Data in ShareGPT format
"""
sharegpt_images = []
sharegpt_conversation = []
image_idx = 0
for i, message_item in enumerate(message):
role = message_item['role']
content_list = message_item['content']
whole_content = ""
for content_item in content_list:
content_type = content_item['type']
if content_type == "text":
content_value = content_item['text']
whole_content += content_value
elif content_type == "image_url":
content_value = content_item['image_url']['url']
whole_content += "<image>"
image = base64_to_image(content_value)
if image:
sharegpt_images.append(image)
image_idx += 1
if i == 0:
sharegpt_conversation.append({"from": "human", "value": whole_content})
continue
if "<interpreter>" in whole_content:
gpt_content, observation_content = whole_content.split("<interpreter>", -1)
sharegpt_conversation.append({"from": "gpt", "value": gpt_content})
sharegpt_conversation.append({"from": "observation", "value": "<interpreter>"+observation_content})
elif i != 0:
sharegpt_conversation.append({"from": "gpt", "value": whole_content})
sharegpt_data_item = {
"conversations": sharegpt_conversation,
"images": sharegpt_images
}
return sharegpt_data_item
def extract_images_from_messages(messages):
"""
Extract all images from messages
Args:
messages: Message JSON data
Returns:
Extracted image list and updated messages
"""
images = []
for message in messages:
if 'content' in message and isinstance(message['content'], list):
for content_item in message['content']:
if content_item.get('type') == 'image_url':
image_url = content_item.get('image_url', {}).get('url', '')
if image_url.startswith('data:'):
# Extract base64 image
image = base64_to_image(image_url)
if image:
images.append(image)
return images, messages
def process_message(file_path):
try:
# Read JSON file
with open(file_path, "r", encoding="utf-8") as f:
messages = json.load(f)
# Extract images
images, messages = extract_images_from_messages(messages)
# Convert to ShareGPT format
sharegpt_data = process_message_to_sharegpt_format(messages)
# Create HTML output
html_output = '<div style="color: black;">' # Add a wrapper div for all content, set text color black
for message_item in messages:
role = message_item['role']
content = message_item['content']
# Style based on role
if role == "user" or role == "human":
html_output += f'<div style="background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>User:</strong><br>'
elif role == "assistant":
html_output += f'<div style="background-color: #e6f7ff; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>Assistant:</strong><br>'
else:
html_output += f'<div style="background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>{role.capitalize()}:</strong><br>'
# Handle content
for content_item in content:
content_type = content_item['type']
if content_type == "text":
# Convert Markdown text to HTML
md_text = content_item['text']
html_text = markdown.markdown(md_text, extensions=['fenced_code', 'codehilite'])
html_output += f'<div style="color: black;">{html_text}</div>'
elif content_type == "image_url":
content_value = content_item['image_url']['url']
# If base64 image
if content_value.startswith("data:"):
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">'
else:
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">'
html_output += '</div>'
html_output += '</div>' # Close outermost div
return html_output, images, messages, sharegpt_data
except Exception as e:
return f"<div style='color: red;'>Error processing file: {str(e)}</div>", [], None, None
def upload_and_process(file):
if file is None:
return "Please upload a JSON file", [], None, None
html_output, images, messages, sharegpt_data = process_message(file.name)
return html_output, images, messages, sharegpt_data
def use_example():
# Use example file
example_path = "test_message_gpt.json"
return process_message(example_path)
def handle_export_original(images, conversations):
"""Handle export request for original format"""
if not images or conversations is None:
return None
zip_path = export_to_zip(images, conversations, "original")
return zip_path
def handle_export_sharegpt(sharegpt_data):
"""Handle export request for ShareGPT format"""
if sharegpt_data is None:
return None
images = sharegpt_data.get("images", [])
conversations = sharegpt_data.get("conversations", [])
if not images and not conversations:
return None
zip_path = export_to_zip(images, conversations, "sharegpt")
return zip_path
# Ensure example file exists
def setup_example_file():
# Here we need to create the example file because we don't have actual content
# In a real application, you should place the original test_message_gpt.json file in the root directory
example_path = "test_message_gpt.json"
# Create a simple example if the file does not exist
if not os.path.exists(example_path):
example_messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Hello, please introduce yourself."
}
]
},
{
"role": "assistant",
"content": [
{
"type": "text",
"text": "Hello! I am an AI assistant. I can help answer questions, provide information, and have conversations. I am designed to assist users with a variety of tasks, from simple Q&A to more complex discussions.\n\nI can handle text information and also understand and describe images. Although I have some limitations, I will do my best to provide useful, accurate, and helpful responses.\n\nHow can I help you today?"
}
]
}
]
with open(example_path, "w", encoding="utf-8") as f:
json.dump(example_messages, f, ensure_ascii=False, indent=2)
# Set up the example file
setup_example_file()
# Create Gradio interface
with gr.Blocks(title="ChatGPT Conversation Visualizer", css="div.prose * {color: black !important;}") as demo:
gr.Markdown("# ChatGPT Conversation Visualization Tool")
gr.Markdown("Upload a JSON file containing ChatGPT conversation records or use the example file to view visualization results.")
with gr.Row():
file_input = gr.File(label="Upload JSON File", file_types=[".json"])
with gr.Row():
col1, col2 = gr.Column(), gr.Column()
with col1:
visualize_button = gr.Button("Visualize Uploaded Conversation")
with col2:
example_button = gr.Button("Use Example File")
with gr.Row():
output = gr.HTML(label="Conversation Content")
# Add export buttons
with gr.Row():
with gr.Column():
export_original_btn = gr.Button("Export Original Format")
download_original_file = gr.File(label="Download Original Format ZIP")
with gr.Column():
export_sharegpt_btn = gr.Button("Export ShareGPT Format")
download_sharegpt_file = gr.File(label="Download ShareGPT Format ZIP")
# State variables to store current results
current_images = gr.State([])
current_json = gr.State(None)
current_sharegpt = gr.State(None)
visualize_button.click(
fn=upload_and_process,
inputs=[file_input],
outputs=[output, current_images, current_json, current_sharegpt]
)
example_button.click(
fn=use_example,
inputs=[],
outputs=[output, current_images, current_json, current_sharegpt]
)
export_original_btn.click(
fn=handle_export_original,
inputs=[current_images, current_json],
outputs=[download_original_file]
)
export_sharegpt_btn.click(
fn=handle_export_sharegpt,
inputs=[current_sharegpt],
outputs=[download_sharegpt_file]
)
# Launch Gradio app
demo.launch()