0% ont trouvé ce document utile (0 vote)
19 vues8 pages

Code

Le document présente une classe Python, VideoTransformer, qui permet de transformer des vidéos en appliquant divers filtres audio et vidéo tout en supprimant les métadonnées. Il utilise des commandes FFmpeg pour découper la vidéo en segments, appliquer des transformations, et assembler le tout dans un fichier de sortie. La classe gère également les erreurs et le logging pour suivre le processus de transformation.

Transféré par

youssef l'optimiste
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd
0% ont trouvé ce document utile (0 vote)
19 vues8 pages

Code

Le document présente une classe Python, VideoTransformer, qui permet de transformer des vidéos en appliquant divers filtres audio et vidéo tout en supprimant les métadonnées. Il utilise des commandes FFmpeg pour découper la vidéo en segments, appliquer des transformations, et assembler le tout dans un fichier de sortie. La classe gère également les erreurs et le logging pour suivre le processus de transformation.

Transféré par

youssef l'optimiste
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd

import subprocess

import os
import random
import logging
import tempfile
import uuid # Ajouté pour générer des identifiants uniques
from datetime import datetime
from [Link] import ThreadPoolExecutor, as_completed
from typing import List, Tuple

# Configuration du logging
[Link](
level=[Link],
format='%(asctime)s - %(levelname)s - %(module)s - %(message)s',
handlers=[
[Link]('video_processing.log'),
[Link]()
]
)

class VideoTransformError(Exception):
"""Exception personnalisée pour les erreurs de traitement vidéo"""
pass

class VideoTransformer:
def __init__(self):
[Link] = {
'crf': 16,
'preset': 'veryslow',
'audio_bitrate': '320k',
'segment_division': 8,
'min_segment_duration': 2.0,
'output_format': 'mp4',
'max_workers': os.cpu_count() or 4,
'allowed_extensions': ['.mp4', '.mov', '.avi', '.mkv']
}
self.temp_dir = None
self.current_temp_dir = None

def _safe_run_command(self, cmd: List[str], error_context: str = "") -> None:


"""Exécute une commande shell avec gestion d'erreurs détaillée"""
try:
result = [Link](
cmd,
check=True,
stdout=[Link],
stderr=[Link],
universal_newlines=True
)
[Link](f"Commande réussie: {' '.join(cmd)}")
return result
except [Link] as e:
error_msg = f"{error_context}\nErreur: {[Link]}\nCommande: {' '.join(cmd)}"
[Link](error_msg)
raise VideoTransformError(error_msg) from e
except Exception as e:
error_msg = f"Erreur inattendue: {str(e)}"
[Link](error_msg)
raise VideoTransformError(error_msg) from e

def _generate_filters(self) -> Tuple[List[str], List[str]]:


"""Génère les filtres vidéo/audio avec validation"""
vf = []
af = []

try:
# Filtres vidéo plus agressifs pour altérer l'empreinte
scale_factor = [Link](0.65, 1.35)
[Link](f'scale=iw*{scale_factor}:-1:flags=lanczos')

# Rotation et transformations géométriques aléatoires


if [Link]() > 0.4:
[Link]('hflip')
if [Link]() > 0.4:
[Link]('vflip')

[Link]([
f'rotate={[Link](-20,20)}*PI/180:bilinear=1',
f'unsharp=[Link]{[Link](1.0,3.5)}',
f'noise=c0s={[Link](7,25)}:c0f=t+u',
f'eq=contrast={1+[Link](-0.35,0.35)}:brightness={[Link](-0.25,0.25)}',
f'hue=h={[Link](-35,35)}:s={[Link](0.9,1.1)}',

f'colorbalance=rs={[Link](-0.15,0.15)}:gs={[Link](-0.15,0.15)}:bs={[Link]-
form(-0.15,0.15)}',
'pp=al' # Filtre de déblocking léger
])

# Filtres audio renforcés


[Link]([
f'atempo={[Link](0.75,1.25)}',
f'asetrate=48000*{[Link](0.85,1.15)}',
'aresample=48000:resampler=soxr:dither_method=triangular',
f'volume={[Link](0.65,1.35)}',
'afftdn=nr=1:nf=-25', # Réduction de bruit
f'equalizer=f=1000:t=h:w=200:g={[Link](-5,5)}', # EQ aléatoire
'acompressor=threshold=-12dB:ratio=2:attack=200:release=1000' # Compression
])

return vf, af

except Exception as e:
[Link](f"Erreur génération filtres: {str(e)} - Utilisation de filtres par défaut")
return ['scale=iw:-1'], ['aresample=48000']

def _process_segment(self, segment_path: str, index: int) -> str:


"""Traite un segment vidéo individuel avec suppression totale des métadonnées"""
output = [Link](self.current_temp_dir, f'processed_segment_{index}.
{[Link]["output_format"]}')
vf, af = self._generate_filters()

# Génération de métadonnées aléatoires


random_metadata = {
'title': f'Video_{[Link](10000,99999)}',
'artist': f'Creator_{uuid.uuid4().hex[:6]}',
'album': f'Collection_{[Link](1,1000)}',
'date': f'{[Link](2020,2024)}-{[Link](1,12):02d}-
{[Link](1,28):02d}',
'comment': f'Generated_{uuid.uuid4().hex}',
'genre': f'Type_{[Link](1,100)}',
'copyright': f'Rights_{uuid.uuid4().hex[:8]}'
}

cmd = [
'ffmpeg', '-y', '-i', segment_path,
'-vf', ','.join(vf),
'-af', ','.join(af),
'-c:v', 'libx264',
'-preset', [Link]['preset'],
'-crf', str([Link]['crf']),
'-map_metadata', '-1', # Supprime toutes les métadonnées existantes
'-fflags', '+bitexact', # Assure une sortie déterministe
'-flags:v', '+bitexact',
'-flags:a', '+bitexact',
'-timestamp', str([Link](0, 2**32-1)), # Timestamp aléatoire
]

# Ajout des métadonnées aléatoires


for key, value in random_metadata.items():
[Link](['-metadata', f'{key}={value}'])

# Options supplémentaires pour la suppression des métadonnées


[Link]([
'-map_chapters', '-1', # Supprime les chapitres
'-map_metadata:s', '-1', # Supprime les métadonnées des streams
'-map_metadata:c', '-1', # Supprime les métadonnées des codecs
output
])

self._safe_run_command(cmd, f"Échec traitement segment {index}")


return output

def _concat_segments(self, segments: List[str], output_path: str) -> None:


"""Assemble les segments traités avec nouvelles métadonnées"""
list_file = [Link](self.current_temp_dir, 'concat_list.txt')
with open(list_file, 'w') as f:
for seg in segments:
[Link](f"file '{[Link](seg)}'\n")

# Nouvelles métadonnées pour le fichier final


final_metadata = {
'title': f'Processed_{uuid.uuid4().hex[:8]}',
'artist': f'System_{[Link](1000,9999)}',
'date': [Link]().strftime("%Y-%m-%d"),
'comment': f'Compiled_{uuid.uuid4().hex}',
}

cmd = [
'ffmpeg', '-y', '-f', 'concat', '-safe', '0',
'-i', list_file,
'-c', 'copy',
'-map_metadata', '-1', # Supprime les métadonnées des segments
'-movflags', '+faststart',
]

# Ajout des nouvelles métadonnées


for key, value in final_metadata.items():
[Link](['-metadata', f'{key}={value}'])
[Link](output_path)
self._safe_run_command(cmd, "Échec assemblage final")

def apply_transformations(self, input_path: str, output_path: str) -> str:


"""
Applique les transformations à la vidéo avec nettoyage des métadonnées

Args:
input_path: Chemin vers le fichier source
output_path: Chemin de sortie souhaité

Returns:
Chemin du fichier généré
"""
try:
if not [Link](input_path):
raise VideoTransformError(f"Le fichier {input_path} n'existe pas")

# Création répertoire temporaire


self.temp_dir = [Link](prefix='videotransform_')
self.current_temp_dir = self.temp_dir.name

# Découpage initial sans métadonnées


cmd = ['ffmpeg', '-i', input_path]
result = self._safe_run_command(cmd)

# Calcul de la durée pour le découpage


duration = float([Link]('Duration: ')[1].split(',')[0].split(':'))
segment_length = max(duration / [Link]['segment_division'],
[Link]['min_segment_duration'])

segments = []
for i in range([Link]['segment_division']):
segment_path = [Link](self.current_temp_dir, f'segment_{i}.mp4')
cmd = [
'ffmpeg', '-y',
'-i', input_path,
'-ss', str(i * segment_length),
'-t', str(segment_length),
'-map_metadata', '-1', # Supprime les métadonnées dès le découpage
'-c', 'copy',
segment_path
]
self._safe_run_command(cmd)
[Link](segment_path)

# Traitement parallèle des segments


with ThreadPoolExecutor(max_workers=[Link]['max_workers']) as executor:
futures = [[Link](self._process_segment, seg, i)
for i, seg in enumerate(segments)]

processed = [[Link]() for f in as_completed(futures)]

# Assemblage final
self._concat_segments(processed, output_path)

return output_path

except Exception as e:
[Link](f"Échec du traitement: {str(e)}")
raise
finally:
if hasattr(self, 'temp_dir') and self.temp_dir:
self.temp_dir.cleanup()

if __name__ == "__main__":
try:
processor = VideoTransformer()
output = processor.apply_transformations(
"input_video.mp4",
"transformed_output.mp4"
)
[Link](f"Traitement réussi: {output}")
except VideoTransformError as e:
[Link](f"Échec critique: {str(e)}")
exit(1)

Vous aimerez peut-être aussi