transcriptor/main.py
2025-03-20 15:13:12 +01:00

446 lines
16 KiB
Python

import os
import subprocess
import requests
import whisper
from faster_whisper import WhisperModel
from datetime import datetime, time, timedelta
from zoneinfo import ZoneInfo
import json
# ---------------------------
# Configuration
# ---------------------------
TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "")
TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "")
CHANNEL_NAME = os.environ.get("CHANNEL_NAME", "madmonq")
TIMEDELTA_DAYS = int(os.environ.get("TIMEDELTA_DAYS", "1"))
TIMEDELTA_DAYS_EXACT = os.environ.get("TIMEDELTA_DAYS_EXACT", "false").lower() in ("true", "1", "yes")
CLIP_CREATE_FROM_CHAT = os.environ.get("CLIP_CREATE_FROM_CHAT", "false").lower() in ("true", "1", "yes")
CHANNEL_LANGUAGE = os.environ.get("CHANNEL_LANGUAGE", "cs")
SEARCH_KEYWORDS = [
"madmonq",
"madmonge",
"madmong",
"medmong",
"medmonk",
"madmonk",
"mad monk",
"mad monq",
"mad-monq",
"mad-monk",
"madmonck",
"madmunk",
"madmon",
"madmonke",
"madmonque",
"matmonk",
"matt monk",
"mat monk",
"meth monk"
]
MODEL_NAME = "turbo" # Whisper model
# Define base directories for each file category under a folder named after the channel.
base_dirs = {
"vods": os.path.join("vods", CHANNEL_NAME),
"audio": os.path.join("audio", CHANNEL_NAME),
"transcripts": os.path.join("transcripts", CHANNEL_NAME),
"chat": os.path.join("chat", CHANNEL_NAME),
"clips_transcript": os.path.join("clips", CHANNEL_NAME, "from_vod"),
"clips_chat": os.path.join("clips", CHANNEL_NAME, "from_chat")
}
# Create directories if they do not exist.
for path in base_dirs.values():
os.makedirs(path, exist_ok=True)
# ---------------------------
# Twitch API Helper Functions
# ---------------------------
def get_access_token():
url = "https://id.twitch.tv/oauth2/token"
payload = {
"client_id": TWITCH_CLIENT_ID,
"client_secret": TWITCH_CLIENT_SECRET,
"grant_type": "client_credentials"
}
response = requests.post(url, data=payload)
response.raise_for_status()
data = response.json()
return data["access_token"]
def get_channel_id(channel_name, token):
headers = {
"Client-ID": TWITCH_CLIENT_ID,
"Authorization": f"Bearer {token}"
}
url = f"https://api.twitch.tv/helix/users?login={channel_name}"
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
if data.get("data"):
return data["data"][0]["id"]
else:
print("Channel not found.")
return None
def get_vods(channel_id, token):
headers = {
"Client-ID": TWITCH_CLIENT_ID,
"Authorization": f"Bearer {token}"
}
prague_tz = ZoneInfo("Europe/Prague")
today_prague = datetime.now(prague_tz).date()
# Define the search range based on TIMEDELTA_DAYS and TIMEDELTA_DAYS_EXACT
if TIMEDELTA_DAYS == 0:
# Only search for today
start_date = today_prague
end_date = today_prague
else:
if TIMEDELTA_DAYS_EXACT:
# Only search for the day exactly TIMEDELTA_DAYS ago
start_date = today_prague - timedelta(days=TIMEDELTA_DAYS)
end_date = start_date
else:
# Search from TIMEDELTA_DAYS ago up to yesterday
start_date = today_prague - timedelta(days=TIMEDELTA_DAYS)
end_date = today_prague - timedelta(days=1)
start_time = datetime.combine(start_date, time.min).replace(tzinfo=prague_tz)
end_time = datetime.combine(end_date, time.max).replace(tzinfo=prague_tz)
url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100"
response = requests.get(url, headers=headers)
response.raise_for_status()
vods = []
for vod in response.json().get("data", []):
published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00"))
published_at_prague = published_at.astimezone(prague_tz)
if start_time <= published_at_prague <= end_time:
vods.append(vod)
return vods
# ---------------------------
# VOD Processing Functions
# ---------------------------
def download_vod(vod_url, output_filename):
if os.path.exists(output_filename):
print(f"{output_filename} already exists. Skipping download.")
return
command = [
"yt-dlp",
"--cookies", "cookies.txt",
"-f", "worst",
"-o", output_filename,
vod_url
]
subprocess.run(command, check=True)
print(f"Downloaded VOD to {output_filename}")
def extract_audio(video_file, audio_file):
if os.path.exists(audio_file):
print(f"{audio_file} already exists. Skipping audio extraction.")
return
command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"]
subprocess.run(command, check=True)
print(f"Extracted audio to {audio_file}")
def transcribe_audio(audio_file, model_name):
model = whisper.load_model(model_name, download_root="/app/models")
result = model.transcribe(audio_file, language=CHANNEL_LANGUAGE)
return result
def transcribe_audio_fast(audio_file, model_name, language, vod_id):
transcript_path = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json")
if os.path.exists(transcript_path):
print(f"faster_whisper -- Loading existing transcription for VOD {vod_id} from {transcript_path}")
with open(transcript_path, "r", encoding="utf-8") as f:
segments_data = json.load(f)
return segments_data
# Initialize the model and transcribe (passing language if provided)
model_fast = WhisperModel("large-v3-turbo", device="cuda", download_root="/app/models")
segments, info = model_fast.transcribe(audio_file, language=language)
print("faster_whisper -- Detected language '%s' with probability %f" % (info.language, info.language_probability))
# Build a list of dictionaries for the segments.
segments_data = []
for seg in segments:
segments_data.append({
"start": seg.start,
"end": seg.end,
"text": seg.text
})
with open(transcript_path, "w", encoding="utf-8") as f:
json.dump(segments_data, f, ensure_ascii=False, indent=4)
print(f"faster_whisper -- Saved transcription to {transcript_path}")
return segments_data
def search_transcription(result, keywords):
matches = []
if "segments" in result:
for segment in result["segments"]:
segment_text = segment["text"].lower()
for keyword in keywords:
if keyword.lower() in segment_text:
matches.append(segment)
break # Stop checking further keywords for this segment
return matches
def scrape_chat_log(vod_id, output_filename):
"""
Uses TwitchDownloaderCLI to download the chat log for a given VOD.
The chat log is saved in JSON format to output_filename.
"""
if os.path.exists(output_filename):
print(f"{output_filename} already exists. Skipping chat log scrape.")
return
# Build the TwitchDownloaderCLI command.
# The command downloads the chat log in JSON format for the specified VOD.
command = [
"TwitchDownloaderCLI", "chatdownload",
"--id", vod_id,
"--output", output_filename
]
try:
subprocess.run(command, check=True)
print(f"Chat log saved to {output_filename}")
except subprocess.CalledProcessError as e:
print(f"Error downloading chat log for VOD {vod_id}: {e}")
def create_clip_from_vod(video_file, match_start, vod):
clip_start = max(match_start - 15, 0)
clip_duration = 60 # seconds
clip_dir = base_dirs["clips_transcript"]
vod_datetime = datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ')
date_folder = vod_datetime.strftime('%d-%m-%y')
# Create a subfolder inside clip_dir for the date.
clip_date_dir = os.path.join(clip_dir, date_folder)
os.makedirs(clip_date_dir, exist_ok=True)
# Build the clip filename inside the date folder.
clip_filename = os.path.join(clip_date_dir, f"clip_{vod['id']}_{int(match_start)}.mp4")
command = [
"ffmpeg",
"-ss", str(clip_start),
"-i", video_file,
"-t", str(clip_duration),
"-c", "copy",
clip_filename,
"-y"
]
subprocess.run(command, check=True)
print(f"Clip created: {clip_filename}")
return clip_filename
def find_comments_by_keywords(chat_log, keywords):
"""
Searches the chat log for any comments containing one of the given keywords.
Returns a list of matching comment objects.
"""
matching_comments = []
if isinstance(chat_log, dict) and "comments" in chat_log:
chat_log = chat_log["comments"]
for comment in chat_log:
if not isinstance(comment, dict):
continue
message_text = comment['message']['body'].lower()
for keyword in keywords:
if keyword.lower() in message_text:
matching_comments.append(comment)
break
return matching_comments
def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod):
clip_start = max(comment_timestamp - 15, 0)
clip_duration = 60 # seconds
clip_dir = base_dirs["clips_chat"]
vod_datetime = datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ')
date_folder = vod_datetime.strftime('%d-%m-%y')
# Create a subfolder inside clip_dir for the date.
clip_date_dir = os.path.join(clip_dir, date_folder)
os.makedirs(clip_date_dir, exist_ok=True)
# Build the clip filename inside the date folder.
clip_filename = os.path.join(clip_date_dir, f"clip_{vod['id']}_{int(comment_timestamp)}.mp4")
command = [
"ffmpeg",
"-ss", str(clip_start),
"-i", video_file,
"-t", str(clip_duration),
"-c", "copy",
clip_filename,
"-y"
]
subprocess.run(command, check=True)
print(f"Clip created: {clip_filename}")
return clip_filename
def seconds_to_timestamp(seconds):
"""Convert seconds to HH:MM:SS format."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02}:{minutes:02}:{secs:02}"
def download_vod_segment(vod, match_start, duration=60):
"""
Downloads a segment of a VOD using yt-dlp.
Parameters:
vod_url (str): The URL of the video.
output_filename (str): The desired output filename.
start_seconds (float): Start time in seconds (from faster-whisper).
duration (int): Duration of the segment in seconds (default 60 seconds).
"""
clip_start = max(match_start - 15, 0)
clip_dir = base_dirs["clips_transcript"]
vod_datetime = datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ')
date_folder = vod_datetime.strftime('%d-%m-%y')
# Create a subfolder inside clip_dir for the date.
clip_date_dir = os.path.join(clip_dir, date_folder)
os.makedirs(clip_date_dir, exist_ok=True)
clip_filename = os.path.join(clip_date_dir, f"clip_{vod['id']}_{int(clip_start)}.mp4")
end_seconds = clip_start + duration
start_ts = seconds_to_timestamp(clip_start)
end_ts = seconds_to_timestamp(end_seconds)
# yt-dlp download sections format: "*HH:MM:SS-HH:MM:SS"
segment = f"*{start_ts}-{end_ts}"
command = [
"yt-dlp",
"--cookies", "cookies.txt",
"--download-sections", segment,
"-o", clip_filename,
vod["url"]
]
subprocess.run(command, check=True)
print(f"Downloaded segment from {start_ts} to {end_ts} into {clip_filename}")
# ---------------------------
# Main Processing Pipeline
# ---------------------------
def handle_matches_fast(vod, video_filename, segments_data):
matches_fast = []
for segment in segments_data:
segment_text = segment["text"].lower()
for keyword in SEARCH_KEYWORDS:
if keyword.lower() in segment_text:
matches_fast.append(segment)
break
if matches_fast:
print(f"faster_whisper -- Found {len(matches_fast)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod['id']}:")
for match in matches_fast:
start = match["start"]
text = match["text"]
print(f" - At {start:.2f}s: {text}")
# create_clip_from_vod(video_filename, start, vod)
download_vod_segment(vod, start)
else:
print("faster_whisper -- No mentions of keywords.")
def handle_matches(vod, video_filename, result):
matches = search_transcription(result, SEARCH_KEYWORDS)
if matches:
print(f"Found {len(matches)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod['id']}:")
for match in matches:
start = match["start"]
text = match["text"]
print(f" - At {start:.2f}s: {text}")
create_clip_from_vod(video_filename, start, vod)
else:
print(f"No mentions of {SEARCH_KEYWORDS} found in VOD {vod['id']}.")
def download_vod_audio(vod_url, output_filename):
if os.path.exists(output_filename):
print(f"{output_filename} already exists. Skipping download.")
return
command = [
"yt-dlp",
"--cookies", "cookies.txt",
"-f", "worst",
"--extract-audio",
"--audio-format", "mp3",
"-o", output_filename,
vod_url
]
subprocess.run(command, check=True)
print(f"Downloaded audio from VOD to {output_filename}")
def main():
print("Obtaining access token...")
token = get_access_token()
print("Access token obtained.")
channel_id = get_channel_id(CHANNEL_NAME, token)
if not channel_id:
return
vods = get_vods(channel_id, token)
if not vods:
print("No VODs from yesterday found.")
return
for vod in vods:
vod_url = vod["url"]
vod_id = vod["id"]
# Define file paths in the respective directories
video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4")
audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3")
transcript_filename = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json")
chat_log_filename = os.path.join(base_dirs["chat"], f"chat_{vod_id}.json")
print(f"\nProcessing VOD: {vod_url}")
# download_vod(vod_url, video_filename)
# extract_audio(video_filename, audio_filename)
download_vod_audio(vod_url, audio_filename)
print("Transcribing audio. This may take some time...")
# Pass language and vod_id so that the transcript is saved and reused if available.
segments_data = transcribe_audio_fast(audio_filename, MODEL_NAME, language=CHANNEL_LANGUAGE, vod_id=vod_id)
if CLIP_CREATE_FROM_CHAT:
scrape_chat_log(vod_id, chat_log_filename)
handle_matches_fast(vod, video_filename, segments_data)
if CLIP_CREATE_FROM_CHAT:
try:
with open(chat_log_filename, "r", encoding="utf-8") as f:
chat_log = json.load(f)
except Exception as e:
print(f"Error loading chat log: {e}")
chat_log = []
# Search chat log using an array of keywords (using the same keywords as for transcript)
comment_matches = find_comments_by_keywords(chat_log, SEARCH_KEYWORDS)
if comment_matches:
for comment in comment_matches:
# Try to get the timestamp from the "offset" field (or fallback to "content_offset_seconds")
timestamp = comment["content_offset_seconds"]
print(f"Found a matching comment at {timestamp} seconds.")
create_clip_from_comment_timestamp(video_filename, timestamp, vod)
else:
print("No matching comments found.")
if __name__ == "__main__":
main()