From fb23986fdbeaa6288154f9e14bc3402f5fad5717 Mon Sep 17 00:00:00 2001 From: t0is Date: Thu, 20 Feb 2025 22:09:50 +0100 Subject: [PATCH] script edits --- main.py | 204 ++++++++++++++++++++++++-------------------------------- 1 file changed, 86 insertions(+), 118 deletions(-) diff --git a/main.py b/main.py index 876a84d..55494ec 100644 --- a/main.py +++ b/main.py @@ -4,29 +4,35 @@ import requests import whisper from datetime import datetime, time, timedelta from zoneinfo import ZoneInfo - import json # --------------------------- # Configuration # --------------------------- -# Make sure these environment variables are set: -# TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET -TWITCH_CLIENT_ID='a0fuj6tm5ct79clvim9816orphqkov' -TWITCH_CLIENT_SECRET='h7whj3yspxgj1909sgcafx6iz1p1es' -# CHANNEL_NAME = "kuruhs" # e.g. "examplechannel" +TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "") +TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "") CHANNEL_NAME = os.environ.get("CHANNEL_NAME", "madmonq") CHANNEL_LANGUAGE = os.environ.get("CHANNEL_LANGUAGE", "en") -SEARCH_KEYWORDS = ["madmonq", 'madmonge', 'madmong', 'medmong', 'medmonk', 'madmonk'] # keyword to search in the transcript -MODEL_NAME = "turbo" # Whisper model (e.g., "tiny", "base", "small", etc.) +SEARCH_KEYWORDS = ["madmonq", "madmonge", "madmong", "medmong", "medmonk", "madmonk"] +MODEL_NAME = "turbo" # Whisper model + +# Define base directories for each file category under a folder named after the channel. +base_dirs = { + "vods": os.path.join("vods", CHANNEL_NAME), + "audio": os.path.join("audio", CHANNEL_NAME), + "transcripts": os.path.join("transcripts", CHANNEL_NAME), + "chat": os.path.join("chat", CHANNEL_NAME), + "clips": os.path.join("clips", CHANNEL_NAME) +} + +# Create directories if they do not exist. +for path in base_dirs.values(): + os.makedirs(path, exist_ok=True) # --------------------------- # Twitch API Helper Functions # --------------------------- def get_access_token(): - """ - Uses the client credentials flow to obtain an OAuth token. - """ url = "https://id.twitch.tv/oauth2/token" payload = { "client_id": TWITCH_CLIENT_ID, @@ -53,79 +59,67 @@ def get_channel_id(channel_name, token): print("Channel not found.") return None - def get_vods_from_yesterday(channel_id, token): headers = { "Client-ID": TWITCH_CLIENT_ID, "Authorization": f"Bearer {token}" } - # Define Prague timezone prague_tz = ZoneInfo("Europe/Prague") - - # Get today's date in Prague, then compute yesterday's date today_prague = datetime.now(prague_tz).date() - yesterday = today_prague - timedelta(days=0) - - # Create timezone-aware datetime objects for the entire day in Prague + yesterday = today_prague - timedelta(days=0) # Change days as needed start_time = datetime.combine(yesterday, time.min).replace(tzinfo=prague_tz) end_time = datetime.combine(yesterday, time.max).replace(tzinfo=prague_tz) - # Fetch up to 100 archived VODs for the channel url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100" response = requests.get(url, headers=headers) response.raise_for_status() vods = [] - for vod in response.json().get("data", []): - # Parse the published_at timestamp (Twitch uses UTC) published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00")) - # Convert published_at to Prague time published_at_prague = published_at.astimezone(prague_tz) - if start_time <= published_at_prague <= end_time: vods.append(vod) - return vods # --------------------------- # VOD Processing Functions # --------------------------- def download_vod(vod_url, output_filename): - # Use yt-dlp to download the VOD + if os.path.exists(output_filename): + print(f"{output_filename} already exists. Skipping download.") + return command = ["yt-dlp", "-o", output_filename, vod_url] subprocess.run(command, check=True) print(f"Downloaded VOD to {output_filename}") def extract_audio(video_file, audio_file): - # Use ffmpeg to extract the audio from the video + if os.path.exists(audio_file): + print(f"{audio_file} already exists. Skipping audio extraction.") + return command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"] subprocess.run(command, check=True) print(f"Extracted audio to {audio_file}") def transcribe_audio(audio_file, model_name): - global CHANNEL_LANGUAGE model = whisper.load_model(model_name, download_root="/app/models") result = model.transcribe(audio_file, language=CHANNEL_LANGUAGE) return result def search_transcription(result, keywords): matches = [] - # Whisper returns segments with approximate start and end timestamps. if "segments" in result: for segment in result["segments"]: segment_text = segment["text"].lower() - # Check if any keyword is in the segment text for keyword in keywords: if keyword.lower() in segment_text: matches.append(segment) - break # Prevent duplicate entries if more than one keyword matches + break # Stop checking further keywords for this segment return matches def scrape_chat_log(vod_id, output_filename): - """ - Scrapes the entire chat log for a given VOD using Twitch v5 API. - The chat log is saved to output_filename as JSON. - """ + if os.path.exists(output_filename): + print(f"{output_filename} already exists. Skipping chat log scrape.") + return headers = { "Client-ID": TWITCH_CLIENT_ID, "Accept": "application/vnd.twitchtv.v5+json" @@ -133,90 +127,72 @@ def scrape_chat_log(vod_id, output_filename): base_url = f"https://api.twitch.tv/v5/videos/{vod_id}/comments" comments = [] cursor = None - while True: params = {} if cursor: params["cursor"] = cursor - response = requests.get(base_url, headers=headers, params=params) if response.status_code != 200: print(f"Error fetching chat comments for VOD {vod_id}: {response.text}") break - data = response.json() comments.extend(data.get("comments", [])) cursor = data.get("_next") if not cursor: break - with open(output_filename, "w", encoding="utf-8") as f: json.dump(comments, f, ensure_ascii=False, indent=4) - print(f"Chat log saved to {output_filename}") def create_clip_from_vod(video_file, match_start, vod_id): - """ - Extract a 1-minute clip from the video_file. - The clip starts 15 seconds before match_start (or at 0 if match_start < 15). - """ - # Adjust start time to include 15 seconds of context (but not before the beginning) clip_start = max(match_start - 15, 0) clip_duration = 60 # seconds - - clip_dir = os.path.join("clips", CHANNEL_NAME) + clip_dir = base_dirs["clips"] os.makedirs(clip_dir, exist_ok=True) - clip_filename = os.path.join(clip_dir, f"clip_{vod_id}_{int(match_start)}.mp4") - command = [ "ffmpeg", - "-ss", str(clip_start), # Start time for the clip - "-i", video_file, # Input video file - "-t", str(clip_duration), # Duration of the clip - "-c", "copy", # Copy the streams without re-encoding + "-ss", str(clip_start), + "-i", video_file, + "-t", str(clip_duration), + "-c", "copy", clip_filename, - "-y" # Overwrite output file if exists + "-y" ] subprocess.run(command, check=True) print(f"Clip created: {clip_filename}") return clip_filename - -def find_comments_by_keyword(chat_log, keyword): - """ - Given a chat log (list of comments) and a keyword, - return a list of comments that contain the keyword. - Each comment is expected to have a 'content_offset_seconds' field. - """ +def find_comments_by_keywords(chat_log, keywords): matching_comments = [] + # Ensure chat_log is a list of dictionaries. for comment in chat_log: - # Adjust the key access based on the chat log's structure. - # For v5 API, each comment typically has: - # comment["message"]["body"] - text = comment.get("message", {}).get("body", "").lower() - if keyword.lower() in text: - matching_comments.append(comment) + if not isinstance(comment, dict): + continue + message = comment.get("message", {}) + if not isinstance(message, dict): + continue + text = message.get("body", "").lower() + for keyword in keywords: + if keyword.lower() in text: + matching_comments.append(comment) + break return matching_comments - def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod_id): - """ - Extract a 1-minute clip from the VOD starting 15 seconds before the comment timestamp. - """ - # Start the clip 15 seconds before the comment timestamp (if possible) clip_start = max(comment_timestamp - 15, 0) clip_duration = 60 # seconds - clip_filename = f"clip_{vod_id}_{int(comment_timestamp)}.mp4" - + clip_dir = base_dirs["clips"] + os.makedirs(clip_dir, exist_ok=True) + clip_filename = os.path.join(clip_dir, f"clip_{vod_id}_{int(comment_timestamp)}.mp4") command = [ "ffmpeg", - "-ss", str(clip_start), # Start time for the clip - "-i", video_file, # Input video file - "-t", str(clip_duration), # Duration of the clip - "-c", "copy", # Copy streams without re-encoding + "-ss", str(clip_start), + "-i", video_file, + "-t", str(clip_duration), + "-c", "copy", clip_filename, - "-y" # Overwrite if exists + "-y" ] subprocess.run(command, check=True) print(f"Clip created: {clip_filename}") @@ -226,17 +202,14 @@ def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod_id): # Main Processing Pipeline # --------------------------- def main(): - # Step 0: Get Twitch access token using client credentials print("Obtaining access token...") token = get_access_token() print("Access token obtained.") - # Step 1: Get channel ID channel_id = get_channel_id(CHANNEL_NAME, token) if not channel_id: return - # Step 2: Get yesterday's VODs vods = get_vods_from_yesterday(channel_id, token) if not vods: print("No VODs from yesterday found.") @@ -245,60 +218,55 @@ def main(): for vod in vods: vod_url = vod["url"] vod_id = vod["id"] - video_filename = f"vod_{vod_id}.mp4" - # video_filename = "vod_2382031096.mp4" - audio_filename = f"vod_{vod_id}.mp3" - # audio_filename = "vod_2382031096.mp3" + + # Define file paths in the respective directories + video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4") + audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3") + transcript_filename = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json") + chat_log_filename = os.path.join(base_dirs["chat"], f"chat_{vod_id}.json") print(f"\nProcessing VOD: {vod_url}") - # Download the VOD download_vod(vod_url, video_filename) - # Extract the audio track extract_audio(video_filename, audio_filename) - # Transcribe using Whisper (this may take a while for long audio files) - # print("Transcribing audio. This may take some time...") - # result = transcribe_audio(audio_filename, MODEL_NAME) - # # Search for the keyword in the transcription - # matches = search_transcription(result, SEARCH_KEYWORDS) + # Check if transcript already exists; if yes, load it, otherwise transcribe and save. + if os.path.exists(transcript_filename): + print(f"{transcript_filename} already exists. Skipping transcription.") + with open(transcript_filename, "r", encoding="utf-8") as f: + result = json.load(f) + else: + print("Transcribing audio. This may take some time...") + result = transcribe_audio(audio_filename, MODEL_NAME) + with open(transcript_filename, "w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=4) + print(f"Transcript saved to {transcript_filename}") - - print("Transcribing audio. This may take some time...") - result = transcribe_audio(audio_filename, MODEL_NAME) - - chat_log_filename = f"chat_{vod_id}.json" - print("Scraping chat log...") scrape_chat_log(vod_id, chat_log_filename) - transcripts_dir = os.path.join("transcripts", CHANNEL_NAME) - os.makedirs(transcripts_dir, exist_ok=True) - transcript_filename = os.path.join(transcripts_dir, f"transcript_{vod_id}.json") - - - with open(transcript_filename, "w", encoding="utf-8") as f: - json.dump(result, f, ensure_ascii=False, indent=4) - print(f"Transcript saved to {transcript_filename}") - - # Search for the keyword in the transcription + # Search transcript for keywords matches = search_transcription(result, SEARCH_KEYWORDS) - if matches: - print(f"Found {len(matches)} mention(s) of '{SEARCH_KEYWORDS}' in VOD {vod_id}:") + print(f"Found {len(matches)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod_id}:") for match in matches: start = match["start"] - end = match["end"] text = match["text"] - print(f" - At {start:.2f}s to {end:.2f}s: {text}") + print(f" - At {start:.2f}s: {text}") create_clip_from_vod(video_filename, start, vod_id) else: - print(f"No mentions of '{SEARCH_KEYWORDS}' found in VOD {vod_id}.") + print(f"No mentions of {SEARCH_KEYWORDS} found in VOD {vod_id}.") - # keyword = "your_keyword_here" - matches = find_comments_by_keyword(chat_log_filename, "Madmonq") + # Load chat log from file + try: + with open(chat_log_filename, "r", encoding="utf-8") as f: + chat_log = json.load(f) + except Exception as e: + print(f"Error loading chat log: {e}") + chat_log = [] - if matches: - for comment in matches: - # Use the content_offset_seconds from the comment as the timestamp. + # Search chat log using an array of keywords (using the same keywords as for transcript) + comment_matches = find_comments_by_keywords(chat_log, SEARCH_KEYWORDS) + if comment_matches: + for comment in comment_matches: timestamp = comment.get("content_offset_seconds") print(f"Found a matching comment at {timestamp} seconds.") create_clip_from_comment_timestamp(video_filename, timestamp, vod_id)