335 lines
12 KiB
Python
335 lines
12 KiB
Python
import json
|
|
import os
|
|
import re
|
|
from collections import defaultdict
|
|
|
|
from tqdm import tqdm
|
|
|
|
from .image_utils import load_labelmap
|
|
|
|
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".avif", ".heic"}
|
|
ANNOTATION_EXTENSIONS = {".txt", ".json", ".xml", ".csv", ".jsonl"}
|
|
LABELMAPS_EXTENSIONS = {".labels", ".yaml", ".yml"}
|
|
|
|
|
|
def _patch_sep(filename):
|
|
"""
|
|
Replace Windows style slashes to keep filenames consistent.
|
|
|
|
Roboflow depend on it server side.
|
|
"""
|
|
return filename.replace("\\", "/")
|
|
|
|
|
|
def parsefolder(folder, is_classification=False):
|
|
folder = _patch_sep(folder).strip().rstrip("/")
|
|
if not os.path.exists(folder):
|
|
raise Exception(f"folder does not exist. {folder}")
|
|
files = _list_files(folder)
|
|
images = [f for f in files if f["extension"] in IMAGE_EXTENSIONS]
|
|
_add_indices(images)
|
|
_decide_split(images)
|
|
annotations = [f for f in files if f["extension"] in ANNOTATION_EXTENSIONS]
|
|
labelmaps = [f for f in files if f["extension"] in LABELMAPS_EXTENSIONS]
|
|
labelmaps = _load_labelmaps(folder, labelmaps)
|
|
_map_labelmaps_to_annotations(annotations, labelmaps)
|
|
if not _map_annotations_to_images_1to1(images, annotations):
|
|
annotations = _loadAnnotations(folder, annotations)
|
|
_map_annotations_to_images_1tomany(images, annotations)
|
|
if is_classification:
|
|
_infer_classification_labels_from_folders(images)
|
|
return {
|
|
"location": folder,
|
|
"images": images,
|
|
}
|
|
|
|
|
|
def _alphanumkey(s):
|
|
s = os.path.splitext(s)[0]
|
|
# Split the string into two parts: all characters before the last digit sequence, and the last digit sequence
|
|
match = re.match(r"(.*?)(\d*)$", s)
|
|
if match:
|
|
alpha_part = match.group(1)
|
|
num_part = match.group(2)
|
|
num_part = int(num_part) if num_part else 0
|
|
return (alpha_part, num_part)
|
|
else:
|
|
return (s, 0)
|
|
|
|
|
|
def _list_files(folder):
|
|
filedescriptors = []
|
|
for root, dirs, files in os.walk(folder):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
rel = os.path.relpath(file_path, folder)
|
|
filedescriptors.append(_describe_file(f"/{rel}"))
|
|
filedescriptors = sorted(filedescriptors, key=lambda x: _alphanumkey(x["file"]))
|
|
return filedescriptors
|
|
|
|
|
|
def _add_indices(files):
|
|
for i, f in enumerate(files):
|
|
f["index"] = i
|
|
|
|
|
|
def _describe_file(f):
|
|
f = _patch_sep(f)
|
|
name = f.split("/")[-1]
|
|
dirname = os.path.dirname(f)
|
|
fullkey, extension = os.path.splitext(f)
|
|
fullkey2 = fullkey.replace("/labels", "").replace("/images", "")
|
|
key = os.path.splitext(name)[0]
|
|
return {
|
|
"file": f,
|
|
"dirname": dirname,
|
|
"name": name,
|
|
"extension": extension.lower(),
|
|
"key": key.lower(),
|
|
"fullkey": fullkey.lower(),
|
|
"fullkey2": fullkey2.lower(),
|
|
}
|
|
|
|
|
|
def _map_annotations_to_images_1to1(images, annotations):
|
|
imgmap = {i["fullkey"]: i for i in images}
|
|
countmapped = 0
|
|
for ann in annotations:
|
|
image = imgmap.get(ann["fullkey"])
|
|
if image:
|
|
image["annotationfile"] = ann
|
|
countmapped += 1
|
|
if countmapped > 0:
|
|
return True
|
|
imgmap = {i["fullkey2"]: i for i in images}
|
|
for ann in annotations:
|
|
image = imgmap.get(ann["fullkey2"])
|
|
if image:
|
|
image["annotationfile"] = ann
|
|
countmapped += 1
|
|
return countmapped > 0
|
|
|
|
|
|
def _map_annotations_to_images_1tomany(images, annotationFiles):
|
|
annotationsByDirname = _list_map(annotationFiles, "dirname")
|
|
imgRefMap, annotationMap = _build_image_and_annotation_maps(annotationFiles)
|
|
|
|
for image in tqdm(images):
|
|
dirname = image["dirname"]
|
|
annotationsInSameDir = annotationsByDirname.get(dirname, [])
|
|
if annotationsInSameDir:
|
|
for annotationFile in annotationsInSameDir:
|
|
format = annotationFile["parsedType"]
|
|
filtered_annotations = _filterIndividualAnnotations(
|
|
image, annotationFile, format, imgRefMap, annotationMap
|
|
)
|
|
if filtered_annotations:
|
|
image["annotationfile"] = filtered_annotations
|
|
break
|
|
|
|
|
|
def _build_image_and_annotation_maps(annotationFiles):
|
|
imgRefMap = {}
|
|
annotationMap = defaultdict(list)
|
|
for annFile in annotationFiles:
|
|
filename, dirname, parsed, parsedType = (
|
|
annFile["file"],
|
|
annFile["dirname"],
|
|
annFile["parsed"],
|
|
annFile["parsedType"],
|
|
)
|
|
if parsedType == "coco":
|
|
for imageRef in parsed["images"]:
|
|
imgRefMap[f"{filename}/{imageRef['file_name']}"] = imageRef
|
|
for annotation in parsed["annotations"]:
|
|
annotationMap[f"{dirname}/{annotation['image_id']}"].append(annotation)
|
|
return imgRefMap, annotationMap
|
|
|
|
|
|
def _filterIndividualAnnotations(image, annotation, format, imgRefMap, annotationMap):
|
|
parsed = annotation["parsed"]
|
|
if format == "coco":
|
|
imgReference = imgRefMap.get(f"{annotation['file']}/{image['name']}")
|
|
if imgReference:
|
|
# workaround to make Annotations.js correctly identify this as coco in the backend
|
|
fake_annotation = {
|
|
"id": 999999999,
|
|
"image_id": 999999999,
|
|
"category_id": 0,
|
|
"area": 1,
|
|
"segmentation": [],
|
|
"iscrowd": 0,
|
|
}
|
|
_annotation = {"name": "annotation.coco.json"}
|
|
annotations_for_image = annotationMap.get(f"{image['dirname']}/{imgReference['id']}", [])
|
|
_annotation["rawText"] = json.dumps(
|
|
{
|
|
"info": parsed["info"],
|
|
"licenses": parsed["licenses"],
|
|
"categories": parsed["categories"],
|
|
"images": [imgReference],
|
|
"annotations": annotations_for_image or [fake_annotation],
|
|
}
|
|
)
|
|
return _annotation
|
|
elif format == "createml":
|
|
imgReferences = [i for i in parsed if i["image"] == image["name"]]
|
|
if len(imgReferences) > 1:
|
|
print(f"warning: found multiple image entries for image {image['file']} in {annotation['file']}")
|
|
if imgReferences:
|
|
imgReference = imgReferences[0]
|
|
_annotation = {
|
|
"name": "annotation.createml.json",
|
|
"rawText": json.dumps([imgReference]),
|
|
}
|
|
return _annotation
|
|
elif format == "csv":
|
|
imgLines = [ld["line"] for ld in parsed["lines"] if ld["file_name"] == image["name"]]
|
|
if imgLines:
|
|
headers = parsed["headers"]
|
|
_annotation = {
|
|
"name": "annotation.csv",
|
|
"rawText": "".join([headers] + imgLines),
|
|
}
|
|
return _annotation
|
|
else:
|
|
return None
|
|
elif format == "multilabel_csv":
|
|
rows = [r for r in parsed["rows"] if r["file_name"] == image["name"]]
|
|
if rows:
|
|
labels = rows[0]["labels"]
|
|
return {"type": "classification_multilabel", "labels": labels}
|
|
else:
|
|
return None
|
|
elif format == "jsonl":
|
|
jsonlLines = [json.dumps(line) for line in parsed if line["image"] == image["name"]]
|
|
if jsonlLines:
|
|
_annotation = {"name": "annotation.jsonl", "rawText": "\n".join(jsonlLines)}
|
|
return _annotation
|
|
return None
|
|
|
|
|
|
def _loadAnnotations(folder, annotations):
|
|
valid_extensions = {".json", ".csv", ".jsonl"}
|
|
annotations = [a for a in annotations if a["extension"] in valid_extensions]
|
|
for ann in annotations:
|
|
extension = ann["extension"]
|
|
if extension == ".json":
|
|
with open(f"{folder}{ann['file']}") as f:
|
|
parsed = json.load(f)
|
|
parsedType = _guessAnnotationFileFormat(parsed, extension)
|
|
if parsedType:
|
|
ann["parsed"] = parsed
|
|
ann["parsedType"] = parsedType
|
|
elif extension == ".jsonl":
|
|
ann["parsed"] = _read_jsonl(f"{folder}{ann['file']}")
|
|
ann["parsedType"] = "jsonl"
|
|
elif extension == ".csv":
|
|
parsed = _parseAnnotationCSV(f"{folder}{ann['file']}")
|
|
ann["parsed"] = parsed
|
|
ann["parsedType"] = parsed.get("type", "csv")
|
|
return annotations
|
|
|
|
|
|
def _read_jsonl(path):
|
|
data = []
|
|
with open(path) as file:
|
|
for linenum, line in enumerate(file, 1):
|
|
if not line:
|
|
continue
|
|
try:
|
|
json_object = json.loads(line.strip())
|
|
data.append(json_object)
|
|
except json.JSONDecodeError:
|
|
print(f"Warning: Skipping invalid JSON line in {path}:{linenum}")
|
|
return data
|
|
|
|
|
|
def _parseAnnotationCSV(filename):
|
|
# TODO: use a proper CSV library?
|
|
with open(filename) as f:
|
|
lines = f.readlines()
|
|
headers = [h.strip() for h in lines[0].split(",")]
|
|
# Multi-label classification csv typically named _classes.csv
|
|
if os.path.basename(filename) == "_classes.csv":
|
|
parsed_lines = []
|
|
for line in lines[1:]:
|
|
parts = [p.strip() for p in line.split(",")]
|
|
file_name = parts[0]
|
|
labels = [headers[i] for i, v in enumerate(parts[1:], start=1) if v == "1"]
|
|
parsed_lines.append({"file_name": file_name, "labels": labels})
|
|
return {"type": "multilabel_csv", "rows": parsed_lines, "headers": headers}
|
|
header_line = lines[0]
|
|
lines = [{"file_name": ld.split(",")[0].strip(), "line": ld} for ld in lines[1:]]
|
|
return {
|
|
"headers": header_line,
|
|
"lines": lines,
|
|
}
|
|
|
|
|
|
def _guessAnnotationFileFormat(parsed, extension):
|
|
if extension == ".json":
|
|
if isinstance(parsed, dict):
|
|
if isinstance(parsed.get("annotations"), list) and isinstance(parsed.get("images"), list):
|
|
return "coco"
|
|
elif isinstance(parsed, list):
|
|
return "createml"
|
|
return None
|
|
|
|
|
|
def _map_labelmaps_to_annotations(annotations, labelmaps):
|
|
if not labelmaps:
|
|
return
|
|
labelmapmap = {lm["dirname"]: lm for lm in labelmaps}
|
|
rootLabelmap = labelmapmap.get("/")
|
|
if len(labelmapmap) < len(labelmaps):
|
|
print("warning: unexpectedly found multiple labelmaps per directory")
|
|
print([lm["file"] for lm in labelmaps])
|
|
for ann in annotations:
|
|
labelmap = labelmapmap.get(ann["dirname"]) or rootLabelmap
|
|
if labelmap:
|
|
ann["labelmap"] = labelmap["labelmap"]
|
|
|
|
|
|
def _load_labelmaps(folder, labelmaps):
|
|
for labelmap in labelmaps:
|
|
try:
|
|
labelmap["labelmap"] = load_labelmap(f"{folder}{labelmap['file']}")
|
|
except Exception:
|
|
# raise Exception(f"failed to load labelmap {labelmap['file']}")
|
|
pass
|
|
return [lm for lm in labelmaps if lm.get("labelmap")]
|
|
|
|
|
|
def _decide_split(images):
|
|
for i in images:
|
|
fullkey = i["fullkey"]
|
|
if "valid" in fullkey:
|
|
i["split"] = "valid"
|
|
elif "train" in fullkey:
|
|
i["split"] = "train"
|
|
elif "test" in fullkey:
|
|
i["split"] = "test"
|
|
else:
|
|
i["split"] = "train"
|
|
|
|
|
|
def _list_map(my_list, key):
|
|
d = {}
|
|
for i in my_list:
|
|
d.setdefault(i[key], []).append(i)
|
|
return d
|
|
|
|
|
|
def _infer_classification_labels_from_folders(images):
|
|
for image in images:
|
|
if image.get("annotationfile"):
|
|
continue
|
|
dirname = image.get("dirname", "").strip("/")
|
|
if not dirname or dirname == ".":
|
|
# Skip images in root directory or invalid paths
|
|
continue
|
|
class_name = os.path.basename(dirname)
|
|
if class_name and class_name != ".":
|
|
image["annotationfile"] = {"classification_label": class_name, "type": "classification_folder"}
|