import subprocess
import os
import random
import logging
import tempfile
import uuid # Ajouté pour générer des identifiants uniques
from datetime import datetime
from [Link] import ThreadPoolExecutor, as_completed
from typing import List, Tuple
# Configuration du logging
[Link](
level=[Link],
format='%(asctime)s - %(levelname)s - %(module)s - %(message)s',
handlers=[
[Link]('video_processing.log'),
[Link]()
]
)
class VideoTransformError(Exception):
"""Exception personnalisée pour les erreurs de traitement vidéo"""
pass
class VideoTransformer:
def __init__(self):
[Link] = {
'crf': 16,
'preset': 'veryslow',
'audio_bitrate': '320k',
'segment_division': 8,
'min_segment_duration': 2.0,
'output_format': 'mp4',
'max_workers': os.cpu_count() or 4,
'allowed_extensions': ['.mp4', '.mov', '.avi', '.mkv']
}
self.temp_dir = None
self.current_temp_dir = None
def _safe_run_command(self, cmd: List[str], error_context: str = "") -> None:
"""Exécute une commande shell avec gestion d'erreurs détaillée"""
try:
result = [Link](
cmd,
check=True,
stdout=[Link],
stderr=[Link],
universal_newlines=True
)
[Link](f"Commande réussie: {' '.join(cmd)}")
return result
except [Link] as e:
error_msg = f"{error_context}\nErreur: {[Link]}\nCommande: {' '.join(cmd)}"
[Link](error_msg)
raise VideoTransformError(error_msg) from e
except Exception as e:
error_msg = f"Erreur inattendue: {str(e)}"
[Link](error_msg)
raise VideoTransformError(error_msg) from e
def _generate_filters(self) -> Tuple[List[str], List[str]]:
"""Génère les filtres vidéo/audio avec validation"""
vf = []
af = []
try:
# Filtres vidéo plus agressifs pour altérer l'empreinte
scale_factor = [Link](0.65, 1.35)
[Link](f'scale=iw*{scale_factor}:-1:flags=lanczos')
# Rotation et transformations géométriques aléatoires
if [Link]() > 0.4:
[Link]('hflip')
if [Link]() > 0.4:
[Link]('vflip')
[Link]([
f'rotate={[Link](-20,20)}*PI/180:bilinear=1',
f'unsharp=[Link]{[Link](1.0,3.5)}',
f'noise=c0s={[Link](7,25)}:c0f=t+u',
f'eq=contrast={1+[Link](-0.35,0.35)}:brightness={[Link](-0.25,0.25)}',
f'hue=h={[Link](-35,35)}:s={[Link](0.9,1.1)}',
f'colorbalance=rs={[Link](-0.15,0.15)}:gs={[Link](-0.15,0.15)}:bs={[Link]-
form(-0.15,0.15)}',
'pp=al' # Filtre de déblocking léger
])
# Filtres audio renforcés
[Link]([
f'atempo={[Link](0.75,1.25)}',
f'asetrate=48000*{[Link](0.85,1.15)}',
'aresample=48000:resampler=soxr:dither_method=triangular',
f'volume={[Link](0.65,1.35)}',
'afftdn=nr=1:nf=-25', # Réduction de bruit
f'equalizer=f=1000:t=h:w=200:g={[Link](-5,5)}', # EQ aléatoire
'acompressor=threshold=-12dB:ratio=2:attack=200:release=1000' # Compression
])
return vf, af
except Exception as e:
[Link](f"Erreur génération filtres: {str(e)} - Utilisation de filtres par défaut")
return ['scale=iw:-1'], ['aresample=48000']
def _process_segment(self, segment_path: str, index: int) -> str:
"""Traite un segment vidéo individuel avec suppression totale des métadonnées"""
output = [Link](self.current_temp_dir, f'processed_segment_{index}.
{[Link]["output_format"]}')
vf, af = self._generate_filters()
# Génération de métadonnées aléatoires
random_metadata = {
'title': f'Video_{[Link](10000,99999)}',
'artist': f'Creator_{uuid.uuid4().hex[:6]}',
'album': f'Collection_{[Link](1,1000)}',
'date': f'{[Link](2020,2024)}-{[Link](1,12):02d}-
{[Link](1,28):02d}',
'comment': f'Generated_{uuid.uuid4().hex}',
'genre': f'Type_{[Link](1,100)}',
'copyright': f'Rights_{uuid.uuid4().hex[:8]}'
}
cmd = [
'ffmpeg', '-y', '-i', segment_path,
'-vf', ','.join(vf),
'-af', ','.join(af),
'-c:v', 'libx264',
'-preset', [Link]['preset'],
'-crf', str([Link]['crf']),
'-map_metadata', '-1', # Supprime toutes les métadonnées existantes
'-fflags', '+bitexact', # Assure une sortie déterministe
'-flags:v', '+bitexact',
'-flags:a', '+bitexact',
'-timestamp', str([Link](0, 2**32-1)), # Timestamp aléatoire
]
# Ajout des métadonnées aléatoires
for key, value in random_metadata.items():
[Link](['-metadata', f'{key}={value}'])
# Options supplémentaires pour la suppression des métadonnées
[Link]([
'-map_chapters', '-1', # Supprime les chapitres
'-map_metadata:s', '-1', # Supprime les métadonnées des streams
'-map_metadata:c', '-1', # Supprime les métadonnées des codecs
output
])
self._safe_run_command(cmd, f"Échec traitement segment {index}")
return output
def _concat_segments(self, segments: List[str], output_path: str) -> None:
"""Assemble les segments traités avec nouvelles métadonnées"""
list_file = [Link](self.current_temp_dir, 'concat_list.txt')
with open(list_file, 'w') as f:
for seg in segments:
[Link](f"file '{[Link](seg)}'\n")
# Nouvelles métadonnées pour le fichier final
final_metadata = {
'title': f'Processed_{uuid.uuid4().hex[:8]}',
'artist': f'System_{[Link](1000,9999)}',
'date': [Link]().strftime("%Y-%m-%d"),
'comment': f'Compiled_{uuid.uuid4().hex}',
}
cmd = [
'ffmpeg', '-y', '-f', 'concat', '-safe', '0',
'-i', list_file,
'-c', 'copy',
'-map_metadata', '-1', # Supprime les métadonnées des segments
'-movflags', '+faststart',
]
# Ajout des nouvelles métadonnées
for key, value in final_metadata.items():
[Link](['-metadata', f'{key}={value}'])
[Link](output_path)
self._safe_run_command(cmd, "Échec assemblage final")
def apply_transformations(self, input_path: str, output_path: str) -> str:
"""
Applique les transformations à la vidéo avec nettoyage des métadonnées
Args:
input_path: Chemin vers le fichier source
output_path: Chemin de sortie souhaité
Returns:
Chemin du fichier généré
"""
try:
if not [Link](input_path):
raise VideoTransformError(f"Le fichier {input_path} n'existe pas")
# Création répertoire temporaire
self.temp_dir = [Link](prefix='videotransform_')
self.current_temp_dir = self.temp_dir.name
# Découpage initial sans métadonnées
cmd = ['ffmpeg', '-i', input_path]
result = self._safe_run_command(cmd)
# Calcul de la durée pour le découpage
duration = float([Link]('Duration: ')[1].split(',')[0].split(':'))
segment_length = max(duration / [Link]['segment_division'],
[Link]['min_segment_duration'])
segments = []
for i in range([Link]['segment_division']):
segment_path = [Link](self.current_temp_dir, f'segment_{i}.mp4')
cmd = [
'ffmpeg', '-y',
'-i', input_path,
'-ss', str(i * segment_length),
'-t', str(segment_length),
'-map_metadata', '-1', # Supprime les métadonnées dès le découpage
'-c', 'copy',
segment_path
]
self._safe_run_command(cmd)
[Link](segment_path)
# Traitement parallèle des segments
with ThreadPoolExecutor(max_workers=[Link]['max_workers']) as executor:
futures = [[Link](self._process_segment, seg, i)
for i, seg in enumerate(segments)]
processed = [[Link]() for f in as_completed(futures)]
# Assemblage final
self._concat_segments(processed, output_path)
return output_path
except Exception as e:
[Link](f"Échec du traitement: {str(e)}")
raise
finally:
if hasattr(self, 'temp_dir') and self.temp_dir:
self.temp_dir.cleanup()
if __name__ == "__main__":
try:
processor = VideoTransformer()
output = processor.apply_transformations(
"input_video.mp4",
"transformed_output.mp4"
)
[Link](f"Traitement réussi: {output}")
except VideoTransformError as e:
[Link](f"Échec critique: {str(e)}")
exit(1)