Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Convert VisDrone dataset from three separate splits to a single unified COCO format dataset. | |
| Dataset structure: | |
| /mnt/archive/person_drone/VisDrone2019-DET-train/ | |
| βββ images/ | |
| βββ annotations/ | |
| /mnt/archive/person_drone/VisDrone2019-DET-val/ | |
| βββ images/ | |
| βββ annotations/ | |
| /mnt/archive/person_drone/VisDrone2019-DET-test-dev/ | |
| βββ images/ | |
| βββ annotations/ | |
| VisDrone annotation format: <bbox_left>,<bbox_top>,<bbox_width>,<bbox_height>,<score>,<object_category>,<truncation>,<occlusion> | |
| - bbox_left, bbox_top: Top-left corner coordinates (absolute pixels) | |
| - bbox_width, bbox_height: Box dimensions (absolute pixels) | |
| - score: Confidence score (usually 0 or 1) | |
| - object_category: Object class ID (0=ignored, 1=pedestrian, 2=people, 3=bicycle, 4=car, 5=van, 6=truck, 7=tricycle, 8=awning-tricycle, 9=bus, 10=motor) | |
| - truncation: Truncation flag (0=not truncated, 1=truncated) | |
| - occlusion: Occlusion flag (0=not occluded, 1=occluded) | |
| COCO format: [x, y, width, height] (absolute coordinates for top-left corner) | |
| """ | |
| import json | |
| import os | |
| import argparse | |
| from pathlib import Path | |
| from PIL import Image | |
| from typing import Dict, List, Tuple | |
| from datetime import datetime | |
| def get_visdrone_categories() -> Dict[int, Dict]: | |
| """Get VisDrone category mapping.""" | |
| # VisDrone categories - 0 is ignored regions, so we skip it | |
| class_names = { | |
| 1: "pedestrian", | |
| 2: "people", | |
| 3: "bicycle", | |
| 4: "car", | |
| 5: "van", | |
| 6: "truck", | |
| 7: "tricycle", | |
| 8: "awning-tricycle", | |
| 9: "bus", | |
| 10: "motor" | |
| } | |
| category_mapping = {} | |
| for class_id, name in class_names.items(): | |
| category_mapping[class_id] = { | |
| "id": class_id, # Keep original VisDrone IDs | |
| "name": name, | |
| "supercategory": "vehicle" if name not in ["pedestrian", "people"] else "person" | |
| } | |
| return category_mapping | |
| def parse_visdrone_annotation(label_file: str) -> List[List[int]]: | |
| """ | |
| Parse VisDrone annotation file. | |
| Args: | |
| label_file: Path to .txt annotation file | |
| Returns: | |
| List of [bbox_left, bbox_top, bbox_width, bbox_height, score, object_category, truncation, occlusion] for each detection | |
| """ | |
| annotations = [] | |
| if not os.path.exists(label_file): | |
| return annotations | |
| with open(label_file, 'r') as f: | |
| content = f.read().strip() | |
| if not content: | |
| return annotations | |
| lines = content.split('\n') | |
| for line_num, line in enumerate(lines): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| # Parse comma-separated values, filter out empty strings from trailing commas | |
| parts = [part.strip() for part in line.split(',') if part.strip()] | |
| values = list(map(int, parts)) | |
| # Each detection has 8 values: bbox_left, bbox_top, bbox_width, bbox_height, score, object_category, truncation, occlusion | |
| if len(values) != 8: | |
| print(f"Warning: Unexpected format in {label_file} line {line_num + 1}, expected 8 values, got {len(values)}: {line}") | |
| continue | |
| annotations.append(values) | |
| except ValueError as e: | |
| print(f"Warning: Could not parse line {line_num + 1} in {label_file}: {line} - {e}") | |
| continue | |
| return annotations | |
| def process_split_to_coco( | |
| split_dir: str, | |
| split_name: str, | |
| category_mapping: Dict[int, Dict], | |
| start_img_id: int = 1, | |
| start_ann_id: int = 1 | |
| ) -> Tuple[List[Dict], List[Dict], int, int]: | |
| """ | |
| Process a VisDrone split and convert to COCO format components. | |
| Args: | |
| split_dir: Directory containing images/ and annotations/ subdirectories | |
| split_name: Name of the split (train/val/test-dev) | |
| category_mapping: Mapping of class_id to category info | |
| start_img_id: Starting image ID for this split | |
| start_ann_id: Starting annotation ID for this split | |
| Returns: | |
| images_list, annotations_list, next_img_id, next_ann_id | |
| """ | |
| split_path = Path(split_dir) | |
| images_dir = split_path / "images" | |
| annotations_dir = split_path / "annotations" | |
| if not images_dir.exists() or not annotations_dir.exists(): | |
| print(f"Warning: Missing images or annotations directory in {split_dir}") | |
| return [], [], start_img_id, start_ann_id | |
| images_list = [] | |
| annotations_list = [] | |
| img_id = start_img_id | |
| ann_id = start_ann_id | |
| # Get all image files | |
| image_extensions = {'.jpg', '.jpeg', '.png', '.bmp'} | |
| image_files = [] | |
| for ext in image_extensions: | |
| image_files.extend(images_dir.glob(f"*{ext}")) | |
| image_files.extend(images_dir.glob(f"*{ext.upper()}")) | |
| image_files = sorted(image_files) | |
| print(f"Processing {len(image_files)} images in {split_name} split...") | |
| for img_file in image_files: | |
| # Get corresponding annotation file | |
| annotation_file = annotations_dir / f"{img_file.stem}.txt" | |
| # Open image to get dimensions | |
| try: | |
| with Image.open(img_file) as img: | |
| img_width, img_height = img.size | |
| except Exception as e: | |
| print(f"Error opening image {img_file}: {e}") | |
| continue | |
| # Add image info with split prefix to avoid filename conflicts | |
| relative_path = f"{split_name}_images/{img_file.name}" | |
| images_list.append({ | |
| "id": img_id, | |
| "file_name": relative_path, | |
| "width": img_width, | |
| "height": img_height, | |
| "license": 1 | |
| }) | |
| # Parse annotations | |
| visdrone_annotations = parse_visdrone_annotation(str(annotation_file)) | |
| for visdrone_ann in visdrone_annotations: | |
| bbox_left, bbox_top, bbox_width, bbox_height, score, object_category, truncation, occlusion = visdrone_ann | |
| # Skip ignored regions (category 0) | |
| if object_category == 0: | |
| continue | |
| # Skip unknown categories | |
| if object_category not in category_mapping: | |
| continue | |
| # Ensure bbox is within image bounds | |
| bbox_left = max(0, min(bbox_left, img_width - 1)) | |
| bbox_top = max(0, min(bbox_top, img_height - 1)) | |
| bbox_width = min(bbox_width, img_width - bbox_left) | |
| bbox_height = min(bbox_height, img_height - bbox_top) | |
| # Skip invalid bboxes | |
| if bbox_width <= 0 or bbox_height <= 0: | |
| continue | |
| area = bbox_width * bbox_height | |
| annotations_list.append({ | |
| "id": ann_id, | |
| "image_id": img_id, | |
| "category_id": category_mapping[object_category]["id"], | |
| "bbox": [bbox_left, bbox_top, bbox_width, bbox_height], # COCO format: [x, y, width, height] | |
| "area": area, | |
| "iscrowd": 0, | |
| "segmentation": [], | |
| # Additional VisDrone specific fields | |
| "score": score, | |
| "truncation": truncation, | |
| "occlusion": occlusion | |
| }) | |
| ann_id += 1 | |
| img_id += 1 | |
| return images_list, annotations_list, img_id, ann_id | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Convert VisDrone dataset splits to single unified COCO format") | |
| parser.add_argument( | |
| "--train_dir", | |
| type=str, | |
| default="/mnt/archive/person_drone/VisDrone2019-DET-train", | |
| help="Path to VisDrone train split directory" | |
| ) | |
| parser.add_argument( | |
| "--val_dir", | |
| type=str, | |
| default="/mnt/archive/person_drone/VisDrone2019-DET-val", | |
| help="Path to VisDrone val split directory" | |
| ) | |
| parser.add_argument( | |
| "--test_dir", | |
| type=str, | |
| default="/mnt/archive/person_drone/VisDrone2019-DET-test-dev", | |
| help="Path to VisDrone test-dev split directory" | |
| ) | |
| parser.add_argument( | |
| "--output_dir", | |
| type=str, | |
| default="/home/svakhreev/projects/DEIM/data/visdrone_coco", | |
| help="Output directory for unified COCO format files" | |
| ) | |
| args = parser.parse_args() | |
| output_dir = Path(args.output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Get category mapping | |
| category_mapping = get_visdrone_categories() | |
| print(f"Categories: {category_mapping}") | |
| # Initialize COCO data structure | |
| coco_data = { | |
| "info": { | |
| "year": 2024, | |
| "version": "1.0", | |
| "description": "VisDrone Dataset - Combined train/val/test-dev splits in COCO format", | |
| "contributor": "VisDrone Dataset", | |
| "url": "https://github.com/VisDrone/VisDrone-Dataset", | |
| "date_created": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }, | |
| "licenses": [ | |
| { | |
| "id": 1, | |
| "name": "Academic Use", | |
| "url": "" | |
| } | |
| ], | |
| "categories": list(category_mapping.values()), | |
| "images": [], | |
| "annotations": [] | |
| } | |
| # Process each split | |
| splits = [ | |
| (args.train_dir, "train"), | |
| (args.val_dir, "val"), | |
| (args.test_dir, "test") | |
| ] | |
| img_id = 1 | |
| ann_id = 1 | |
| for split_dir, split_name in splits: | |
| if not Path(split_dir).exists(): | |
| print(f"Warning: {split_dir} does not exist, skipping {split_name}") | |
| continue | |
| print(f"\nProcessing {split_name} split from {split_dir}...") | |
| images_list, annotations_list, next_img_id, next_ann_id = process_split_to_coco( | |
| split_dir, | |
| split_name, | |
| category_mapping, | |
| img_id, | |
| ann_id | |
| ) | |
| # Add to combined dataset | |
| coco_data["images"].extend(images_list) | |
| coco_data["annotations"].extend(annotations_list) | |
| # Update IDs for next split | |
| img_id = next_img_id | |
| ann_id = next_ann_id | |
| print(f"Added {len(images_list)} images and {len(annotations_list)} annotations from {split_name}") | |
| # Save unified COCO annotation file | |
| output_file = output_dir / "annotations.json" | |
| with open(output_file, 'w') as f: | |
| json.dump(coco_data, f, indent=2) | |
| print(f"\nConversion complete!") | |
| print(f"Total: {len(coco_data['images'])} images and {len(coco_data['annotations'])} annotations") | |
| print(f"Saved unified COCO format to {output_file}") | |
| # Create symlinks to original image directories for easy access | |
| for split_dir, split_name in splits: | |
| if not Path(split_dir).exists(): | |
| continue | |
| original_images_dir = Path(split_dir) / "images" | |
| symlink_dir = output_dir / f"{split_name}_images" | |
| if original_images_dir.exists() and not symlink_dir.exists(): | |
| try: | |
| symlink_dir.symlink_to(original_images_dir.resolve()) | |
| print(f"Created symlink: {symlink_dir} -> {original_images_dir}") | |
| except Exception as e: | |
| print(f"Warning: Could not create symlink for {split_name}: {e}") | |
| # Save dataset information for reference | |
| info_file = output_dir / "dataset_info.json" | |
| with open(info_file, 'w') as f: | |
| json.dump({ | |
| 'splits_processed': [split_name for split_dir, split_name in splits if Path(split_dir).exists()], | |
| 'total_images': len(coco_data['images']), | |
| 'total_annotations': len(coco_data['annotations']), | |
| 'categories': category_mapping, | |
| 'args': vars(args) | |
| }, f, indent=2) | |
| print(f"Dataset info saved to {info_file}") | |
| if __name__ == "__main__": | |
| main() | |