lucid-hf's picture
CI: deploy Docker/PDM Space
98a3af2 verified
#!/usr/bin/env python3
"""
Script to split COCO formatted dataset into train and validation sets.
This script:
- Loads the existing train.json from /media/fast/drone_train/drone_ds
- Creates a validation set with 3000 images:
- 1500 images from drone datasets (equally distributed)
- 1500 images from objects365
- Saves the validation annotations as val.json
- Updates train.json to exclude validation images
"""
import json
import random
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Set, Tuple
import logging
from tqdm import tqdm
import shutil
from datetime import datetime
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TrainValSplitter:
def __init__(self, data_dir: str, val_size: int = 3000, seed: int = 42):
"""
Initialize the train/validation splitter.
Args:
data_dir: Path to the dataset directory containing train.json
val_size: Total number of images for validation set
seed: Random seed for reproducibility
"""
self.data_dir = Path(data_dir)
self.train_json_path = self.data_dir / "train.json"
self.val_json_path = self.data_dir / "val.json"
self.val_size = val_size
self.seed = seed
# Define drone datasets based on coco_dataset.py
self.drone_datasets = [
"rgbt_drone_person",
"search_and_rescue",
"stanford_drone",
"visdrone2019",
"vtsar",
"vtuav",
"wisard"
]
# Set random seed for reproducibility
random.seed(seed)
# Data structures
self.data = None
self.images_by_dataset = defaultdict(list)
self.image_id_to_annotations = defaultdict(list)
def load_data(self):
"""Load the original train.json file."""
logger.info(f"Loading data from {self.train_json_path}")
if not self.train_json_path.exists():
raise FileNotFoundError(f"Train file not found: {self.train_json_path}")
with open(self.train_json_path, 'r') as f:
self.data = json.load(f)
logger.info(f"Loaded {len(self.data['images'])} images and {len(self.data['annotations'])} annotations")
def organize_data(self):
"""Organize images by dataset and create annotation mappings."""
logger.info("Organizing data by dataset...")
# Group images by dataset
for img in tqdm(self.data['images'], desc="Grouping images"):
dataset = img.get('dataset', 'unknown')
self.images_by_dataset[dataset].append(img)
# Map annotations to images
for ann in tqdm(self.data['annotations'], desc="Mapping annotations"):
self.image_id_to_annotations[ann['image_id']].append(ann)
# Print statistics
logger.info("\nDataset statistics:")
for dataset, images in sorted(self.images_by_dataset.items()):
logger.info(f" {dataset}: {len(images)} images")
def select_validation_images(self) -> Tuple[Set[int], Dict[str, List[int]]]:
"""
Select images for validation set according to the rules.
Returns:
Tuple of (set of validation image IDs, dict of dataset to selected image IDs)
"""
val_image_ids = set()
val_images_by_dataset = defaultdict(list)
# Calculate how many images to take from each drone dataset
drone_val_total = self.val_size // 2 # 1500
num_drone_datasets = len(self.drone_datasets)
images_per_drone = drone_val_total // num_drone_datasets # ~214
logger.info(f"\nSelecting validation images:")
logger.info(f" Total validation size: {self.val_size}")
logger.info(f" Drone datasets allocation: {drone_val_total} images")
logger.info(f" Objects365 allocation: {self.val_size - drone_val_total} images")
logger.info(f" Images per drone dataset: ~{images_per_drone}")
# Select from drone datasets
drone_selected_total = 0
for dataset in self.drone_datasets:
if dataset not in self.images_by_dataset:
logger.warning(f" Dataset '{dataset}' not found in data")
continue
available_images = self.images_by_dataset[dataset]
num_to_select = min(images_per_drone, len(available_images))
if num_to_select < images_per_drone:
logger.warning(f" {dataset}: only {len(available_images)} images available, selecting all")
selected = random.sample(available_images, num_to_select)
for img in selected:
val_image_ids.add(img['id'])
val_images_by_dataset[dataset].append(img['id'])
drone_selected_total += num_to_select
logger.info(f" {dataset}: selected {num_to_select} images")
# Adjust objects365 selection based on actual drone selection
objects365_needed = self.val_size - drone_selected_total
# Select from objects365
if 'objects365' in self.images_by_dataset:
available_objects365 = self.images_by_dataset['objects365']
num_to_select = min(objects365_needed, len(available_objects365))
selected = random.sample(available_objects365, num_to_select)
for img in selected:
val_image_ids.add(img['id'])
val_images_by_dataset['objects365'].append(img['id'])
logger.info(f" objects365: selected {num_to_select} images")
else:
logger.warning(" objects365 dataset not found")
logger.info(f"\nTotal validation images selected: {len(val_image_ids)}")
return val_image_ids, val_images_by_dataset
def split_data(self, val_image_ids: Set[int]) -> Tuple[Dict, Dict]:
"""
Split the data into train and validation sets.
Args:
val_image_ids: Set of image IDs selected for validation
Returns:
Tuple of (train_data, val_data) dictionaries
"""
logger.info("\nSplitting data into train and validation sets...")
# Initialize train and val data structures
train_data = {
"info": self.data.get("info", {}),
"licenses": self.data.get("licenses", []),
"categories": self.data.get("categories", []),
"images": [],
"annotations": []
}
val_data = {
"info": self.data.get("info", {}),
"licenses": self.data.get("licenses", []),
"categories": self.data.get("categories", []),
"images": [],
"annotations": []
}
# Split images
for img in tqdm(self.data['images'], desc="Splitting images"):
if img['id'] in val_image_ids:
val_data['images'].append(img)
else:
train_data['images'].append(img)
# Split annotations
for ann in tqdm(self.data['annotations'], desc="Splitting annotations"):
if ann['image_id'] in val_image_ids:
val_data['annotations'].append(ann)
else:
train_data['annotations'].append(ann)
logger.info(f"Train set: {len(train_data['images'])} images, {len(train_data['annotations'])} annotations")
logger.info(f"Val set: {len(val_data['images'])} images, {len(val_data['annotations'])} annotations")
return train_data, val_data
def save_splits(self, train_data: Dict, val_data: Dict, backup: bool = True):
"""
Save the train and validation splits to JSON files.
Args:
train_data: Training data dictionary
val_data: Validation data dictionary
backup: Whether to create a backup of the original train.json
"""
# Create backup of original train.json if requested
if backup and self.train_json_path.exists():
backup_path = self.train_json_path.with_suffix(f'.backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
logger.info(f"Creating backup: {backup_path}")
shutil.copy2(self.train_json_path, backup_path)
# Save validation set
logger.info(f"Saving validation set to {self.val_json_path}")
with open(self.val_json_path, 'w') as f:
json.dump(val_data, f)
# Save updated training set
logger.info(f"Saving updated training set to {self.train_json_path}")
with open(self.train_json_path, 'w') as f:
json.dump(train_data, f)
logger.info("Split completed successfully!")
def print_statistics(self, val_images_by_dataset: Dict[str, List[int]]):
"""Print detailed statistics about the split."""
logger.info("\n" + "="*60)
logger.info("VALIDATION SET STATISTICS")
logger.info("="*60)
total_val_images = sum(len(ids) for ids in val_images_by_dataset.values())
# Drone datasets statistics
drone_total = 0
logger.info("\nDrone Datasets:")
for dataset in self.drone_datasets:
if dataset in val_images_by_dataset:
count = len(val_images_by_dataset[dataset])
drone_total += count
percentage = (count / total_val_images) * 100
logger.info(f" {dataset:20s}: {count:5d} images ({percentage:5.2f}%)")
logger.info(f"\nTotal Drone Images: {drone_total} ({(drone_total/total_val_images)*100:.1f}%)")
# Objects365 statistics
if 'objects365' in val_images_by_dataset:
obj365_count = len(val_images_by_dataset['objects365'])
logger.info(f"Objects365 Images: {obj365_count} ({(obj365_count/total_val_images)*100:.1f}%)")
logger.info(f"\nTotal Validation Images: {total_val_images}")
logger.info("="*60)
def run(self):
"""Execute the train/validation split."""
logger.info("Starting train/validation split...")
logger.info(f"Random seed: {self.seed}")
# Load data
self.load_data()
# Organize data
self.organize_data()
# Select validation images
val_image_ids, val_images_by_dataset = self.select_validation_images()
# Print statistics
self.print_statistics(val_images_by_dataset)
# Split data
train_data, val_data = self.split_data(val_image_ids)
# Save splits
self.save_splits(train_data, val_data, backup=True)
logger.info("\nProcess completed successfully!")
def main():
"""Main function to execute the train/validation split."""
import argparse
parser = argparse.ArgumentParser(
description="Split COCO dataset into train and validation sets"
)
parser.add_argument(
"--data-dir",
type=str,
default="/media/fast/drone_train/drone_ds",
help="Path to dataset directory containing train.json"
)
parser.add_argument(
"--val-size",
type=int,
default=3000,
help="Number of images for validation set"
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed for reproducibility"
)
parser.add_argument(
"--no-backup",
action="store_true",
help="Don't create backup of original train.json"
)
args = parser.parse_args()
# Create splitter and run
splitter = TrainValSplitter(
data_dir=args.data_dir,
val_size=args.val_size,
seed=args.seed
)
try:
splitter.run()
except Exception as e:
logger.error(f"Error during split: {e}")
raise
if __name__ == "__main__":
main()