import os import subprocess import requests from datetime import datetime, time, timedelta from zoneinfo import ZoneInfo import json channels_str = os.environ.get("CHANNELS_JSON", "[]") try: channels = json.loads(channels_str) except json.JSONDecodeError: raise ValueError("Invalid JSON in CHANNELS_JSON environment variable") # --------------------------- # Configuration # --------------------------- TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "") TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "") TIMEDELTA_DAYS = int(os.environ.get("TIMEDELTA_DAYS", "3")) TIMEDELTA_DAYS_EXACT = os.environ.get("TIMEDELTA_DAYS_EXACT", "false").lower() in ("true", "1", "yes") # --------------------------- # Twitch API Helper Functions # --------------------------- def get_access_token(): url = "https://id.twitch.tv/oauth2/token" payload = { "client_id": TWITCH_CLIENT_ID, "client_secret": TWITCH_CLIENT_SECRET, "grant_type": "client_credentials" } response = requests.post(url, data=payload) response.raise_for_status() data = response.json() return data["access_token"] def get_channel_id(channel_name, token): headers = { "Client-ID": TWITCH_CLIENT_ID, "Authorization": f"Bearer {token}" } url = f"https://api.twitch.tv/helix/users?login={channel_name}" response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() if data.get("data"): return data["data"][0]["id"] else: print("Channel not found.") return None def get_vods(channel_id, token): headers = { "Client-ID": TWITCH_CLIENT_ID, "Authorization": f"Bearer {token}" } prague_tz = ZoneInfo("Europe/Prague") today_prague = datetime.now(prague_tz).date() # Define the search range based on TIMEDELTA_DAYS and TIMEDELTA_DAYS_EXACT if TIMEDELTA_DAYS == 0: # Only search for today start_date = today_prague end_date = today_prague else: if TIMEDELTA_DAYS_EXACT: # Only search for the day exactly TIMEDELTA_DAYS ago start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) end_date = start_date else: # Search from TIMEDELTA_DAYS ago up to yesterday start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) end_date = today_prague - timedelta(days=1) start_time = datetime.combine(start_date, time.min).replace(tzinfo=prague_tz) end_time = datetime.combine(end_date, time.max).replace(tzinfo=prague_tz) url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100" response = requests.get(url, headers=headers) response.raise_for_status() vods = [] for vod in response.json().get("data", []): published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00")) published_at_prague = published_at.astimezone(prague_tz) if start_time <= published_at_prague <= end_time: vods.append(vod) return vods # --------------------------- # VOD Processing Functions # --------------------------- def download_vod(vod_url, output_filename): if os.path.exists(output_filename): print(f"{output_filename} already exists. Skipping download.") return command = ["yt-dlp", "--cookies", "cookies.txt", "-o", output_filename, vod_url] subprocess.run(command, check=True) print(f"Downloaded VOD to {output_filename}") def extract_audio(video_file, audio_file): if os.path.exists(audio_file): print(f"{audio_file} already exists. Skipping audio extraction.") return command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"] subprocess.run(command, check=True) print(f"Extracted audio to {audio_file}") def download_vod_audio(vod_url, output_filename): if os.path.exists(output_filename): print(f"{output_filename} already exists. Skipping download.") return command = [ "yt-dlp", "--cookies", "cookies.txt", "-f", "worst", "--extract-audio", "--audio-format", "mp3", "-o", output_filename, vod_url ] subprocess.run(command, check=True) print(f"Downloaded audio from VOD to {output_filename}") def main(): print("Obtaining access token...") token = get_access_token() print("Access token obtained.") for channel in channels: try: print(f"Channel Name: {channel['name']}, Language: {channel['language']}") channel_name = channel['name'] base_dirs = { "vods": os.path.join("vods", channel_name), "audio": os.path.join("audio", channel_name), "transcripts": os.path.join("transcripts", channel_name), "chat": os.path.join("chat", channel_name), "clips_transcript": os.path.join("clips", channel_name, "from_vod"), "clips_chat": os.path.join("clips", channel_name, "from_chat") } # Create directories if they do not exist. for path in base_dirs.values(): os.makedirs(path, exist_ok=True) channel_id = get_channel_id(channel_name, token) if not channel_id: continue vods = get_vods(channel_id, token) if not vods: print("No VODs found.") continue for vod in vods: try: vod_url = vod["url"] vod_id = vod["id"] # Define file paths in the respective directories video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4") audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3") print(f"\nProcessing VOD: {vod_url}") # download_vod(vod_url, video_filename) # extract_audio(video_filename, audio_filename) download_vod_audio(vod_url, audio_filename) except: continue except: continue if __name__ == "__main__": main()