From a8280b031bb8af9351d52f92179d1b64b9d0185f Mon Sep 17 00:00:00 2001 From: t0is Date: Fri, 21 Mar 2025 12:53:32 +0100 Subject: [PATCH] files added --- chat_format.py | 30 ++++++++ download_only.py | 177 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 chat_format.py create mode 100644 download_only.py diff --git a/chat_format.py b/chat_format.py new file mode 100644 index 0000000..6491c7d --- /dev/null +++ b/chat_format.py @@ -0,0 +1,30 @@ +import json +from datetime import datetime + + + +with open("chat/madmonq/chat_2397919008.json", "r", encoding="utf-8") as f: + formated_log = [] + chat_log = json.load(f) + + if isinstance(chat_log, dict) and "comments" in chat_log: + chat_log = chat_log["comments"] + + for comment in chat_log: + if not isinstance(comment, dict): + continue + # Parse the timestamp; %f is used for the fractional seconds and 'Z' is matched literally + + try: + # Try parsing with fractional seconds + dt = datetime.strptime(comment['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + # Fallback for timestamps without fractional seconds + dt = datetime.strptime(comment['created_at'], "%Y-%m-%dT%H:%M:%SZ") + + formatted_time = dt.strftime("%H:%M:%S") + message_text = formatted_time + " --> " + f"{comment['commenter']['display_name']:30}" + ": " + comment['message']['body'] + formated_log.append(message_text) + + with open("chat_2397919008_formated.txt", "w") as file: + file.write("\n".join(formated_log) + "\n") \ No newline at end of file diff --git a/download_only.py b/download_only.py new file mode 100644 index 0000000..368dfa1 --- /dev/null +++ b/download_only.py @@ -0,0 +1,177 @@ +import os +import subprocess +import requests +from datetime import datetime, time, timedelta +from zoneinfo import ZoneInfo +import json + +channels_str = os.environ.get("CHANNELS_JSON", "[]") +try: + channels = json.loads(channels_str) +except json.JSONDecodeError: + raise ValueError("Invalid JSON in CHANNELS_JSON environment variable") + + +# --------------------------- +# Configuration +# --------------------------- +TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "") +TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "") +TIMEDELTA_DAYS = int(os.environ.get("TIMEDELTA_DAYS", "3")) +TIMEDELTA_DAYS_EXACT = os.environ.get("TIMEDELTA_DAYS_EXACT", "false").lower() in ("true", "1", "yes") + +# --------------------------- +# Twitch API Helper Functions +# --------------------------- +def get_access_token(): + url = "https://id.twitch.tv/oauth2/token" + payload = { + "client_id": TWITCH_CLIENT_ID, + "client_secret": TWITCH_CLIENT_SECRET, + "grant_type": "client_credentials" + } + response = requests.post(url, data=payload) + response.raise_for_status() + data = response.json() + return data["access_token"] + +def get_channel_id(channel_name, token): + headers = { + "Client-ID": TWITCH_CLIENT_ID, + "Authorization": f"Bearer {token}" + } + url = f"https://api.twitch.tv/helix/users?login={channel_name}" + response = requests.get(url, headers=headers) + response.raise_for_status() + data = response.json() + if data.get("data"): + return data["data"][0]["id"] + else: + print("Channel not found.") + return None + +def get_vods(channel_id, token): + headers = { + "Client-ID": TWITCH_CLIENT_ID, + "Authorization": f"Bearer {token}" + } + prague_tz = ZoneInfo("Europe/Prague") + today_prague = datetime.now(prague_tz).date() + + # Define the search range based on TIMEDELTA_DAYS and TIMEDELTA_DAYS_EXACT + if TIMEDELTA_DAYS == 0: + # Only search for today + start_date = today_prague + end_date = today_prague + else: + if TIMEDELTA_DAYS_EXACT: + # Only search for the day exactly TIMEDELTA_DAYS ago + start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) + end_date = start_date + else: + # Search from TIMEDELTA_DAYS ago up to yesterday + start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) + end_date = today_prague - timedelta(days=1) + + start_time = datetime.combine(start_date, time.min).replace(tzinfo=prague_tz) + end_time = datetime.combine(end_date, time.max).replace(tzinfo=prague_tz) + + url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100" + response = requests.get(url, headers=headers) + response.raise_for_status() + vods = [] + for vod in response.json().get("data", []): + published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00")) + published_at_prague = published_at.astimezone(prague_tz) + if start_time <= published_at_prague <= end_time: + vods.append(vod) + return vods + +# --------------------------- +# VOD Processing Functions +# --------------------------- +def download_vod(vod_url, output_filename): + if os.path.exists(output_filename): + print(f"{output_filename} already exists. Skipping download.") + return + command = ["yt-dlp", "--cookies", "cookies.txt", "-o", output_filename, vod_url] + subprocess.run(command, check=True) + print(f"Downloaded VOD to {output_filename}") + +def extract_audio(video_file, audio_file): + if os.path.exists(audio_file): + print(f"{audio_file} already exists. Skipping audio extraction.") + return + command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"] + subprocess.run(command, check=True) + print(f"Extracted audio to {audio_file}") + +def download_vod_audio(vod_url, output_filename): + if os.path.exists(output_filename): + print(f"{output_filename} already exists. Skipping download.") + return + command = [ + "yt-dlp", + "--cookies", "cookies.txt", + "-f", "worst", + "--extract-audio", + "--audio-format", "mp3", + "-o", output_filename, + vod_url + ] + subprocess.run(command, check=True) + print(f"Downloaded audio from VOD to {output_filename}") + + +def main(): + print("Obtaining access token...") + token = get_access_token() + print("Access token obtained.") + + for channel in channels: + try: + print(f"Channel Name: {channel['name']}, Language: {channel['language']}") + + channel_name = channel['name'] + + base_dirs = { + "vods": os.path.join("vods", channel_name), + "audio": os.path.join("audio", channel_name), + "transcripts": os.path.join("transcripts", channel_name), + "chat": os.path.join("chat", channel_name), + "clips_transcript": os.path.join("clips", channel_name, "from_vod"), + "clips_chat": os.path.join("clips", channel_name, "from_chat") + } + + # Create directories if they do not exist. + for path in base_dirs.values(): + os.makedirs(path, exist_ok=True) + + channel_id = get_channel_id(channel_name, token) + if not channel_id: + continue + + vods = get_vods(channel_id, token) + if not vods: + print("No VODs found.") + continue + + for vod in vods: + try: + vod_url = vod["url"] + vod_id = vod["id"] + + # Define file paths in the respective directories + video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4") + audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3") + + print(f"\nProcessing VOD: {vod_url}") + # download_vod(vod_url, video_filename) + # extract_audio(video_filename, audio_filename) + download_vod_audio(vod_url, audio_filename) + except: + continue + except: + continue +if __name__ == "__main__": + main() \ No newline at end of file