Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Convert CrowdHuman dataset from ODGT format to COCO format. | |
| Dataset structure: | |
| /mnt/archive/person_drone/crowd_human/ | |
| βββ annotation_train.odgt | |
| βββ annotation_val.odgt | |
| βββ CrowdHuman_train/ | |
| β βββ Images/ | |
| βββ CrowdHuman_val/ | |
| β βββ Images/ | |
| βββ CrowdHuman_test/ | |
| βββ Images/ | |
| ODGT format: One Dict per line in JSON format with: | |
| - fbox: full body bounding box [x, y, width, height] | |
| - vbox: visible body bounding box [x, y, width, height] | |
| - hbox: head bounding box [x, y, width, height] | |
| We use fbox (full body) for normal person bboxes as requested. | |
| """ | |
| import json | |
| import os | |
| import argparse | |
| from pathlib import Path | |
| from PIL import Image | |
| from typing import Dict, List, Tuple | |
| from datetime import datetime | |
| def parse_odgt_file(odgt_path: str) -> List[Dict]: | |
| """ | |
| Parse ODGT (One Dict per line) annotation file. | |
| Args: | |
| odgt_path: Path to .odgt annotation file | |
| Returns: | |
| List of annotation dictionaries | |
| """ | |
| annotations = [] | |
| if not os.path.exists(odgt_path): | |
| print(f"Warning: Annotation file {odgt_path} does not exist") | |
| return annotations | |
| with open(odgt_path, 'r') as f: | |
| for line_num, line in enumerate(f): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| annotation = json.loads(line) | |
| annotations.append(annotation) | |
| except json.JSONDecodeError as e: | |
| print(f"Warning: Could not parse line {line_num + 1} in {odgt_path}: {e}") | |
| continue | |
| return annotations | |
| def convert_split_to_coco( | |
| dataset_root: str, | |
| split_name: str, | |
| annotations_data: List[Dict], | |
| category_mapping: Dict[str, Dict], | |
| start_img_id: int = 1, | |
| start_ann_id: int = 1 | |
| ) -> Tuple[List[Dict], List[Dict], int, int]: | |
| """ | |
| Convert a CrowdHuman dataset split to COCO format components. | |
| Args: | |
| dataset_root: Root directory of CrowdHuman dataset | |
| split_name: Name of the split (train/val/test) | |
| annotations_data: Parsed ODGT annotations for this split | |
| category_mapping: Mapping of tag to category info | |
| start_img_id: Starting image ID for this split | |
| start_ann_id: Starting annotation ID for this split | |
| Returns: | |
| images_list, annotations_list, next_img_id, next_ann_id | |
| """ | |
| dataset_root = Path(dataset_root) | |
| images_dir = dataset_root / f"CrowdHuman_{split_name}" / "Images" | |
| if not images_dir.exists(): | |
| print(f"Warning: Images directory {images_dir} does not exist") | |
| return [], [], start_img_id, start_ann_id | |
| images_list = [] | |
| annotations_list = [] | |
| img_id = start_img_id | |
| ann_id = start_ann_id | |
| print(f"Processing {len(annotations_data)} images in {split_name} split...") | |
| # Create a mapping from image ID to sequential ID | |
| image_id_mapping = {} | |
| for ann_data in annotations_data: | |
| image_filename = f"{ann_data['ID']}.jpg" | |
| image_path = images_dir / image_filename | |
| # Check if image exists | |
| if not image_path.exists(): | |
| print(f"Warning: Image {image_path} not found, skipping...") | |
| continue | |
| # Get image dimensions | |
| try: | |
| with Image.open(image_path) as img: | |
| img_width, img_height = img.size | |
| except Exception as e: | |
| print(f"Error opening image {image_path}: {e}") | |
| continue | |
| # Map original ID to sequential ID | |
| image_id_mapping[ann_data['ID']] = img_id | |
| # Add image info | |
| images_list.append({ | |
| "id": img_id, | |
| "file_name": f"CrowdHuman_{split_name}/Images/{image_filename}", | |
| "width": img_width, | |
| "height": img_height, | |
| "license": 1 | |
| }) | |
| # Process ground truth boxes | |
| if 'gtboxes' in ann_data: | |
| for gt_box in ann_data['gtboxes']: | |
| # Skip if not a person | |
| if gt_box.get('tag') != 'person': | |
| continue | |
| # Skip ignored boxes | |
| if 'head_attr' in gt_box and gt_box['head_attr'].get('ignore', 0) == 1: | |
| continue | |
| # Use fbox (full body box) as requested | |
| if 'fbox' not in gt_box: | |
| continue | |
| fbox = gt_box['fbox'] | |
| # fbox format is already [x, y, width, height] | |
| x, y, width, height = fbox | |
| # Ensure coordinates are within image bounds | |
| x = max(0, min(x, img_width - 1)) | |
| y = max(0, min(y, img_height - 1)) | |
| width = min(width, img_width - x) | |
| height = min(height, img_height - y) | |
| # Calculate area | |
| area = width * height | |
| if area > 0: # Only add valid annotations | |
| # Determine if crowded (occluded) | |
| is_crowd = 0 | |
| if 'extra' in gt_box: | |
| # occ: 0 = no occlusion, 1 = partial occlusion, 2 = heavy occlusion | |
| occ_level = gt_box['extra'].get('occ', 0) | |
| # Consider heavy occlusion as crowd | |
| is_crowd = 1 if occ_level >= 2 else 0 | |
| annotations_list.append({ | |
| "id": ann_id, | |
| "image_id": img_id, | |
| "category_id": category_mapping['person']["id"], | |
| "bbox": [x, y, width, height], | |
| "area": area, | |
| "iscrowd": is_crowd, | |
| "segmentation": [] | |
| }) | |
| ann_id += 1 | |
| img_id += 1 | |
| return images_list, annotations_list, img_id, ann_id | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Convert CrowdHuman dataset to COCO format") | |
| parser.add_argument( | |
| "--dataset_root", | |
| type=str, | |
| default="/mnt/archive/person_drone/crowd_human", | |
| help="Path to CrowdHuman dataset root directory" | |
| ) | |
| parser.add_argument( | |
| "--output_dir", | |
| type=str, | |
| default="/home/svakhreev/projects/DEIM/data/crowd_human_coco", | |
| help="Output directory for COCO format files" | |
| ) | |
| parser.add_argument( | |
| "--splits", | |
| type=str, | |
| nargs='+', | |
| default=["train", "val"], | |
| choices=["train", "val", "test"], | |
| help="Splits to convert (default: train and val)" | |
| ) | |
| args = parser.parse_args() | |
| dataset_root = Path(args.dataset_root) | |
| output_dir = Path(args.output_dir) | |
| # Create output directory | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Create category mapping for CrowdHuman (single class: person) | |
| category_mapping = { | |
| 'person': { | |
| "id": 1, # COCO categories start from 1 | |
| "name": "person", | |
| "supercategory": "person" | |
| } | |
| } | |
| print(f"Categories: {category_mapping}") | |
| print(f"Converting splits: {args.splits}") | |
| # Process each split separately | |
| for split in args.splits: | |
| print(f"\n{'='*60}") | |
| print(f"Converting {split} split...") | |
| print(f"{'='*60}") | |
| # Initialize COCO data structure for this split | |
| coco_data = { | |
| "info": { | |
| "year": 2024, | |
| "version": "1.0", | |
| "description": f"CrowdHuman Dataset - {split} split in COCO format", | |
| "contributor": "CrowdHuman Dataset", | |
| "url": "https://www.crowdhuman.org/", | |
| "date_created": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }, | |
| "licenses": [ | |
| { | |
| "id": 1, | |
| "name": "Academic Use", | |
| "url": "" | |
| } | |
| ], | |
| "categories": list(category_mapping.values()), | |
| "images": [], | |
| "annotations": [] | |
| } | |
| # Load annotations for this split | |
| if split == "test": | |
| # Test split might not have annotations | |
| print(f"Warning: Test split typically doesn't have annotations") | |
| annotation_file = dataset_root / f"annotation_{split}.odgt" | |
| if not annotation_file.exists(): | |
| print(f"No annotation file found for test split, skipping...") | |
| continue | |
| else: | |
| annotation_file = dataset_root / f"annotation_{split}.odgt" | |
| if not annotation_file.exists(): | |
| print(f"Warning: Annotation file {annotation_file} not found, skipping {split}") | |
| continue | |
| # Parse ODGT annotations | |
| print(f"Loading annotations from {annotation_file}...") | |
| annotations_data = parse_odgt_file(str(annotation_file)) | |
| print(f"Loaded {len(annotations_data)} image annotations") | |
| # Convert to COCO format | |
| images_list, annotations_list, _, _ = convert_split_to_coco( | |
| str(dataset_root), | |
| split, | |
| annotations_data, | |
| category_mapping, | |
| start_img_id=1, | |
| start_ann_id=1 | |
| ) | |
| # Add to COCO dataset | |
| coco_data["images"] = images_list | |
| coco_data["annotations"] = annotations_list | |
| # Save COCO annotation file for this split | |
| output_file = output_dir / f"annotations_{split}.json" | |
| with open(output_file, 'w') as f: | |
| json.dump(coco_data, f, indent=2) | |
| print(f"\nSplit {split} complete!") | |
| print(f"Total: {len(coco_data['images'])} images and {len(coco_data['annotations'])} annotations") | |
| print(f"Saved COCO format to {output_file}") | |
| # Also create a combined annotation file for all splits | |
| if len(args.splits) > 1: | |
| print(f"\n{'='*60}") | |
| print(f"Creating combined annotation file...") | |
| print(f"{'='*60}") | |
| combined_coco_data = { | |
| "info": { | |
| "year": 2024, | |
| "version": "1.0", | |
| "description": f"CrowdHuman Dataset - Combined splits ({', '.join(args.splits)}) in COCO format", | |
| "contributor": "CrowdHuman Dataset", | |
| "url": "https://www.crowdhuman.org/", | |
| "date_created": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }, | |
| "licenses": [ | |
| { | |
| "id": 1, | |
| "name": "Academic Use", | |
| "url": "" | |
| } | |
| ], | |
| "categories": list(category_mapping.values()), | |
| "images": [], | |
| "annotations": [] | |
| } | |
| img_id = 1 | |
| ann_id = 1 | |
| for split in args.splits: | |
| annotation_file = dataset_root / f"annotation_{split}.odgt" | |
| if not annotation_file.exists(): | |
| continue | |
| print(f"Processing {split} for combined file...") | |
| annotations_data = parse_odgt_file(str(annotation_file)) | |
| images_list, annotations_list, next_img_id, next_ann_id = convert_split_to_coco( | |
| str(dataset_root), | |
| split, | |
| annotations_data, | |
| category_mapping, | |
| start_img_id=img_id, | |
| start_ann_id=ann_id | |
| ) | |
| combined_coco_data["images"].extend(images_list) | |
| combined_coco_data["annotations"].extend(annotations_list) | |
| img_id = next_img_id | |
| ann_id = next_ann_id | |
| # Save combined annotation file | |
| output_file = output_dir / "annotations_combined.json" | |
| with open(output_file, 'w') as f: | |
| json.dump(combined_coco_data, f, indent=2) | |
| print(f"\nCombined file complete!") | |
| print(f"Total: {len(combined_coco_data['images'])} images and {len(combined_coco_data['annotations'])} annotations") | |
| print(f"Saved combined COCO format to {output_file}") | |
| # Save dataset information for reference | |
| info_file = output_dir / "dataset_info.json" | |
| with open(info_file, 'w') as f: | |
| json.dump({ | |
| 'splits_processed': args.splits, | |
| 'total_images_per_split': { | |
| split: len(json.load(open(output_dir / f"annotations_{split}.json"))['images']) | |
| for split in args.splits | |
| if (output_dir / f"annotations_{split}.json").exists() | |
| }, | |
| 'total_annotations_per_split': { | |
| split: len(json.load(open(output_dir / f"annotations_{split}.json"))['annotations']) | |
| for split in args.splits | |
| if (output_dir / f"annotations_{split}.json").exists() | |
| }, | |
| 'categories': category_mapping, | |
| 'args': vars(args) | |
| }, f, indent=2) | |
| print(f"\nDataset info saved to {info_file}") | |
| print("\nConversion complete!") | |
| if __name__ == "__main__": | |
| main() | |