Multimodal speaker identification for video: selectable diarization, transcription, face recognition, voice matching, pixel mouth-gap active-speaker cues, and evidence fusion.
Project description
WhoSpoke
WhoSpoke is a multimodal speaker-identification package for video. It is designed to answer a research workflow question: who spoke, when, what did they say, which known people were visible, and what evidence supports each identity assignment?
Version 0.4.0 converts the latest working tutorial pipeline into a package release and updates the default mouth-open cue to pixel_mouth_gap, a per-face dark-aperture detector designed for ordinary speech in video frames. The dlib/mauckc MAR implementation, MediaPipe Face Mesh, and face_recognition landmark backends remain available. Users can switch between selectable speaker diarization, speech transcription, face recognition, mouth-open visual active-speaker, and voice matching backends through YAML configuration or CLI flags. The package does not silently return mock transcripts. When a selected backend is unavailable or fails, WhoSpoke records a clear stage-specific error in the result object and in the output JSON.
Main capabilities
WhoSpoke coordinates the following stages:
- Audio extraction from video with ffmpeg.
- Speech transcription / ASR using
faster_whisper, OpenAIwhisper,vosk, or Hugging Facetransformers_wav2vec. - Speaker diarization using
pyannote,pyannote_community, simpleenergyVAD segmentation,fixed_windows, ornone. - Voice matching against user-provided reference voice samples using SpeechBrain ECAPA-TDNN, SpeechBrain x-vector, or Resemblyzer.
- Face recognition against user-provided portrait images using DeepFace or
face_recognition/dlib. - All-faces frame evidence: every detected and matchable face in sampled frames is exported, not only the single best face.
- Mouth-open / visual active-speaker evidence using configurable per-face detectors. The default backend is
pixel_mouth_gap, which detects a dark mouth aperture separately for every detected face in every sampled frame;dlib_68_mauckc, MediaPipe Face Mesh, andface_recognitionlandmark backends remain available. - Evidence fusion combining diarization, ASR timing, voice identity, all visible face evidence, and mouth-open cues.
- CSV/JSON export with transparent evidence columns.
Output columns for all-face evidence
The final CSV contains one row per transcript segment. Face-related columns include:
face_best_speaker_idface_best_speaker_nameface_all_speaker_idsface_all_speaker_namesface_all_match_count_by_speakerface_all_matches_detailface_all_detected_countface_all_usable_candidate_countface_all_strict_match_countface_analyzed_frame_countface_frames_with_detected_faces
face_all_matches_detail is JSON text containing each candidate face observation, frame timestamp, face index, matched person, distance, strict/candidate status, crop path, and mouth-open fields when available.
Output columns for mouth-open / visual active-speaker evidence
WhoSpoke can estimate whether the mouth is open for each detected/matched face crop. This is useful as a lightweight visual active-speaker cue: if multiple known people are visible in the same sampled frames, the person with the most open-mouth observations during a transcript segment is exported as the visual active-speaker candidate. This cue is evidence, not a guarantee of speech; it should be interpreted alongside diarization, ASR timing, voice matching, and face identity.
Mouth-related final CSV columns include:
mouth_open_speaker_idsmouth_open_speaker_namesmouth_open_count_by_speakermouth_open_score_by_speakermouth_open_observation_countmouth_open_max_scoremouth_open_all_detailvisual_active_speaker_idvisual_active_speaker_namevisual_active_speaker_confidence
At the individual face-observation level, JSON output also records mouth_open, mouth_open_score, mouth_open_threshold, mouth_backend, mouth_landmarks, mouth_error, and per-face state/debug fields such as mouth_open_state, mouth_open_speech_cue, and mouth_open_debug_image_path when the selected backend provides them. The default pixel_mouth_gap backend looks for a dark aperture in the lower-central mouth region of each detected face crop and uses a speech-sensitive default threshold of 0.045. The optional dlib_68_mauckc backend follows the mauckc/mouth-open MAR formula: MAR = (distance(mouth[2], mouth[10]) + distance(mouth[4], mouth[8])) / (2 * distance(mouth[0], mouth[6])); its original strict threshold is 0.79.
Installation
Base package:
python -m pip install -e .
Install the backends you want:
# Faster Whisper ASR
python -m pip install -e '.[asr-faster-whisper]'
# OpenAI Whisper ASR
python -m pip install -e '.[asr-whisper]'
# Pyannote diarization
python -m pip install -e '.[diarization-pyannote]'
# DeepFace recognition
python -m pip install -e '.[face-deepface]'
# Mouth-open visual active-speaker cue with dlib68/mauckc MAR plus MediaPipe fallback
python -m pip install -e '.[active-speaker-mouth]'
# Recommended dlib install when using the default mauckc backend
conda install -c conda-forge dlib -y
# SpeechBrain voice matching
python -m pip install -e '.[voice-speechbrain]'
# Everything optional
python -m pip install -e '.[all]'
You also need ffmpeg:
# macOS
brew install ffmpeg
# conda
conda install -c conda-forge ffmpeg -y
Mouth-open backend choices
The default mouth-open backend is pixel_mouth_gap. It requires OpenCV and Pillow but does not require dlib, MediaPipe, or a shape-predictor data file. It is usually the best first option for press-conference or bilateral-meeting videos because it scores each detected face crop directly.
The optional dlib_68_mauckc backend is adapted from mauckc/mouth-open. It requires dlib and the 68-point shape-predictor data file:
conda install -c conda-forge dlib -y
# then place the file in your project folder, or set an environment variable:
export WHOSPOKE_DLIB_SHAPE_PREDICTOR=/path/to/shape_predictor_68_face_landmarks.dat
If the predictor file is not available, use the default mouth_backend: pixel_mouth_gap, or set mouth_backend: mediapipe_face_mesh / mouth_backend: auto to use fallback behavior.
Hugging Face access tokens
Some WhoSpoke backends download models from the Hugging Face Hub. Public models may download without authentication, but the full set of diarization and transcription options can require a Hugging Face account, accepted model terms, and a User Access Token with read access.
This is especially important for pyannote diarization models such as:
pyannote/speaker-diarization-3.1pyannote/speaker-diarization-community-1pyannote/segmentation-3.0
Before running these models, open the relevant Hugging Face model pages in a browser, sign in, and accept/request access when the model is gated. Then create a read token from your Hugging Face account settings and expose it to WhoSpoke in one of these ways:
# Recommended shell option
export HF_TOKEN=hf_your_read_token_here
# Also supported by WhoSpoke and the tutorial
export PYANNOTE_AUTH_TOKEN=hf_your_read_token_here
export HUGGINGFACE_TOKEN=hf_your_read_token_here
export HUGGING_FACE_HUB_TOKEN=hf_your_read_token_here
The YAML configuration can point to the environment variable:
diarization:
algorithm: pyannote
model_name: pyannote/speaker-diarization-3.1
num_speakers: 2
auth_token_env: HF_TOKEN
Or, for local experiments only, you can pass a token directly in the config:
diarization:
algorithm: pyannote
model_name: pyannote/speaker-diarization-3.1
auth_token: hf_your_read_token_here
For reproducibility and security, environment variables are preferred over storing tokens in YAML files, notebooks, Git repositories, or shared project folders.
If you use transformers_wav2vec with a private or gated ASR model, authenticate with the Hugging Face CLI or set HF_TOKEN before running WhoSpoke. The merged tutorial file also asks for a Hugging Face read token when it runs pyannote.
Recommended DeepFace environment on macOS/Anaconda
DeepFace/TensorFlow/Keras can conflict with the main research environment. WhoSpoke can run DeepFace in a separate subprocess environment:
conda create -n whospoke_deepface python=3.10 -y
conda activate whospoke_deepface
python -m pip install --upgrade pip setuptools wheel
conda install -c conda-forge dlib -y
python -m pip install --no-cache-dir "numpy==1.26.4" "tensorflow==2.15.0" "keras==2.15.0" "tf-keras==2.15.0" "opencv-python-headless==4.9.0.80" "deepface==0.0.89" "mediapipe>=0.10"
# install WhoSpoke in that env too, so python -m WhoSpoke.deepface_worker works
python -m pip install -e /path/to/WhoSpoke
Then set in your config:
face:
backend: deepface
model_name: SFace
run_in_subprocess: true
subprocess_python: /opt/anaconda3/envs/whospoke_deepface/bin/python
Example people file
Create people.yaml:
people:
- id: emmanuel_macron
name: Emmanuel Macron
portraits:
- /path/to/macron.jpg
voice_samples:
- /path/to/macron_voice_sample.wav
- id: donald_j_trump
name: Donald J. Trump
portraits:
- /path/to/trump.jpg
voice_samples:
- /path/to/trump_voice_sample.wav
Example config
Create a default config:
whospoke write-example-config whospoke_config.yaml
A practical multilingual config:
asr:
backend: faster_whisper
model_name: small
language: null
task: transcribe
device: cpu
compute_type: int8
beam_size: 1
vad_filter: false
# For pyannote, set HF_TOKEN in your shell or put auth_token here.
diarization:
algorithm: pyannote
model_name: pyannote/speaker-diarization-3.1
num_speakers: 2
auth_token_env: HF_TOKEN
voice:
backend: speechbrain_ecapa
similarity_threshold: 0.20
margin_threshold: 0.03
force_best_match: true
face:
backend: deepface
model_name: SFace
detector_backends: [opencv, ssd, mediapipe, mtcnn, retinaface]
sample_fps: 2.0
strict_threshold: 0.60
candidate_threshold: 0.95
run_in_subprocess: true
subprocess_python: /opt/anaconda3/envs/whospoke_deepface/bin/python
detect_mouth_open: true
mouth_backend: pixel_mouth_gap # pixel_mouth_gap, auto, dlib_68_mauckc, mediapipe_face_mesh, face_recognition_landmarks, none
mouth_open_threshold: 0.045 # speech-sensitive threshold for pixel_mouth_gap
# Needed only for dlib_68_mauckc:
mouth_shape_predictor_path: /path/to/shape_predictor_68_face_landmarks.dat
active_speaker:
backend: mouth_open
threshold: 0.045
min_open_observations: 1
fusion:
mode: voice_primary
use_face_to_override_voice: false
use_face_when_voice_unknown: true
Run from the command line
whospoke analyze /path/to/video.mp4 --config whospoke_config.yaml --people people.yaml --output-dir output_macron_trump
Outputs:
output_macron_trump/whospoke_result.json
output_macron_trump/whospoke_segments.csv
Run from Python
from WhoSpoke import AnalysisConfig, PersonProfile, SpeakerVideoAnalyzer
config = AnalysisConfig.from_yaml("whospoke_config.yaml")
people = [
PersonProfile(
id="emmanuel_macron",
name="Emmanuel Macron",
portraits=["/path/to/macron.jpg"],
voice_samples=["/path/to/macron_voice_sample.wav"],
),
PersonProfile(
id="donald_j_trump",
name="Donald J. Trump",
portraits=["/path/to/trump.jpg"],
voice_samples=["/path/to/trump_voice_sample.wav"],
),
]
result = SpeakerVideoAnalyzer(config).analyze(
"/path/to/macron_trump_test_video.mp4",
people=people,
output_dir="output_macron_trump",
)
result.to_json("output_macron_trump/result.json")
result.to_csv("output_macron_trump/segments.csv")
Backend choices
List all available backend names:
whospoke list-backends
Current implemented backend names:
ASR
faster_whisperwhispervosktransformers_wav2vec
Diarization
pyannotepyannote_communityenergyfixed_windowsnone
Face recognition
deepfaceface_recognitionnone
For DeepFace, set face.model_name to any DeepFace-supported model name, such as SFace, ArcFace, Facenet, Facenet512, VGG-Face, Dlib, OpenFace, DeepFace, DeepID, GhostFaceNet, or Buffalo_L.
Active-speaker / mouth-open visual cue
mouth_opennone
The mouth_open backend is based on face detections produced by the face-recognition stage. With mouth_backend: pixel_mouth_gap, WhoSpoke analyzes the lower-central mouth region of every detected face crop and looks for a dark mouth aperture; this is the default because it worked better in the Trump/Macron tutorial when landmark-based methods did not produce open-mouth cues. With mouth_backend: dlib_68_mauckc, WhoSpoke uses the dlib 68-point shape predictor and the MAR formula from mauckc/mouth-open. With mouth_backend: mediapipe_face_mesh, WhoSpoke uses MediaPipe Face Mesh lip landmarks. With mouth_backend: face_recognition_landmarks, WhoSpoke uses lip landmarks exposed by face_recognition. With mouth_backend: auto, it tries the pixel-gap backend first and then falls back to the landmark-based methods.
Voice matching
speechbrain_ecapaspeechbrain_xvectorresemblyzernone
Tutorial code included
The complete merged tutorial code is included here:
examples/tutorial_multilingual_voice_face_mouth_pixel_gap.py
The older debug tutorial name is also kept for convenience:
examples/tutorial_multilingual_voice_face_debug.py
That tutorial preserves the debug-friendly, cell-by-cell logic from the research notebook while using the same methodological components as the package: pyannote diarization, faster-whisper ASR, SpeechBrain voice matching, DeepFace face recognition in a subprocess, dense speech-turn face sampling, all-faces evidence extraction, per-face pixel-gap mouth-state detection, mouth-open visual active-speaker evidence, and final multimodal fusion.
Development checks
python -m pip install -e '.[dev]'
pytest
python -m compileall src examples
Notes
- WhoSpoke intentionally keeps heavy ML dependencies optional.
- Mouth-open detection is a visual cue, not a full lip-reading model. It works best when faces are frontal, sufficiently large, and sampled at enough frames per second.
- Hugging Face access tokens may be required for gated/private pyannote or Transformers models; set
HF_TOKENor configurediarization.auth_token_env. - If a backend is selected but its package is missing, the pipeline raises a clear dependency error for that stage.
- ASR is the only essential stage for transcript text. Diarization, voice, and face stages may fail independently and will be reported in
result.errors. - No stage returns fabricated transcript content. Use
diarization.algorithm: none,voice.backend: none, orface.backend: noneto explicitly skip stages.
Full pixel-gap mouth-open tutorial
The package includes the full tutorial under examples/tutorial_multilingual_voice_face_mouth_pixel_gap.py. The same tutorial is embedded below so the README is self-contained. This tutorial is intentionally verbose and debug-friendly: it prints checkpoints, isolates pyannote and DeepFace work, samples frames densely inside speaking turns, detects every face in each sampled frame, assigns all visible/matchable identities, and classifies each detected face's mouth state using the pixel_mouth_gap algorithm.
Show full tutorial code
# WhoSpoke multilingual voice-ID debug tutorial — merged latest script
# Generated from the multi-cell tutorial by merging cells into one run-through Python file.
# Includes dedicated DeepFace/TensorFlow subprocess environment diagnostics, tf_keras fallback shim,
# all-face evidence, dense speech-turn sampling, and per-face pixel-gap mouth-state cues.
# Original notebook title:
# # WhoSpoke multilingual voice-ID debug notebook
#
# This version is designed to avoid the “no output at all” problem.
#
# Key changes:
# - The first cell prints immediately before any heavy imports.
# - The dependency check uses `importlib.util.find_spec` instead of importing heavy modules.
# - Heavy imports (`pyannote.audio`, `faster_whisper`, `speechbrain`, `torch`, `torchaudio`) happen only inside the cells that need them.
# - Pyannote model loading has explicit progress messages and fallbacks.
# - The notebook is split into small cells so you can see exactly where a stall occurs.
# --- Merged former notebook cell 2 ---
# =============================================================================
# CELL 1 — ZERO-DEPENDENCY STARTUP CHECK
# =============================================================================
import sys
import os
import time
from pathlib import Path
# Fallback for running this merged notebook as a plain Python script.
# In Jupyter, IPython.display.display will be used; outside Jupyter,
# objects are printed in a readable form.
try:
from IPython.display import display
except Exception:
def display(obj):
print(obj)
GLOBAL_START_TIME = time.time()
def checkpoint(message):
elapsed = time.time() - GLOBAL_START_TIME
print(f"\n[{elapsed:8.1f}s] {message}", flush=True)
checkpoint("Cell 1 started successfully")
base = Path("/Users/cantayus/Dropbox/ders/research/WhoSpoke_Python_Package")
video_path = base / "macron_trump_test_video.mp4"
audio_path = base / "macron_trump_test_video_16k_mono.wav"
macron_voice_sample = base / "macron_voice_sample.wav"
trump_voice_sample = base / "trump_voice_sample.wav"
# Reference face images for optional face matching.
macron_face_image = base / "macron.jpg"
trump_face_image = base / "trump.jpg"
direct_csv_output_path = base / "result_macron_trump_multilingual_pyannote_asr.csv"
direct_json_output_path = base / "result_macron_trump_multilingual_pyannote_asr.json"
diarization_csv_path = base / "result_macron_trump_diarization_segments.csv"
merged_turns_csv_path = base / "result_macron_trump_merged_turns.csv"
cluster_audio_dir = base / "speaker_cluster_voice_audio"
turn_audio_dir = base / "turn_audio_for_multilingual_asr"
speechbrain_cache_dir = base / "cache" / "speechbrain_ecapa"
named_csv_output_path = base / "result_macron_trump_voice_identified_multilingual.csv"
named_json_output_path = base / "result_macron_trump_voice_identified_multilingual.json"
similarity_csv_output_path = base / "result_macron_trump_voice_similarity_matrix.csv"
# Face-matching outputs.
face_frame_dir = base / "face_sampled_frames"
face_crop_dir = base / "face_detected_crops"
face_frame_manifest_csv_path = base / "result_macron_trump_face_sampled_frames_manifest.csv"
face_match_csv_path = base / "result_macron_trump_face_matches_by_frame.csv"
face_evidence_csv_path = base / "result_macron_trump_face_evidence_by_segment.csv"
deepface_input_csv_path = base / "result_macron_trump_voice_identified_for_deepface_input.csv"
deepface_config_path = base / "deepface_face_matching_config.json"
deepface_metadata_json_path = base / "result_macron_trump_deepface_metadata.json"
deepface_runner_script_path = base / "run_deepface_face_matching_subprocess.py"
# Final multimodal voice + face outputs.
voice_face_csv_output_path = base / "result_macron_trump_voice_face_identified_multilingual.csv"
voice_face_json_output_path = base / "result_macron_trump_voice_face_identified_multilingual.json"
print("Python executable:", sys.executable, flush=True)
print("Base folder:", base, flush=True)
print("Base exists:", base.exists(), flush=True)
print("Video exists:", video_path.exists(), "->", video_path, flush=True)
print("Macron voice sample exists:", macron_voice_sample.exists(), "->", macron_voice_sample, flush=True)
print("Trump voice sample exists:", trump_voice_sample.exists(), "->", trump_voice_sample, flush=True)
print("Macron face image exists:", macron_face_image.exists(), "->", macron_face_image, flush=True)
print("Trump face image exists:", trump_face_image.exists(), "->", trump_face_image, flush=True)
checkpoint("Cell 1 complete")
# --- Merged former notebook cell 3 ---
# =============================================================================
# CELL 2 — LIGHTWEIGHT SETTINGS AND BASIC HELPERS
# =============================================================================
checkpoint("Cell 2 started")
import json
import shutil
import subprocess
from getpass import getpass
from collections import defaultdict
# Do NOT import pyannote, faster_whisper, speechbrain, torch, or torchaudio here.
FASTER_WHISPER_MODEL = "small"
FASTER_WHISPER_COMPUTE_TYPE = "int8"
NUM_SPEAKERS = 2
MIN_TURN_DURATION_FOR_ASR = 0.60
MAX_TURN_DURATION_FOR_ASR = 18.0
MERGE_GAP_SECONDS = 0.45
TURN_ASR_PAD_SECONDS = 0.08
FORCE_BEST_MATCH = True
ENFORCE_ONE_TO_ONE_ASSIGNMENT = True
VOICE_ASSIGNMENT_THRESHOLD = 0.20
VOICE_ASSIGNMENT_MARGIN = 0.03
MAX_CLUSTER_SEGMENTS = 8
MIN_CLUSTER_SEGMENT_DURATION = 0.50
PAD_SECONDS = 0.05
MAX_CLUSTER_AUDIO_SECONDS = 30.0
# Optional face-matching settings.
# This v6 cell runs DeepFace in a separate Python subprocess.
# This prevents DeepFace / TensorFlow / OpenCV native crashes from restarting
# the Jupyter kernel.
# -------------------------------------------------------------------------
# Face-frame sampling strategy
# -------------------------------------------------------------------------
# Uniform 2 fps sampling can easily miss transient mouth openings during speech.
# The tutorial now samples densely *inside diarized/ASR speaking turns* and uses
# small temporal offsets around each sampled timestamp. This makes it much more
# likely to catch open-mouth frames for each detected face.
#
# Strategy options:
# speech_turn_dense -> dense sampling within transcript/diarized turns
# hybrid_speech_uniform -> speech_turn_dense + low-rate global baseline
# uniform -> old behavior: uniform sampling across full video
# -------------------------------------------------------------------------
FACE_SAMPLING_STRATEGY = "hybrid_speech_uniform"
# Legacy/global sampling rate. Used for the uniform strategy and as the low-rate
# baseline when FACE_SAMPLING_STRATEGY="hybrid_speech_uniform".
FACE_SAMPLE_FPS = 2.0
FACE_GLOBAL_SAMPLE_FPS = 0.50
# Dense sampling rate within diarized/ASR turns. Increase to 10–12 for short
# clips when mouth openings are still missed. Lower for long videos.
FACE_SPEECH_SAMPLE_FPS = 8.0
# Add a small pad around each spoken turn so the sampler catches mouth movement
# immediately before/after the diarized interval.
FACE_SPEECH_TURN_PAD_SECONDS = 0.20
# For each dense-sampled speech timestamp, also sample these offsets. This is the
# key change for transient mouth-open events: if one exact frame hits a closed
# mouth between syllables, a nearby offset may catch the open-mouth frame.
FACE_SPEECH_SAMPLE_OFFSET_SECONDS = [0.0, 0.05, -0.05]
# De-duplicate near-identical timestamps and cap the total number of frames to
# keep DeepFace runtime manageable. Set FACE_MAX_SAMPLED_FRAMES=None to disable.
FACE_SAMPLE_MIN_GAP_SECONDS = 0.025
FACE_MAX_SAMPLED_FRAMES = 1500
FACE_SAMPLE_ONLY_TURNS_WITH_TEXT = False
# Safer default model. SFace is usually more stable because it is OpenCV-based.
# You can switch to "ArcFace" later if your environment is stable.
DEEPFACE_MODEL_NAME = "SFace"
# Try these detectors in order. "opencv" is the safest first choice.
DEEPFACE_DETECTOR_BACKENDS = [
"opencv",
"ssd",
"mediapipe",
"mtcnn",
"retinaface",
]
DEEPFACE_ALIGN = True
DEEPFACE_NORMALIZATION = "base"
# Strict match threshold for cosine distance. Lower means more similar.
FACE_DISTANCE_THRESHOLD = 0.60
# Loose candidate threshold used for segment-level evidence.
FACE_CANDIDATE_MAX_DISTANCE = 0.95
# Resize very large frames before DeepFace. This keeps the subprocess stable.
FACE_RESIZE_MAX_WIDTH = 1600
# Expand detected face boxes before saving face crops.
FACE_CROP_EXPAND_MARGIN = 0.25
# Crop and save each detected face from sampled frames for inspection.
SAVE_FACE_CROPS = True
# -------------------------------------------------------------------------
# Mouth-open / visual active-speaker settings
# -------------------------------------------------------------------------
# These settings estimate whether each detected/matched face has an open mouth.
# The signal is not lip-reading; it is a visual active-speaker cue that should be
# interpreted alongside diarization, ASR timing, voice matching, and face matching.
#
# Backend options:
# pixel_mouth_gap -> OpenCV dark-aperture detector on every detected face crop
# dlib_68_mauckc -> dlib 68-point landmarks + mauckc mouth-open MAR formula
# auto -> try pixel_mouth_gap, dlib_68_mauckc, MediaPipe, then face_recognition
# mediapipe_face_mesh -> dense lip landmarks from MediaPipe Face Mesh
# face_recognition_landmarks-> dlib/face_recognition lip landmarks
# none -> disable mouth-open estimation
#
# Default formula follows the mauckc/mouth-open implementation:
# MAR = (dist(mouth[2], mouth[10]) + dist(mouth[4], mouth[8])) / (2 * dist(mouth[0], mouth[6]))
# where the 20 dlib mouth landmarks are sliced from the 68-point face-landmark array.
# The original mauckc threshold is 0.79. That is useful for a clearly open
# mouth / yawn-like state, but it is often too strict for ordinary speech.
# This tutorial therefore keeps the strict threshold and adds a separate,
# lower speech-cue threshold used for visual active-speaker evidence.
# If you still receive no cues, inspect mouth_open_max_score and lower
# MOUTH_SPEAKING_CUE_THRESHOLD gradually.
# -------------------------------------------------------------------------
MOUTH_OPEN_DETECTION_ENABLED = True
MOUTH_OPEN_BACKEND = "pixel_mouth_gap"
# Strict binary open-mouth threshold from mauckc/mouth-open.
MOUTH_OPEN_THRESHOLD = 0.79
# Softer threshold for speech-related mouth motion. This is what drives the
# visual_active_speaker_* cue when no frame reaches the strict 0.79 threshold.
MOUTH_SPEAKING_CUE_THRESHOLD = 0.045
# Last-resort diagnostic cue: if no frame exceeds MOUTH_SPEAKING_CUE_THRESHOLD,
# still report the highest-scoring matched face if it clears this minimum.
# This prevents silent failure and helps tune thresholds per video.
MOUTH_OPEN_REPORT_BEST_SCORED_FACE = True
MOUTH_OPEN_MIN_SCORE_FOR_BEST_CUE = 0.02
MOUTH_OPEN_TOP_DEBUG_ROWS = 25
# Per-face open/closed classification thresholds. The strict 0.79 threshold
# remains available, but ordinary speech often needs a lower threshold. These
# thresholds are used to classify every detected face in every sampled frame as
# open, closed, or unknown.
MOUTH_OPEN_STATE_THRESHOLD_DLIB = 0.12
MOUTH_OPEN_STATE_THRESHOLD_MEDIAPIPE = 0.050
MOUTH_OPEN_STATE_THRESHOLD_FACE_RECOGNITION = 0.065
# New default algorithm: pixel-based mouth aperture / dark-gap detection.
# This runs separately for every detected face crop and does not depend on dlib,
# MediaPipe, or 68-point landmarks. It looks for a dark vertical aperture in the
# lower-central mouth region of each detected face. This is more forgiving for
# press-conference videos where landmark models may fail or yield very small MAR
# values even when a person is visibly speaking.
MOUTH_OPEN_STATE_THRESHOLD_PIXEL_GAP = 0.045
MOUTH_PIXEL_ROI_LEFT = 0.18
MOUTH_PIXEL_ROI_RIGHT = 0.82
MOUTH_PIXEL_ROI_TOP = 0.52
MOUTH_PIXEL_ROI_BOTTOM = 0.92
MOUTH_PIXEL_DARK_PERCENTILE = 22
MOUTH_PIXEL_DARK_STD_FACTOR = 0.15
MOUTH_PIXEL_MIN_COMPONENT_AREA_RATIO = 0.0015
MOUTH_PIXEL_MIN_COMPONENT_HEIGHT_RATIO = 0.030
MOUTH_PIXEL_SAVE_DEBUG_ROIS = True
MOUTH_PIXEL_DEBUG_ROI_DIR = base / "mouth_open_pixel_gap_debug_rois"
MOUTH_OPEN_RECT_EXPAND_MARGIN = 0.45
MOUTH_OPEN_MIN_SCORABLE_FACE_WIDTH = 20
MOUTH_OPEN_MIN_SCORABLE_FACE_HEIGHT = 20
MOUTH_OPEN_DEBUG_PER_FACE_CSV_PATH = base / "result_macron_trump_mouth_open_per_face_debug.csv"
MOUTH_OPEN_MIN_DETECTION_CONFIDENCE = 0.50
DLIB_SHAPE_PREDICTOR_PATH = Path(
os.environ.get(
"WHOSPOKE_DLIB_SHAPE_PREDICTOR",
str(base / "shape_predictor_68_face_landmarks.dat"),
)
)
# Run DeepFace outside the notebook kernel.
DEEPFACE_SUBPROCESS_TIMEOUT_SECONDS = 1800
# -------------------------------------------------------------------------
# Dedicated DeepFace subprocess Python environment
# -------------------------------------------------------------------------
# DeepFace is run in a separate clean conda environment because TensorFlow/Keras
# can crash or conflict with the main notebook environment.
#
# Recommended setup in Terminal:
#
# conda create -n whospoke_deepface python=3.10 -y
# conda activate whospoke_deepface
# python -m pip install --upgrade pip setuptools wheel
# python -m pip uninstall -y deepface tensorflow tensorflow-macos tensorflow-metal keras tf-keras tf_keras opencv-python opencv-contrib-python opencv-python-headless
# python -m pip install --no-cache-dir "numpy==1.26.4" "tensorflow==2.15.0" "keras==2.15.0" "tf-keras==2.15.0" "opencv-python-headless==4.9.0.80" "mediapipe" "dlib" "deepface==0.0.89"
#
# Then test:
# conda activate whospoke_deepface
# python -c "from tensorflow.keras.models import Sequential; print('Sequential OK')"
# python -c "from deepface import DeepFace; print('deepface OK')"
# python -c "import mediapipe as mp; print('mediapipe', mp.__version__)"
# python -c "import dlib; print('dlib', getattr(dlib, '__version__', 'unknown'))"
# -------------------------------------------------------------------------
DEEPFACE_PYTHON = Path("/opt/anaconda3/envs/whospoke_deepface/bin/python")
DEEPFACE_REQUIRE_WORKING_IMPORT = True
deepface_environment_diagnostic_json_path = base / "deepface_environment_diagnostic.json"
def make_deepface_subprocess_env():
"""Environment variables for the isolated DeepFace subprocess."""
env = os.environ.copy()
# Force CPU-only DeepFace/TensorFlow.
env["CUDA_VISIBLE_DEVICES"] = "-1"
env["TF_CPP_MIN_LOG_LEVEL"] = "3"
# Native-library safety settings.
env["KMP_DUPLICATE_LIB_OK"] = "TRUE"
env["OMP_NUM_THREADS"] = "1"
env["MKL_NUM_THREADS"] = "1"
env["OPENBLAS_NUM_THREADS"] = "1"
env["VECLIB_MAXIMUM_THREADS"] = "1"
env["NUMEXPR_NUM_THREADS"] = "1"
# Important for TensorFlow/Keras compatibility:
# avoid a shell-level setting that forces a mismatched Keras path.
env.pop("TF_USE_LEGACY_KERAS", None)
return env
def run_deepface_env_test(label, code, timeout_seconds=60):
"""Run a small Python import test inside the dedicated DeepFace environment."""
result = subprocess.run(
[str(DEEPFACE_PYTHON), "-c", code],
capture_output=True,
text=True,
timeout=timeout_seconds,
env=make_deepface_subprocess_env(),
)
return {
"label": label,
"ok": result.returncode == 0,
"returncode": result.returncode,
"stdout": result.stdout or "",
"stderr": result.stderr or "",
"code": code,
}
def diagnose_deepface_environment():
"""Verify that the isolated DeepFace environment is usable before face matching."""
checkpoint("Checking dedicated DeepFace subprocess environment")
if not DEEPFACE_PYTHON.exists():
raise FileNotFoundError(
"The configured DeepFace Python executable does not exist:\n"
f"{DEEPFACE_PYTHON}\n\n"
"Create the environment with:\n"
"conda create -n whospoke_deepface python=3.10 -y"
)
tests = [
("python executable", "import sys; print(sys.executable)"),
("numpy", "import numpy as np; print('numpy', np.__version__)"),
("cv2", "import cv2; print('cv2', cv2.__version__)"),
("mediapipe", "import mediapipe as mp; print('mediapipe', mp.__version__)"),
("dlib", "import dlib; print('dlib', getattr(dlib, '__version__', 'unknown'))"),
("tensorflow", "import tensorflow as tf; print('tensorflow', tf.__version__)"),
("tensorflow.keras", "import tensorflow.keras as tk; print('tensorflow.keras OK')"),
("tensorflow.keras.models.Sequential", "from tensorflow.keras.models import Sequential; print('Sequential OK')"),
("tf_keras", "import tf_keras; print('tf_keras', tf_keras.__version__)"),
("deepface direct import", "from deepface import DeepFace; print('deepface OK')"),
(
"deepface import with tf_keras shim",
r"""
import sys
try:
from tensorflow.keras.models import Sequential
print("tensorflow.keras.models.Sequential direct OK")
except Exception as exc:
print("Direct tensorflow.keras Sequential failed:", repr(exc))
import tf_keras
sys.modules.setdefault("tensorflow.keras", tf_keras)
sys.modules.setdefault("tensorflow.keras.models", tf_keras.models)
sys.modules.setdefault("tensorflow.keras.layers", tf_keras.layers)
sys.modules.setdefault("tensorflow.keras.backend", tf_keras.backend)
sys.modules.setdefault("tensorflow.keras.optimizers", tf_keras.optimizers)
sys.modules.setdefault("tensorflow.keras.utils", tf_keras.utils)
sys.modules.setdefault("tensorflow.keras.callbacks", tf_keras.callbacks)
sys.modules.setdefault("tensorflow.keras.losses", tf_keras.losses)
sys.modules.setdefault("tensorflow.keras.metrics", tf_keras.metrics)
from tensorflow.keras.models import Sequential
print("tensorflow.keras.models.Sequential OK after shim")
from deepface import DeepFace
print("deepface OK with compatibility path")
""",
),
]
records = []
for label, code in tests:
print("\n" + "-" * 80, flush=True)
print(f"DeepFace environment test: {label}", flush=True)
print("-" * 80, flush=True)
record = run_deepface_env_test(label, code)
records.append(record)
print("returncode:", record["returncode"], flush=True)
print("stdout:", record["stdout"].strip() or "[empty]", flush=True)
print("stderr:", record["stderr"].strip() or "[empty]", flush=True)
diagnostic = {
"deepface_python": str(DEEPFACE_PYTHON),
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"tests": records,
}
deepface_environment_diagnostic_json_path.write_text(
json.dumps(diagnostic, indent=2, ensure_ascii=False),
encoding="utf-8",
)
direct_ok = any(r["label"] == "deepface direct import" and r["ok"] for r in records)
shim_ok = any(r["label"] == "deepface import with tf_keras shim" and r["ok"] for r in records)
if not direct_ok and not shim_ok:
raise RuntimeError(
"The dedicated DeepFace environment is not ready.\n\n"
f"Diagnostic written to: {deepface_environment_diagnostic_json_path}\n\n"
"Fix with:\n\n"
"conda activate whospoke_deepface\n"
"python -m pip uninstall -y deepface tensorflow tensorflow-macos tensorflow-metal keras tf-keras tf_keras "
"opencv-python opencv-contrib-python opencv-python-headless\n"
"python -m pip install --upgrade pip setuptools wheel\n"
"python -m pip install --no-cache-dir "
"\"numpy==1.26.4\" "
"\"tensorflow==2.15.0\" "
"\"keras==2.15.0\" "
"\"tf-keras==2.15.0\" "
"\"opencv-python-headless==4.9.0.80\" "
"\"deepface==0.0.89\""
)
checkpoint("Dedicated DeepFace subprocess environment passed")
# Keep voice as primary speaker identity.
# Face matching is treated as supporting evidence by default.
USE_FACE_TO_OVERRIDE_VOICE = False
USE_FACE_WHEN_VOICE_UNKNOWN = True
# Even when no strict match is found, keep the nearest face candidate.
USE_BEST_FACE_CANDIDATE_EVEN_IF_NOT_STRICT = True
VERBOSE_FFMPEG = False
cluster_audio_dir.mkdir(parents=True, exist_ok=True)
turn_audio_dir.mkdir(parents=True, exist_ok=True)
speechbrain_cache_dir.mkdir(parents=True, exist_ok=True)
face_frame_dir.mkdir(parents=True, exist_ok=True)
face_crop_dir.mkdir(parents=True, exist_ok=True)
def run_command(cmd, description=None, check=True, verbose=None):
if verbose is None:
verbose = VERBOSE_FFMPEG
if description:
checkpoint(description)
if verbose:
print("Command:", " ".join(str(x) for x in cmd), flush=True)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
)
if verbose and result.stdout:
print(result.stdout[-4000:], flush=True)
if verbose and result.stderr:
print(result.stderr[-4000:], flush=True)
if check and result.returncode != 0:
print("Command:", " ".join(str(x) for x in cmd), flush=True)
print("STDOUT:", result.stdout[-4000:], flush=True)
print("STDERR:", result.stderr[-4000:], flush=True)
raise RuntimeError(
f"Command failed with return code {result.returncode}: "
f"{' '.join(str(x) for x in cmd)}"
)
return result
def extract_turn_audio(source_wav, start, end, output_wav, pad_seconds=0.08):
start_padded = max(0.0, float(start) - pad_seconds)
end_padded = float(end) + pad_seconds
duration = max(0.1, end_padded - start_padded)
cmd = [
"ffmpeg",
"-y",
"-ss",
f"{start_padded:.3f}",
"-t",
f"{duration:.3f}",
"-i",
str(source_wav),
"-ac",
"1",
"-ar",
"16000",
"-acodec",
"pcm_s16le",
str(output_wav),
]
run_command(cmd, check=True)
def extract_wav_clip(source_wav, start, duration, output_wav):
cmd = [
"ffmpeg",
"-y",
"-ss",
f"{start:.3f}",
"-t",
f"{duration:.3f}",
"-i",
str(source_wav),
"-ac",
"1",
"-ar",
"16000",
"-acodec",
"pcm_s16le",
str(output_wav),
]
run_command(cmd, check=True)
def concat_wav_files(wav_paths, output_wav):
concat_list_path = output_wav.parent / f"{output_wav.stem}_concat_list.txt"
with concat_list_path.open("w", encoding="utf-8") as f:
for wav_path in wav_paths:
f.write(f"file '{wav_path.as_posix()}'\n")
cmd = [
"ffmpeg",
"-y",
"-f",
"concat",
"-safe",
"0",
"-i",
str(concat_list_path),
"-ac",
"1",
"-ar",
"16000",
"-acodec",
"pcm_s16le",
str(output_wav),
]
run_command(cmd, check=True)
checkpoint("Cell 2 complete")
# --- Merged former notebook cell 4 ---
# =============================================================================
# CELL 3 — FILES, FFMPEG, AND LIGHTWEIGHT DEPENDENCY DISCOVERY
# =============================================================================
checkpoint("Cell 3 started")
required_files = [
video_path,
macron_voice_sample,
trump_voice_sample,
]
missing_files = []
for path in required_files:
print(f"{path.name}: {path.exists()} -> {path}", flush=True)
if not path.exists():
missing_files.append(path)
if missing_files:
raise FileNotFoundError(
"Missing required files:\n" + "\n".join(str(path) for path in missing_files)
)
ffmpeg_path = shutil.which("ffmpeg")
if not ffmpeg_path:
raise RuntimeError(
"ffmpeg is not visible to this Jupyter kernel. "
"Install it with: conda install -c conda-forge ffmpeg -y"
)
print("ffmpeg found at:", ffmpeg_path, flush=True)
ffmpeg_check = subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
text=True,
)
if ffmpeg_check.returncode != 0:
print(ffmpeg_check.stderr, flush=True)
raise RuntimeError("ffmpeg exists but failed to run.")
print("ffmpeg check:", ffmpeg_check.stdout.splitlines()[0], flush=True)
checkpoint("Checking installed packages without importing heavy libraries")
import importlib.util
required_modules = [
"faster_whisper",
"pyannote.audio",
"huggingface_hub",
"speechbrain",
"torch",
"torchaudio",
"pandas",
"numpy",
]
missing_modules = []
for module_name in required_modules:
spec = importlib.util.find_spec(module_name)
status = "FOUND" if spec is not None else "MISSING"
print(f"{module_name}: {status}", flush=True)
if spec is None:
missing_modules.append(module_name)
if missing_modules:
print("\nInstall missing packages with:", flush=True)
print(
f"{sys.executable} -m pip install --upgrade "
"faster-whisper pyannote.audio huggingface_hub "
"speechbrain torch torchaudio pandas numpy",
flush=True
)
raise RuntimeError("Missing modules: " + ", ".join(missing_modules))
# Optional face-matching package.
# It is checked here but not required until the face-matching cell runs.
face_recognition_spec = importlib.util.find_spec("face_recognition")
print(
"face_recognition:",
"FOUND" if face_recognition_spec is not None else "OPTIONAL MISSING",
flush=True,
)
cv2_spec = importlib.util.find_spec("cv2")
print(
"cv2 / opencv-python:",
"FOUND" if cv2_spec is not None else "OPTIONAL MISSING",
flush=True,
)
deepface_spec = importlib.util.find_spec("deepface")
print(
"deepface:",
"FOUND" if deepface_spec is not None else "OPTIONAL MISSING",
flush=True,
)
if deepface_spec is None:
print(
"DeepFace-based face matching will require: "
f"{sys.executable} -m pip install deepface",
flush=True,
)
if face_recognition_spec is None:
print(
"Optional face matching will require: "
"conda install -c conda-forge dlib face_recognition -y",
flush=True,
)
if cv2_spec is None:
print(
"Robust video-frame extraction will require: "
"conda install -c conda-forge opencv -y",
flush=True,
)
mediapipe_spec = importlib.util.find_spec("mediapipe")
print(
"mediapipe:",
"FOUND" if mediapipe_spec is not None else "OPTIONAL MISSING",
flush=True,
)
if mediapipe_spec is None:
print(
"MediaPipe fallback detector is optional. To install: "
f"{sys.executable} -m pip install mediapipe",
flush=True,
)
# dlib shape-predictor check for mauckc/mouth-open style MAR.
dlib_spec = importlib.util.find_spec("dlib")
print(
"dlib:",
"FOUND" if dlib_spec is not None else "OPTIONAL MISSING",
flush=True,
)
print(
"shape_predictor_68_face_landmarks.dat:",
"FOUND" if DLIB_SHAPE_PREDICTOR_PATH.exists() else "OPTIONAL MISSING",
"->",
DLIB_SHAPE_PREDICTOR_PATH,
flush=True,
)
if dlib_spec is None:
print(
"The mauckc mouth-open backend requires dlib. Install with conda-forge when possible: "
"conda install -c conda-forge dlib -y",
flush=True,
)
if not DLIB_SHAPE_PREDICTOR_PATH.exists():
print(
"The mauckc mouth-open backend requires shape_predictor_68_face_landmarks.dat. "
"Place it in the base folder or set WHOSPOKE_DLIB_SHAPE_PREDICTOR.",
flush=True,
)
# -----------------------------------------------------------------------------
# Dedicated DeepFace environment check
# -----------------------------------------------------------------------------
# The main notebook/script does not need to import DeepFace directly. Instead,
# Cell 18B runs DeepFace in DEEPFACE_PYTHON. This diagnostic checks that
# external environment before any expensive face-matching work begins.
if DEEPFACE_REQUIRE_WORKING_IMPORT:
diagnose_deepface_environment()
else:
print(
"Skipping strict DeepFace subprocess environment check because "
"DEEPFACE_REQUIRE_WORKING_IMPORT=False",
flush=True,
)
checkpoint("Cell 3 complete")
# --- Merged former notebook cell 5 ---
# =============================================================================
# CELL 4 — IMPORT PANDAS/NUMPY ONLY AFTER LIGHTWEIGHT CHECK
# =============================================================================
checkpoint("Cell 4 started: importing pandas and numpy")
import numpy as np
import pandas as pd
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_rows", 300)
def cosine_similarity(a, b):
a = np.asarray(a, dtype=np.float32).reshape(-1)
b = np.asarray(b, dtype=np.float32).reshape(-1)
denom = np.linalg.norm(a) * np.linalg.norm(b)
if denom == 0:
return float("nan")
return float(np.dot(a, b) / denom)
checkpoint("Cell 4 complete")
# =============================================================================
# OUTPUT CLEANUP HELPERS FOR MOUTH-OPEN / VISUAL ACTIVE-SPEAKER FIELDS
# =============================================================================
NO_MOUTH_OPEN_CUE_LABEL = "No mouth-open cue detected"
NO_VISUAL_ACTIVE_SPEAKER_LABEL = "No visual active-speaker cue"
def is_missing_output_value(value):
"""Return True for None, NaN, empty strings, and common serialized nulls."""
if value is None:
return True
try:
if pd.isna(value):
return True
except Exception:
pass
text_value = str(value).strip()
if text_value == "":
return True
if text_value.lower() in {"nan", "none", "null", "<na>"}:
return True
return False
def safe_display_value(value, default=""):
"""Human-readable value for final transcript printing."""
if is_missing_output_value(value):
return default
return value
def clean_mouth_open_output_columns(df):
"""
Normalize mouth-open / visual active-speaker columns after CSV round-trips.
Pandas reads empty CSV fields as NaN by default. Without this cleanup, a
segment with no mouth-open cue prints as `nan`, which is confusing even
though the pipeline ran correctly. These defaults make the absence of a
cue explicit and keep the final transcript readable.
"""
if df is None:
return df
df = df.copy()
text_defaults = {
"mouth_open_speaker_ids": "[]",
"mouth_open_speaker_names": NO_MOUTH_OPEN_CUE_LABEL,
"mouth_open_count_by_speaker": "{}",
"mouth_open_score_by_speaker": "{}",
"mouth_open_all_detail": "[]",
"visual_active_speaker_id": "none",
"visual_active_speaker_name": NO_VISUAL_ACTIVE_SPEAKER_LABEL,
"mouth_open_detection_mode": "none",
}
numeric_defaults = {
"mouth_open_observation_count": 0,
"mouth_open_strict_observation_count": 0,
"mouth_open_cue_observation_count": 0,
"mouth_open_closed_observation_count": 0,
"mouth_open_scored_observation_count": 0,
"mouth_open_max_score": 0.0,
"visual_active_speaker_confidence": 0.0,
}
for col, default in text_defaults.items():
if col in df.columns:
df[col] = df[col].apply(
lambda value: default if is_missing_output_value(value) else value
)
for col, default in numeric_defaults.items():
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(default)
return df
# --- Merged former notebook cell 6 ---
# =============================================================================
# CELL 5 — HUGGING FACE AUTHENTICATION
# =============================================================================
checkpoint("Cell 5 started: Hugging Face authentication")
print("Importing huggingface_hub...", flush=True)
from huggingface_hub import login, whoami, HfApi
print("Imported huggingface_hub.", flush=True)
for key in [
"PYANNOTE_AUTH_TOKEN",
"HF_TOKEN",
"HUGGINGFACE_TOKEN",
"HUGGING_FACE_HUB_TOKEN",
]:
if key in os.environ:
del os.environ[key]
hf_token = getpass("Paste your Hugging Face READ token for pyannote: ").strip()
if not hf_token.startswith("hf_"):
raise ValueError("This does not look like a Hugging Face token. It should start with 'hf_'.")
os.environ["PYANNOTE_AUTH_TOKEN"] = hf_token
os.environ["HF_TOKEN"] = hf_token
os.environ["HUGGINGFACE_TOKEN"] = hf_token
os.environ["HUGGING_FACE_HUB_TOKEN"] = hf_token
login(token=hf_token, add_to_git_credential=False)
user_info = whoami(token=hf_token)
print("Logged in as:", user_info.get("name") or user_info, flush=True)
api = HfApi()
required_repos = [
"pyannote/speaker-diarization-3.1",
"pyannote/segmentation-3.0",
"pyannote/speaker-diarization-community-1",
]
for repo_id in required_repos:
checkpoint(f"Checking access to {repo_id}")
try:
api.model_info(repo_id, token=hf_token)
print(f"{repo_id}: OK", flush=True)
except Exception as exc:
print(f"{repo_id}: FAILED", flush=True)
print(exc, flush=True)
raise RuntimeError(
f"You do not have access to {repo_id}. "
f"Open the Hugging Face page for {repo_id}, accept/request access, "
"then create a new READ token from the same Hugging Face account."
)
checkpoint("Cell 5 complete")
# --- Merged former notebook cell 7 ---
# =============================================================================
# CELL 6 — EXTRACT 16 KHZ MONO AUDIO FROM VIDEO
# =============================================================================
checkpoint("Cell 6 started: extracting 16 kHz mono WAV from video")
cmd = [
"ffmpeg",
"-y",
"-i",
str(video_path),
"-vn",
"-ac",
"1",
"-ar",
"16000",
"-acodec",
"pcm_s16le",
str(audio_path),
]
run_command(cmd, description="Running ffmpeg audio extraction")
if not audio_path.exists():
raise FileNotFoundError(f"Audio extraction failed: {audio_path}")
print("Audio file:", audio_path, flush=True)
print("Audio file size:", audio_path.stat().st_size, "bytes", flush=True)
checkpoint("Cell 6 complete")
# =============================================================================
# CELL 7 + 8 REPLACEMENT
# USE CACHED DIARIZATION FIRST; ONLY RUN PYANNOTE IN A TIME-LIMITED SUBPROCESS IF NEEDED
# =============================================================================
checkpoint("Cell 7+8 started: loading cached diarization or running pyannote safely")
# Set this to True only when you really want to force pyannote to run again.
FORCE_RERUN_DIARIZATION = False
# Time limit for pyannote subprocess. If pyannote hangs, the notebook will return.
PYANNOTE_TIMEOUT_SECONDS = 600
diarization_meta_path = base / "result_macron_trump_diarization_meta.json"
pyannote_runner_script_path = base / "run_pyannote_diarization_subprocess.py"
required_diarization_columns = {"start", "end", "speaker_cluster"}
# -----------------------------------------------------------------------------
# 1. Prefer cached diarization CSV if it exists
# -----------------------------------------------------------------------------
if diarization_csv_path.exists() and not FORCE_RERUN_DIARIZATION:
checkpoint(f"Using cached diarization CSV: {diarization_csv_path}")
diarization_df = pd.read_csv(
diarization_csv_path,
encoding="utf-8-sig",
)
missing_columns = required_diarization_columns - set(diarization_df.columns)
if missing_columns:
raise RuntimeError(
"Cached diarization CSV exists, but it is missing required columns: "
f"{sorted(missing_columns)}\n"
f"CSV path: {diarization_csv_path}"
)
if "track" not in diarization_df.columns:
diarization_df["track"] = ""
diarization_segments = diarization_df[
["start", "end", "speaker_cluster", "track"]
].to_dict(orient="records")
loaded_pyannote_model = f"cached_csv:{diarization_csv_path.name}"
checkpoint(f"Loaded {len(diarization_segments)} diarization segments from cached CSV")
display(diarization_df)
# -----------------------------------------------------------------------------
# 2. If no cached CSV exists, run pyannote in a subprocess with timeout
# -----------------------------------------------------------------------------
else:
checkpoint("No cached diarization CSV found, or FORCE_RERUN_DIARIZATION=True")
checkpoint("Writing isolated pyannote runner script")
pyannote_runner_code = f'''
import os
import sys
import json
import time
from pathlib import Path
import pandas as pd
print("Subprocess Python:", sys.executable, flush=True)
print("Starting isolated pyannote diarization runner", flush=True)
os.environ["HF_HUB_DOWNLOAD_TIMEOUT"] = "60"
os.environ["HF_HUB_ETAG_TIMEOUT"] = "30"
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0"
audio_path = Path({str(audio_path)!r})
out_csv = Path({str(diarization_csv_path)!r})
out_meta = Path({str(diarization_meta_path)!r})
num_speakers = {NUM_SPEAKERS}
token = (
os.environ.get("HF_TOKEN")
or os.environ.get("PYANNOTE_AUTH_TOKEN")
or os.environ.get("HUGGINGFACE_TOKEN")
or os.environ.get("HUGGING_FACE_HUB_TOKEN")
)
if not token:
raise RuntimeError("No Hugging Face token found in subprocess environment.")
print("Audio path:", audio_path, flush=True)
print("Audio exists:", audio_path.exists(), flush=True)
print("Output CSV:", out_csv, flush=True)
print("Importing pyannote.audio Pipeline...", flush=True)
from pyannote.audio import Pipeline
print("Imported pyannote.audio Pipeline.", flush=True)
model_candidates = [
"pyannote/speaker-diarization-community-1",
"pyannote/speaker-diarization-3.1",
]
pipeline = None
loaded_model = None
last_error = None
for model_id in model_candidates:
print("=" * 80, flush=True)
print("Trying model:", model_id, flush=True)
start_time = time.time()
try:
pipeline = Pipeline.from_pretrained(
model_id,
token=token,
)
elapsed = time.time() - start_time
if pipeline is not None:
loaded_model = model_id
print(f"Loaded {{model_id}} in {{elapsed:.1f}} seconds", flush=True)
break
print(f"Model {{model_id}} returned None after {{elapsed:.1f}} seconds", flush=True)
except TypeError as exc:
print("token= failed; trying use_auth_token=", flush=True)
print(repr(exc), flush=True)
try:
pipeline = Pipeline.from_pretrained(
model_id,
use_auth_token=token,
)
elapsed = time.time() - start_time
if pipeline is not None:
loaded_model = model_id
print(f"Loaded {{model_id}} with use_auth_token= in {{elapsed:.1f}} seconds", flush=True)
break
except Exception as exc2:
last_error = exc2
print("use_auth_token= also failed:", repr(exc2), flush=True)
except Exception as exc:
last_error = exc
print("Failed:", repr(exc), flush=True)
if pipeline is None:
raise RuntimeError(f"Could not load pyannote pipeline. Last error: {{repr(last_error)}}")
print("Running diarization...", flush=True)
run_start = time.time()
try:
diarization_output = pipeline(
str(audio_path),
num_speakers=num_speakers,
)
except TypeError:
diarization_output = pipeline(str(audio_path))
print(f"Diarization finished in {{time.time() - run_start:.1f}} seconds", flush=True)
diarization = diarization_output
for attr in [
"speaker_diarization",
"exclusive_speaker_diarization",
"diarization",
"annotation",
]:
if hasattr(diarization_output, attr):
candidate = getattr(diarization_output, attr)
if candidate is not None:
diarization = candidate
print("Using diarization_output." + attr, flush=True)
break
if not hasattr(diarization, "itertracks"):
print("Returned type:", type(diarization_output), flush=True)
print("Selected type:", type(diarization), flush=True)
print("Available attributes:", [a for a in dir(diarization_output) if not a.startswith("_")], flush=True)
raise RuntimeError("Could not find pyannote Annotation object with itertracks().")
rows = []
for segment, track, speaker in diarization.itertracks(yield_label=True):
rows.append({{
"start": float(segment.start),
"end": float(segment.end),
"speaker_cluster": str(speaker),
"track": str(track),
}})
if not rows:
raise RuntimeError("pyannote returned no diarization segments.")
df = pd.DataFrame(rows)
df.to_csv(out_csv, index=False, encoding="utf-8-sig")
meta = {{
"loaded_pyannote_model": loaded_model,
"audio_path": str(audio_path),
"out_csv": str(out_csv),
"num_segments": len(rows),
}}
out_meta.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
print("Wrote diarization CSV:", out_csv, flush=True)
print("Wrote metadata:", out_meta, flush=True)
print("Number of diarization segments:", len(rows), flush=True)
'''
pyannote_runner_script_path.write_text(
pyannote_runner_code,
encoding="utf-8",
)
checkpoint(f"Wrote pyannote runner script: {pyannote_runner_script_path}")
env = os.environ.copy()
env["HF_HUB_DOWNLOAD_TIMEOUT"] = "60"
env["HF_HUB_ETAG_TIMEOUT"] = "30"
env["HF_HUB_ENABLE_HF_TRANSFER"] = "0"
# Make sure the token is visible to the subprocess.
if "hf_token" in globals():
env["HF_TOKEN"] = hf_token
env["PYANNOTE_AUTH_TOKEN"] = hf_token
env["HUGGINGFACE_TOKEN"] = hf_token
env["HUGGING_FACE_HUB_TOKEN"] = hf_token
checkpoint(f"Running pyannote in subprocess with {PYANNOTE_TIMEOUT_SECONDS}s timeout")
try:
result = subprocess.run(
[sys.executable, str(pyannote_runner_script_path)],
capture_output=True,
text=True,
timeout=PYANNOTE_TIMEOUT_SECONDS,
env=env,
)
print("\n" + "=" * 80)
print("PYANNOTE SUBPROCESS STDOUT")
print("=" * 80)
print(result.stdout)
print("\n" + "=" * 80)
print("PYANNOTE SUBPROCESS STDERR")
print("=" * 80)
print(result.stderr)
if result.returncode != 0:
raise RuntimeError(
"Pyannote subprocess failed with return code "
f"{result.returncode}"
)
except subprocess.TimeoutExpired as exc:
print("\n" + "=" * 80)
print("PYANNOTE SUBPROCESS TIMED OUT")
print("=" * 80)
print("STDOUT so far:")
print(exc.stdout or "")
print("STDERR so far:")
print(exc.stderr or "")
raise RuntimeError(
"Pyannote model loading or diarization timed out. "
"Use a cached diarization CSV, or set FORCE_RERUN_DIARIZATION=False "
"after one successful run."
)
if not diarization_csv_path.exists():
raise FileNotFoundError(
f"Pyannote subprocess completed but did not create: {diarization_csv_path}"
)
diarization_df = pd.read_csv(
diarization_csv_path,
encoding="utf-8-sig",
)
if "track" not in diarization_df.columns:
diarization_df["track"] = ""
diarization_segments = diarization_df[
["start", "end", "speaker_cluster", "track"]
].to_dict(orient="records")
if diarization_meta_path.exists():
diarization_meta = json.loads(
diarization_meta_path.read_text(encoding="utf-8")
)
loaded_pyannote_model = diarization_meta.get(
"loaded_pyannote_model",
"pyannote_subprocess_unknown",
)
else:
loaded_pyannote_model = "pyannote_subprocess_unknown"
checkpoint(f"Loaded {len(diarization_segments)} diarization segments from subprocess output")
display(diarization_df)
checkpoint("Cell 7+8 replacement complete")
# --- Merged former notebook cell 10 ---
# =============================================================================
# CELL 9 — MERGE DIARIZATION SEGMENTS INTO ASR-FRIENDLY TURNS
# =============================================================================
checkpoint("Cell 9 started: merging diarization segments")
diarization_segments_sorted = sorted(
diarization_segments,
key=lambda x: (x["start"], x["end"]),
)
merged_turns = []
for seg in diarization_segments_sorted:
start = float(seg["start"])
end = float(seg["end"])
speaker_cluster = seg["speaker_cluster"]
if end <= start:
continue
if not merged_turns:
merged_turns.append({
"start": start,
"end": end,
"speaker_cluster": speaker_cluster,
})
continue
prev = merged_turns[-1]
same_speaker = prev["speaker_cluster"] == speaker_cluster
small_gap = start - prev["end"] <= MERGE_GAP_SECONDS
combined_duration = end - prev["start"]
if same_speaker and small_gap and combined_duration <= MAX_TURN_DURATION_FOR_ASR:
prev["end"] = max(prev["end"], end)
else:
merged_turns.append({
"start": start,
"end": end,
"speaker_cluster": speaker_cluster,
})
merged_turns = [
turn for turn in merged_turns
if (turn["end"] - turn["start"]) >= MIN_TURN_DURATION_FOR_ASR
]
if not merged_turns:
raise RuntimeError("No usable merged speaker turns were produced for ASR.")
checkpoint(f"Created {len(merged_turns)} merged speaker turns")
merged_turns_df = pd.DataFrame(merged_turns)
merged_turns_df.to_csv(
merged_turns_csv_path,
index=False,
encoding="utf-8-sig",
)
print("Wrote merged turns:", merged_turns_csv_path, flush=True)
display(merged_turns_df)
checkpoint("Cell 9 complete")
# --- Merged former notebook cell 11 ---
# =============================================================================
# CELL 10 — LOAD FASTER-WHISPER MODEL
# =============================================================================
checkpoint("Cell 10 started: importing and loading faster-whisper")
print("Importing faster_whisper.WhisperModel...", flush=True)
from faster_whisper import WhisperModel
print("Imported faster_whisper.WhisperModel.", flush=True)
checkpoint(f"Loading faster-whisper model: {FASTER_WHISPER_MODEL}")
asr_model = WhisperModel(
FASTER_WHISPER_MODEL,
device="cpu",
compute_type=FASTER_WHISPER_COMPUTE_TYPE,
)
checkpoint("Cell 10 complete: faster-whisper model loaded")
# --- Merged former notebook cell 12 ---
# =============================================================================
# CELL 11 — RUN MULTILINGUAL ASR PER DIARIZED TURN
# =============================================================================
checkpoint("Cell 11 started: multilingual ASR per diarized turn")
direct_segments = []
asr_segments = []
for i, turn in enumerate(merged_turns):
turn_start = float(turn["start"])
turn_end = float(turn["end"])
speaker_cluster = turn["speaker_cluster"]
turn_audio_path = turn_audio_dir / f"turn_{i:04d}_{speaker_cluster}.wav"
extract_turn_audio(
source_wav=audio_path,
start=turn_start,
end=turn_end,
output_wav=turn_audio_path,
pad_seconds=TURN_ASR_PAD_SECONDS,
)
checkpoint(
f"Transcribing turn {i + 1}/{len(merged_turns)} "
f"[{turn_start:.2f}–{turn_end:.2f}] {speaker_cluster}"
)
segments_generator, info = asr_model.transcribe(
str(turn_audio_path),
language=None,
task="transcribe",
beam_size=1,
vad_filter=False,
word_timestamps=False,
condition_on_previous_text=False,
)
detected_language = info.language
detected_language_probability = float(info.language_probability)
local_segments = list(segments_generator)
if not local_segments:
text = ""
else:
text = " ".join(seg.text.strip() for seg in local_segments).strip()
duration = max(0.001, turn_end - turn_start)
row = {
"segment_index": i,
"asr_index": i,
"start": turn_start,
"end": turn_end,
"speaker_cluster": speaker_cluster,
"speaker_id": speaker_cluster,
"speaker_name": speaker_cluster,
"confidence": 1.0,
"overlap_seconds": round(duration, 4),
"overlap_detail": {speaker_cluster: duration},
"text": text,
"text_original_language": text,
"asr_language": detected_language,
"asr_language_probability": detected_language_probability,
"turn_audio_path": str(turn_audio_path),
}
direct_segments.append(row)
asr_segments.append({
"asr_index": i,
"start": turn_start,
"end": turn_end,
"text": text,
"text_original_language": text,
"asr_language": detected_language,
"asr_language_probability": detected_language_probability,
"speaker_cluster": speaker_cluster,
"turn_audio_path": str(turn_audio_path),
})
print("=" * 80, flush=True)
print(f"Turn: {i}", flush=True)
print(f"Time: {turn_start:.2f}–{turn_end:.2f}", flush=True)
print("Speaker cluster:", speaker_cluster, flush=True)
print("Detected language:", detected_language, flush=True)
print("Language probability:", detected_language_probability, flush=True)
print("Text:", text, flush=True)
if not direct_segments:
raise RuntimeError("No ASR segments were produced from diarized turns.")
checkpoint(f"Multilingual turn-level ASR completed with {len(direct_segments)} segments")
del asr_model
direct_df = pd.DataFrame(direct_segments)
display(direct_df[[
"segment_index",
"start",
"end",
"speaker_cluster",
"asr_language",
"asr_language_probability",
"text",
]])
print("\nLanguage counts:", flush=True)
display(direct_df["asr_language"].value_counts(dropna=False).to_frame("n_segments"))
checkpoint("Cell 11 complete")
# --- Merged former notebook cell 13 ---
# =============================================================================
# CELL 12 — SAVE INTERMEDIATE MULTILINGUAL ASR + DIARIZATION OUTPUTS
# =============================================================================
checkpoint("Cell 12 started: saving intermediate outputs")
direct_df.to_csv(
direct_csv_output_path,
index=False,
encoding="utf-8-sig",
)
diarization_df.to_csv(
diarization_csv_path,
index=False,
encoding="utf-8-sig",
)
direct_payload = {
"video_path": str(video_path),
"audio_path": str(audio_path),
"asr_backend": "faster_whisper",
"asr_model": FASTER_WHISPER_MODEL,
"asr_mode": "turn_level_language_detection",
"diarization_backend": loaded_pyannote_model,
"num_speakers": NUM_SPEAKERS,
"text_language_policy": "preserve_original_asr_language_no_translation",
"asr_segments": asr_segments,
"diarization_segments": diarization_segments,
"merged_turns": merged_turns,
"final_segments": direct_segments,
}
with direct_json_output_path.open("w", encoding="utf-8") as f:
json.dump(direct_payload, f, ensure_ascii=False, indent=2)
print("Wrote:", direct_csv_output_path, flush=True)
print("Wrote:", diarization_csv_path, flush=True)
print("Wrote:", direct_json_output_path, flush=True)
checkpoint("Cell 12 complete")
# --- Merged former notebook cell 14 ---
# =============================================================================
# CELL 13 — CREATE CLUSTER-LEVEL AUDIO FILES FOR VOICE IDENTITY MATCHING
# =============================================================================
checkpoint("Cell 13 started: creating cluster-level voice audio")
diarization_df["duration"] = diarization_df["end"] - diarization_df["start"]
cluster_audio_paths = {}
for cluster_name in sorted(diarization_df["speaker_cluster"].dropna().unique()):
checkpoint(f"Creating cluster audio for {cluster_name}")
cluster_rows = diarization_df[diarization_df["speaker_cluster"] == cluster_name].copy()
cluster_rows["duration"] = cluster_rows["end"] - cluster_rows["start"]
cluster_rows = cluster_rows[cluster_rows["duration"] >= MIN_CLUSTER_SEGMENT_DURATION]
cluster_rows = cluster_rows.sort_values("duration", ascending=False)
selected_rows = []
total_duration = 0.0
for row in cluster_rows.itertuples(index=False):
duration = float(row.end) - float(row.start)
if len(selected_rows) >= MAX_CLUSTER_SEGMENTS:
break
if total_duration + duration > MAX_CLUSTER_AUDIO_SECONDS:
continue
selected_rows.append(row)
total_duration += duration
if not selected_rows:
raise RuntimeError(f"No usable diarization segments found for {cluster_name}")
temp_dir = cluster_audio_dir / f"tmp_{cluster_name}"
temp_dir.mkdir(parents=True, exist_ok=True)
clip_paths = []
for i, row in enumerate(selected_rows):
start = max(0.0, float(row.start) - PAD_SECONDS)
end = float(row.end) + PAD_SECONDS
duration = max(0.10, end - start)
clip_path = temp_dir / f"{cluster_name}_{i:03d}.wav"
extract_wav_clip(
source_wav=audio_path,
start=start,
duration=duration,
output_wav=clip_path,
)
clip_paths.append(clip_path)
cluster_wav_path = cluster_audio_dir / f"{cluster_name}_cluster_voice.wav"
concat_wav_files(
wav_paths=clip_paths,
output_wav=cluster_wav_path,
)
cluster_audio_paths[cluster_name] = cluster_wav_path
print("\nCreated cluster audio files:", flush=True)
for cluster_name, path in cluster_audio_paths.items():
print(cluster_name, "->", path, flush=True)
checkpoint("Cell 13 complete")
# --- Merged former notebook cell 15 ---
# =============================================================================
# CELL 14 — LOAD SPEECHBRAIN ECAPA-TDNN
# =============================================================================
checkpoint("Cell 14 started: importing torch, torchaudio, and SpeechBrain")
print("Importing torch...", flush=True)
import torch
print("Imported torch.", flush=True)
print("Importing torchaudio...", flush=True)
import torchaudio
print("Imported torchaudio.", flush=True)
print("Importing SpeechBrain EncoderClassifier...", flush=True)
try:
from speechbrain.inference.speaker import EncoderClassifier
except Exception:
from speechbrain.pretrained import EncoderClassifier
print("Imported SpeechBrain EncoderClassifier.", flush=True)
checkpoint("Loading SpeechBrain ECAPA-TDNN speaker model")
classifier = EncoderClassifier.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
savedir=str(speechbrain_cache_dir),
run_opts={"device": "cpu"},
)
checkpoint("Cell 14 complete: SpeechBrain ECAPA-TDNN loaded")
# --- Merged former notebook cell 16 ---
# =============================================================================
# CELL 15 — COMPUTE VOICE EMBEDDINGS
# =============================================================================
checkpoint("Cell 15 started: computing voice embeddings")
def load_audio_for_embedding(wav_path):
waveform, sample_rate = torchaudio.load(str(wav_path))
if waveform.shape[0] > 1:
waveform = waveform.mean(dim=0, keepdim=True)
if sample_rate != 16000:
waveform = torchaudio.functional.resample(
waveform,
orig_freq=sample_rate,
new_freq=16000,
)
waveform = waveform.squeeze(0).unsqueeze(0)
return waveform
def compute_embedding(wav_path):
waveform = load_audio_for_embedding(wav_path)
with torch.no_grad():
embedding = classifier.encode_batch(waveform)
embedding = embedding.squeeze().detach().cpu().numpy()
return embedding
reference_people = {
"emmanuel_macron": {
"speaker_id": "emmanuel_macron",
"speaker_name": "Emmanuel Macron",
"voice_sample": macron_voice_sample,
},
"donald_j_trump": {
"speaker_id": "donald_j_trump",
"speaker_name": "Donald J. Trump",
"voice_sample": trump_voice_sample,
},
}
reference_embeddings = {}
for person_key, person in reference_people.items():
checkpoint(f"Computing reference embedding for {person_key}")
emb = compute_embedding(person["voice_sample"])
reference_embeddings[person_key] = emb
print(person_key, "embedding shape:", emb.shape, flush=True)
cluster_embeddings = {}
for cluster_name, cluster_wav_path in cluster_audio_paths.items():
checkpoint(f"Computing cluster embedding for {cluster_name}")
emb = compute_embedding(cluster_wav_path)
cluster_embeddings[cluster_name] = emb
print(cluster_name, "embedding shape:", emb.shape, flush=True)
checkpoint("Cell 15 complete")
# --- Merged former notebook cell 17 ---
# =============================================================================
# CELL 16 — COMPUTE VOICE SIMILARITY MATRIX
# =============================================================================
checkpoint("Cell 16 started: computing voice similarity matrix")
similarity_rows = []
for cluster_name, cluster_emb in cluster_embeddings.items():
for person_key, ref_emb in reference_embeddings.items():
score = cosine_similarity(cluster_emb, ref_emb)
similarity_rows.append({
"speaker_cluster": cluster_name,
"reference_person_key": person_key,
"reference_speaker_id": reference_people[person_key]["speaker_id"],
"reference_speaker_name": reference_people[person_key]["speaker_name"],
"cosine_similarity": float(score),
})
similarity_df = pd.DataFrame(similarity_rows)
similarity_df = similarity_df.sort_values(
["speaker_cluster", "cosine_similarity"],
ascending=[True, False],
)
display(similarity_df)
similarity_df.to_csv(
similarity_csv_output_path,
index=False,
encoding="utf-8-sig",
)
print("Wrote similarity matrix:", similarity_csv_output_path, flush=True)
checkpoint("Cell 16 complete")
# --- Merged former notebook cell 18 ---
# =============================================================================
# CELL 17 — ASSIGN REAL SPEAKER IDENTITIES FROM VOICE SAMPLES
# =============================================================================
checkpoint("Cell 17 started: assigning real speaker identities")
cluster_to_person = {}
assignment_rows = []
if ENFORCE_ONE_TO_ONE_ASSIGNMENT:
used_clusters = set()
used_people = set()
all_pairs = similarity_df.sort_values(
"cosine_similarity",
ascending=False,
).to_dict(orient="records")
for pair in all_pairs:
cluster_name = pair["speaker_cluster"]
person_key = pair["reference_person_key"]
if cluster_name in used_clusters:
continue
if person_key in used_people:
continue
rows = similarity_df[similarity_df["speaker_cluster"] == cluster_name].copy()
rows = rows.sort_values("cosine_similarity", ascending=False)
best_score = float(pair["cosine_similarity"])
second_score = rows.iloc[1]["cosine_similarity"] if len(rows) > 1 else float("-inf")
margin = best_score - float(second_score)
passes_threshold = best_score >= VOICE_ASSIGNMENT_THRESHOLD
passes_margin = margin >= VOICE_ASSIGNMENT_MARGIN
assigned = FORCE_BEST_MATCH or (passes_threshold and passes_margin)
if assigned:
cluster_to_person[cluster_name] = {
"speaker_id": reference_people[person_key]["speaker_id"],
"speaker_name": reference_people[person_key]["speaker_name"],
"voice_similarity": float(best_score),
"voice_margin": float(margin),
"passes_threshold": bool(passes_threshold),
"passes_margin": bool(passes_margin),
"reference_person_key": person_key,
}
used_clusters.add(cluster_name)
used_people.add(person_key)
for cluster_name in sorted(cluster_embeddings.keys()):
if cluster_name not in cluster_to_person:
rows = similarity_df[similarity_df["speaker_cluster"] == cluster_name].copy()
rows = rows.sort_values("cosine_similarity", ascending=False)
best = rows.iloc[0]
second_score = rows.iloc[1]["cosine_similarity"] if len(rows) > 1 else float("-inf")
best_score = float(best["cosine_similarity"])
margin = best_score - float(second_score)
cluster_to_person[cluster_name] = {
"speaker_id": f"unknown_{cluster_name.lower()}",
"speaker_name": f"Unknown {cluster_name}",
"voice_similarity": float(best_score),
"voice_margin": float(margin),
"passes_threshold": bool(best_score >= VOICE_ASSIGNMENT_THRESHOLD),
"passes_margin": bool(margin >= VOICE_ASSIGNMENT_MARGIN),
"reference_person_key": None,
}
else:
for cluster_name in sorted(cluster_embeddings.keys()):
rows = similarity_df[similarity_df["speaker_cluster"] == cluster_name].copy()
rows = rows.sort_values("cosine_similarity", ascending=False)
best = rows.iloc[0]
second_score = rows.iloc[1]["cosine_similarity"] if len(rows) > 1 else float("-inf")
best_score = float(best["cosine_similarity"])
margin = best_score - float(second_score)
passes_threshold = best_score >= VOICE_ASSIGNMENT_THRESHOLD
passes_margin = margin >= VOICE_ASSIGNMENT_MARGIN
assigned = FORCE_BEST_MATCH or (passes_threshold and passes_margin)
if assigned:
cluster_to_person[cluster_name] = {
"speaker_id": best["reference_speaker_id"],
"speaker_name": best["reference_speaker_name"],
"voice_similarity": float(best_score),
"voice_margin": float(margin),
"passes_threshold": bool(passes_threshold),
"passes_margin": bool(passes_margin),
"reference_person_key": best["reference_person_key"],
}
else:
cluster_to_person[cluster_name] = {
"speaker_id": f"unknown_{cluster_name.lower()}",
"speaker_name": f"Unknown {cluster_name}",
"voice_similarity": float(best_score),
"voice_margin": float(margin),
"passes_threshold": bool(passes_threshold),
"passes_margin": bool(passes_margin),
"reference_person_key": None,
}
for cluster_name in sorted(cluster_to_person.keys()):
assignment_rows.append({
"speaker_cluster": cluster_name,
"assigned_speaker_id": cluster_to_person[cluster_name]["speaker_id"],
"assigned_speaker_name": cluster_to_person[cluster_name]["speaker_name"],
"best_similarity": cluster_to_person[cluster_name]["voice_similarity"],
"margin_over_second_best": cluster_to_person[cluster_name]["voice_margin"],
"passes_threshold": cluster_to_person[cluster_name]["passes_threshold"],
"passes_margin": cluster_to_person[cluster_name]["passes_margin"],
"force_best_match": FORCE_BEST_MATCH,
"enforce_one_to_one_assignment": ENFORCE_ONE_TO_ONE_ASSIGNMENT,
})
assignment_df = pd.DataFrame(assignment_rows)
display(assignment_df)
print(json.dumps(cluster_to_person, indent=2, ensure_ascii=False), flush=True)
checkpoint("Cell 17 complete")
# --- Merged former notebook cell 19 ---
# =============================================================================
# CELL 18 — APPLY SPEAKER NAMES TO TRANSCRIPT
# =============================================================================
checkpoint("Cell 18 started: applying speaker names")
df_named = direct_df.copy()
df_named["text_original_language"] = df_named["text"].astype(str)
df_named["text"] = df_named["text_original_language"]
df_named["speaker_id_original"] = df_named["speaker_id"]
df_named["speaker_name_original"] = df_named["speaker_name"]
def assign_speaker_id_from_cluster(cluster):
if pd.isna(cluster):
return None
cluster = str(cluster)
if cluster in cluster_to_person:
return cluster_to_person[cluster]["speaker_id"]
return f"unknown_{cluster.lower()}"
def assign_speaker_name_from_cluster(cluster):
if pd.isna(cluster):
return None
cluster = str(cluster)
if cluster in cluster_to_person:
return cluster_to_person[cluster]["speaker_name"]
return f"Unknown {cluster}"
def assign_voice_similarity_from_cluster(cluster):
if pd.isna(cluster):
return None
cluster = str(cluster)
if cluster in cluster_to_person:
return cluster_to_person[cluster]["voice_similarity"]
return None
def assign_voice_margin_from_cluster(cluster):
if pd.isna(cluster):
return None
cluster = str(cluster)
if cluster in cluster_to_person:
return cluster_to_person[cluster]["voice_margin"]
return None
df_named["speaker_id"] = df_named["speaker_cluster"].apply(assign_speaker_id_from_cluster)
df_named["speaker_name"] = df_named["speaker_cluster"].apply(assign_speaker_name_from_cluster)
df_named["voice_similarity"] = df_named["speaker_cluster"].apply(assign_voice_similarity_from_cluster)
df_named["voice_margin"] = df_named["speaker_cluster"].apply(assign_voice_margin_from_cluster)
preferred_columns = [
"segment_index",
"start",
"end",
"speaker_cluster",
"speaker_id",
"speaker_name",
"speaker_id_original",
"speaker_name_original",
"confidence",
"voice_similarity",
"voice_margin",
"overlap_seconds",
"overlap_detail",
"text",
"text_original_language",
"asr_language",
"asr_language_probability",
"turn_audio_path",
]
remaining_columns = [
col for col in df_named.columns
if col not in preferred_columns
]
df_named = df_named[preferred_columns + remaining_columns]
display(df_named[[
"segment_index",
"start",
"end",
"speaker_cluster",
"speaker_id",
"speaker_name",
"asr_language",
"asr_language_probability",
"voice_similarity",
"voice_margin",
"text",
]])
checkpoint("Cell 18 complete")
# --- Merged former notebook cell 20 ---
# =============================================================================
# CELL 18B — DEEPFACE FACE MATCHING IN AN ISOLATED SUBPROCESS
# =============================================================================
# This version fixes kernel restarts caused by DeepFace / TensorFlow / OpenCV
# native-library crashes by running all DeepFace work in a separate Python process.
#
# The notebook kernel only:
# 1. Saves df_named to CSV.
# 2. Writes a JSON config file.
# 3. Writes a standalone DeepFace runner script.
# 4. Runs that script in a subprocess with a timeout.
# 5. Reads the completed CSV/JSON outputs back into df_named.
#
# If DeepFace crashes, only the subprocess fails. The notebook kernel survives.
# =============================================================================
checkpoint("Cell 18B started: DeepFace matching in isolated subprocess")
face_matching_metadata = None
required_face_files = [
macron_face_image,
trump_face_image,
]
missing_face_files = []
for path in required_face_files:
print(f"{path.name}: {path.exists()} -> {path}", flush=True)
if not path.exists():
missing_face_files.append(path)
if missing_face_files:
print(
"Skipping DeepFace matching because the following reference face image files are missing:",
flush=True,
)
for path in missing_face_files:
print(" -", path, flush=True)
df_named["face_best_speaker_id"] = None
df_named["face_best_speaker_name"] = None
df_named["face_match_count"] = 0
df_named["face_strict_match_count"] = 0
df_named["face_candidate_count"] = 0
df_named["face_mean_distance"] = None
df_named["face_min_distance"] = None
df_named["face_evidence_detail"] = "{}"
df_named["face_all_speaker_ids"] = "[]"
df_named["face_all_speaker_names"] = ""
df_named["face_all_match_count_by_speaker"] = "{}"
df_named["face_all_matches_detail"] = "[]"
df_named["face_all_detected_count"] = 0
df_named["face_all_usable_candidate_count"] = 0
df_named["face_all_strict_match_count"] = 0
df_named["face_analyzed_frame_count"] = 0
df_named["face_frames_with_detected_faces"] = 0
df_named["mouth_open_speaker_ids"] = "[]"
df_named["mouth_open_speaker_names"] = "Face matching skipped"
df_named["mouth_open_count_by_speaker"] = "{}"
df_named["mouth_open_score_by_speaker"] = "{}"
df_named["mouth_open_observation_count"] = 0
df_named["mouth_open_strict_observation_count"] = 0
df_named["mouth_open_cue_observation_count"] = 0
df_named["mouth_open_closed_observation_count"] = 0
df_named["mouth_open_scored_observation_count"] = 0
df_named["mouth_open_detection_mode"] = "none"
df_named["mouth_open_max_score"] = 0.0
df_named["mouth_open_all_detail"] = "[]"
df_named["visual_active_speaker_id"] = "none"
df_named["visual_active_speaker_name"] = "No visual active-speaker cue"
df_named["visual_active_speaker_confidence"] = 0.0
df_named["voice_face_agree"] = False
df_named["speaker_id_multimodal"] = df_named["speaker_id"]
df_named["speaker_name_multimodal"] = df_named["speaker_name"]
df_named["multimodal_identity_source"] = "voice_no_deepface_matching"
df_named["speaker_id_final"] = df_named["speaker_id"]
df_named["speaker_name_final"] = df_named["speaker_name"]
else:
checkpoint("Saving current voice-identified transcript for DeepFace subprocess")
df_named.to_csv(
deepface_input_csv_path,
index=False,
encoding="utf-8-sig",
)
deepface_config = {
"python_executable": str(DEEPFACE_PYTHON),
"base": str(base),
"video_path": str(video_path),
"audio_path": str(audio_path),
"macron_face_image": str(macron_face_image),
"trump_face_image": str(trump_face_image),
"deepface_input_csv_path": str(deepface_input_csv_path),
"face_frame_dir": str(face_frame_dir),
"face_crop_dir": str(face_crop_dir),
"face_frame_manifest_csv_path": str(face_frame_manifest_csv_path),
"face_match_csv_path": str(face_match_csv_path),
"face_evidence_csv_path": str(face_evidence_csv_path),
"voice_face_csv_output_path": str(voice_face_csv_output_path),
"voice_face_json_output_path": str(voice_face_json_output_path),
"deepface_metadata_json_path": str(deepface_metadata_json_path),
"face_sampling_strategy": FACE_SAMPLING_STRATEGY,
"face_sample_fps": FACE_SAMPLE_FPS,
"face_global_sample_fps": FACE_GLOBAL_SAMPLE_FPS,
"face_speech_sample_fps": FACE_SPEECH_SAMPLE_FPS,
"face_speech_turn_pad_seconds": FACE_SPEECH_TURN_PAD_SECONDS,
"face_speech_sample_offset_seconds": FACE_SPEECH_SAMPLE_OFFSET_SECONDS,
"face_sample_min_gap_seconds": FACE_SAMPLE_MIN_GAP_SECONDS,
"face_max_sampled_frames": FACE_MAX_SAMPLED_FRAMES,
"face_sample_only_turns_with_text": FACE_SAMPLE_ONLY_TURNS_WITH_TEXT,
"deepface_model_name": DEEPFACE_MODEL_NAME,
"deepface_detector_backends": DEEPFACE_DETECTOR_BACKENDS,
"deepface_align": DEEPFACE_ALIGN,
"deepface_normalization": DEEPFACE_NORMALIZATION,
"face_distance_threshold": FACE_DISTANCE_THRESHOLD,
"face_candidate_max_distance": FACE_CANDIDATE_MAX_DISTANCE,
"face_resize_max_width": FACE_RESIZE_MAX_WIDTH,
"face_crop_expand_margin": FACE_CROP_EXPAND_MARGIN,
"save_face_crops": SAVE_FACE_CROPS,
"use_face_to_override_voice": USE_FACE_TO_OVERRIDE_VOICE,
"use_face_when_voice_unknown": USE_FACE_WHEN_VOICE_UNKNOWN,
"use_best_face_candidate_even_if_not_strict": USE_BEST_FACE_CANDIDATE_EVEN_IF_NOT_STRICT,
"mouth_open_detection_enabled": MOUTH_OPEN_DETECTION_ENABLED,
"mouth_open_backend": MOUTH_OPEN_BACKEND,
"mouth_open_threshold": MOUTH_OPEN_THRESHOLD,
"mouth_speaking_cue_threshold": MOUTH_SPEAKING_CUE_THRESHOLD,
"mouth_open_report_best_scored_face": MOUTH_OPEN_REPORT_BEST_SCORED_FACE,
"mouth_open_min_score_for_best_cue": MOUTH_OPEN_MIN_SCORE_FOR_BEST_CUE,
"mouth_open_top_debug_rows": MOUTH_OPEN_TOP_DEBUG_ROWS,
"mouth_open_min_detection_confidence": MOUTH_OPEN_MIN_DETECTION_CONFIDENCE,
"mouth_open_state_threshold_dlib": MOUTH_OPEN_STATE_THRESHOLD_DLIB,
"mouth_open_state_threshold_mediapipe": MOUTH_OPEN_STATE_THRESHOLD_MEDIAPIPE,
"mouth_open_state_threshold_face_recognition": MOUTH_OPEN_STATE_THRESHOLD_FACE_RECOGNITION,
"mouth_open_state_threshold_pixel_gap": MOUTH_OPEN_STATE_THRESHOLD_PIXEL_GAP,
"mouth_pixel_roi_left": MOUTH_PIXEL_ROI_LEFT,
"mouth_pixel_roi_right": MOUTH_PIXEL_ROI_RIGHT,
"mouth_pixel_roi_top": MOUTH_PIXEL_ROI_TOP,
"mouth_pixel_roi_bottom": MOUTH_PIXEL_ROI_BOTTOM,
"mouth_pixel_dark_percentile": MOUTH_PIXEL_DARK_PERCENTILE,
"mouth_pixel_dark_std_factor": MOUTH_PIXEL_DARK_STD_FACTOR,
"mouth_pixel_min_component_area_ratio": MOUTH_PIXEL_MIN_COMPONENT_AREA_RATIO,
"mouth_pixel_min_component_height_ratio": MOUTH_PIXEL_MIN_COMPONENT_HEIGHT_RATIO,
"mouth_pixel_save_debug_rois": MOUTH_PIXEL_SAVE_DEBUG_ROIS,
"mouth_pixel_debug_roi_dir": str(MOUTH_PIXEL_DEBUG_ROI_DIR),
"mouth_open_rect_expand_margin": MOUTH_OPEN_RECT_EXPAND_MARGIN,
"mouth_open_min_scorable_face_width": MOUTH_OPEN_MIN_SCORABLE_FACE_WIDTH,
"mouth_open_min_scorable_face_height": MOUTH_OPEN_MIN_SCORABLE_FACE_HEIGHT,
"mouth_open_debug_per_face_csv_path": str(MOUTH_OPEN_DEBUG_PER_FACE_CSV_PATH),
"dlib_shape_predictor_path": str(DLIB_SHAPE_PREDICTOR_PATH),
}
deepface_config_path.write_text(
json.dumps(deepface_config, ensure_ascii=False, indent=2),
encoding="utf-8",
)
checkpoint(f"Wrote DeepFace config: {deepface_config_path}")
deepface_runner_code = r'''
import os
import sys
import json
import time
import subprocess
from pathlib import Path
# Native-library safety settings. Set before importing DeepFace/OpenCV/TensorFlow.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
# Avoid shell-level Keras settings interfering with TensorFlow/Keras imports.
os.environ.pop("TF_USE_LEGACY_KERAS", None)
start_time_global = time.time()
def log(message):
elapsed = time.time() - start_time_global
print(f"[subprocess {elapsed:8.1f}s] {message}", flush=True)
if len(sys.argv) != 2:
raise RuntimeError("Usage: python run_deepface_face_matching_subprocess.py CONFIG_JSON")
config_path = Path(sys.argv[1])
config = json.loads(config_path.read_text(encoding="utf-8"))
base = Path(config["base"])
video_path = Path(config["video_path"])
audio_path = Path(config["audio_path"])
macron_face_image = Path(config["macron_face_image"])
trump_face_image = Path(config["trump_face_image"])
deepface_input_csv_path = Path(config["deepface_input_csv_path"])
face_frame_dir = Path(config["face_frame_dir"])
face_crop_dir = Path(config["face_crop_dir"])
face_frame_manifest_csv_path = Path(config["face_frame_manifest_csv_path"])
face_match_csv_path = Path(config["face_match_csv_path"])
face_evidence_csv_path = Path(config["face_evidence_csv_path"])
voice_face_csv_output_path = Path(config["voice_face_csv_output_path"])
voice_face_json_output_path = Path(config["voice_face_json_output_path"])
deepface_metadata_json_path = Path(config["deepface_metadata_json_path"])
FACE_SAMPLING_STRATEGY = str(config.get("face_sampling_strategy", "hybrid_speech_uniform"))
FACE_SAMPLE_FPS = float(config.get("face_sample_fps", 2.0))
FACE_GLOBAL_SAMPLE_FPS = float(config.get("face_global_sample_fps", 0.50))
FACE_SPEECH_SAMPLE_FPS = float(config.get("face_speech_sample_fps", 8.0))
FACE_SPEECH_TURN_PAD_SECONDS = float(config.get("face_speech_turn_pad_seconds", 0.20))
FACE_SPEECH_SAMPLE_OFFSET_SECONDS = list(config.get("face_speech_sample_offset_seconds", [0.0, 0.05, -0.05]))
FACE_SAMPLE_MIN_GAP_SECONDS = float(config.get("face_sample_min_gap_seconds", 0.025))
_face_max_sampled_frames_value = config.get("face_max_sampled_frames", 1500)
FACE_MAX_SAMPLED_FRAMES = None if _face_max_sampled_frames_value is None else int(_face_max_sampled_frames_value)
FACE_SAMPLE_ONLY_TURNS_WITH_TEXT = bool(config.get("face_sample_only_turns_with_text", False))
DEEPFACE_MODEL_NAME = str(config["deepface_model_name"])
DEEPFACE_DETECTOR_BACKENDS = list(config["deepface_detector_backends"])
DEEPFACE_ALIGN = bool(config["deepface_align"])
DEEPFACE_NORMALIZATION = str(config["deepface_normalization"])
FACE_DISTANCE_THRESHOLD = float(config["face_distance_threshold"])
FACE_CANDIDATE_MAX_DISTANCE = float(config["face_candidate_max_distance"])
FACE_RESIZE_MAX_WIDTH = config["face_resize_max_width"]
FACE_CROP_EXPAND_MARGIN = float(config["face_crop_expand_margin"])
SAVE_FACE_CROPS = bool(config["save_face_crops"])
USE_FACE_TO_OVERRIDE_VOICE = bool(config["use_face_to_override_voice"])
USE_FACE_WHEN_VOICE_UNKNOWN = bool(config["use_face_when_voice_unknown"])
USE_BEST_FACE_CANDIDATE_EVEN_IF_NOT_STRICT = bool(config["use_best_face_candidate_even_if_not_strict"])
MOUTH_OPEN_DETECTION_ENABLED = bool(config.get("mouth_open_detection_enabled", True))
MOUTH_OPEN_BACKEND = str(config.get("mouth_open_backend", "pixel_mouth_gap"))
MOUTH_OPEN_THRESHOLD = float(config.get("mouth_open_threshold", 0.79))
MOUTH_SPEAKING_CUE_THRESHOLD = float(config.get("mouth_speaking_cue_threshold", 0.20))
MOUTH_OPEN_REPORT_BEST_SCORED_FACE = bool(config.get("mouth_open_report_best_scored_face", True))
MOUTH_OPEN_MIN_SCORE_FOR_BEST_CUE = float(config.get("mouth_open_min_score_for_best_cue", 0.10))
MOUTH_OPEN_TOP_DEBUG_ROWS = int(config.get("mouth_open_top_debug_rows", 25))
MOUTH_OPEN_MIN_DETECTION_CONFIDENCE = float(config.get("mouth_open_min_detection_confidence", 0.50))
MOUTH_OPEN_STATE_THRESHOLD_DLIB = float(config.get("mouth_open_state_threshold_dlib", 0.12))
MOUTH_OPEN_STATE_THRESHOLD_MEDIAPIPE = float(config.get("mouth_open_state_threshold_mediapipe", 0.050))
MOUTH_OPEN_STATE_THRESHOLD_FACE_RECOGNITION = float(config.get("mouth_open_state_threshold_face_recognition", 0.065))
MOUTH_OPEN_STATE_THRESHOLD_PIXEL_GAP = float(config.get("mouth_open_state_threshold_pixel_gap", 0.045))
MOUTH_PIXEL_ROI_LEFT = float(config.get("mouth_pixel_roi_left", 0.18))
MOUTH_PIXEL_ROI_RIGHT = float(config.get("mouth_pixel_roi_right", 0.82))
MOUTH_PIXEL_ROI_TOP = float(config.get("mouth_pixel_roi_top", 0.52))
MOUTH_PIXEL_ROI_BOTTOM = float(config.get("mouth_pixel_roi_bottom", 0.92))
MOUTH_PIXEL_DARK_PERCENTILE = float(config.get("mouth_pixel_dark_percentile", 22))
MOUTH_PIXEL_DARK_STD_FACTOR = float(config.get("mouth_pixel_dark_std_factor", 0.15))
MOUTH_PIXEL_MIN_COMPONENT_AREA_RATIO = float(config.get("mouth_pixel_min_component_area_ratio", 0.0025))
MOUTH_PIXEL_MIN_COMPONENT_HEIGHT_RATIO = float(config.get("mouth_pixel_min_component_height_ratio", 0.045))
MOUTH_PIXEL_SAVE_DEBUG_ROIS = bool(config.get("mouth_pixel_save_debug_rois", True))
MOUTH_PIXEL_DEBUG_ROI_DIR = Path(config.get("mouth_pixel_debug_roi_dir", str(base / "mouth_open_pixel_gap_debug_rois")))
MOUTH_OPEN_RECT_EXPAND_MARGIN = float(config.get("mouth_open_rect_expand_margin", 0.45))
MOUTH_OPEN_MIN_SCORABLE_FACE_WIDTH = int(config.get("mouth_open_min_scorable_face_width", 20))
MOUTH_OPEN_MIN_SCORABLE_FACE_HEIGHT = int(config.get("mouth_open_min_scorable_face_height", 20))
MOUTH_OPEN_DEBUG_PER_FACE_CSV_PATH = Path(config.get("mouth_open_debug_per_face_csv_path", str(base / "result_macron_trump_mouth_open_per_face_debug.csv")))
DLIB_SHAPE_PREDICTOR_PATH = Path(config.get("dlib_shape_predictor_path", "shape_predictor_68_face_landmarks.dat"))
log("Importing numpy/pandas/PIL/cv2")
import numpy as np
import pandas as pd
from PIL import Image
import cv2
try:
MOUTH_PIXEL_DEBUG_ROI_DIR.mkdir(parents=True, exist_ok=True)
except Exception:
pass
def install_tf_keras_compatibility_shim():
"""
Some TensorFlow/Keras/DeepFace combinations fail on:
from tensorflow.keras.models import Sequential
even when TensorFlow itself imports. If tf_keras is installed, this shim
exposes tf_keras under selected tensorflow.keras module names before
importing DeepFace.
"""
try:
from tensorflow.keras.models import Sequential
log("tensorflow.keras.models.Sequential import OK")
return True
except Exception as exc:
log(f"tensorflow.keras.models.Sequential import failed before shim: {repr(exc)}")
try:
import tf_keras
sys.modules.setdefault("tensorflow.keras", tf_keras)
sys.modules.setdefault("tensorflow.keras.models", tf_keras.models)
sys.modules.setdefault("tensorflow.keras.layers", tf_keras.layers)
sys.modules.setdefault("tensorflow.keras.backend", tf_keras.backend)
sys.modules.setdefault("tensorflow.keras.optimizers", tf_keras.optimizers)
sys.modules.setdefault("tensorflow.keras.utils", tf_keras.utils)
sys.modules.setdefault("tensorflow.keras.callbacks", tf_keras.callbacks)
sys.modules.setdefault("tensorflow.keras.losses", tf_keras.losses)
sys.modules.setdefault("tensorflow.keras.metrics", tf_keras.metrics)
sys.modules.setdefault("tensorflow.keras.initializers", tf_keras.initializers)
sys.modules.setdefault("tensorflow.keras.regularizers", tf_keras.regularizers)
sys.modules.setdefault("tensorflow.keras.constraints", tf_keras.constraints)
try:
sys.modules.setdefault("tensorflow.keras.preprocessing", tf_keras.preprocessing)
except Exception:
pass
try:
sys.modules.setdefault("tensorflow.keras.applications", tf_keras.applications)
except Exception:
pass
from tensorflow.keras.models import Sequential
log("Installed tf_keras compatibility shim successfully")
log("tensorflow.keras.models.Sequential import OK after shim")
return True
except Exception as exc:
log(f"tf_keras compatibility shim failed: {repr(exc)}")
return False
log("Checking TensorFlow/Keras compatibility before importing DeepFace")
install_tf_keras_compatibility_shim()
log("Importing DeepFace")
from deepface import DeepFace
log("Imported DeepFace")
def run_ffprobe_duration(video_file):
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(video_file),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
try:
duration = float(result.stdout.strip())
if duration > 0:
return duration
except Exception:
pass
return None
def get_video_duration_with_opencv(video_file):
cap = cv2.VideoCapture(str(video_file))
if not cap.isOpened():
return None
fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
cap.release()
if fps > 0 and frame_count > 0:
return frame_count / fps
return None
def extract_frame_at_timestamp(video_file, timestamp, output_path):
output_path = Path(output_path)
if output_path.exists():
output_path.unlink()
cmd_accurate = [
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-i", str(video_file),
"-ss", f"{timestamp:.3f}",
"-frames:v", "1", "-q:v", "2", str(output_path),
]
result = subprocess.run(cmd_accurate, capture_output=True, text=True)
if output_path.exists() and output_path.stat().st_size > 0:
return True, "ffmpeg_accurate", result.stderr
cmd_fast = [
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-ss", f"{timestamp:.3f}",
"-i", str(video_file),
"-frames:v", "1", "-q:v", "2", str(output_path),
]
result_fast = subprocess.run(cmd_fast, capture_output=True, text=True)
if output_path.exists() and output_path.stat().st_size > 0:
return True, "ffmpeg_fast", result_fast.stderr
return False, "failed", (result.stderr + "\n" + result_fast.stderr)
def resize_image_file_for_deepface(input_path, output_path, max_width=1600):
input_path = Path(input_path)
output_path = Path(output_path)
with Image.open(input_path) as img:
img = img.convert("RGB")
width, height = img.size
if max_width is None or width <= max_width:
return input_path
scale = float(max_width) / float(width)
resized = img.resize((int(width * scale), int(height * scale)))
resized.save(output_path)
return output_path
def crop_with_margin(image_path, facial_area, output_path, margin=0.25):
image_path = Path(image_path)
output_path = Path(output_path)
with Image.open(image_path) as img:
img = img.convert("RGB")
width, height = img.size
x = int(facial_area.get("x", 0))
y = int(facial_area.get("y", 0))
w = int(facial_area.get("w", facial_area.get("width", 0)))
h = int(facial_area.get("h", facial_area.get("height", 0)))
if w <= 0 or h <= 0:
return None
pad_x = int(round(w * margin))
pad_y = int(round(h * margin))
left = max(0, x - pad_x)
top = max(0, y - pad_y)
right = min(width, x + w + pad_x)
bottom = min(height, y + h + pad_y)
if right <= left or bottom <= top:
return None
crop = img.crop((left, top, right, bottom))
crop.save(output_path)
return output_path
def cosine_distance(a, b):
a = np.asarray(a, dtype=np.float32).reshape(-1)
b = np.asarray(b, dtype=np.float32).reshape(-1)
denom = np.linalg.norm(a) * np.linalg.norm(b)
if denom == 0:
return float("inf")
return float(1.0 - float(np.dot(a, b) / denom))
def point_distance(a, b):
return float(((float(a[0]) - float(b[0])) ** 2 + (float(a[1]) - float(b[1])) ** 2) ** 0.5)
def get_mouth_state_threshold(backend):
"""Backend-specific threshold for per-face open/closed classification."""
backend_text = str(backend or "").lower()
if any(token in backend_text for token in ["pixel", "opencv", "dark_gap", "mouth_gap"]):
return float(MOUTH_OPEN_STATE_THRESHOLD_PIXEL_GAP)
if "mediapipe" in backend_text:
return float(MOUTH_OPEN_STATE_THRESHOLD_MEDIAPIPE)
if "face_recognition" in backend_text:
return float(MOUTH_OPEN_STATE_THRESHOLD_FACE_RECOGNITION)
return float(MOUTH_OPEN_STATE_THRESHOLD_DLIB)
def classify_mouth_state(score, backend):
"""Classify one detected face as open, closed, or unknown."""
if score is None:
return {
"mouth_open_state": "unknown",
"mouth_open_is_open": None,
"mouth_open_is_closed": None,
"mouth_open_speech_cue": None,
"mouth_open_strict_is_open": None,
"mouth_open_state_threshold": get_mouth_state_threshold(backend),
"mouth_open_strict_threshold": float(MOUTH_OPEN_THRESHOLD),
"mouth_open_state_confidence": None,
"mouth_open_state_rule": "no_score",
}
score = float(score)
state_threshold = get_mouth_state_threshold(backend)
is_state_open = bool(score >= state_threshold)
strict_is_open = bool(score >= float(MOUTH_OPEN_THRESHOLD))
if is_state_open:
state = "open"
confidence = float(min(1.0, score / max(state_threshold, 1e-6)))
rule = "score_gte_backend_open_closed_threshold"
else:
state = "closed"
confidence = float(min(1.0, max(0.0, (state_threshold - score) / max(state_threshold, 1e-6))))
rule = "score_lt_backend_open_closed_threshold"
return {
"mouth_open_state": state,
"mouth_open_is_open": is_state_open,
"mouth_open_is_closed": bool(not is_state_open),
"mouth_open_speech_cue": is_state_open,
"mouth_open_strict_is_open": strict_is_open,
"mouth_open_state_threshold": float(state_threshold),
"mouth_open_strict_threshold": float(MOUTH_OPEN_THRESHOLD),
"mouth_open_state_confidence": confidence,
"mouth_open_state_rule": rule,
}
def empty_mouth_result(error=None, backend=None):
result = {
"mouth_open_backend": backend,
"mouth_open_is_open": None,
"mouth_open_is_closed": None,
"mouth_open_state": "unknown",
"mouth_open_speech_cue": None,
"mouth_open_strict_is_open": None,
"mouth_open_score": None,
"mouth_open_threshold": get_mouth_state_threshold(backend),
"mouth_open_state_threshold": get_mouth_state_threshold(backend),
"mouth_open_strict_threshold": float(MOUTH_OPEN_THRESHOLD),
"mouth_open_state_confidence": None,
"mouth_open_state_rule": "no_score",
"mouth_open_vertical_distance": None,
"mouth_open_horizontal_distance": None,
"mouth_open_landmarks": "{}",
"mouth_open_debug_image_path": None,
"mouth_open_error": error,
}
return result
def mouth_aspect_ratio_mauckc(mouth_points):
"""
Exact MAR formula adapted from mauckc/mouth-open:
A = dist(mouth[2], mouth[10])
B = dist(mouth[4], mouth[8])
C = dist(mouth[0], mouth[6])
MAR = (A + B) / (2.0 * C)
"""
pts = [(float(x), float(y)) for x, y in mouth_points]
if len(pts) <= 10:
return None, {"error": f"need at least 11 mouth points; received {len(pts)}"}
A = point_distance(pts[2], pts[10])
B = point_distance(pts[4], pts[8])
C = point_distance(pts[0], pts[6])
if C <= 0:
return None, {"error": "mouth horizontal width was zero"}
score = float((A + B) / (2.0 * C))
debug = {
"formula": "mauckc_mouth_open_mar",
"A_distance_mouth_2_10": float(A),
"B_distance_mouth_4_8": float(B),
"C_distance_mouth_0_6": float(C),
"mouth_0": pts[0],
"mouth_2": pts[2],
"mouth_4": pts[4],
"mouth_6": pts[6],
"mouth_8": pts[8],
"mouth_10": pts[10],
}
return score, debug
def resolve_dlib_shape_predictor_path():
candidates = []
try:
candidates.append(Path(DLIB_SHAPE_PREDICTOR_PATH))
except Exception:
pass
for env_name in ["WHOSPOKE_DLIB_SHAPE_PREDICTOR", "DLIB_SHAPE_PREDICTOR", "SHAPE_PREDICTOR_68_FACE_LANDMARKS"]:
value = os.environ.get(env_name)
if value:
candidates.append(Path(value))
candidates.extend([
base / "shape_predictor_68_face_landmarks.dat",
base / "models" / "shape_predictor_68_face_landmarks.dat",
Path.cwd() / "shape_predictor_68_face_landmarks.dat",
Path.cwd() / "models" / "shape_predictor_68_face_landmarks.dat",
])
for candidate in candidates:
try:
if candidate.exists() and candidate.is_file():
return candidate
except Exception:
continue
return None
def dlib_shape_to_np(shape):
return [(float(shape.part(i).x), float(shape.part(i).y)) for i in range(shape.num_parts)]
def bbox_to_dlib_rect(facial_area, image_shape, margin=None):
if not facial_area:
return None
try:
import dlib
except Exception:
return None
height = int(image_shape[0])
width = int(image_shape[1])
x = int(float(facial_area.get("x", facial_area.get("left", 0))))
y = int(float(facial_area.get("y", facial_area.get("top", 0))))
w = int(float(facial_area.get("w", facial_area.get("width", 0))))
h = int(float(facial_area.get("h", facial_area.get("height", 0))))
if w <= 0 and "right" in facial_area:
w = int(float(facial_area["right"]) - x)
if h <= 0 and "bottom" in facial_area:
h = int(float(facial_area["bottom"]) - y)
if w <= 0 or h <= 0:
return None
if w < int(MOUTH_OPEN_MIN_SCORABLE_FACE_WIDTH) or h < int(MOUTH_OPEN_MIN_SCORABLE_FACE_HEIGHT):
return None
if margin is None:
margin = float(MOUTH_OPEN_RECT_EXPAND_MARGIN)
margin = max(0.0, float(margin))
pad_x = int(round(w * margin))
pad_y = int(round(h * margin))
left = max(0, x - pad_x)
top = max(0, y - pad_y)
right = min(width - 1, x + w + pad_x)
bottom = min(height - 1, y + h + pad_y)
if right <= left or bottom <= top:
return None
return dlib.rectangle(left=left, top=top, right=right, bottom=bottom)
def parse_facial_area(value):
if value is None:
return {}
if isinstance(value, dict):
return value
try:
if pd.isna(value):
return {}
except Exception:
pass
try:
return json.loads(str(value))
except Exception:
return {}
def clamp_fraction(value, default, low=0.0, high=1.0):
try:
value = float(value)
except Exception:
value = float(default)
return max(float(low), min(float(high), value))
def crop_face_region_for_pixel_gap(frame_path=None, facial_area=None, crop_path=None):
"""
Return a face image for pixel-gap mouth analysis.
Priority:
1. Full video frame + DeepFace facial_area, expanded modestly.
2. Saved face crop from DeepFace.
This avoids depending on landmarks. It analyzes the lower-central portion of
the detected face region and looks for a dark mouth aperture.
"""
facial_area = facial_area or {}
# Try full frame + detected face bounding box first.
if frame_path is not None and str(frame_path).strip() and str(frame_path).lower() != "nan":
frame_bgr = cv2.imread(str(frame_path))
if frame_bgr is not None and facial_area:
height, width = frame_bgr.shape[:2]
try:
x = int(float(facial_area.get("x", facial_area.get("left", 0))))
y = int(float(facial_area.get("y", facial_area.get("top", 0))))
w = int(float(facial_area.get("w", facial_area.get("width", 0))))
h = int(float(facial_area.get("h", facial_area.get("height", 0))))
if w <= 0 and "right" in facial_area:
w = int(float(facial_area["right"]) - x)
if h <= 0 and "bottom" in facial_area:
h = int(float(facial_area["bottom"]) - y)
except Exception:
x = y = w = h = 0
if w >= int(MOUTH_OPEN_MIN_SCORABLE_FACE_WIDTH) and h >= int(MOUTH_OPEN_MIN_SCORABLE_FACE_HEIGHT):
pad_x = int(round(w * 0.12))
pad_y_top = int(round(h * 0.08))
pad_y_bottom = int(round(h * 0.18))
left = max(0, x - pad_x)
top = max(0, y - pad_y_top)
right = min(width, x + w + pad_x)
bottom = min(height, y + h + pad_y_bottom)
if right > left and bottom > top:
return frame_bgr[top:bottom, left:right].copy(), {
"source": "frame_deepface_bbox",
"source_path": str(frame_path),
"face_box": {"left": int(left), "top": int(top), "right": int(right), "bottom": int(bottom)},
}
# Fall back to saved crop.
if crop_path is not None and str(crop_path).strip() and str(crop_path).lower() != "nan":
crop_bgr = cv2.imread(str(crop_path))
if crop_bgr is not None:
return crop_bgr, {
"source": "saved_face_crop",
"source_path": str(crop_path),
"face_box": None,
}
return None, {"source": "none", "source_path": None, "face_box": None}
def estimate_mouth_open_pixel_gap(image_path=None, facial_area=None, crop_path=None):
"""
Pixel-based mouth-open algorithm.
This is intentionally different from MAR/landmark approaches. It detects a
mouth aperture using image evidence only:
1. take each detected face independently;
2. isolate the lower-central mouth region;
3. threshold dark pixels adaptively;
4. locate the largest dark connected component;
5. classify open/closed from component height/area and darkness.
It is useful when dlib/MediaPipe landmarks fail or return conservative MAR
scores for ordinary speech. It is not lip reading; it is a visual cue.
"""
backend = "pixel_mouth_gap"
facial_area = facial_area or {}
face_bgr, source_info = crop_face_region_for_pixel_gap(
frame_path=image_path,
facial_area=facial_area,
crop_path=crop_path,
)
if face_bgr is None:
return empty_mouth_result(error="could not read face region for pixel-gap mouth detector", backend=backend)
h, w = face_bgr.shape[:2]
if w < int(MOUTH_OPEN_MIN_SCORABLE_FACE_WIDTH) or h < int(MOUTH_OPEN_MIN_SCORABLE_FACE_HEIGHT):
return empty_mouth_result(error=f"face region too small for pixel-gap detector: {w}x{h}", backend=backend)
left_f = clamp_fraction(MOUTH_PIXEL_ROI_LEFT, 0.18)
right_f = clamp_fraction(MOUTH_PIXEL_ROI_RIGHT, 0.82)
top_f = clamp_fraction(MOUTH_PIXEL_ROI_TOP, 0.52)
bottom_f = clamp_fraction(MOUTH_PIXEL_ROI_BOTTOM, 0.92)
if right_f <= left_f:
left_f, right_f = 0.18, 0.82
if bottom_f <= top_f:
top_f, bottom_f = 0.52, 0.92
x1 = int(round(w * left_f))
x2 = int(round(w * right_f))
y1 = int(round(h * top_f))
y2 = int(round(h * bottom_f))
x1 = max(0, min(w - 1, x1))
x2 = max(x1 + 1, min(w, x2))
y1 = max(0, min(h - 1, y1))
y2 = max(y1 + 1, min(h, y2))
roi_bgr = face_bgr[y1:y2, x1:x2]
roi_h, roi_w = roi_bgr.shape[:2]
if roi_w < 8 or roi_h < 8:
return empty_mouth_result(error=f"mouth ROI too small: {roi_w}x{roi_h}", backend=backend)
gray = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2GRAY)
gray_blur = cv2.GaussianBlur(gray, (3, 3), 0)
# Improve local contrast without depending on a global brightness assumption.
try:
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(4, 4))
gray_eq = clahe.apply(gray_blur)
except Exception:
gray_eq = gray_blur
# Adaptive dark threshold. A true mouth aperture is usually among the darker
# pixels in the mouth ROI, even when the video is bright.
percentile_threshold = float(np.percentile(gray_eq, float(MOUTH_PIXEL_DARK_PERCENTILE)))
median = float(np.median(gray_eq))
std = float(np.std(gray_eq))
std_threshold = median - float(MOUTH_PIXEL_DARK_STD_FACTOR) * std
dark_threshold = max(0.0, min(255.0, min(percentile_threshold, std_threshold)))
dark_mask = (gray_eq <= dark_threshold).astype(np.uint8) * 255
# Clean up isolated pixels and fill a small aperture region.
kernel_open = np.ones((2, 2), dtype=np.uint8)
kernel_close = np.ones((3, 3), dtype=np.uint8)
dark_mask = cv2.morphologyEx(dark_mask, cv2.MORPH_OPEN, kernel_open)
dark_mask = cv2.morphologyEx(dark_mask, cv2.MORPH_CLOSE, kernel_close)
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(dark_mask, connectivity=8)
roi_area = float(max(1, roi_w * roi_h))
dark_area_ratio = float((dark_mask > 0).sum() / roi_area)
best = None
for label_id in range(1, num_labels):
x, y, cw, ch, area = stats[label_id]
area = float(area)
if area <= 0:
continue
area_ratio = area / roi_area
height_ratio = float(ch) / float(max(1, roi_h))
width_ratio = float(cw) / float(max(1, roi_w))
cx, cy = centroids[label_id]
center_distance = abs(float(cx) - (roi_w / 2.0)) / max(1.0, roi_w / 2.0)
centrality = max(0.0, 1.0 - center_distance)
# Mouth apertures should usually be central and not span the entire ROI.
component_score = (
0.45 * min(1.0, height_ratio / 0.18)
+ 0.35 * min(1.0, area_ratio / 0.035)
+ 0.20 * min(1.0, dark_area_ratio / 0.10)
) * (0.70 + 0.30 * centrality)
candidate = {
"component_label": int(label_id),
"x": int(x),
"y": int(y),
"w": int(cw),
"h": int(ch),
"area": int(area),
"area_ratio": float(area_ratio),
"height_ratio": float(height_ratio),
"width_ratio": float(width_ratio),
"centrality": float(centrality),
"component_score": float(component_score),
}
if best is None or candidate["component_score"] > best["component_score"]:
best = candidate
if best is None:
score = 0.0
best = {
"component_label": None,
"x": None,
"y": None,
"w": 0,
"h": 0,
"area": 0,
"area_ratio": 0.0,
"height_ratio": 0.0,
"width_ratio": 0.0,
"centrality": 0.0,
"component_score": 0.0,
}
else:
score = float(best["component_score"])
# Very small components should not become open-mouth cues unless they also
# have enough height. This reduces false positives from shadows or wrinkles.
if best["area_ratio"] < float(MOUTH_PIXEL_MIN_COMPONENT_AREA_RATIO) and best["height_ratio"] < float(MOUTH_PIXEL_MIN_COMPONENT_HEIGHT_RATIO):
score = min(score, float(MOUTH_OPEN_STATE_THRESHOLD_PIXEL_GAP) * 0.75)
state = classify_mouth_state(score, backend)
debug_image_path = None
if bool(MOUTH_PIXEL_SAVE_DEBUG_ROIS):
try:
stem_source = Path(str(crop_path or image_path or "mouth_roi")).stem
# Include frame/face data if present to avoid overwriting files.
debug_name = f"{stem_source}_mouth_roi.png"
debug_image_path = MOUTH_PIXEL_DEBUG_ROI_DIR / debug_name
debug_canvas = roi_bgr.copy()
if best.get("x") is not None:
cv2.rectangle(
debug_canvas,
(int(best["x"]), int(best["y"])),
(int(best["x"] + best["w"]), int(best["y"] + best["h"])),
(0, 0, 255),
1,
)
cv2.imwrite(str(debug_image_path), debug_canvas)
except Exception:
debug_image_path = None
debug = {
"formula": "opencv_pixel_dark_mouth_gap",
"source_info": source_info,
"face_size": {"width": int(w), "height": int(h)},
"mouth_roi_box_in_face": {"x1": int(x1), "y1": int(y1), "x2": int(x2), "y2": int(y2)},
"mouth_roi_size": {"width": int(roi_w), "height": int(roi_h)},
"dark_percentile": float(MOUTH_PIXEL_DARK_PERCENTILE),
"dark_threshold": float(dark_threshold),
"dark_area_ratio": float(dark_area_ratio),
"largest_component": best,
"state_threshold": float(state["mouth_open_state_threshold"]),
"strict_threshold": float(MOUTH_OPEN_THRESHOLD),
"debug_image_path": str(debug_image_path) if debug_image_path else None,
}
return {
"mouth_open_backend": backend,
**state,
"mouth_open_score": float(score),
"mouth_open_threshold": float(state["mouth_open_state_threshold"]),
"mouth_open_vertical_distance": float(best.get("h") or 0),
"mouth_open_horizontal_distance": float(best.get("w") or 0),
"mouth_open_landmarks": json.dumps(debug, ensure_ascii=False),
"mouth_open_debug_image_path": str(debug_image_path) if debug_image_path else None,
"mouth_open_error": None,
}
def estimate_mouth_open_dlib68_mauckc(image_path, facial_area=None, crop_path=None):
"""
Use the mauckc/mouth-open dlib 68-landmark implementation.
The preferred path is full-frame + the face rectangle from DeepFace. This is
much more reliable than running landmarks on tight crops, because tight crops
may remove chin/cheek context needed by dlib's shape predictor.
"""
backend = "dlib_68_mauckc"
predictor_path = resolve_dlib_shape_predictor_path()
if predictor_path is None:
return empty_mouth_result(
error="shape_predictor_68_face_landmarks.dat not found. Put it in the base folder or set WHOSPOKE_DLIB_SHAPE_PREDICTOR.",
backend=backend,
)
try:
import dlib
except Exception as exc:
return empty_mouth_result(error=f"dlib import failed: {repr(exc)}", backend=backend)
def load_gray(path):
image_bgr = cv2.imread(str(path))
if image_bgr is None:
return None, None
return image_bgr, cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
image_path = Path(image_path)
image_bgr, gray = load_gray(image_path)
source_path = image_path
if image_bgr is None:
return empty_mouth_result(error=f"could not read image: {image_path}", backend=backend)
try:
predictor = dlib.shape_predictor(str(predictor_path))
except Exception as exc:
return empty_mouth_result(error=f"could not load shape predictor {predictor_path}: {repr(exc)}", backend=backend)
rect = bbox_to_dlib_rect(facial_area, image_bgr.shape, margin=MOUTH_OPEN_RECT_EXPAND_MARGIN)
rect_source = "provided_deepface_bbox"
if rect is None:
detector = dlib.get_frontal_face_detector()
try:
rects = list(detector(gray, 0))
except Exception:
rects = []
if rects:
rect = max(rects, key=lambda r: max(1, r.width()) * max(1, r.height()))
rect_source = "dlib_detector_on_frame"
elif crop_path is not None:
crop_bgr, crop_gray = load_gray(crop_path)
if crop_bgr is not None:
source_path = Path(crop_path)
image_bgr, gray = crop_bgr, crop_gray
try:
rects = list(detector(gray, 0))
except Exception:
rects = []
if rects:
rect = max(rects, key=lambda r: max(1, r.width()) * max(1, r.height()))
rect_source = "dlib_detector_on_crop"
else:
h, w = crop_gray.shape[:2]
rect = dlib.rectangle(left=0, top=0, right=max(1, w - 1), bottom=max(1, h - 1))
rect_source = "whole_crop_as_rect"
if rect is None:
return empty_mouth_result(error="no face rectangle available for dlib landmarks", backend=backend)
try:
shape = predictor(gray, rect)
points = dlib_shape_to_np(shape)
except Exception as exc:
return empty_mouth_result(error=f"dlib landmark prediction failed: {repr(exc)}", backend=backend)
if len(points) < 68:
return empty_mouth_result(error=f"expected 68 landmarks; received {len(points)}", backend=backend)
# The referenced repo uses mStart=49, mEnd=68, but common iBUG indexing uses
# 48:68 for the full mouth. Compute both and use the larger MAR, which avoids
# under-detecting clear open-mouth cases while preserving the mauckc formula.
candidates = {
"mauckc_literal_shape_49_68": points[49:68],
"ibug_standard_shape_48_68": points[48:68],
}
scored = []
for name, mouth_points in candidates.items():
score, debug = mouth_aspect_ratio_mauckc(mouth_points)
if score is not None:
debug["slice_name"] = name
scored.append((float(score), debug))
if not scored:
return empty_mouth_result(error="could not compute mauckc mouth aspect ratio", backend=backend)
score, debug = max(scored, key=lambda item: item[0])
debug.update({
"source_path": str(source_path),
"shape_predictor_path": str(predictor_path),
"rect_source": rect_source,
"rect_expand_margin": float(MOUTH_OPEN_RECT_EXPAND_MARGIN),
"state_threshold": get_mouth_state_threshold(backend),
"strict_threshold": float(MOUTH_OPEN_THRESHOLD),
"rect": {"left": int(rect.left()), "top": int(rect.top()), "right": int(rect.right()), "bottom": int(rect.bottom())},
})
state = classify_mouth_state(score, backend)
return {
"mouth_open_backend": backend,
**state,
"mouth_open_score": float(score),
"mouth_open_threshold": float(state["mouth_open_state_threshold"]),
"mouth_open_vertical_distance": float(debug["A_distance_mouth_2_10"] + debug["B_distance_mouth_4_8"]),
"mouth_open_horizontal_distance": float(2.0 * debug["C_distance_mouth_0_6"]),
"mouth_open_landmarks": json.dumps(debug, ensure_ascii=False),
"mouth_open_error": None,
}
def estimate_mouth_open_mediapipe(crop_path):
backend = "mediapipe_face_mesh"
try:
import mediapipe as mp
except Exception as exc:
return empty_mouth_result(error=f"mediapipe import failed: {repr(exc)}", backend=backend)
image_bgr = cv2.imread(str(crop_path))
if image_bgr is None:
return empty_mouth_result(error=f"could not read crop: {crop_path}", backend=backend)
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
try:
face_mesh = mp.solutions.face_mesh.FaceMesh(
static_image_mode=True,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=float(MOUTH_OPEN_MIN_DETECTION_CONFIDENCE),
)
try:
result = face_mesh.process(image_rgb)
finally:
try:
face_mesh.close()
except Exception:
pass
except Exception as exc:
return empty_mouth_result(error=f"mediapipe FaceMesh failed: {repr(exc)}", backend=backend)
if not getattr(result, "multi_face_landmarks", None):
return empty_mouth_result(error="no Face Mesh landmarks found in crop", backend=backend)
landmarks = result.multi_face_landmarks[0].landmark
try:
pts = {
"upper_inner_lip_13": (float(landmarks[13].x), float(landmarks[13].y)),
"lower_inner_lip_14": (float(landmarks[14].x), float(landmarks[14].y)),
"left_mouth_corner_61": (float(landmarks[61].x), float(landmarks[61].y)),
"right_mouth_corner_291": (float(landmarks[291].x), float(landmarks[291].y)),
}
vertical = point_distance(pts["upper_inner_lip_13"], pts["lower_inner_lip_14"])
horizontal = point_distance(pts["left_mouth_corner_61"], pts["right_mouth_corner_291"])
except Exception as exc:
return empty_mouth_result(error=f"required mouth landmarks missing: {repr(exc)}", backend=backend)
if horizontal <= 0:
return empty_mouth_result(error="mouth horizontal width was zero", backend=backend)
score = float(vertical / horizontal)
state = classify_mouth_state(score, backend)
return {
"mouth_open_backend": backend,
**state,
"mouth_open_score": score,
"mouth_open_threshold": float(state["mouth_open_state_threshold"]),
"mouth_open_vertical_distance": float(vertical),
"mouth_open_horizontal_distance": float(horizontal),
"mouth_open_landmarks": json.dumps(pts, ensure_ascii=False),
"mouth_open_error": None,
}
def mean_point(points):
pts = list(points)
if not pts:
return (0.0, 0.0)
return (sum(p[0] for p in pts) / len(pts), sum(p[1] for p in pts) / len(pts))
def estimate_mouth_open_face_recognition(crop_path):
backend = "face_recognition_landmarks"
try:
import face_recognition
except Exception as exc:
return empty_mouth_result(error=f"face_recognition import failed: {repr(exc)}", backend=backend)
try:
image = face_recognition.load_image_file(str(crop_path))
landmarks_rows = face_recognition.face_landmarks(image)
except Exception as exc:
return empty_mouth_result(error=f"face_recognition landmarks failed: {repr(exc)}", backend=backend)
if not landmarks_rows:
return empty_mouth_result(error="no dlib/face_recognition landmarks found in crop", backend=backend)
landmarks = landmarks_rows[0]
top_lip = [tuple(map(float, p)) for p in (landmarks.get("top_lip") or [])]
bottom_lip = [tuple(map(float, p)) for p in (landmarks.get("bottom_lip") or [])]
if len(top_lip) < 12 or len(bottom_lip) < 12:
return empty_mouth_result(error="insufficient lip landmarks returned", backend=backend)
left_corner = top_lip[0]
right_corner = top_lip[6]
upper_inner_left = top_lip[9]
upper_inner_right = top_lip[10]
lower_inner_left = bottom_lip[9]
lower_inner_right = bottom_lip[10]
A = point_distance(upper_inner_left, lower_inner_right)
B = point_distance(upper_inner_right, lower_inner_left)
C = point_distance(left_corner, right_corner)
if C <= 0:
return empty_mouth_result(error="mouth horizontal width was zero", backend=backend)
score = float((A + B) / (2.0 * C))
state = classify_mouth_state(score, backend)
pts = {
"formula": "mauckc_style_face_recognition_fallback",
"left_corner": left_corner,
"right_corner": right_corner,
"upper_inner_left": upper_inner_left,
"upper_inner_right": upper_inner_right,
"lower_inner_left": lower_inner_left,
"lower_inner_right": lower_inner_right,
"A_distance": float(A),
"B_distance": float(B),
"C_distance": float(C),
"state_threshold": float(state["mouth_open_state_threshold"]),
"strict_threshold": float(MOUTH_OPEN_THRESHOLD),
}
return {
"mouth_open_backend": backend,
**state,
"mouth_open_score": score,
"mouth_open_threshold": float(state["mouth_open_state_threshold"]),
"mouth_open_vertical_distance": float(A + B),
"mouth_open_horizontal_distance": float(2.0 * C),
"mouth_open_landmarks": json.dumps(pts, ensure_ascii=False),
"mouth_open_error": None,
}
def estimate_mouth_open_for_match_row(match_row):
if not MOUTH_OPEN_DETECTION_ENABLED:
return empty_mouth_result(error="mouth-open detection disabled", backend="none")
crop_path = match_row.get("crop_path")
frame_for_deepface = match_row.get("frame_for_deepface") or match_row.get("frame_path")
facial_area = parse_facial_area(match_row.get("facial_area"))
if frame_for_deepface is None or str(frame_for_deepface).strip() == "" or str(frame_for_deepface).lower() == "nan":
frame_for_deepface = crop_path
if frame_for_deepface is None or str(frame_for_deepface).strip() == "" or str(frame_for_deepface).lower() == "nan":
return empty_mouth_result(error="no frame or crop path", backend=MOUTH_OPEN_BACKEND)
backend = str(MOUTH_OPEN_BACKEND or "dlib_68_mauckc").lower().replace("-", "_")
if backend in {"none", "off", "disabled", "false"}:
return empty_mouth_result(error="mouth-open detection disabled", backend="none")
if backend in {"pixel", "pixel_mouth_gap", "opencv", "opencv_dark_gap", "dark_gap", "mouth_gap"}:
return estimate_mouth_open_pixel_gap(frame_for_deepface, facial_area=facial_area, crop_path=crop_path)
if backend in {"dlib68", "dlib_68", "dlib_68_mauckc", "mauckc", "mouth_open_repo"}:
return estimate_mouth_open_dlib68_mauckc(frame_for_deepface, facial_area=facial_area, crop_path=crop_path)
if backend in {"mediapipe", "mediapipe_face_mesh", "face_mesh"}:
if crop_path is None or str(crop_path).strip() == "" or str(crop_path).lower() == "nan":
return empty_mouth_result(error="no crop path", backend=backend)
return estimate_mouth_open_mediapipe(crop_path)
if backend in {"face_recognition", "face_recognition_landmarks", "dlib"}:
if crop_path is None or str(crop_path).strip() == "" or str(crop_path).lower() == "nan":
return empty_mouth_result(error="no crop path", backend=backend)
return estimate_mouth_open_face_recognition(crop_path)
# auto: use the pixel-gap detector first because it always works per detected
# face crop and does not depend on dlib/MediaPipe landmark success. Then try
# landmark methods as fallbacks/alternatives.
first = estimate_mouth_open_pixel_gap(frame_for_deepface, facial_area=facial_area, crop_path=crop_path)
if first.get("mouth_open_score") is not None:
return first
second = estimate_mouth_open_dlib68_mauckc(frame_for_deepface, facial_area=facial_area, crop_path=crop_path)
if second.get("mouth_open_score") is not None:
return second
if crop_path is not None and str(crop_path).strip() and str(crop_path).lower() != "nan":
third = estimate_mouth_open_mediapipe(crop_path)
if third.get("mouth_open_score") is not None:
return third
fourth = estimate_mouth_open_face_recognition(crop_path)
if fourth.get("mouth_open_score") is not None:
return fourth
return empty_mouth_result(
error=(first.get("mouth_open_error") or "pixel_gap failed") + " | " + (second.get("mouth_open_error") or "dlib68_mauckc failed") + " | " + (third.get("mouth_open_error") or "mediapipe failed") + " | " + (fourth.get("mouth_open_error") or "face_recognition failed"),
backend="auto",
)
return empty_mouth_result(
error=(first.get("mouth_open_error") or "pixel_gap failed") + " | " + (second.get("mouth_open_error") or "dlib68_mauckc failed"),
backend="auto",
)
def extract_embedding_from_deepface_obj(obj):
if not isinstance(obj, dict):
return None
embedding = obj.get("embedding")
if embedding is None:
return None
return np.asarray(embedding, dtype=np.float32)
def deepface_extract_faces(image_path, detector_backends, align=True):
error_log = []
for backend in detector_backends:
try:
faces = DeepFace.extract_faces(
img_path=str(image_path),
detector_backend=backend,
align=align,
enforce_detection=True,
)
if faces is None:
faces = []
if isinstance(faces, dict):
faces = [faces]
usable_faces = [
face for face in faces
if isinstance(face, dict) and face.get("facial_area") is not None
]
if usable_faces:
return usable_faces, backend, error_log
error_log.append({"backend": backend, "error": "no_faces_returned"})
except Exception as exc:
error_log.append({"backend": backend, "error": repr(exc)})
return [], None, error_log
def represent_crop_with_deepface(crop_path):
attempts = [
{"detector_backend": "skip", "align": False, "enforce_detection": False},
{"detector_backend": "opencv", "align": False, "enforce_detection": False},
]
errors = []
for attempt in attempts:
try:
objs = DeepFace.represent(
img_path=str(crop_path),
model_name=DEEPFACE_MODEL_NAME,
detector_backend=attempt["detector_backend"],
align=attempt["align"],
normalization=DEEPFACE_NORMALIZATION,
enforce_detection=attempt["enforce_detection"],
)
if objs is None:
objs = []
if isinstance(objs, dict):
objs = [objs]
for obj in objs:
embedding = extract_embedding_from_deepface_obj(obj)
if embedding is not None:
return embedding, attempt["detector_backend"], errors
errors.append({"backend": attempt["detector_backend"], "error": "no_embedding_returned"})
except Exception as exc:
errors.append({"backend": attempt["detector_backend"], "error": repr(exc)})
return None, None, errors
def load_reference_embedding_with_deepface(image_path, person_label):
faces, backend_used, error_log = deepface_extract_faces(
image_path=image_path,
detector_backends=DEEPFACE_DETECTOR_BACKENDS,
align=DEEPFACE_ALIGN,
)
if not faces:
print("DeepFace reference extraction errors:", json.dumps(error_log, indent=2), flush=True)
raise RuntimeError(f"DeepFace could not detect a face for {person_label}: {image_path}")
def face_area(face):
area = face.get("facial_area", {}) or {}
return int(area.get("w", 0)) * int(area.get("h", 0))
selected_face = max(faces, key=face_area)
facial_area = selected_face.get("facial_area", {}) or {}
ref_crop_dir = base / "reference_face_crops"
ref_crop_dir.mkdir(parents=True, exist_ok=True)
safe_label = person_label.lower().replace(" ", "_").replace(".", "")
crop_path = ref_crop_dir / f"{safe_label}_reference_crop.jpg"
crop_with_margin(image_path=image_path, facial_area=facial_area, output_path=crop_path, margin=FACE_CROP_EXPAND_MARGIN)
embedding, represent_backend, represent_errors = represent_crop_with_deepface(crop_path)
if embedding is None:
print("DeepFace reference representation errors:", json.dumps(represent_errors, indent=2), flush=True)
raise RuntimeError(f"DeepFace could not represent reference crop for {person_label}: {crop_path}")
log(f"Loaded DeepFace reference embedding for {person_label}: detector={backend_used}; represent_backend={represent_backend}; dim={embedding.shape[0]}")
return embedding, backend_used, represent_backend, str(crop_path)
def speaker_is_unknown(value):
if value is None:
return True
if pd.isna(value):
return True
value = str(value).lower()
return value.startswith("unknown") or value.startswith("speaker_") or value in {"none", "nan", ""}
log(f"Reading input transcript: {deepface_input_csv_path}")
df_named = pd.read_csv(deepface_input_csv_path, encoding="utf-8-sig")
log("Loading DeepFace reference embeddings")
reference_faces = {
"emmanuel_macron": {"speaker_id": "emmanuel_macron", "speaker_name": "Emmanuel Macron", "image_path": macron_face_image},
"donald_j_trump": {"speaker_id": "donald_j_trump", "speaker_name": "Donald J. Trump", "image_path": trump_face_image},
}
reference_face_embeddings = {}
reference_backend_rows = []
for person_key, person in reference_faces.items():
embedding, detector_backend, represent_backend, crop_path = load_reference_embedding_with_deepface(person["image_path"], person["speaker_name"])
reference_face_embeddings[person_key] = embedding
reference_backend_rows.append({
"person_key": person_key,
"speaker_id": person["speaker_id"],
"speaker_name": person["speaker_name"],
"image_path": str(person["image_path"]),
"reference_crop_path": crop_path,
"deepface_detector_backend": detector_backend,
"deepface_represent_backend": represent_backend,
"embedding_dim": int(embedding.shape[0]),
})
reference_backend_df = pd.DataFrame(reference_backend_rows)
log("Sampling frames from video with transcript-aware dense speech sampling")
face_frame_dir.mkdir(parents=True, exist_ok=True)
face_crop_dir.mkdir(parents=True, exist_ok=True)
for old_frame in face_frame_dir.glob("face_frame_*.jpg"):
old_frame.unlink()
for old_frame in face_frame_dir.glob("face_frame_resized_*.jpg"):
old_frame.unlink()
for old_crop in face_crop_dir.glob("face_crop_*.jpg"):
old_crop.unlink()
video_duration = run_ffprobe_duration(video_path)
if video_duration is None:
video_duration = get_video_duration_with_opencv(video_path)
if video_duration is None or video_duration <= 0:
raise RuntimeError("Could not determine video duration with ffprobe or OpenCV.")
log(f"Video duration seconds: {video_duration}")
def _coerce_float_or_none(value):
try:
if value is None or pd.isna(value):
return None
except Exception:
if value is None:
return None
try:
value = float(value)
except Exception:
return None
if not np.isfinite(value):
return None
return value
def _add_sample(samples, timestamp, source, source_segment_index=None, source_start=None, source_end=None):
timestamp = _coerce_float_or_none(timestamp)
if timestamp is None:
return
timestamp = max(0.0, min(float(video_duration) - 0.001, float(timestamp)))
if timestamp < 0 or timestamp >= float(video_duration):
return
samples.append({
"frame_time": float(timestamp),
"sampling_source": str(source),
"source_segment_index": None if source_segment_index is None else int(source_segment_index),
"source_start": None if source_start is None else float(source_start),
"source_end": None if source_end is None else float(source_end),
})
def _add_uniform_samples(samples, fps, source):
fps = max(0.01, float(fps))
step = 1.0 / fps
for timestamp in np.arange(0.0, max(0.0, float(video_duration) - 0.05), step):
_add_sample(samples, float(timestamp), source)
def build_transcript_aware_sampling_plan(transcript_df):
"""
Build a frame-sampling plan optimized for mouth-open detection.
The previous tutorial sampled the whole video uniformly. That can miss open
mouths because speech mouth openings are brief. This plan samples densely
inside diarized/ASR turns and adds small temporal offsets around each sample.
It also preserves a low-rate global baseline when requested.
"""
strategy = str(FACE_SAMPLING_STRATEGY or "hybrid_speech_uniform").lower().replace("-", "_")
samples = []
if strategy in {"uniform", "global", "global_uniform"}:
_add_uniform_samples(samples, FACE_SAMPLE_FPS, "uniform_global")
else:
if strategy in {"hybrid_speech_uniform", "hybrid", "speech_plus_global"}:
_add_uniform_samples(samples, FACE_GLOBAL_SAMPLE_FPS, "global_baseline")
if transcript_df is None or transcript_df.empty:
log("Transcript table is empty; falling back to uniform global sampling")
_add_uniform_samples(samples, FACE_SAMPLE_FPS, "uniform_fallback_no_transcript")
else:
offsets = []
for value in FACE_SPEECH_SAMPLE_OFFSET_SECONDS:
try:
offsets.append(float(value))
except Exception:
pass
if not offsets:
offsets = [0.0]
speech_fps = max(0.01, float(FACE_SPEECH_SAMPLE_FPS))
speech_step = 1.0 / speech_fps
turn_pad = max(0.0, float(FACE_SPEECH_TURN_PAD_SECONDS))
for row_number, row in enumerate(transcript_df.to_dict(orient="records")):
start = _coerce_float_or_none(row.get("start"))
end = _coerce_float_or_none(row.get("end"))
if start is None or end is None or end <= start:
continue
text_value = str(row.get("text", "") or "").strip()
if FACE_SAMPLE_ONLY_TURNS_WITH_TEXT and not text_value:
continue
segment_index = row.get("segment_index", row.get("asr_index", row_number))
try:
segment_index = int(segment_index)
except Exception:
segment_index = int(row_number)
window_start = max(0.0, float(start) - turn_pad)
window_end = min(float(video_duration) - 0.001, float(end) + turn_pad)
if window_end <= window_start:
continue
# Always include anchors, because these are easy to inspect and
# help diagnose whether landmarks work on each turn.
anchor_times = [
window_start,
float(start),
float(start) + 0.25 * (float(end) - float(start)),
float(start) + 0.50 * (float(end) - float(start)),
float(start) + 0.75 * (float(end) - float(start)),
float(end),
window_end,
]
for anchor in anchor_times:
_add_sample(samples, anchor, "speech_turn_anchor", segment_index, start, end)
# Dense speech window with small jitter/offset samples. This is
# the important part for catching syllable-level mouth openings.
for base_time in np.arange(window_start, window_end + 1e-9, speech_step):
for offset in offsets:
_add_sample(samples, float(base_time) + float(offset), "speech_turn_dense_offset", segment_index, start, end)
if not samples:
_add_sample(samples, 0.0, "emergency_zero_timestamp")
# Sort and de-duplicate. Keep speech samples when duplicate timestamps occur.
priority = {
"speech_turn_dense_offset": 0,
"speech_turn_anchor": 1,
"global_baseline": 2,
"uniform_global": 3,
}
samples = sorted(samples, key=lambda item: (float(item["frame_time"]), priority.get(item["sampling_source"], 9)))
deduped = []
min_gap = max(0.0, float(FACE_SAMPLE_MIN_GAP_SECONDS))
for sample in samples:
if not deduped:
deduped.append(sample)
continue
if abs(float(sample["frame_time"]) - float(deduped[-1]["frame_time"])) >= min_gap:
deduped.append(sample)
else:
# If two samples are nearly identical, keep the one that is more
# speech-focused, because mouth openness is our goal here.
current_priority = priority.get(sample["sampling_source"], 9)
previous_priority = priority.get(deduped[-1]["sampling_source"], 9)
if current_priority < previous_priority:
deduped[-1] = sample
if FACE_MAX_SAMPLED_FRAMES is not None and len(deduped) > int(FACE_MAX_SAMPLED_FRAMES):
max_frames = int(FACE_MAX_SAMPLED_FRAMES)
log(f"Sampling plan has {len(deduped)} frames; reducing to FACE_MAX_SAMPLED_FRAMES={max_frames}")
# Evenly thin the sorted plan. This preserves the full temporal span.
keep_indices = np.linspace(0, len(deduped) - 1, max_frames).round().astype(int)
keep_indices = sorted(set(int(i) for i in keep_indices))
deduped = [deduped[i] for i in keep_indices]
for i, sample in enumerate(deduped):
sample["sample_plan_index"] = int(i)
return deduped
sampling_plan = build_transcript_aware_sampling_plan(df_named)
log(
"Face sampling strategy: "
f"{FACE_SAMPLING_STRATEGY}; requested/scheduled frames={len(sampling_plan)}; "
f"speech_fps={FACE_SPEECH_SAMPLE_FPS}; global_fps={FACE_GLOBAL_SAMPLE_FPS}; "
f"offsets={FACE_SPEECH_SAMPLE_OFFSET_SECONDS}"
)
frame_manifest_rows = []
saved_frame_paths = []
failed_frame_rows = []
for saved_idx, sample in enumerate(sampling_plan):
timestamp = float(sample["frame_time"])
frame_path = face_frame_dir / f"face_frame_{saved_idx:06d}.jpg"
ok, backend, stderr_text = extract_frame_at_timestamp(video_path, timestamp, frame_path)
if ok:
saved_frame_paths.append(frame_path)
frame_manifest_rows.append({
"saved_frame_index": saved_idx,
"sample_plan_index": int(sample.get("sample_plan_index", saved_idx)),
"frame_time": float(timestamp),
"sampling_source": sample.get("sampling_source"),
"source_segment_index": sample.get("source_segment_index"),
"source_start": sample.get("source_start"),
"source_end": sample.get("source_end"),
"frame_path": str(frame_path),
"extract_backend": backend,
"file_size": frame_path.stat().st_size,
})
else:
failed_frame_rows.append({
"requested_frame_index": saved_idx,
"sample_plan_index": int(sample.get("sample_plan_index", saved_idx)),
"frame_time": float(timestamp),
"sampling_source": sample.get("sampling_source"),
"source_segment_index": sample.get("source_segment_index"),
"source_start": sample.get("source_start"),
"source_end": sample.get("source_end"),
"frame_path": str(frame_path),
"stderr": stderr_text[-1000:] if stderr_text else "",
})
if saved_idx % 25 == 0:
log(f"Frame extraction progress: {saved_idx + 1}/{len(sampling_plan)} | saved={len(saved_frame_paths)} | failed={len(failed_frame_rows)}")
frame_manifest_df = pd.DataFrame(frame_manifest_rows)
frame_manifest_df.to_csv(face_frame_manifest_csv_path, index=False, encoding="utf-8-sig")
if failed_frame_rows:
failed_frame_df = pd.DataFrame(failed_frame_rows)
failed_frames_csv_path = base / "result_macron_trump_face_failed_frame_extractions.csv"
failed_frame_df.to_csv(failed_frames_csv_path, index=False, encoding="utf-8-sig")
if not saved_frame_paths:
raise RuntimeError(f"No frames were saved to {face_frame_dir}.")
log(f"Saved {len(saved_frame_paths)} sampled frames to {face_frame_dir}")
log("Running DeepFace extraction, representation, and matching on sampled frames")
face_match_rows = []
reference_person_keys = list(reference_face_embeddings.keys())
reference_embedding_matrix = [reference_face_embeddings[key] for key in reference_person_keys]
frames_with_faces = 0
total_detected_faces = 0
total_encoded_faces = 0
total_strict_matches = 0
total_usable_candidates = 0
backend_usage_counts = {}
represent_backend_usage_counts = {}
for manifest_row in frame_manifest_df.itertuples(index=False):
saved_frame_index = int(manifest_row.saved_frame_index)
frame_time = float(manifest_row.frame_time)
frame_path = Path(manifest_row.frame_path)
resized_frame_path = face_frame_dir / f"face_frame_resized_{saved_frame_index:06d}.jpg"
frame_for_deepface = resize_image_file_for_deepface(frame_path, resized_frame_path, FACE_RESIZE_MAX_WIDTH)
faces, detector_backend, error_log = deepface_extract_faces(frame_for_deepface, DEEPFACE_DETECTOR_BACKENDS, DEEPFACE_ALIGN)
if detector_backend is not None:
backend_usage_counts[detector_backend] = backend_usage_counts.get(detector_backend, 0) + 1
if faces:
frames_with_faces += 1
total_detected_faces += len(faces)
if not faces:
face_match_rows.append({
"saved_frame_index": saved_frame_index, "frame_time": frame_time, "frame_path": str(frame_path),
"frame_for_deepface": str(frame_for_deepface), "face_index": None, "deepface_model": DEEPFACE_MODEL_NAME,
"detector_backend": detector_backend, "represent_backend": None, "crop_path": None, "facial_area": "{}",
"face_confidence": None, "best_candidate_person_key": None, "best_candidate_speaker_id": None,
"best_candidate_speaker_name": None, "best_candidate_distance": None, "best_strict_person_key": None,
"best_strict_speaker_id": None, "best_strict_speaker_name": None, "best_strict_distance": None,
"is_strict_match": False, "is_usable_candidate": False, "all_distances": "{}",
"failure_reason": "deepface_no_face_detected", "deepface_errors": json.dumps(error_log, ensure_ascii=False),
})
continue
for face_idx, face_obj in enumerate(faces):
facial_area = face_obj.get("facial_area", {}) or {}
crop_path = None
if facial_area:
crop_path_candidate = face_crop_dir / f"face_crop_frame_{saved_frame_index:06d}_face_{face_idx:02d}.jpg"
crop_path = crop_with_margin(frame_for_deepface, facial_area, crop_path_candidate, FACE_CROP_EXPAND_MARGIN)
if crop_path is None or not Path(crop_path).exists():
face_match_rows.append({
"saved_frame_index": saved_frame_index, "frame_time": frame_time, "frame_path": str(frame_path),
"frame_for_deepface": str(frame_for_deepface), "face_index": face_idx, "deepface_model": DEEPFACE_MODEL_NAME,
"detector_backend": detector_backend, "represent_backend": None, "crop_path": None,
"facial_area": json.dumps(facial_area, ensure_ascii=False),
"face_confidence": face_obj.get("confidence", face_obj.get("face_confidence")),
"best_candidate_person_key": None, "best_candidate_speaker_id": None, "best_candidate_speaker_name": None,
"best_candidate_distance": None, "best_strict_person_key": None, "best_strict_speaker_id": None,
"best_strict_speaker_name": None, "best_strict_distance": None, "is_strict_match": False,
"is_usable_candidate": False, "all_distances": "{}", "failure_reason": "face_detected_but_crop_failed",
"deepface_errors": json.dumps(error_log, ensure_ascii=False),
})
continue
embedding, represent_backend, represent_errors = represent_crop_with_deepface(crop_path)
if represent_backend is not None:
represent_backend_usage_counts[represent_backend] = represent_backend_usage_counts.get(represent_backend, 0) + 1
if embedding is None:
face_match_rows.append({
"saved_frame_index": saved_frame_index, "frame_time": frame_time, "frame_path": str(frame_path),
"frame_for_deepface": str(frame_for_deepface), "face_index": face_idx, "deepface_model": DEEPFACE_MODEL_NAME,
"detector_backend": detector_backend, "represent_backend": represent_backend, "crop_path": str(crop_path),
"facial_area": json.dumps(facial_area, ensure_ascii=False),
"face_confidence": face_obj.get("confidence", face_obj.get("face_confidence")),
"best_candidate_person_key": None, "best_candidate_speaker_id": None, "best_candidate_speaker_name": None,
"best_candidate_distance": None, "best_strict_person_key": None, "best_strict_speaker_id": None,
"best_strict_speaker_name": None, "best_strict_distance": None, "is_strict_match": False,
"is_usable_candidate": False, "all_distances": "{}", "failure_reason": "face_detected_but_representation_failed",
"deepface_errors": json.dumps(represent_errors, ensure_ascii=False),
})
continue
total_encoded_faces += 1
distances = [cosine_distance(embedding, ref_embedding) for ref_embedding in reference_embedding_matrix]
best_index = int(np.argmin(distances))
best_person_key = reference_person_keys[best_index]
best_distance = float(distances[best_index])
is_strict_match = bool(best_distance <= FACE_DISTANCE_THRESHOLD)
is_usable_candidate = bool(best_distance <= FACE_CANDIDATE_MAX_DISTANCE)
if is_strict_match:
total_strict_matches += 1
if is_usable_candidate:
total_usable_candidates += 1
candidate_speaker_id = reference_faces[best_person_key]["speaker_id"]
candidate_speaker_name = reference_faces[best_person_key]["speaker_name"]
if is_strict_match:
strict_person_key = best_person_key
strict_speaker_id = candidate_speaker_id
strict_speaker_name = candidate_speaker_name
strict_distance = best_distance
else:
strict_person_key = None
strict_speaker_id = None
strict_speaker_name = None
strict_distance = None
all_distances = {reference_faces[key]["speaker_name"]: float(distances[i]) for i, key in enumerate(reference_person_keys)}
face_match_rows.append({
"saved_frame_index": saved_frame_index, "frame_time": frame_time, "frame_path": str(frame_path),
"frame_for_deepface": str(frame_for_deepface), "face_index": face_idx, "deepface_model": DEEPFACE_MODEL_NAME,
"detector_backend": detector_backend, "represent_backend": represent_backend, "crop_path": str(crop_path),
"facial_area": json.dumps(facial_area, ensure_ascii=False),
"face_confidence": face_obj.get("confidence", face_obj.get("face_confidence")),
"best_candidate_person_key": best_person_key, "best_candidate_speaker_id": candidate_speaker_id,
"best_candidate_speaker_name": candidate_speaker_name, "best_candidate_distance": best_distance,
"best_strict_person_key": strict_person_key, "best_strict_speaker_id": strict_speaker_id,
"best_strict_speaker_name": strict_speaker_name, "best_strict_distance": strict_distance,
"is_strict_match": is_strict_match, "is_usable_candidate": is_usable_candidate,
"all_distances": json.dumps(all_distances, ensure_ascii=False),
"failure_reason": None, "deepface_errors": json.dumps(error_log, ensure_ascii=False),
})
face_matches_df = pd.DataFrame(face_match_rows)
log("Estimating mouth openness for detected face crops")
mouth_columns = [
"mouth_open_backend",
"mouth_open_state",
"mouth_open_is_open",
"mouth_open_is_closed",
"mouth_open_speech_cue",
"mouth_open_strict_is_open",
"mouth_open_score",
"mouth_open_threshold",
"mouth_open_state_threshold",
"mouth_open_strict_threshold",
"mouth_open_state_confidence",
"mouth_open_state_rule",
"mouth_open_vertical_distance",
"mouth_open_horizontal_distance",
"mouth_open_landmarks",
"mouth_open_debug_image_path",
"mouth_open_error",
]
for col in mouth_columns:
if col not in face_matches_df.columns:
face_matches_df[col] = None
if not face_matches_df.empty:
for idx, match_row in face_matches_df.iterrows():
mouth_result = estimate_mouth_open_for_match_row(match_row)
for key, value in mouth_result.items():
face_matches_df.at[idx, key] = value
mouth_open_observation_count_total = int((face_matches_df["mouth_open_is_open"] == True).sum()) if "mouth_open_is_open" in face_matches_df.columns else 0
mouth_closed_observation_count_total = int((face_matches_df["mouth_open_is_closed"] == True).sum()) if "mouth_open_is_closed" in face_matches_df.columns else 0
mouth_unknown_observation_count_total = int((face_matches_df["mouth_open_state"].astype(str) == "unknown").sum()) if "mouth_open_state" in face_matches_df.columns else 0
log(f"Per-face mouth states: open={mouth_open_observation_count_total}, closed={mouth_closed_observation_count_total}, unknown={mouth_unknown_observation_count_total}")
# Save an explicit per-face, per-frame mouth-state diagnostic table. This table
# reports open/closed/unknown for every detected face in every sampled frame.
mouth_debug_columns = [
"saved_frame_index", "frame_time", "frame_path", "frame_for_deepface",
"face_index", "crop_path", "facial_area", "face_confidence",
"best_candidate_speaker_id", "best_candidate_speaker_name", "best_candidate_distance",
"is_strict_match", "is_usable_candidate",
"mouth_open_state", "mouth_open_is_open", "mouth_open_is_closed",
"mouth_open_speech_cue", "mouth_open_strict_is_open",
"mouth_open_score", "mouth_open_threshold", "mouth_open_state_threshold",
"mouth_open_strict_threshold", "mouth_open_state_confidence",
"mouth_open_backend", "mouth_open_vertical_distance",
"mouth_open_horizontal_distance", "mouth_open_state_rule",
"mouth_open_debug_image_path", "mouth_open_error", "failure_reason",
]
mouth_debug_columns = [col for col in mouth_debug_columns if col in face_matches_df.columns]
try:
mouth_debug_df = face_matches_df[face_matches_df["face_index"].notna()].copy()
if not mouth_debug_df.empty:
sort_cols = [col for col in ["frame_time", "saved_frame_index", "face_index"] if col in mouth_debug_df.columns]
if sort_cols:
mouth_debug_df = mouth_debug_df.sort_values(sort_cols)
mouth_debug_df[mouth_debug_columns].to_csv(MOUTH_OPEN_DEBUG_PER_FACE_CSV_PATH, index=False, encoding="utf-8-sig")
log(f"Wrote per-face mouth-state debug CSV: {MOUTH_OPEN_DEBUG_PER_FACE_CSV_PATH}")
except Exception as exc:
log(f"Could not write per-face mouth-state debug CSV: {repr(exc)}")
face_matches_df.to_csv(face_match_csv_path, index=False, encoding="utf-8-sig")
log(f"Wrote frame-level DeepFace matches: {face_match_csv_path}")
log(f"Frames saved: {len(saved_frame_paths)} | frames_with_faces={frames_with_faces} | encoded_faces={total_encoded_faces} | strict={total_strict_matches} | usable={total_usable_candidates}")
log("Aggregating DeepFace evidence by transcript segment")
# candidate_faces_df contains faces that DeepFace could match to one of the
# supplied reference identities closely enough to be useful. This is the table
# used for identity evidence.
candidate_faces_df = face_matches_df[
face_matches_df["best_candidate_speaker_id"].notna()
& (
(face_matches_df["is_strict_match"] == True)
| (USE_BEST_FACE_CANDIDATE_EVEN_IF_NOT_STRICT & (face_matches_df["is_usable_candidate"] == True))
)
].copy()
# all_detected_faces_df keeps all detected/analyzed faces, including faces that
# were detected but could not be represented or did not pass the usable-candidate
# distance threshold. This lets the final CSV report both "all detected faces"
# and "all matched faces" for each transcript segment.
all_detected_faces_df = face_matches_df[
face_matches_df["face_index"].notna()
].copy()
def json_ready_value(value):
"""Convert NumPy/pandas values to JSON-safe Python scalars."""
if value is None:
return None
try:
if pd.isna(value):
return None
except Exception:
pass
try:
if isinstance(value, np.generic):
return value.item()
except Exception:
pass
if isinstance(value, (np.ndarray, list, tuple)):
return [json_ready_value(v) for v in value]
return value
def dataframe_records_for_json(df, columns):
"""Return selected DataFrame columns as JSON-safe records."""
available_columns = [col for col in columns if col in df.columns]
records = []
for record in df[available_columns].to_dict(orient="records"):
records.append({
key: json_ready_value(value)
for key, value in record.items()
})
return records
def summarize_all_face_matches(grouped_df):
"""
Human-readable summary for the final transcript column.
Example:
Emmanuel Macron (usable=4, strict=3, min=0.241); Donald J. Trump (...)
"""
if grouped_df.empty:
return ""
parts = []
for row in grouped_df.itertuples(index=False):
name = getattr(row, "best_candidate_speaker_name")
usable = int(getattr(row, "face_candidate_count"))
strict = int(getattr(row, "face_strict_match_count"))
min_distance = getattr(row, "face_min_distance")
if min_distance is None or pd.isna(min_distance):
min_text = "NA"
else:
min_text = f"{float(min_distance):.3f}"
parts.append(
f"{name} (usable={usable}, strict={strict}, min_distance={min_text})"
)
return "; ".join(parts)
def make_count_by_speaker(grouped_df):
"""JSON-friendly speaker-count summary keyed by speaker name."""
if grouped_df.empty:
return {}
out = {}
for row in grouped_df.itertuples(index=False):
speaker_name = str(getattr(row, "best_candidate_speaker_name"))
out[speaker_name] = {
"speaker_id": getattr(row, "best_candidate_speaker_id"),
"usable_candidate_count": int(getattr(row, "face_candidate_count")),
"strict_match_count": int(getattr(row, "face_strict_match_count")),
"mean_distance": json_ready_value(getattr(row, "face_mean_distance")),
"min_distance": json_ready_value(getattr(row, "face_min_distance")),
}
return out
def compute_mouth_open_segment_summary(segment_faces):
"""
Aggregate mouth-open evidence for one transcript segment.
Important change for speech videos:
- MOUTH_OPEN_THRESHOLD remains the strict mauckc threshold, usually 0.79.
- MOUTH_SPEAKING_CUE_THRESHOLD is a softer threshold used for ordinary
speaking-mouth movement.
- If neither threshold is reached, the optional best-scored cue reports the
highest matched/scored face so you can tune thresholds instead of seeing
only "No mouth-open cue detected".
"""
default = {
"mouth_open_speaker_ids": "[]",
"mouth_open_speaker_names": "No mouth-open cue detected",
"mouth_open_count_by_speaker": "{}",
"mouth_open_score_by_speaker": "{}",
"mouth_open_observation_count": 0,
"mouth_open_strict_observation_count": 0,
"mouth_open_cue_observation_count": 0,
"mouth_open_closed_observation_count": 0,
"mouth_open_scored_observation_count": 0,
"mouth_open_max_score": 0.0,
"mouth_open_all_detail": "[]",
"mouth_open_detection_mode": "none",
"visual_active_speaker_id": "none",
"visual_active_speaker_name": "No visual active-speaker cue",
"visual_active_speaker_confidence": 0.0,
}
if segment_faces.empty or "mouth_open_score" not in segment_faces.columns:
return default
valid = segment_faces[segment_faces["mouth_open_score"].notna()].copy()
if valid.empty:
return default
valid["mouth_open_score"] = pd.to_numeric(valid["mouth_open_score"], errors="coerce")
valid = valid[valid["mouth_open_score"].notna()].copy()
if valid.empty:
return default
# Prefer matched identities. If none are available, we still keep diagnostics
# in mouth_open_all_detail, but do not invent a visual active speaker name.
matched_valid = valid[valid["best_candidate_speaker_id"].notna()].copy()
evidence_pool = matched_valid if not matched_valid.empty else valid.copy()
# Keep all scored mouth observations in the detail field, including closed mouths.
mouth_detail_columns = [
"saved_frame_index", "frame_time", "frame_path", "face_index", "crop_path",
"best_candidate_speaker_id", "best_candidate_speaker_name",
"best_candidate_distance", "is_strict_match", "is_usable_candidate",
"mouth_open_backend", "mouth_open_state", "mouth_open_is_open",
"mouth_open_is_closed", "mouth_open_speech_cue", "mouth_open_strict_is_open",
"mouth_open_score", "mouth_open_threshold", "mouth_open_state_threshold",
"mouth_open_strict_threshold", "mouth_open_state_confidence", "mouth_open_state_rule",
"mouth_open_vertical_distance",
"mouth_open_horizontal_distance", "mouth_open_debug_image_path", "mouth_open_error",
]
sort_cols = [col for col in ["frame_time", "saved_frame_index", "face_index"] if col in valid.columns]
valid_for_detail = valid.sort_values(sort_cols) if sort_cols else valid
all_mouth_detail = dataframe_records_for_json(valid_for_detail, mouth_detail_columns)
strict_faces = evidence_pool[evidence_pool.get("mouth_open_strict_is_open", evidence_pool["mouth_open_is_open"]) == True].copy()
open_state_faces = evidence_pool[evidence_pool["mouth_open_is_open"] == True].copy()
closed_state_faces = evidence_pool[evidence_pool.get("mouth_open_is_closed", False) == True].copy()
cue_faces = evidence_pool[evidence_pool["mouth_open_score"] >= float(MOUTH_SPEAKING_CUE_THRESHOLD)].copy()
selected_faces = None
detection_mode = "none"
if not strict_faces.empty:
selected_faces = strict_faces
detection_mode = "strict_open_mouth"
elif not open_state_faces.empty:
selected_faces = open_state_faces
detection_mode = "per_face_open_state"
elif not cue_faces.empty:
selected_faces = cue_faces
detection_mode = "speech_mouth_cue"
elif MOUTH_OPEN_REPORT_BEST_SCORED_FACE:
best_pool = evidence_pool.sort_values("mouth_open_score", ascending=False).head(1).copy()
if not best_pool.empty and float(best_pool.iloc[0]["mouth_open_score"]) >= float(MOUTH_OPEN_MIN_SCORE_FOR_BEST_CUE):
selected_faces = best_pool
detection_mode = "best_scored_diagnostic_cue"
if selected_faces is None or selected_faces.empty:
default["mouth_open_max_score"] = float(valid["mouth_open_score"].max())
default["mouth_open_scored_observation_count"] = int(len(valid))
default["mouth_open_all_detail"] = json.dumps(all_mouth_detail, ensure_ascii=False)
return default
# If selected faces are unmatched, report the evidence but avoid inventing names.
selected_matched = selected_faces[selected_faces["best_candidate_speaker_id"].notna()].copy()
if selected_matched.empty:
default["mouth_open_max_score"] = float(valid["mouth_open_score"].max())
default["mouth_open_scored_observation_count"] = int(len(valid))
default["mouth_open_detection_mode"] = detection_mode
default["mouth_open_all_detail"] = json.dumps(all_mouth_detail, ensure_ascii=False)
return default
grouped = (
selected_matched
.groupby(["best_candidate_speaker_id", "best_candidate_speaker_name"], dropna=True)
.agg(
mouth_open_count=("mouth_open_score", "size"),
mouth_open_mean_score=("mouth_open_score", "mean"),
mouth_open_max_score=("mouth_open_score", "max"),
mouth_open_min_distance=("best_candidate_distance", "min"),
)
.reset_index()
.sort_values(
["mouth_open_count", "mouth_open_max_score", "mouth_open_mean_score", "mouth_open_min_distance"],
ascending=[False, False, False, True],
)
)
best = grouped.iloc[0]
count_by_speaker = {}
score_by_speaker = {}
summary_parts = []
for speaker_row in grouped.itertuples(index=False):
speaker_name = str(getattr(speaker_row, "best_candidate_speaker_name"))
count_by_speaker[speaker_name] = {
"speaker_id": getattr(speaker_row, "best_candidate_speaker_id"),
"mouth_cue_count": int(getattr(speaker_row, "mouth_open_count")),
"detection_mode": detection_mode,
}
score_by_speaker[speaker_name] = {
"speaker_id": getattr(speaker_row, "best_candidate_speaker_id"),
"mean_score": json_ready_value(getattr(speaker_row, "mouth_open_mean_score")),
"max_score": json_ready_value(getattr(speaker_row, "mouth_open_max_score")),
"strict_threshold": float(MOUTH_OPEN_THRESHOLD),
"speaking_cue_threshold": float(MOUTH_SPEAKING_CUE_THRESHOLD),
"min_face_distance": json_ready_value(getattr(speaker_row, "mouth_open_min_distance")),
"detection_mode": detection_mode,
}
summary_parts.append(
f"{speaker_name} ({detection_mode}, n={int(getattr(speaker_row, 'mouth_open_count'))}, "
f"max_score={float(getattr(speaker_row, 'mouth_open_max_score')):.3f})"
)
max_score = float(best["mouth_open_max_score"])
if detection_mode == "strict_open_mouth":
denom = max(float(MOUTH_OPEN_THRESHOLD), 1e-6)
elif detection_mode in {"speech_mouth_cue", "per_face_open_state"}:
denom = max(float(MOUTH_SPEAKING_CUE_THRESHOLD), 1e-6)
else:
denom = max(float(MOUTH_OPEN_MIN_SCORE_FOR_BEST_CUE), 1e-6)
confidence = float(min(1.0, max_score / denom))
return {
"mouth_open_speaker_ids": json.dumps([str(x) for x in grouped["best_candidate_speaker_id"].dropna().tolist()], ensure_ascii=False),
"mouth_open_speaker_names": "; ".join(summary_parts),
"mouth_open_count_by_speaker": json.dumps(count_by_speaker, ensure_ascii=False),
"mouth_open_score_by_speaker": json.dumps(score_by_speaker, ensure_ascii=False),
"mouth_open_observation_count": int(len(selected_matched)),
"mouth_open_strict_observation_count": int(len(strict_faces)),
"mouth_open_cue_observation_count": int(len(cue_faces)),
"mouth_open_closed_observation_count": int(len(closed_state_faces)),
"mouth_open_scored_observation_count": int(len(valid)),
"mouth_open_max_score": max_score,
"mouth_open_all_detail": json.dumps(all_mouth_detail, ensure_ascii=False),
"mouth_open_detection_mode": detection_mode,
"visual_active_speaker_id": best["best_candidate_speaker_id"],
"visual_active_speaker_name": best["best_candidate_speaker_name"],
"visual_active_speaker_confidence": confidence,
}
face_detail_columns = [
"saved_frame_index",
"frame_time",
"frame_path",
"face_index",
"crop_path",
"detector_backend",
"represent_backend",
"facial_area",
"face_confidence",
"best_candidate_person_key",
"best_candidate_speaker_id",
"best_candidate_speaker_name",
"best_candidate_distance",
"best_strict_person_key",
"best_strict_speaker_id",
"best_strict_speaker_name",
"best_strict_distance",
"is_strict_match",
"is_usable_candidate",
"all_distances",
"mouth_open_backend",
"mouth_open_state",
"mouth_open_is_open",
"mouth_open_is_closed",
"mouth_open_speech_cue",
"mouth_open_strict_is_open",
"mouth_open_score",
"mouth_open_threshold",
"mouth_open_state_threshold",
"mouth_open_strict_threshold",
"mouth_open_state_confidence",
"mouth_open_state_rule",
"mouth_open_vertical_distance",
"mouth_open_horizontal_distance",
"mouth_open_error",
"failure_reason",
]
segment_face_rows = []
for row in df_named.itertuples(index=False):
segment_index = int(row.segment_index)
start = float(row.start)
end = float(row.end)
# Frames/faces analyzed during this transcript segment, regardless of whether
# the detected face matched a reference person closely enough.
segment_all_detected_faces = all_detected_faces_df[
(all_detected_faces_df["frame_time"] >= start)
& (all_detected_faces_df["frame_time"] <= end)
].copy()
# Usable matched faces during this transcript segment.
segment_faces = candidate_faces_df[
(candidate_faces_df["frame_time"] >= start)
& (candidate_faces_df["frame_time"] <= end)
].copy()
analyzed_frame_count = int(
face_matches_df[
(face_matches_df["frame_time"] >= start)
& (face_matches_df["frame_time"] <= end)
]["saved_frame_index"].nunique()
)
frames_with_detected_faces = int(
segment_all_detected_faces["saved_frame_index"].nunique()
) if not segment_all_detected_faces.empty else 0
face_all_detected_count = int(len(segment_all_detected_faces))
face_all_usable_candidate_count = int(len(segment_faces))
face_all_strict_match_count = int(segment_faces["is_strict_match"].sum()) if not segment_faces.empty else 0
all_matches_detail = dataframe_records_for_json(
segment_faces.sort_values(
["frame_time", "saved_frame_index", "face_index", "best_candidate_distance"],
ascending=[True, True, True, True],
),
face_detail_columns,
)
mouth_summary = compute_mouth_open_segment_summary(segment_faces)
if segment_faces.empty:
segment_face_rows.append({
"segment_index": segment_index,
"face_best_speaker_id": None,
"face_best_speaker_name": None,
"face_match_count": 0,
"face_strict_match_count": 0,
"face_candidate_count": 0,
"face_mean_distance": None,
"face_min_distance": None,
"face_evidence_detail": "{}",
"face_all_speaker_ids": "[]",
"face_all_speaker_names": "No matched faces in segment",
"face_all_match_count_by_speaker": "{}",
"face_all_matches_detail": json.dumps(all_matches_detail, ensure_ascii=False),
"face_all_detected_count": face_all_detected_count,
"face_all_usable_candidate_count": face_all_usable_candidate_count,
"face_all_strict_match_count": face_all_strict_match_count,
"face_analyzed_frame_count": analyzed_frame_count,
"face_frames_with_detected_faces": frames_with_detected_faces,
**mouth_summary,
})
continue
grouped = (
segment_faces
.groupby(["best_candidate_speaker_id", "best_candidate_speaker_name"], dropna=True)
.agg(
face_candidate_count=("best_candidate_speaker_id", "size"),
face_strict_match_count=("is_strict_match", "sum"),
face_mean_distance=("best_candidate_distance", "mean"),
face_min_distance=("best_candidate_distance", "min"),
)
.reset_index()
.sort_values(
["face_strict_match_count", "face_candidate_count", "face_mean_distance", "face_min_distance"],
ascending=[False, False, True, True],
)
)
best = grouped.iloc[0]
all_speaker_ids = [
str(value)
for value in grouped["best_candidate_speaker_id"].dropna().tolist()
]
all_speaker_names = [
str(value)
for value in grouped["best_candidate_speaker_name"].dropna().tolist()
]
count_by_speaker = make_count_by_speaker(grouped)
all_faces_summary = summarize_all_face_matches(grouped)
segment_face_rows.append({
"segment_index": segment_index,
"face_best_speaker_id": best["best_candidate_speaker_id"],
"face_best_speaker_name": best["best_candidate_speaker_name"],
"face_match_count": int(best["face_candidate_count"]),
"face_strict_match_count": int(best["face_strict_match_count"]),
"face_candidate_count": int(best["face_candidate_count"]),
"face_mean_distance": float(best["face_mean_distance"]),
"face_min_distance": float(best["face_min_distance"]),
# Existing column: compact grouped speaker-level summary.
"face_evidence_detail": json.dumps(
dataframe_records_for_json(grouped, list(grouped.columns)),
ensure_ascii=False,
),
# New columns: all usable matched faces and all identities observed in the
# frames that fall inside the transcript segment.
"face_all_speaker_ids": json.dumps(all_speaker_ids, ensure_ascii=False),
"face_all_speaker_names": all_faces_summary,
"face_all_match_count_by_speaker": json.dumps(count_by_speaker, ensure_ascii=False),
"face_all_matches_detail": json.dumps(all_matches_detail, ensure_ascii=False),
"face_all_detected_count": face_all_detected_count,
"face_all_usable_candidate_count": face_all_usable_candidate_count,
"face_all_strict_match_count": face_all_strict_match_count,
"face_analyzed_frame_count": analyzed_frame_count,
"face_frames_with_detected_faces": frames_with_detected_faces,
**mouth_summary,
})
face_evidence_df = pd.DataFrame(segment_face_rows)
face_evidence_df.to_csv(face_evidence_csv_path, index=False, encoding="utf-8-sig")
log(f"Wrote segment-level DeepFace evidence: {face_evidence_csv_path}")
face_cols_to_drop = [
"face_best_speaker_id", "face_best_speaker_name", "face_match_count",
"face_strict_match_count", "face_candidate_count", "face_mean_distance",
"face_min_distance", "face_evidence_detail",
"face_all_speaker_ids", "face_all_speaker_names", "face_all_match_count_by_speaker",
"face_all_matches_detail", "face_all_detected_count", "face_all_usable_candidate_count",
"face_all_strict_match_count", "face_analyzed_frame_count", "face_frames_with_detected_faces",
"mouth_open_speaker_ids", "mouth_open_speaker_names", "mouth_open_count_by_speaker",
"mouth_open_score_by_speaker", "mouth_open_observation_count",
"mouth_open_strict_observation_count", "mouth_open_cue_observation_count",
"mouth_open_closed_observation_count", "mouth_open_scored_observation_count", "mouth_open_max_score",
"mouth_open_all_detail", "mouth_open_detection_mode",
"visual_active_speaker_id", "visual_active_speaker_name",
"visual_active_speaker_confidence",
"voice_face_agree",
"speaker_id_multimodal", "speaker_name_multimodal", "multimodal_identity_source",
"speaker_id_final", "speaker_name_final",
]
df_named = df_named.drop(columns=[col for col in face_cols_to_drop if col in df_named.columns], errors="ignore")
df_named = df_named.merge(face_evidence_df, on="segment_index", how="left")
df_named["voice_face_agree"] = (
df_named["speaker_id"].notna()
& df_named["face_best_speaker_id"].notna()
& (df_named["speaker_id"] == df_named["face_best_speaker_id"])
)
df_named["speaker_id_multimodal"] = df_named["speaker_id"]
df_named["speaker_name_multimodal"] = df_named["speaker_name"]
df_named["multimodal_identity_source"] = "voice"
if USE_FACE_WHEN_VOICE_UNKNOWN:
mask_voice_unknown = df_named["speaker_id"].apply(speaker_is_unknown)
mask_face_available = df_named["face_best_speaker_id"].notna()
fill_mask = mask_voice_unknown & mask_face_available
df_named.loc[fill_mask, "speaker_id_multimodal"] = df_named.loc[fill_mask, "face_best_speaker_id"]
df_named.loc[fill_mask, "speaker_name_multimodal"] = df_named.loc[fill_mask, "face_best_speaker_name"]
df_named.loc[fill_mask, "multimodal_identity_source"] = "face_when_voice_unknown"
if USE_FACE_TO_OVERRIDE_VOICE:
mask_face_available = df_named["face_best_speaker_id"].notna()
df_named.loc[mask_face_available, "speaker_id_multimodal"] = df_named.loc[mask_face_available, "face_best_speaker_id"]
df_named.loc[mask_face_available, "speaker_name_multimodal"] = df_named.loc[mask_face_available, "face_best_speaker_name"]
df_named.loc[mask_face_available, "multimodal_identity_source"] = "face_override"
df_named["speaker_id_final"] = df_named["speaker_id_multimodal"]
df_named["speaker_name_final"] = df_named["speaker_name_multimodal"]
df_named.to_csv(voice_face_csv_output_path, index=False, encoding="utf-8-sig")
face_matching_metadata = {
"method": "deepface_subprocess_timestamp_ffmpeg_extract_faces_represent_crops",
"deepface_model_name": DEEPFACE_MODEL_NAME,
"deepface_detector_backends": DEEPFACE_DETECTOR_BACKENDS,
"deepface_align": DEEPFACE_ALIGN,
"deepface_normalization": DEEPFACE_NORMALIZATION,
"face_sampling_strategy": FACE_SAMPLING_STRATEGY,
"face_sample_fps": FACE_SAMPLE_FPS,
"face_global_sample_fps": FACE_GLOBAL_SAMPLE_FPS,
"face_speech_sample_fps": FACE_SPEECH_SAMPLE_FPS,
"face_speech_turn_pad_seconds": FACE_SPEECH_TURN_PAD_SECONDS,
"face_speech_sample_offset_seconds": FACE_SPEECH_SAMPLE_OFFSET_SECONDS,
"face_sample_min_gap_seconds": FACE_SAMPLE_MIN_GAP_SECONDS,
"face_max_sampled_frames": FACE_MAX_SAMPLED_FRAMES,
"face_sample_only_turns_with_text": FACE_SAMPLE_ONLY_TURNS_WITH_TEXT,
"face_distance_threshold": FACE_DISTANCE_THRESHOLD,
"face_candidate_max_distance": FACE_CANDIDATE_MAX_DISTANCE,
"face_resize_max_width": FACE_RESIZE_MAX_WIDTH,
"face_crop_expand_margin": FACE_CROP_EXPAND_MARGIN,
"use_best_face_candidate_even_if_not_strict": USE_BEST_FACE_CANDIDATE_EVEN_IF_NOT_STRICT,
"mouth_open_detection_enabled": MOUTH_OPEN_DETECTION_ENABLED,
"mouth_open_backend": MOUTH_OPEN_BACKEND,
"mouth_open_threshold": MOUTH_OPEN_THRESHOLD,
"mouth_open_state_threshold_dlib": MOUTH_OPEN_STATE_THRESHOLD_DLIB,
"mouth_open_state_threshold_mediapipe": MOUTH_OPEN_STATE_THRESHOLD_MEDIAPIPE,
"mouth_open_state_threshold_face_recognition": MOUTH_OPEN_STATE_THRESHOLD_FACE_RECOGNITION,
"mouth_open_rect_expand_margin": MOUTH_OPEN_RECT_EXPAND_MARGIN,
"mouth_open_min_detection_confidence": MOUTH_OPEN_MIN_DETECTION_CONFIDENCE,
"mouth_open_observation_count_total": int(mouth_open_observation_count_total),
"mouth_closed_observation_count_total": int(mouth_closed_observation_count_total),
"mouth_unknown_observation_count_total": int(mouth_unknown_observation_count_total),
"use_face_to_override_voice": USE_FACE_TO_OVERRIDE_VOICE,
"use_face_when_voice_unknown": USE_FACE_WHEN_VOICE_UNKNOWN,
"video_duration": float(video_duration),
"frames_requested": len(sampling_plan),
"frames_saved": len(saved_frame_paths),
"frames_with_faces": int(frames_with_faces),
"total_detected_faces": int(total_detected_faces),
"total_encoded_faces": int(total_encoded_faces),
"total_strict_matches": int(total_strict_matches),
"total_usable_candidates": int(total_usable_candidates),
"detector_backend_usage": backend_usage_counts,
"representation_backend_usage": represent_backend_usage_counts,
"reference_faces": {key: {"speaker_id": value["speaker_id"], "speaker_name": value["speaker_name"], "image_path": str(value["image_path"])} for key, value in reference_faces.items()},
"reference_backend_df": reference_backend_df.to_dict(orient="records"),
"frame_manifest_csv": str(face_frame_manifest_csv_path),
"frame_level_face_matches_csv": str(face_match_csv_path),
"segment_level_face_evidence_csv": str(face_evidence_csv_path),
"mouth_open_per_face_debug_csv": str(MOUTH_OPEN_DEBUG_PER_FACE_CSV_PATH),
"face_frame_dir": str(face_frame_dir),
"face_crop_dir": str(face_crop_dir),
}
deepface_metadata_json_path.write_text(json.dumps(face_matching_metadata, ensure_ascii=False, indent=2), encoding="utf-8")
with voice_face_json_output_path.open("w", encoding="utf-8") as f:
json.dump({"video_path": str(video_path), "audio_path": str(audio_path), "face_matching": face_matching_metadata, "final_segments_voice_face_identified": df_named.to_dict(orient="records")}, f, ensure_ascii=False, indent=2)
log(f"Wrote voice + DeepFace CSV: {voice_face_csv_output_path}")
log(f"Wrote voice + DeepFace JSON: {voice_face_json_output_path}")
log(f"Wrote DeepFace metadata JSON: {deepface_metadata_json_path}")
log("DeepFace subprocess complete")
'''
deepface_runner_script_path.write_text(deepface_runner_code, encoding="utf-8")
checkpoint(f"Wrote DeepFace subprocess runner: {deepface_runner_script_path}")
deepface_python_for_subprocess = Path(deepface_config["python_executable"])
if not deepface_python_for_subprocess.exists():
raise FileNotFoundError(
"DeepFace subprocess Python executable does not exist:\n"
f"{deepface_python_for_subprocess}"
)
checkpoint(f"Running DeepFace in subprocess with timeout={DEEPFACE_SUBPROCESS_TIMEOUT_SECONDS}s")
try:
result = subprocess.run(
[
str(deepface_python_for_subprocess),
str(deepface_runner_script_path),
str(deepface_config_path),
],
capture_output=True,
text=True,
timeout=DEEPFACE_SUBPROCESS_TIMEOUT_SECONDS,
env=make_deepface_subprocess_env(),
)
deepface_stdout_log_path = base / "deepface_subprocess_stdout.log"
deepface_stderr_log_path = base / "deepface_subprocess_stderr.log"
deepface_stdout_log_path.write_text(result.stdout or "", encoding="utf-8")
deepface_stderr_log_path.write_text(result.stderr or "", encoding="utf-8")
print("\n" + "=" * 80)
print("DEEPFACE SUBPROCESS STDOUT")
print("=" * 80)
print(result.stdout)
print("\n" + "=" * 80)
print("DEEPFACE SUBPROCESS STDERR")
print("=" * 80)
print(result.stderr)
if result.returncode != 0:
stderr_tail = "\n".join((result.stderr or "").splitlines()[-40:])
stdout_tail = "\n".join((result.stdout or "").splitlines()[-20:])
raise RuntimeError(
"DeepFace subprocess failed or crashed with return code "
f"{result.returncode}. The notebook kernel survived.\n\n"
f"Full stdout log: {deepface_stdout_log_path}\n"
f"Full stderr log: {deepface_stderr_log_path}\n\n"
"STDOUT tail:\n"
f"{stdout_tail}\n\n"
"STDERR tail:\n"
f"{stderr_tail}"
)
except subprocess.TimeoutExpired as exc:
print("\n" + "=" * 80)
print("DEEPFACE SUBPROCESS TIMED OUT")
print("=" * 80)
print("STDOUT so far:")
print(exc.stdout or "")
print("STDERR so far:")
print(exc.stderr or "")
raise RuntimeError(
"DeepFace subprocess timed out. Reduce FACE_SAMPLE_FPS, use SFace/opencv, "
"or increase DEEPFACE_SUBPROCESS_TIMEOUT_SECONDS."
)
if not voice_face_csv_output_path.exists():
raise FileNotFoundError(
f"DeepFace subprocess completed but did not create: {voice_face_csv_output_path}"
)
df_named = pd.read_csv(voice_face_csv_output_path, encoding="utf-8-sig", keep_default_na=False)
df_named = clean_mouth_open_output_columns(df_named)
if deepface_metadata_json_path.exists():
face_matching_metadata = json.loads(deepface_metadata_json_path.read_text(encoding="utf-8"))
else:
face_matching_metadata = None
checkpoint("Loaded DeepFace-enhanced transcript back into notebook")
deepface_display_columns = [
"segment_index", "start", "end", "speaker_cluster", "speaker_id", "speaker_name",
"face_best_speaker_id", "face_best_speaker_name",
"face_all_speaker_names", "face_all_detected_count",
"face_all_usable_candidate_count", "face_all_strict_match_count",
"face_analyzed_frame_count", "face_frames_with_detected_faces",
"mouth_open_speaker_names", "visual_active_speaker_name",
"visual_active_speaker_confidence", "mouth_open_detection_mode",
"mouth_open_observation_count", "mouth_open_closed_observation_count",
"mouth_open_scored_observation_count", "mouth_open_max_score",
"face_match_count", "face_strict_match_count", "face_mean_distance", "face_min_distance",
"voice_face_agree", "speaker_id_final", "speaker_name_final",
"multimodal_identity_source", "asr_language", "text",
]
deepface_display_columns = [col for col in deepface_display_columns if col in df_named.columns]
display(df_named[deepface_display_columns])
# Explicit per-face, per-frame mouth-state preview. This shows every detected
# face independently, including whether the mouth was classified as open,
# closed, or unknown. Inspect this table first when final segment-level
# visual cues do not look right.
try:
mouth_debug_csv_path = None
if isinstance(face_matching_metadata, dict):
mouth_debug_csv_path = face_matching_metadata.get("mouth_open_per_face_debug_csv")
if mouth_debug_csv_path and Path(mouth_debug_csv_path).exists():
mouth_debug_preview_df = pd.read_csv(
mouth_debug_csv_path,
encoding="utf-8-sig",
keep_default_na=False,
)
preview_cols = [
"saved_frame_index", "frame_time", "face_index",
"best_candidate_speaker_name", "best_candidate_distance",
"mouth_open_state", "mouth_open_score", "mouth_open_threshold",
"mouth_open_state_confidence", "mouth_open_backend",
"mouth_open_error", "crop_path",
]
preview_cols = [col for col in preview_cols if col in mouth_debug_preview_df.columns]
checkpoint("Per-face mouth open/closed debug preview")
display(mouth_debug_preview_df[preview_cols].head(100))
if "mouth_open_state" in mouth_debug_preview_df.columns:
print("Per-face mouth-state counts:", flush=True)
display(mouth_debug_preview_df["mouth_open_state"].value_counts(dropna=False).to_frame("n_faces"))
except Exception as exc:
print("Could not display per-face mouth-state preview:", repr(exc), flush=True)
checkpoint("Cell 18B complete")
# --- Merged former notebook cell 21 ---
# =============================================================================
# CELL 19 — SAVE FINAL CSV AND JSON
# =============================================================================
checkpoint("Cell 19 started: saving final CSV and JSON")
df_named = clean_mouth_open_output_columns(df_named)
df_named.to_csv(
named_csv_output_path,
index=False,
encoding="utf-8-sig",
)
final_payload = {
"video_path": str(video_path),
"audio_path": str(audio_path),
"asr_backend": "faster_whisper",
"asr_model": FASTER_WHISPER_MODEL,
"asr_mode": "turn_level_language_detection",
"diarization_backend": loaded_pyannote_model,
"num_speakers": NUM_SPEAKERS,
"text_language_policy": "preserve_original_asr_language_no_translation",
"voice_identification": {
"method": "speechbrain_ecapa_tdnn_cosine_similarity",
"model": "speechbrain/spkrec-ecapa-voxceleb",
"force_best_match": FORCE_BEST_MATCH,
"enforce_one_to_one_assignment": ENFORCE_ONE_TO_ONE_ASSIGNMENT,
"voice_assignment_threshold": VOICE_ASSIGNMENT_THRESHOLD,
"voice_assignment_margin": VOICE_ASSIGNMENT_MARGIN,
"reference_people": {
key: {
"speaker_id": value["speaker_id"],
"speaker_name": value["speaker_name"],
"voice_sample": str(value["voice_sample"]),
}
for key, value in reference_people.items()
},
"cluster_audio_paths": {
key: str(value)
for key, value in cluster_audio_paths.items()
},
"similarity_matrix": similarity_df.to_dict(orient="records"),
"cluster_assignments": assignment_df.to_dict(orient="records"),
"cluster_to_person": cluster_to_person,
},
"face_matching": face_matching_metadata if "face_matching_metadata" in globals() else None,
"final_identity_columns": {
"primary_voice_speaker_id": "speaker_id",
"primary_voice_speaker_name": "speaker_name",
"multimodal_speaker_id": "speaker_id_final",
"multimodal_speaker_name": "speaker_name_final",
},
"asr_segments": asr_segments,
"diarization_segments": diarization_segments,
"merged_turns": merged_turns,
"final_segments_voice_identified": df_named.to_dict(orient="records"),
}
with named_json_output_path.open("w", encoding="utf-8") as f:
json.dump(final_payload, f, ensure_ascii=False, indent=2)
print("Wrote voice-identified CSV:", named_csv_output_path, flush=True)
print("Wrote voice-identified JSON:", named_json_output_path, flush=True)
checkpoint("Cell 19 complete")
# --- Merged former notebook cell 21 ---
# =============================================================================
# CELL 20 — VERIFY FINAL OUTPUT
# =============================================================================
checkpoint("Cell 20 started: verifying final output")
verify_df = pd.read_csv(
named_csv_output_path,
encoding="utf-8-sig",
keep_default_na=False,
)
verify_df = clean_mouth_open_output_columns(verify_df)
verify_display_columns = [
"segment_index",
"start",
"end",
"speaker_id",
"speaker_name",
"speaker_id_final",
"speaker_name_final",
"face_best_speaker_id",
"face_best_speaker_name",
"face_all_speaker_names",
"face_all_detected_count",
"face_all_usable_candidate_count",
"face_all_strict_match_count",
"mouth_open_speaker_names",
"visual_active_speaker_name",
"visual_active_speaker_confidence",
"mouth_open_observation_count",
"mouth_open_max_score",
"voice_face_agree",
"asr_language",
"text",
"text_original_language",
]
verify_display_columns = [
col for col in verify_display_columns
if col in verify_df.columns
]
display(verify_df[verify_display_columns])
print("\nLanguage counts in final output:", flush=True)
display(verify_df["asr_language"].value_counts(dropna=False).to_frame("n_segments"))
print("\nFinal transcript:", flush=True)
for row in verify_df.itertuples(index=False):
print("=" * 80, flush=True)
print(f"{row.start:.2f}–{row.end:.2f}", flush=True)
print("Voice speaker ID:", row.speaker_id, flush=True)
print("Voice speaker name:", row.speaker_name, flush=True)
if hasattr(row, "speaker_id_final"):
print("Final speaker ID:", row.speaker_id_final, flush=True)
if hasattr(row, "speaker_name_final"):
print("Final speaker name:", row.speaker_name_final, flush=True)
if hasattr(row, "face_best_speaker_name"):
print("Best face evidence:", row.face_best_speaker_name, flush=True)
if hasattr(row, "face_all_speaker_names"):
print("All face evidence:", row.face_all_speaker_names, flush=True)
if hasattr(row, "visual_active_speaker_name"):
print(
"Visual active speaker cue:",
safe_display_value(row.visual_active_speaker_name, NO_VISUAL_ACTIVE_SPEAKER_LABEL),
flush=True,
)
if hasattr(row, "mouth_open_speaker_names"):
print(
"Mouth-open evidence:",
safe_display_value(row.mouth_open_speaker_names, NO_MOUTH_OPEN_CUE_LABEL),
flush=True,
)
print("ASR language:", row.asr_language, flush=True)
print("Text:", row.text, flush=True)
checkpoint("Debug notebook complete")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file whospoke-0.4.0.tar.gz.
File metadata
- Download URL: whospoke-0.4.0.tar.gz
- Upload date:
- Size: 170.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8bf9d563a10021ebffbc3c06dcd78c96f1fc587cbfd012e7acde09818e1fcbfd
|
|
| MD5 |
b924cc55451f0e2e2507f521ed1d83eb
|
|
| BLAKE2b-256 |
4ad3664f74a9848c7eaf77b6d577deb7ff734ec8a991fbe1e7970e91abecd5c6
|
File details
Details for the file whospoke-0.4.0-py3-none-any.whl.
File metadata
- Download URL: whospoke-0.4.0-py3-none-any.whl
- Upload date:
- Size: 82.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d016c91bde13efcea0f598c3715a9b3612464b94847bd7f6fb323faa2977d48
|
|
| MD5 |
b4c4c7e691a1839840a6c2de2a10d5ca
|
|
| BLAKE2b-256 |
a16b2ed09437b2a159285240f114f0c6dd6719001a82f1dcfbb6e721909b8251
|