mirror of
https://github.com/Theodor-Springmann-Stiftung/kgpz_web.git
synced 2025-10-29 17:15:31 +00:00
322 lines
11 KiB
Python
322 lines
11 KiB
Python
"""
|
|
Historical Newspaper Image Cleaning Pipeline
|
|
|
|
This module provides functions to clean and enhance scanned historical newspaper images
|
|
by reducing noise, improving contrast, and sharpening text for better readability.
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image, ImageEnhance
|
|
import os
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
class NewspaperImageCleaner:
|
|
"""
|
|
Image processing pipeline specifically designed for historical newspaper scans.
|
|
"""
|
|
|
|
def __init__(self, config=None):
|
|
"""Initialize with default or custom configuration."""
|
|
self.config = config or self._default_config()
|
|
|
|
def _default_config(self):
|
|
"""Default processing parameters optimized for newspaper scans."""
|
|
return {
|
|
'bilateral_d': 9, # Neighborhood diameter for bilateral filter
|
|
'bilateral_sigma_color': 75, # Filter sigma in color space
|
|
'bilateral_sigma_space': 75, # Filter sigma in coordinate space
|
|
'clahe_clip_limit': 2.0, # Contrast limiting for CLAHE
|
|
'clahe_grid_size': (8, 8), # CLAHE grid size
|
|
'gamma': 1.2, # Gamma correction value
|
|
'denoise_h': 10, # Denoising filter strength
|
|
'morph_kernel_size': 2, # Morphological operation kernel size
|
|
'unsharp_amount': 1.5, # Unsharp masking amount
|
|
'unsharp_radius': 1.0, # Unsharp masking radius
|
|
'unsharp_threshold': 0, # Unsharp masking threshold
|
|
}
|
|
|
|
def reduce_noise(self, image):
|
|
"""
|
|
Apply noise reduction techniques to remove speckles and JPEG artifacts.
|
|
|
|
Args:
|
|
image: Input BGR image
|
|
|
|
Returns:
|
|
Denoised image
|
|
"""
|
|
# Bilateral filter - preserves edges while reducing noise
|
|
bilateral = cv2.bilateralFilter(
|
|
image,
|
|
self.config['bilateral_d'],
|
|
self.config['bilateral_sigma_color'],
|
|
self.config['bilateral_sigma_space']
|
|
)
|
|
|
|
# Non-local means denoising for better noise reduction
|
|
if len(image.shape) == 3:
|
|
# Color image
|
|
denoised = cv2.fastNlMeansDenoisingColored(
|
|
bilateral, None,
|
|
self.config['denoise_h'],
|
|
self.config['denoise_h'],
|
|
7, 21
|
|
)
|
|
else:
|
|
# Grayscale image
|
|
denoised = cv2.fastNlMeansDenoising(
|
|
bilateral, None,
|
|
self.config['denoise_h'],
|
|
7, 21
|
|
)
|
|
|
|
return denoised
|
|
|
|
def enhance_contrast(self, image):
|
|
"""
|
|
Improve image contrast using CLAHE and gamma correction.
|
|
|
|
Args:
|
|
image: Input BGR image
|
|
|
|
Returns:
|
|
Contrast-enhanced image
|
|
"""
|
|
# Convert to LAB color space for better contrast processing
|
|
if len(image.shape) == 3:
|
|
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
|
|
l_channel, a_channel, b_channel = cv2.split(lab)
|
|
else:
|
|
l_channel = image
|
|
|
|
# Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)
|
|
clahe = cv2.createCLAHE(
|
|
clipLimit=self.config['clahe_clip_limit'],
|
|
tileGridSize=self.config['clahe_grid_size']
|
|
)
|
|
l_channel = clahe.apply(l_channel)
|
|
|
|
# Reconstruct image
|
|
if len(image.shape) == 3:
|
|
enhanced = cv2.merge([l_channel, a_channel, b_channel])
|
|
enhanced = cv2.cvtColor(enhanced, cv2.COLOR_LAB2BGR)
|
|
else:
|
|
enhanced = l_channel
|
|
|
|
# Apply gamma correction
|
|
gamma = self.config['gamma']
|
|
inv_gamma = 1.0 / gamma
|
|
table = np.array([((i / 255.0) ** inv_gamma) * 255
|
|
for i in np.arange(0, 256)]).astype("uint8")
|
|
enhanced = cv2.LUT(enhanced, table)
|
|
|
|
return enhanced
|
|
|
|
def clean_background(self, image):
|
|
"""
|
|
Remove small artifacts and clean background noise.
|
|
|
|
Args:
|
|
image: Input image
|
|
|
|
Returns:
|
|
Background-cleaned image
|
|
"""
|
|
# Convert to grayscale for morphological operations
|
|
if len(image.shape) == 3:
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
gray = image
|
|
|
|
# Morphological opening to remove small noise
|
|
kernel = np.ones((self.config['morph_kernel_size'],
|
|
self.config['morph_kernel_size']), np.uint8)
|
|
|
|
# Opening (erosion followed by dilation)
|
|
opened = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel)
|
|
|
|
# If original was color, apply the mask
|
|
if len(image.shape) == 3:
|
|
# Create a mask and apply it to the original color image
|
|
mask = opened > 0
|
|
result = image.copy()
|
|
result[~mask] = [255, 255, 255] # Set background to white
|
|
return result
|
|
else:
|
|
return opened
|
|
|
|
def sharpen_image(self, image):
|
|
"""
|
|
Apply unsharp masking to enhance text clarity.
|
|
|
|
Args:
|
|
image: Input image
|
|
|
|
Returns:
|
|
Sharpened image
|
|
"""
|
|
# Convert to float for processing
|
|
float_img = image.astype(np.float32) / 255.0
|
|
|
|
# Create Gaussian blur
|
|
radius = self.config['unsharp_radius']
|
|
sigma = radius / 3.0
|
|
blurred = cv2.GaussianBlur(float_img, (0, 0), sigma)
|
|
|
|
# Unsharp masking
|
|
amount = self.config['unsharp_amount']
|
|
sharpened = float_img + amount * (float_img - blurred)
|
|
|
|
# Threshold and clamp
|
|
threshold = self.config['unsharp_threshold'] / 255.0
|
|
sharpened = np.where(np.abs(float_img - blurred) < threshold,
|
|
float_img, sharpened)
|
|
sharpened = np.clip(sharpened, 0.0, 1.0)
|
|
|
|
return (sharpened * 255).astype(np.uint8)
|
|
|
|
def process_image(self, image_path, output_path=None, steps=None):
|
|
"""
|
|
Process a single image through the complete pipeline.
|
|
|
|
Args:
|
|
image_path: Path to input image
|
|
output_path: Path for output image (optional)
|
|
steps: List of processing steps to apply (optional)
|
|
|
|
Returns:
|
|
Processed image array
|
|
"""
|
|
if steps is None:
|
|
steps = ['denoise', 'contrast', 'background', 'sharpen']
|
|
|
|
# Load image
|
|
image = cv2.imread(str(image_path))
|
|
if image is None:
|
|
raise ValueError(f"Could not load image: {image_path}")
|
|
|
|
original = image.copy()
|
|
|
|
# Apply processing steps
|
|
if 'denoise' in steps:
|
|
print(f"Applying noise reduction...")
|
|
image = self.reduce_noise(image)
|
|
|
|
if 'contrast' in steps:
|
|
print(f"Enhancing contrast...")
|
|
image = self.enhance_contrast(image)
|
|
|
|
if 'background' in steps:
|
|
print(f"Cleaning background...")
|
|
image = self.clean_background(image)
|
|
|
|
if 'sharpen' in steps:
|
|
print(f"Sharpening image...")
|
|
image = self.sharpen_image(image)
|
|
|
|
# Save output if path provided
|
|
if output_path:
|
|
cv2.imwrite(str(output_path), image)
|
|
print(f"Processed image saved to: {output_path}")
|
|
|
|
return image, original
|
|
|
|
def process_directory(self, input_dir, output_dir, extensions=None):
|
|
"""
|
|
Process all images in a directory.
|
|
|
|
Args:
|
|
input_dir: Input directory path
|
|
output_dir: Output directory path
|
|
extensions: List of file extensions to process
|
|
"""
|
|
if extensions is None:
|
|
extensions = ['.jpg', '.jpeg', '.png', '.tif', '.tiff']
|
|
|
|
input_path = Path(input_dir)
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
for file_path in input_path.iterdir():
|
|
if file_path.suffix.lower() in extensions:
|
|
print(f"\nProcessing: {file_path.name}")
|
|
output_file = output_path / f"cleaned_{file_path.name}"
|
|
|
|
try:
|
|
self.process_image(file_path, output_file)
|
|
except Exception as e:
|
|
print(f"Error processing {file_path.name}: {str(e)}")
|
|
|
|
print(f"\nBatch processing completed. Results in: {output_dir}")
|
|
|
|
|
|
def create_comparison_image(original, processed, output_path):
|
|
"""
|
|
Create a side-by-side comparison image.
|
|
|
|
Args:
|
|
original: Original image array
|
|
processed: Processed image array
|
|
output_path: Path to save comparison
|
|
"""
|
|
# Resize images to same height if needed
|
|
h1, w1 = original.shape[:2]
|
|
h2, w2 = processed.shape[:2]
|
|
|
|
if h1 != h2:
|
|
height = min(h1, h2)
|
|
original = cv2.resize(original, (int(w1 * height / h1), height))
|
|
processed = cv2.resize(processed, (int(w2 * height / h2), height))
|
|
|
|
# Create side-by-side comparison
|
|
comparison = np.hstack([original, processed])
|
|
cv2.imwrite(str(output_path), comparison)
|
|
print(f"Comparison saved to: {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Clean historical newspaper images")
|
|
parser.add_argument("input", help="Input image or directory path")
|
|
parser.add_argument("-o", "--output", help="Output path")
|
|
parser.add_argument("-d", "--directory", action="store_true",
|
|
help="Process entire directory")
|
|
parser.add_argument("-c", "--comparison", action="store_true",
|
|
help="Create before/after comparison")
|
|
parser.add_argument("--steps", nargs="+",
|
|
choices=['denoise', 'contrast', 'background', 'sharpen'],
|
|
default=['denoise', 'contrast', 'background', 'sharpen'],
|
|
help="Processing steps to apply")
|
|
parser.add_argument("--config", help="JSON config file with custom parameters")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load config if provided
|
|
config = None
|
|
if args.config and os.path.exists(args.config):
|
|
with open(args.config, 'r') as f:
|
|
config = json.load(f)
|
|
# Convert list back to tuple if needed
|
|
if config and 'clahe_grid_size' in config:
|
|
config['clahe_grid_size'] = tuple(config['clahe_grid_size'])
|
|
print(f"Loaded config from: {args.config}")
|
|
|
|
cleaner = NewspaperImageCleaner(config)
|
|
|
|
if args.directory:
|
|
output_dir = args.output or "cleaned_images"
|
|
cleaner.process_directory(args.input, output_dir)
|
|
else:
|
|
output_path = args.output
|
|
if not output_path:
|
|
input_path = Path(args.input)
|
|
output_path = input_path.parent / f"cleaned_{input_path.name}"
|
|
|
|
processed, original = cleaner.process_image(args.input, output_path, args.steps)
|
|
|
|
if args.comparison:
|
|
comparison_path = Path(output_path).parent / f"comparison_{Path(args.input).name}"
|
|
create_comparison_image(original, processed, comparison_path) |