import base64 import io import json import urllib.request import warnings import requests from PIL import Image from roboflow.config import ( CLASSIFICATION_MODEL, INSTANCE_SEGMENTATION_MODEL, OBJECT_DETECTION_MODEL, PREDICTION_OBJECT, SEMANTIC_SEGMENTATION_MODEL, ) from roboflow.util.image_utils import mask_image, validate_image_path def plot_image(image_path): """ Helper method to plot image :param image_path: path of image to be plotted (can be hosted or local) :return: """ import matplotlib.pyplot as plt validate_image_path(image_path) try: img = Image.open(image_path) except OSError: # Try opening Hosted image response = requests.get(image_path) img = Image.open(io.BytesIO(response.content)) figure, axes = plt.subplots() axes.imshow(img) # type: ignore[attr-defined] return figure, axes def plot_annotation(axes, prediction=None, stroke=1, transparency=60, colors=None): """ Helper method to plot annotations :param axes: Matplotlib axes :param prediction: prediction dictionary from the Roboflow API :param stroke: line width to use when drawing rectangles and polygons :param transparency: alpha transparency of masks for semantic overlays :return: """ from matplotlib import patches # Object Detection annotation colors = {} if colors is None else colors prediction = prediction or {} stroke_color = "r" if prediction["prediction_type"] == OBJECT_DETECTION_MODEL: if prediction["class"] in colors.keys(): stroke_color = colors[prediction["class"]] # Get height, width, and center coordinates of prediction if prediction is not None: height = prediction["height"] width = prediction["width"] x = prediction["x"] y = prediction["y"] rect = patches.Rectangle( (x - width / 2, y - height / 2), width, height, linewidth=stroke, edgecolor=stroke_color, facecolor="none", ) # Plot Rectangle axes.add_patch(rect) elif prediction["prediction_type"] == CLASSIFICATION_MODEL: axes.set_title("Class: " + prediction["top"] + " | Confidence: " + str(prediction["confidence"])) elif prediction["prediction_type"] == INSTANCE_SEGMENTATION_MODEL: if prediction["class"] in colors.keys(): stroke_color = colors[prediction["class"]] points = [[p["x"], p["y"]] for p in prediction["points"]] polygon = patches.Polygon(points, linewidth=stroke, edgecolor=stroke_color, facecolor="none") axes.add_patch(polygon) elif prediction["prediction_type"] == SEMANTIC_SEGMENTATION_MODEL: import matplotlib.image as mpimg encoded_mask = prediction["segmentation_mask"] mask_bytes = io.BytesIO(base64.b64decode(encoded_mask)) mask = mpimg.imread(mask_bytes, format="JPG") alpha = transparency / 100 axes.imshow(mask, alpha=alpha) class Prediction: def __init__( self, json_prediction, image_path, prediction_type=OBJECT_DETECTION_MODEL, colors=None, ): """ Generalized Prediction for both Object Detection and Classification Models :param json_prediction: :param image_path: """ # Set image path in JSON prediction json_prediction["image_path"] = image_path json_prediction["prediction_type"] = prediction_type self.image_path = image_path self.json_prediction = json_prediction self.colors = {} if colors is None else colors def json(self): return self.json_prediction def __load_image(self): import cv2 import numpy as np if "http://" in self.image_path: req = urllib.request.urlopen(self.image_path) arr = np.asarray(bytearray(req.read()), dtype=np.uint8) image = cv2.imdecode(arr, -1) # 'Load it as it is' return image return cv2.imread(self.image_path) def plot(self, stroke=1): import matplotlib.pyplot as plt # Exception to check if image path exists validate_image_path(self["image_path"]) _, axes = plot_image(self["image_path"]) plot_annotation(axes, self, stroke, colors=self.colors) plt.show() def save(self, output_path="predictions.jpg", stroke=2, transparency=60): """ Annotate an image with predictions and save it :param output_path: filename to save the image as :param stroke: line width to use when drawing rectangles and polygons :param transparency: alpha transparency of masks for semantic overlays """ import cv2 import numpy as np image = self.__load_image() stroke_color = (255, 0, 0) if self["prediction_type"] == OBJECT_DETECTION_MODEL: # Get different dimensions/coordinates x = self["x"] y = self["y"] width = self["width"] height = self["height"] class_name = self["class"] if class_name in self.colors.keys(): stroke_color = self.colors[class_name] # Draw bounding boxes for object detection prediction cv2.rectangle( image, (int(x - width / 2), int(y + height / 2)), (int(x + width / 2), int(y - height / 2)), stroke_color, stroke, ) # Get size of text text_size = cv2.getTextSize(class_name, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0] # Draw background rectangle for text cv2.rectangle( image, (x - width / 2, y - height / 2 + 1), ( x - width / 2 + text_size[0] + 1, y - height / 2 + int(1.5 * text_size[1]), ), stroke_color, -1, ) # Write text onto image cv2.putText( image, class_name, (int(x - width / 2), y + text_size[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), thickness=1, ) elif self["prediction_type"] == CLASSIFICATION_MODEL: if self["top"] in self.colors.keys(): stroke_color = self.colors[self["top"]] # Get image dimensions height, width = image.shape[:2] # Get bottom amount for image bottom = image[height - 2 : height, 0:width] # Get mean of bottom amount mean = cv2.mean(bottom)[0] border_size = 100 # Apply Border image = cv2.copyMakeBorder( image, top=border_size, bottom=border_size, left=border_size, right=border_size, borderType=cv2.BORDER_CONSTANT, value=[mean, mean, mean], ) # Add text and relax cv2.putText( image, (self["top"] + " | " + "Confidence: " + self["confidence"]), (int(width / 2), 5), cv2.FONT_HERSHEY_DUPLEX, 0.5, stroke_color, 1, ) elif self["prediction_type"] == INSTANCE_SEGMENTATION_MODEL: points = [[int(p["x"]), int(p["y"])] for p in self["points"]] np_points = np.array(points, dtype=np.int32) if self["class"] in self.colors.keys(): stroke_color = self.colors[self["class"]] cv2.polylines(image, [np_points], isClosed=True, color=stroke_color, thickness=stroke) elif self["prediction_type"] == SEMANTIC_SEGMENTATION_MODEL: image = mask_image(image, self["segmentation_mask"], transparency) cv2.imwrite(output_path, image) def __str__(self) -> str: """ :return: JSON formatted string of prediction """ # Pretty print the JSON prediction as a String prediction_string = json.dumps(self.json_prediction, indent=2) return prediction_string def __getitem__(self, key): """ :param key: :return: """ # Allows the prediction to be accessed like a dictionary return self.json_prediction[key] # Make representation equal to string value __repr__ = __str__ class PredictionGroup: def __init__(self, image_dims, image_path, *args): """ :param args: The prediction(s) to be added to the prediction group """ # List of predictions (core of the PredictionGroup) self.predictions = [] # Base image path (path of image of first prediction in prediction group) self.base_image_path = image_path # Base prediction type # (prediction type of image of first prediction in prediction group) self.base_prediction_type = "" self.image_dims = image_dims # Iterate through the arguments for index, prediction in enumerate(args): # Set base image path based on first prediction if index == 0: self.base_image_path = prediction["image_path"] self.base_prediction_type = prediction["prediction_type"] # If not a Prediction object then do not allow into the prediction group self.__exception_check(is_prediction_check=prediction) # Add prediction to prediction group otherwise self.predictions.append(prediction) def add_prediction(self, prediction=None): """ :param prediction: Prediction to add to the prediction group """ prediction = prediction or {} # If not a Prediction object then do not allow into the prediction group # Also checks if prediction types are the same # (i.e. object detection predictions in object detection groups) self.__exception_check( is_prediction_check=prediction, prediction_type_check=prediction["prediction_type"], ) # If there is more than one prediction and the prediction image path is # not the group image path then warn user if self.__len__() > 0: self.__exception_check(image_path_check=prediction["image_path"]) # If the prediction group is empty, make the base image path of the prediction elif self.__len__() == 0: self.base_image_path = prediction["image_path"] # Append prediction to group self.predictions.append(prediction) def plot(self, stroke=1): import matplotlib.pyplot as plt if len(self) > 0: validate_image_path(self.base_image_path) _, axes = plot_image(self.base_image_path) for single_prediction in self: plot_annotation(axes, single_prediction, stroke, colors=single_prediction.colors) # Show the plot to the user plt.show() def __load_image(self): import cv2 import numpy as np # Check if it is a hosted image and open image as needed if "http://" in self.base_image_path or "https://" in self.base_image_path: req = urllib.request.urlopen(self.base_image_path) arr = np.asarray(bytearray(req.read()), dtype=np.uint8) image = cv2.imdecode(arr, -1) # 'Load it as it is' # Return array with image info return image # Return array with image info of local image return cv2.imread(self.base_image_path) def save(self, output_path="predictions.jpg", stroke=2): import cv2 import numpy as np # Load image based on image path as an array image = self.__load_image() stroke_color = (255, 0, 0) # Iterate through predictions and add prediction to image for prediction in self.predictions: # Check what type of prediction it is if self.base_prediction_type == OBJECT_DETECTION_MODEL: # Get different dimensions/coordinates x = prediction["x"] y = prediction["y"] width = prediction["width"] height = prediction["height"] class_name = prediction["class"] # Draw bounding boxes for object detection prediction cv2.rectangle( image, (int(x - width / 2), int(y + height / 2)), (int(x + width / 2), int(y - height / 2)), stroke_color, stroke, ) # Get size of text text_size = cv2.getTextSize(class_name, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0] # Draw background rectangle for text cv2.rectangle( image, (int(x - width / 2), int(y - height / 2 + 1)), ( int(x - width / 2 + text_size[0] + 1), int(y - height / 2 + int(1.5 * text_size[1])), ), stroke_color, -1, ) # Write text onto image cv2.putText( image, class_name, (int(x - width / 2), int(y - height / 2 + text_size[1])), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), thickness=1, ) # Plot for classification model elif self.base_prediction_type == CLASSIFICATION_MODEL: # Get image dimensions height, width = image.shape[:2] border_size = 100 text = "Class: " + prediction["top"] + " | " + "Confidence: " + str(prediction["confidence"]) text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_COMPLEX, 1, 1)[0] # Apply Border image = cv2.copyMakeBorder( image, top=border_size, bottom=border_size, left=border_size, right=border_size, borderType=cv2.BORDER_CONSTANT, value=[255, 255, 255], ) # get coords text_x = (image.shape[1] - text_size[0]) / 2 # Add text and relax cv2.putText( image, text, (int(text_x), int(border_size / 2)), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 0), 1, ) elif self.base_prediction_type == INSTANCE_SEGMENTATION_MODEL: points = [[int(p["x"]), int(p["y"])] for p in prediction["points"]] np_points = np.array(points, dtype=np.int32) cv2.polylines( image, [np_points], isClosed=True, color=stroke_color, thickness=stroke, ) elif self.base_prediction_type == SEMANTIC_SEGMENTATION_MODEL: image = mask_image(image, prediction["segmentation_mask"]) # Write image path cv2.imwrite(output_path, image) def __str__(self): """ :return: """ # final string to be returned for the prediction group prediction_group_string = "" # Iterate through the predictions and convert # each prediction into a string format for prediction in self.predictions: prediction_group_string += str(prediction) + "\n\n" # return the prediction group string return prediction_group_string def __getitem__(self, index): # Allows prediction group to be accessed via an index return self.predictions[index] def __len__(self): # Length of prediction based off of number of predictions return len(self.predictions) def __exception_check( self, is_prediction_check=None, image_path_check=None, prediction_type_check=None, ): # Ensures only predictions can be added to a prediction group if is_prediction_check is not None: if type(is_prediction_check).__name__ is not PREDICTION_OBJECT: raise Exception("Cannot add type " + type(is_prediction_check).__name__ + " to PredictionGroup") # Warns user if predictions have different prediction types if prediction_type_check is not None: if self.__len__() > 0 and prediction_type_check != self.base_prediction_type: warnings.warn( "This prediction is a different type (" + prediction_type_check + ") than the prediction group base type (" + self.base_prediction_type + ")" ) # Gives user warning that base path is not equal to image path if image_path_check is not None: if self.base_image_path != image_path_check: warnings.warn( "This prediction has a different image path (" + image_path_check + ") than the prediction group base image path (" + self.base_image_path + ")" ) def json(self): prediction_group_json = {"predictions": []} for prediction in self.predictions: prediction_group_json["predictions"].append(prediction.json()) prediction_group_json["image"] = self.image_dims return prediction_group_json @staticmethod def create_prediction_group(json_response, image_path, prediction_type, image_dims, colors=None): """ Method to create a prediction group based on the JSON Response :param prediction_type: :param json_response: Based on Roboflow JSON Response from Inference API :param model: :param image_path: :param image_dims: :return: """ # noqa: E501 // docs colors = {} if colors is None else colors prediction_list = [] if prediction_type in [OBJECT_DETECTION_MODEL, INSTANCE_SEGMENTATION_MODEL]: for prediction in json_response["predictions"]: prediction = Prediction( prediction, image_path, prediction_type=prediction_type, colors=colors, ) prediction_list.append(prediction) elif prediction_type == CLASSIFICATION_MODEL: prediction = Prediction(json_response, image_path, prediction_type, colors=colors) prediction_list.append(prediction) elif prediction_type == SEMANTIC_SEGMENTATION_MODEL: prediction = Prediction(json_response, image_path, prediction_type, colors=colors) prediction_list.append(prediction) # Seperate list and return as a prediction group return PredictionGroup(image_dims, image_path, *prediction_list)