import os import subprocess import requests import whisper from faster_whisper import WhisperModel from datetime import datetime, time, timedelta from zoneinfo import ZoneInfo import json import mariadb # --------------------------- # Configuration # --------------------------- CHANNELS_LANGUAGE = os.environ.get("CHANNELS_LANGUAGE", "") TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "") TIMEDELTA_DAYS = int(os.environ.get("TIMEDELTA_DAYS", "1")) TIMEDELTA_DAYS_EXACT = os.environ.get("TIMEDELTA_DAYS_EXACT", "false").lower() in ("true", "1", "yes") CLIP_CREATE_FROM_CHAT = os.environ.get("CLIP_CREATE_FROM_CHAT", "false").lower() in ("true", "1", "yes") SEARCH_KEYWORDS = [ "madmonq", "madmonge", "madmong", "medmong", "medmonk", "madmonk", "mad monk", "mad monq", "mad-monq", "mad-monk", "madmonck", "madmunk", "madmon", "madmonke", "madmonque", "matmonk", "matt monk", "mat monk", "meth monk" ] MODEL_NAME = "turbo" # Whisper model channels_str = os.environ.get("CHANNELS_JSON", "[]") try: channels = json.loads(channels_str) except json.JSONDecodeError: raise ValueError("Invalid JSON in CHANNELS_JSON environment variable") # --------------------------- # Twitch API Helper Functions # --------------------------- def get_access_token(): url = "https://id.twitch.tv/oauth2/token" payload = { "client_id": TWITCH_CLIENT_ID, "client_secret": TWITCH_CLIENT_SECRET, "grant_type": "client_credentials" } response = requests.post(url, data=payload) response.raise_for_status() data = response.json() return data["access_token"] def get_channel_id(channel_name, token): headers = { "Client-ID": TWITCH_CLIENT_ID, "Authorization": f"Bearer {token}" } url = f"https://api.twitch.tv/helix/users?login={channel_name}" response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() if data.get("data"): return data["data"][0]["id"] else: print("Channel not found.") return None def get_vods(channel_id, token): headers = { "Client-ID": TWITCH_CLIENT_ID, "Authorization": f"Bearer {token}" } prague_tz = ZoneInfo("Europe/Prague") today_prague = datetime.now(prague_tz).date() # Define the search range based on TIMEDELTA_DAYS and TIMEDELTA_DAYS_EXACT if TIMEDELTA_DAYS == 0: # Only search for today start_date = today_prague end_date = today_prague else: if TIMEDELTA_DAYS_EXACT: # Only search for the day exactly TIMEDELTA_DAYS ago start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) end_date = start_date else: # Search from TIMEDELTA_DAYS ago up to yesterday start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) end_date = today_prague - timedelta(days=1) start_time = datetime.combine(start_date, time.min).replace(tzinfo=prague_tz) end_time = datetime.combine(end_date, time.max).replace(tzinfo=prague_tz) url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100" response = requests.get(url, headers=headers) response.raise_for_status() vods = [] for vod in response.json().get("data", []): published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00")) published_at_prague = published_at.astimezone(prague_tz) if start_time <= published_at_prague <= end_time: vods.append(vod) return vods # --------------------------- # VOD Processing Functions # --------------------------- def download_vod(vod_url, output_filename): if os.path.exists(output_filename): print(f"{output_filename} already exists. Skipping download.") return command = [ "yt-dlp", "--cookies", "cookies.txt", "-f", "worst", "-o", output_filename, vod_url ] subprocess.run(command, check=True) print(f"Downloaded VOD to {output_filename}") def transcribe_audio_fast(audio_file, language, vod_id): transcript_path = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json") if os.path.exists(transcript_path): print(f"faster_whisper -- Loading existing transcription for VOD {vod_id} from {transcript_path}") with open(transcript_path, "r", encoding="utf-8") as f: segments_data = json.load(f) return segments_data # Initialize the model and transcribe (passing language if provided) model_fast = WhisperModel("large-v3-turbo", device="cuda", download_root="/app/models") segments, info = model_fast.transcribe(audio_file, language=language) print("faster_whisper -- Detected language '%s' with probability %f" % (info.language, info.language_probability)) # Build a list of dictionaries for the segments. segments_data = [] for seg in segments: segments_data.append({ "start": seg.start, "end": seg.end, "text": seg.text }) with open(transcript_path, "w", encoding="utf-8") as f: json.dump(segments_data, f, ensure_ascii=False, indent=4) print(f"faster_whisper -- Saved transcription to {transcript_path}") return segments_data def search_transcription(result, keywords): matches = [] if "segments" in result: for segment in result["segments"]: segment_text = segment["text"].lower() for keyword in keywords: if keyword.lower() in segment_text: matches.append(segment) break # Stop checking further keywords for this segment return matches def scrape_chat_log(vod_id, output_filename): """ Uses TwitchDownloaderCLI to download the chat log for a given VOD. The chat log is saved in JSON format to output_filename. """ if os.path.exists(output_filename): print(f"{output_filename} already exists. Skipping chat log scrape.") return # Build the TwitchDownloaderCLI command. # The command downloads the chat log in JSON format for the specified VOD. command = [ "TwitchDownloaderCLI", "chatdownload", "--id", vod_id, "--output", output_filename ] try: subprocess.run(command, check=True) print(f"Chat log saved to {output_filename}") except subprocess.CalledProcessError as e: print(f"Error downloading chat log for VOD {vod_id}: {e}") def create_clip_from_vod(video_file, match_start, vod): clip_start = max(match_start - 15, 0) clip_duration = 60 # seconds clip_dir = base_dirs["clips_transcript"] vod_datetime = datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ') date_folder = vod_datetime.strftime('%d-%m-%y') # Create a subfolder inside clip_dir for the date. clip_date_dir = os.path.join(clip_dir, date_folder) os.makedirs(clip_date_dir, exist_ok=True) # Build the clip filename inside the date folder. clip_filename = os.path.join(clip_date_dir, f"clip_{vod['id']}_{int(match_start)}.mp4") command = [ "ffmpeg", "-ss", str(clip_start), "-i", video_file, "-t", str(clip_duration), "-c", "copy", clip_filename, "-y" ] subprocess.run(command, check=True) print(f"Clip created: {clip_filename}") return clip_filename def find_comments_by_keywords(chat_log, keywords): """ Searches the chat log for any comments containing one of the given keywords. Returns a list of matching comment objects. """ matching_comments = [] if isinstance(chat_log, dict) and "comments" in chat_log: chat_log = chat_log["comments"] for comment in chat_log: if not isinstance(comment, dict): continue message_text = comment['message']['body'].lower() for keyword in keywords: if keyword.lower() in message_text: matching_comments.append(comment) break return matching_comments def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod): clip_start = max(comment_timestamp - 15, 0) clip_duration = 60 # seconds clip_dir = base_dirs["clips_chat"] vod_datetime = datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ') date_folder = vod_datetime.strftime('%d-%m-%y') # Create a subfolder inside clip_dir for the date. clip_date_dir = os.path.join(clip_dir, date_folder) os.makedirs(clip_date_dir, exist_ok=True) # Build the clip filename inside the date folder. clip_filename = os.path.join(clip_date_dir, f"clip_{vod['id']}_{int(comment_timestamp)}.mp4") command = [ "ffmpeg", "-ss", str(clip_start), "-i", video_file, "-t", str(clip_duration), "-c", "copy", clip_filename, "-y" ] subprocess.run(command, check=True) print(f"Clip created: {clip_filename}") return clip_filename def seconds_to_timestamp(seconds): """Convert seconds to HH:MM:SS format.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) return f"{hours:02}:{minutes:02}:{secs:02}" def download_vod_segment(db, vod, match_start, duration=60): """ Downloads a segment of a VOD using yt-dlp. Parameters: vod_url (str): The URL of the video. output_filename (str): The desired output filename. start_seconds (float): Start time in seconds (from faster-whisper). duration (int): Duration of the segment in seconds (default 60 seconds). """ clip_start = max(match_start - 15, 0) clip_dir = base_dirs["clips_transcript"] vod_datetime = vod['external_date'] date_folder = vod_datetime.strftime('%d-%m-%y') # Create a subfolder inside clip_dir for the date. clip_date_dir = os.path.join(clip_dir, date_folder) os.makedirs(clip_date_dir, exist_ok=True) clip_filename = os.path.join(clip_date_dir, f"clip_{vod['external_id']}_{int(clip_start)}.mp4") end_seconds = clip_start + duration start_ts = seconds_to_timestamp(clip_start) end_ts = seconds_to_timestamp(end_seconds) # yt-dlp download sections format: "*HH:MM:SS-HH:MM:SS" segment = f"*{start_ts}-{end_ts}" command = [ "yt-dlp", "--cookies", "cookies.txt", "--download-sections", segment, "-o", clip_filename, vod["url"] ] subprocess.run(command, check=True) print(f"Downloaded segment from {start_ts} to {end_ts} into {clip_filename}") insert_clip(db, vod['id'], clip_filename) # --------------------------- # Main Processing Pipeline # --------------------------- def handle_matches_fast(db, vod, segments_data): matches_fast = [] for segment in segments_data: segment_text = segment["text"].lower() for keyword in SEARCH_KEYWORDS: if keyword.lower() in segment_text: matches_fast.append(segment) break if matches_fast: print(f"faster_whisper -- Found {len(matches_fast)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod['id']}:") for match in matches_fast: start = match["start"] text = match["text"] print(f" - At {start:.2f}s: {text}") # create_clip_from_vod(video_filename, start, vod) download_vod_segment(db, vod, start) else: print("faster_whisper -- No mentions of keywords.") def handle_matches(vod, video_filename, result): matches = search_transcription(result, SEARCH_KEYWORDS) if matches: print(f"Found {len(matches)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod['id']}:") for match in matches: start = match["start"] text = match["text"] print(f" - At {start:.2f}s: {text}") create_clip_from_vod(video_filename, start, vod) else: print(f"No mentions of {SEARCH_KEYWORDS} found in VOD {vod['id']}.") def download_vod_audio(vod_url, output_filename): if os.path.exists(output_filename): print(f"{output_filename} already exists. Skipping download.") return command = [ "yt-dlp", "--cookies", "cookies.txt", "-f", "worst", "--extract-audio", "--audio-format", "mp3", "-o", output_filename, vod_url ] subprocess.run(command, check=True) print(f"Downloaded audio from VOD to {output_filename}") def get_pending_videos(db): """ Retrieves videos that are not yet downloaded or processed. Joins the channels table to also fetch the channel_name. """ cursor = db.cursor() if CHANNELS_LANGUAGE == "other": query = """ SELECT v.id, v.external_id, c.channel_name, v.url, v.length, v.external_date, c.language FROM videos v JOIN channels c ON v.channel_id = c.id WHERE v.data_downloaded = 1 AND v.processed = 0 and c.language not in ('cs', 'en') """ else: query = """ SELECT v.id, v.external_id, c.channel_name, v.url, v.length, v.external_date, c.language FROM videos v JOIN channels c ON v.channel_id = c.id WHERE v.data_downloaded = 1 AND v.processed = 0 and c.language = %s """ cursor.execute(query, (CHANNELS_LANGUAGE,)) columns = [col[0] for col in cursor.description] results = [dict(zip(columns, row)) for row in cursor.fetchall()] cursor.close() return results def insert_transcription(db, video_id, filename): """ Inserts a new transcription record into the transcriptions table. Parameters: db: A MariaDB connection object. video_id (int): The foreign key referencing the videos table. filename (str): The transcription file name. Returns: int: The ID of the inserted transcription record. """ cursor = db.cursor() query = """ INSERT INTO transcriptions (video_id, filename) VALUES (%s, %s) """ cursor.execute(query, (video_id, filename)) db.commit() inserted_id = cursor.lastrowid cursor.close() print(f"Inserted transcription for video_id {video_id} with filename '{filename}' (ID: {inserted_id})") return inserted_id def insert_clip(db, video_id, filename): """ Inserts a new transcription record into the transcriptions table. Parameters: db: A MariaDB connection object. video_id (int): The foreign key referencing the videos table. filename (str): The transcription file name. Returns: int: The ID of the inserted transcription record. """ cursor = db.cursor() query = """ INSERT INTO clips (video_id, filename) VALUES (%s, %s) """ cursor.execute(query, (video_id, filename)) db.commit() inserted_id = cursor.lastrowid cursor.close() print(f"Inserted clip for video_id {video_id} with filename '{filename}' (ID: {inserted_id})") return inserted_id def db_set_transcription_finish(db, video_id): """ Updates the specified column (e.g. data_downloaded) for the video. Also updates the updated_at timestamp. """ cursor = db.cursor() transcription_finish = datetime.now() query = f"UPDATE transcriptions SET transcription_finish = %s WHERE video_id = %s" cursor.execute(query, (transcription_finish, video_id)) db.commit() cursor.close() def db_set_video_processed(db, video_id): """ Updates the specified column (e.g. data_downloaded) for the video. Also updates the updated_at timestamp. """ cursor = db.cursor() query = f"UPDATE videos SET processed = %s WHERE id = %s" cursor.execute(query, (True, video_id)) db.commit() cursor.close() def main(): try: db = mariadb.connect( host=os.environ.get("DB_HOST", "192.168.0.187"), user=os.environ.get("DB_USER", "t0is"), password=os.environ.get("DB_PASS", "Silenceisgolden555"), database=os.environ.get("DB_NAME", "transcriptor"), port=int(os.environ.get("DB_PORT", 3306)) ) except mariadb.Error as err: print(f"Error connecting to MariaDB: {err}") return pending_videos = get_pending_videos(db) if not pending_videos: print("No pending videos to transcribe.") db.close() return for video in pending_videos: try: video_id = video['id'] vod_url = video['url'] vod_id = video['external_id'] channel_name = video['channel_name'] channel_language = video['language'] print(f"Channel Name: {channel_name}, Language: {channel_language}, VOD: {vod_id}") global base_dirs base_dirs = { "vods": os.path.join("vods", channel_name), "audio": os.path.join("audio", channel_name), "transcripts": os.path.join("transcripts", channel_name), "chat": os.path.join("chat", channel_name), "clips_transcript": os.path.join("clips", channel_name, "from_vod"), "clips_chat": os.path.join("clips", channel_name, "from_chat") } for path in base_dirs.values(): os.makedirs(path, exist_ok=True) video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4") audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3") transcript_filename = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json") chat_log_filename = os.path.join(base_dirs["chat"], f"chat_{vod_id}.json") print(f"\nProcessing VOD: {vod_url}") insert_transcription(db, video_id, transcript_filename) print("Transcribing audio. This may take some time...") # Pass language and vod_id so that the transcript is saved and reused if available. segments_data = transcribe_audio_fast(audio_filename, language=channel_language, vod_id=vod_id) handle_matches_fast(db, video, segments_data) db_set_transcription_finish(db, video_id) db_set_video_processed(db, video_id) except Exception as e: print(f"Error processing video ID {video['id']}: {e}") continue if __name__ == "__main__": main()