Source code for label_processing.tensorflow_classifier

# Import third-party libraries
import numpy as np
import pandas as pd
import cv2
import glob, os
from pathlib import Path
import tensorflow as tf
from tensorflow import keras
import warnings
import platform
import sys

# Import the necessary module from the 'label_processing' module package
from label_processing import utils


# --------------------------------Predict Classes--------------------------------#


[docs] def get_model(path_to_model: str) -> tf.keras.Sequential: """ Load a trained Keras Sequential image classifier model with cross-platform compatibility. Args: path_to_model (str): Path to the model file. Returns: model (tf.keras.Sequential): Trained Keras Sequential image classifier model. """ print("\nCalling classification model") # Set up cross-platform environment _setup_tensorflow_cross_platform_environment() # Try multiple loading strategies for cross-platform compatibility loading_strategies = [ # Strategy 1: Standard TensorFlow loading lambda: tf.keras.models.load_model(path_to_model), # Strategy 2: Loading with compile=False to avoid optimizer issues lambda: tf.keras.models.load_model(path_to_model, compile=False), # Strategy 3: Loading with custom options for protobuf compatibility lambda: _load_with_protobuf_compatibility(path_to_model), # Strategy 4: Loading with SavedModel format explicitly lambda: _load_with_saved_model_format(path_to_model), ] last_error = None for i, strategy in enumerate(loading_strategies, 1): try: print(f"Trying TensorFlow loading strategy {i}...") model = strategy() print("TensorFlow model loaded successfully") return model except Exception as e: print(f"TensorFlow strategy {i} failed: {e}") last_error = e continue # If all strategies fail, raise the last error with helpful message print(f"All TensorFlow loading strategies failed. Last error: {last_error}") raise Exception( f"Failed to load TensorFlow model from {path_to_model}. " f"This might be due to protobuf version incompatibility or " f"model corruption. Last error: {last_error}" )
[docs] def class_prediction( model: tf.keras.Sequential, class_names: list, jpg_dir: str, out_dir=None, batch_size: int = 32, max_images: int = 10000, ) -> pd.DataFrame: """ Create a dataframe with predicted classes for each picture with memory-safe batch processing. Args: model (tf.keras.Sequential): Trained Keras Sequential image classifier model. class_names (list): Model's predicted classes. jpg_dir (str): Path to the directory containing the original jpgs. out_dir (str): Path where the CSV file will be stored. batch_size (int): Number of images to process in each batch (default: 32) max_images (int): Maximum number of images to process (default: 10000) Returns: DataFrame (pd.DataFrame): Pandas DataFrame with the predicted results. """ utils.check_dir(jpg_dir) print("\nPredicting classes with memory-safe batch processing") # Get all image files (support multiple formats) image_patterns = ["*.jpg", "*.jpeg", "*.png", "*.tiff", "*.tif", "*.bmp"] image_files = [] for pattern in image_patterns: image_files.extend(glob.glob(os.path.join(jpg_dir, pattern))) image_files.extend(glob.glob(os.path.join(jpg_dir, pattern.upper()))) image_files = sorted(set(image_files)) # SECURITY: Limit total number of images to prevent resource exhaustion if len(image_files) > max_images: print( f"SECURITY WARNING: Too many images ({len(image_files)} > {max_images}). Processing only first {max_images}." ) image_files = image_files[:max_images] all_predictions = [] img_width = 180 img_height = 180 # Process images in batches to prevent memory exhaustion for batch_start in range(0, len(image_files), batch_size): batch_end = min(batch_start + batch_size, len(image_files)) batch_files = image_files[batch_start:batch_end] print( f"Processing batch {batch_start//batch_size + 1}/{(len(image_files)-1)//batch_size + 1} ({len(batch_files)} images)" ) # Validate all images in batch first valid_batch_files = [] for file in batch_files: if utils.validate_image_integrity( file, max_size_mb=10, max_dimensions=(4000, 4000) ): valid_batch_files.append(file) else: print(f"SECURITY WARNING: Skipping unsafe image: {file}") # Process valid images in current batch for file in valid_batch_files: try: # SECURITY: Use safe image loading with error handling image = tf.keras.utils.load_img( file, target_size=(img_height, img_width) ) img_array = tf.keras.utils.img_to_array(image) img_array = tf.expand_dims(img_array, 0) # SECURITY: Clear GPU memory after each prediction to prevent accumulation # Note: verbose parameter removed for SavedModel compatibility predictions = model.predict(img_array) score = tf.nn.softmax(predictions[0]) entry = {} entry["filename"] = os.path.basename(file) entry["class"] = class_names[np.argmax(score)] entry["score"] = 100 * np.max(score) all_predictions.append(entry) # SECURITY: Clear variables to free memory del img_array, predictions, score, image except Exception as e: print(f"SECURITY ERROR: Failed to process image {file}: {e}") continue # SECURITY: Force garbage collection after each batch import gc gc.collect() df = pd.DataFrame(all_predictions) if out_dir is None: out_dir = os.path.dirname(os.path.realpath(jpg_dir)) filename = f"{Path(jpg_dir).stem}_prediction_classifer.csv" csv_path = f"{out_dir}/{filename}" df.to_csv(csv_path) print(f"\nThe CSV file {filename} has been successfully saved in {out_dir}") return df
# --------------------------------Save Pictures--------------------------------#
[docs] def create_dirs(dataframe: pd.DataFrame, path: str) -> None: """ Create separate directories for every class. Args: dataframe (pd.Dataframe): DataFrame containing the classes as a column. path (str): Path of the chosen directory. """ uniques = dataframe["class"].unique() for uni_class in uniques: Path(f"{path}/{uni_class}").mkdir(parents=True, exist_ok=True)
[docs] def make_file_name(label_id: str, pic_class: str) -> None: """ Create a fitting filename. Args: label_id (str): String containing the label id. pic_class (str): Class of the label. Returns: filename (str): The created filename. """ filename = f"{label_id}_{pic_class}.jpg" return filename
[docs] def rename_picture( img_raw: np.ndarray, path: str, filename: str, pic_class: str ) -> None: """ Rename the pictures using the predicted class. Args: img_raw (numpy.ndarray): Input jpg converted to a numpy matrix by cv2. path (str): Path where the picture should be saved. filename (str): Name of the picture. pic_class (str): Class of the label. """ filepath = f"{path}/{pic_class}/{filename}" cv2.imwrite(filepath, img_raw)
[docs] def filter_pictures( jpg_dir: Path, dataframe: pd.DataFrame, out_dir: Path = Path(os.getcwd()) ) -> None: """ Create new folders for each class of the newly named classified pictures. Args: jpg_dir (str): Path to directory with jpgs. dataframe (pd.DataFrame): Pandas DataFrame with class predictions. out_dir (Path): Path to the target directory to save the cropped jpgs. """ # Check if dataframe has required columns if dataframe.empty: print("Warning: No predictions available for image filtering") return if 'class' not in dataframe.columns: print(f"Error: DataFrame missing 'class' column. Available columns: {list(dataframe.columns)}") return if 'filename' not in dataframe.columns: print(f"Error: DataFrame missing 'filename' column. Available columns: {list(dataframe.columns)}") return try: create_dirs(dataframe, out_dir) # Create directories for every class except Exception as e: print(f"Error creating directories: {e}") return image_patterns = ["*.jpg", "*.jpeg", "*.png", "*.tiff", "*.tif", "*.bmp"] all_image_files = [] for pattern in image_patterns: all_image_files.extend(glob.glob(os.path.join(jpg_dir, pattern))) all_image_files.extend(glob.glob(os.path.join(jpg_dir, pattern.upper()))) for filepath in sorted(set(all_image_files)): filename = os.path.basename(filepath) match = dataframe[dataframe.filename == filename] if match.empty: print(f"Warning: No prediction found for image {filename}") continue try: image_raw = cv2.imread(filepath) if image_raw is None: print(f"Warning: Could not load image {filepath}") continue label_id = Path(filename).stem for _, row in match.iterrows(): if 'class' not in row or pd.isna(row['class']): print(f"Warning: No valid class prediction for {filename}") continue pic_class = row["class"] filename = make_file_name(label_id, pic_class) rename_picture(image_raw, out_dir, filename, pic_class) except Exception as e: print(f"Error processing image {filepath}: {e}") continue print(f"\nThe images have been successfully saved in {out_dir}")
# --------------------------------Cross-Platform Compatibility--------------------------------# def _setup_tensorflow_cross_platform_environment(): """Setup TensorFlow environment for cross-platform compatibility.""" # Force CPU-only execution to avoid CUDA/GPU issues os.environ["CUDA_VISIBLE_DEVICES"] = "" os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" # Suppress TensorFlow logs # Linux-specific optimizations if platform.system() == "Linux": # Disable problematic optimizations on Linux os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" os.environ["NUMEXPR_NUM_THREADS"] = "1" # Configure TensorFlow for better Linux compatibility tf.config.set_visible_devices([], "GPU") # Force CPU usage # Set memory growth for any GPU that might be detected try: gpus = tf.config.experimental.list_physical_devices("GPU") if gpus: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) except: pass # Ignore if GPU configuration fails def _load_with_protobuf_compatibility(path_to_model: str) -> tf.keras.Sequential: """Load model with protobuf compatibility fixes for Linux. Arg: path_to_model (str): Path to the model file. Returns: model (tf.keras.Sequential): Loaded Keras Sequential model. """ try: # Try to load with specific protobuf handling import google.protobuf.message # Set protobuf message size limits for large models google.protobuf.message.Message.SetAllowOversizeProtos(True) # Load with explicit options model = tf.keras.models.load_model( path_to_model, compile=False, custom_objects=None, options=tf.saved_model.LoadOptions(experimental_io_device="/job:localhost"), ) return model except Exception as e: raise Exception(f"Protobuf compatibility loading failed: {e}") def _load_with_saved_model_format(path_to_model: str) -> tf.keras.Sequential: """Load model using explicit SavedModel format. Args: path_to_model (str): Path to the model file. Returns: model (tf.keras.Sequential): Loaded Keras Sequential model or a wrapper.""" try: # Load using tf.saved_model API directly imported = tf.saved_model.load(path_to_model) # Convert to Keras model if possible if hasattr(imported, "signatures"): # Try to get the serving signature if "serving_default" in imported.signatures: # Wrap in a Keras model-like interface signature = imported.signatures["serving_default"] class SavedModelWrapper: def __init__(self, signature_fn): self.signature_fn = signature_fn def predict(self, x): # Convert numpy array to tensor if needed if isinstance(x, np.ndarray): x = tf.convert_to_tensor(x, dtype=tf.float32) # Call the signature function result = self.signature_fn(x) # Return numpy array for compatibility if isinstance(result, dict): # Get the first output if multiple outputs output_key = list(result.keys())[0] return result[output_key].numpy() else: return result.numpy() return SavedModelWrapper(signature) # If no serving signature, try to use the model directly return imported except Exception as e: raise Exception(f"SavedModel format loading failed: {e}")