AbstractPhil's picture
Update app.py
eb8e393 verified
raw
history blame
20.6 kB
"""
Lyra/Lune Flow-Matching Inference Space
Author: AbstractPhil
License: MIT
SD1.5-based flow matching with geometric crystalline architectures.
"""
import os
import torch
import gradio as gr
import numpy as np
from PIL import Image
from typing import Optional, Dict
import spaces
from diffusers import (
UNet2DConditionModel,
AutoencoderKL,
DPMSolverMultistepScheduler,
EulerDiscreteScheduler
)
from transformers import CLIPTextModel, CLIPTokenizer
from huggingface_hub import hf_hub_download
# ============================================================================
# MODEL LOADING
# ============================================================================
class FlowMatchingPipeline:
"""Custom pipeline for flow-matching inference."""
def __init__(
self,
vae: AutoencoderKL,
text_encoder: CLIPTextModel,
tokenizer: CLIPTokenizer,
unet: UNet2DConditionModel,
scheduler,
device: str = "cuda"
):
self.vae = vae
self.text_encoder = text_encoder
self.tokenizer = tokenizer
self.unet = unet
self.scheduler = scheduler
self.device = device
# VAE scaling factor
self.vae_scale_factor = 0.18215
def encode_prompt(self, prompt: str, negative_prompt: str = ""):
"""Encode text prompts to embeddings."""
# Positive prompt
text_inputs = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids.to(self.device)
with torch.no_grad():
prompt_embeds = self.text_encoder(text_input_ids)[0]
# Negative prompt
if negative_prompt:
uncond_inputs = self.tokenizer(
negative_prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
uncond_input_ids = uncond_inputs.input_ids.to(self.device)
with torch.no_grad():
negative_prompt_embeds = self.text_encoder(uncond_input_ids)[0]
else:
negative_prompt_embeds = torch.zeros_like(prompt_embeds)
return prompt_embeds, negative_prompt_embeds
@torch.no_grad()
def __call__(
self,
prompt: str,
negative_prompt: str = "",
height: int = 512,
width: int = 512,
num_inference_steps: int = 20,
guidance_scale: float = 7.5,
shift: float = 2.5,
use_flow_matching: bool = True,
prediction_type: str = "epsilon",
seed: Optional[int] = None,
progress_callback=None
):
"""Generate image using flow matching or standard diffusion."""
# Set seed
if seed is not None:
generator = torch.Generator(device=self.device).manual_seed(seed)
else:
generator = None
# Encode prompts
prompt_embeds, negative_prompt_embeds = self.encode_prompt(
prompt, negative_prompt
)
# Prepare latents
latent_channels = 4
latent_height = height // 8
latent_width = width // 8
latents = torch.randn(
(1, latent_channels, latent_height, latent_width),
generator=generator,
device=self.device,
dtype=torch.float32
)
# Set timesteps
self.scheduler.set_timesteps(num_inference_steps, device=self.device)
timesteps = self.scheduler.timesteps
# Denoising loop
for i, t in enumerate(timesteps):
if progress_callback:
progress_callback(i, num_inference_steps, f"Step {i+1}/{num_inference_steps}")
# Expand latents for classifier-free guidance
latent_model_input = torch.cat([latents] * 2) if guidance_scale > 1.0 else latents
# Apply shift for flow matching
if use_flow_matching and shift > 0:
# Compute sigma from timestep with shift
sigma = t.float() / 1000.0
sigma_shifted = (shift * sigma) / (1 + (shift - 1) * sigma)
# Scale latent input
scaling = torch.sqrt(1 + sigma_shifted ** 2)
latent_model_input = latent_model_input / scaling
# Prepare timestep
timestep = t.expand(latent_model_input.shape[0])
# Predict noise/velocity
text_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if guidance_scale > 1.0 else prompt_embeds
noise_pred = self.unet(
latent_model_input,
timestep,
encoder_hidden_states=text_embeds,
return_dict=False
)[0]
# Classifier-free guidance
if guidance_scale > 1.0:
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# Flow matching step
if use_flow_matching:
# Manual flow matching update
sigma = t.float() / 1000.0
sigma_shifted = (shift * sigma) / (1 + (shift - 1) * sigma)
if prediction_type == "v_prediction":
# Convert v-prediction to epsilon
v_pred = noise_pred
alpha_t = torch.sqrt(1 - sigma_shifted ** 2)
sigma_t = sigma_shifted
noise_pred = alpha_t * v_pred + sigma_t * latents
# Compute next latent
dt = -1.0 / num_inference_steps
latents = latents + dt * noise_pred
else:
# Standard scheduler step
latents = self.scheduler.step(
noise_pred, t, latents, return_dict=False
)[0]
# Decode latents
latents = latents / self.vae_scale_factor
with torch.no_grad():
image = self.vae.decode(latents).sample
# Convert to PIL
image = (image / 2 + 0.5).clamp(0, 1)
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
image = (image * 255).round().astype("uint8")
image = Image.fromarray(image[0])
return image
def load_lune_checkpoint(repo_id: str, filename: str, device: str = "cuda"):
"""Load Lune checkpoint from .pt file."""
print(f"📥 Downloading checkpoint: {repo_id}/{filename}")
checkpoint_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
repo_type="model"
)
print(f"✓ Downloaded to: {checkpoint_path}")
print(f"📦 Loading checkpoint...")
checkpoint = torch.load(checkpoint_path, map_location="cpu")
# Initialize UNet with SD1.5 config
print(f"🏗️ Initializing SD1.5 UNet...")
unet = UNet2DConditionModel.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="unet",
torch_dtype=torch.float32
)
# Load student weights
student_state_dict = checkpoint["student"]
# Strip "unet." prefix if present
cleaned_dict = {}
for key, value in student_state_dict.items():
if key.startswith("unet."):
cleaned_dict[key[5:]] = value
else:
cleaned_dict[key] = value
# Load weights
unet.load_state_dict(cleaned_dict, strict=False)
step = checkpoint.get("gstep", "unknown")
print(f"✅ Loaded checkpoint from step {step}")
return unet.to(device)
def initialize_pipeline(model_choice: str, device: str = "cuda"):
"""Initialize the complete pipeline."""
print(f"🚀 Initializing {model_choice} pipeline...")
# Load base components
print("Loading VAE...")
vae = AutoencoderKL.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="vae",
torch_dtype=torch.float32
).to(device)
print("Loading text encoder...")
text_encoder = CLIPTextModel.from_pretrained(
"openai/clip-vit-large-patch14",
torch_dtype=torch.float32
).to(device)
tokenizer = CLIPTokenizer.from_pretrained(
"openai/clip-vit-large-patch14"
)
# Load UNet based on model choice
if model_choice == "Flow-Lune (Latest)":
# Load latest checkpoint from repo
repo_id = "AbstractPhil/sd15-flow-lune"
# Find latest checkpoint - for now use a known one
filename = "sd15_flow_lune_e34_s34000.pt"
unet = load_lune_checkpoint(repo_id, filename, device)
elif model_choice == "SD1.5 Base":
print("Loading SD1.5 base UNet...")
unet = UNet2DConditionModel.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="unet",
torch_dtype=torch.float32
).to(device)
else:
raise ValueError(f"Unknown model: {model_choice}")
# Initialize scheduler
scheduler = EulerDiscreteScheduler.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="scheduler"
)
print("✅ Pipeline initialized!")
return FlowMatchingPipeline(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
device=device
)
# ============================================================================
# GLOBAL STATE
# ============================================================================
# Initialize with None, will load on first inference
CURRENT_PIPELINE = None
CURRENT_MODEL = None
def get_pipeline(model_choice: str):
"""Get or create pipeline for selected model."""
global CURRENT_PIPELINE, CURRENT_MODEL
if CURRENT_PIPELINE is None or CURRENT_MODEL != model_choice:
CURRENT_PIPELINE = initialize_pipeline(model_choice, device="cuda")
CURRENT_MODEL = model_choice
return CURRENT_PIPELINE
# ============================================================================
# INFERENCE
# ============================================================================
def estimate_duration(num_steps: int, width: int, height: int) -> int:
"""Estimate GPU duration based on generation parameters."""
# Base time per step (seconds)
base_time_per_step = 0.3
# Resolution scaling
resolution_factor = (width * height) / (512 * 512)
# Total estimate
estimated = num_steps * base_time_per_step * resolution_factor
# Add 15 seconds for model loading overhead
return int(estimated + 15)
@spaces.GPU(duration=lambda *args: estimate_duration(args[3], args[5], args[6]))
def generate_image(
prompt: str,
negative_prompt: str,
model_choice: str,
num_steps: int,
cfg_scale: float,
width: int,
height: int,
shift: float,
use_flow_matching: bool,
prediction_type: str,
seed: int,
randomize_seed: bool,
progress=gr.Progress()
):
"""Generate image with ZeroGPU support."""
# Randomize seed if requested
if randomize_seed:
seed = np.random.randint(0, 2**32 - 1)
# Progress tracking
def progress_callback(step, total, desc):
progress((step + 1) / total, desc=desc)
try:
# Get pipeline
pipeline = get_pipeline(model_choice)
# Generate
progress(0.05, desc="Starting generation...")
image = pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
height=height,
width=width,
num_inference_steps=num_steps,
guidance_scale=cfg_scale,
shift=shift,
use_flow_matching=use_flow_matching,
prediction_type=prediction_type,
seed=seed,
progress_callback=progress_callback
)
progress(1.0, desc="Complete!")
return image, seed
except Exception as e:
print(f"❌ Generation failed: {e}")
raise e
# ============================================================================
# GRADIO UI
# ============================================================================
def create_demo():
"""Create Gradio interface."""
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🌙 Lyra/Lune Flow-Matching Image Generation
**Geometric crystalline diffusion with flow matching** by [AbstractPhil](https://huggingface.co/AbstractPhil)
Generate images using SD1.5-based flow matching with pentachoron geometric structures.
Achieves high quality with dramatically reduced step counts through geometric efficiency.
""")
with gr.Row():
with gr.Column(scale=1):
# Prompt
prompt = gr.TextArea(
label="Prompt",
placeholder="A beautiful landscape with mountains and a lake at sunset...",
lines=3
)
negative_prompt = gr.TextArea(
label="Negative Prompt",
placeholder="blurry, low quality, distorted...",
lines=2
)
# Model selection
model_choice = gr.Dropdown(
label="Model",
choices=[
"Flow-Lune (Latest)",
"SD1.5 Base"
],
value="Flow-Lune (Latest)"
)
# Flow matching settings
with gr.Accordion("Flow Matching Settings", open=True):
use_flow_matching = gr.Checkbox(
label="Enable Flow Matching",
value=True,
info="Use flow matching ODE integration"
)
shift = gr.Slider(
label="Shift",
minimum=0.0,
maximum=5.0,
value=2.5,
step=0.1,
info="Flow matching shift parameter (0=disabled, 1-3 typical)"
)
prediction_type = gr.Radio(
label="Prediction Type",
choices=["epsilon", "v_prediction"],
value="epsilon",
info="Type of model prediction"
)
# Generation settings
with gr.Accordion("Generation Settings", open=True):
num_steps = gr.Slider(
label="Steps",
minimum=1,
maximum=50,
value=20,
step=1,
info="Flow matching typically needs fewer steps (15-25)"
)
cfg_scale = gr.Slider(
label="CFG Scale",
minimum=1.0,
maximum=20.0,
value=7.5,
step=0.5
)
with gr.Row():
width = gr.Slider(
label="Width",
minimum=256,
maximum=1024,
value=512,
step=64
)
height = gr.Slider(
label="Height",
minimum=256,
maximum=1024,
value=512,
step=64
)
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=2**32 - 1,
value=42,
step=1
)
randomize_seed = gr.Checkbox(
label="Randomize Seed",
value=True
)
generate_btn = gr.Button("🎨 Generate", variant="primary", size="lg")
with gr.Column(scale=1):
output_image = gr.Image(
label="Generated Image",
type="pil"
)
output_seed = gr.Number(
label="Used Seed",
precision=0
)
gr.Markdown("""
### Tips:
- **Flow matching** works best with 15-25 steps (vs 50+ for standard diffusion)
- **Shift** controls the flow trajectory (2.0-2.5 recommended for Lune)
- Lower shift = more direct path, higher shift = more exploration
- Try **v_prediction** mode if epsilon gives unstable results
### Model Info:
- **Flow-Lune**: Trained with flow matching on 500k SD1.5 distillation pairs
- **SD1.5 Base**: Standard Stable Diffusion 1.5 for comparison
[📚 Learn more about geometric deep learning](https://github.com/AbstractEyes/lattice_vocabulary)
""")
# Examples
gr.Examples(
examples=[
[
"A serene mountain landscape at golden hour, crystal clear lake reflecting snow-capped peaks, photorealistic, 8k",
"blurry, low quality",
"Flow-Lune (Latest)",
20,
7.5,
512,
512,
2.5,
True,
"epsilon",
42,
False
],
[
"A futuristic cyberpunk city at night, neon lights, rain-slicked streets, highly detailed",
"low quality, blurry",
"Flow-Lune (Latest)",
22,
8.0,
512,
512,
2.5,
True,
"epsilon",
123,
False
],
[
"Portrait of a majestic lion, golden mane, dramatic lighting, wildlife photography",
"cartoon, painting",
"Flow-Lune (Latest)",
18,
7.0,
512,
512,
2.0,
True,
"epsilon",
456,
False
]
],
inputs=[
prompt, negative_prompt, model_choice, num_steps, cfg_scale,
width, height, shift, use_flow_matching, prediction_type,
seed, randomize_seed
],
outputs=[output_image, output_seed],
fn=generate_image,
cache_examples=False
)
# Event handlers
generate_btn.click(
fn=generate_image,
inputs=[
prompt, negative_prompt, model_choice, num_steps, cfg_scale,
width, height, shift, use_flow_matching, prediction_type,
seed, randomize_seed
],
outputs=[output_image, output_seed]
)
return demo
# ============================================================================
# LAUNCH
# ============================================================================
if __name__ == "__main__":
demo = create_demo()
demo.queue(max_size=20)
demo.launch(show_api=False)