import os import requests import mariadb from datetime import datetime, time, timedelta from zoneinfo import ZoneInfo import re # --------------------------- # Configuration # --------------------------- TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "a0fuj6tm5ct79clvim9816orphqkov") TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "h7whj3yspxgj1909sgcafx6iz1p1es") TIMEDELTA_DAYS = int(os.environ.get("TIMEDELTA_DAYS", "11")) TIMEDELTA_DAYS_EXACT = os.environ.get("TIMEDELTA_DAYS_EXACT", "false").lower() in ("true", "1", "yes") # --------------------------- # Twitch API Helper Functions # --------------------------- def get_access_token(): url = "https://id.twitch.tv/oauth2/token" payload = { "client_id": TWITCH_CLIENT_ID, "client_secret": TWITCH_CLIENT_SECRET, "grant_type": "client_credentials" } response = requests.post(url, data=payload) response.raise_for_status() data = response.json() return data["access_token"] def get_channel_id(channel_name, token): headers = { "Client-ID": TWITCH_CLIENT_ID, "Authorization": f"Bearer {token}" } url = f"https://api.twitch.tv/helix/users?login={channel_name}" response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() if data.get("data"): return data["data"][0]["id"] else: print(f"Channel {channel_name} not found on Twitch.") return None def get_vods(channel_id, token): headers = { "Client-ID": TWITCH_CLIENT_ID, "Authorization": f"Bearer {token}" } prague_tz = ZoneInfo("Europe/Prague") today_prague = datetime.now(prague_tz).date() if TIMEDELTA_DAYS == 0: start_date = today_prague end_date = today_prague else: if TIMEDELTA_DAYS_EXACT: start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) end_date = start_date else: start_date = today_prague - timedelta(days=TIMEDELTA_DAYS) end_date = today_prague - timedelta(days=1) start_time = datetime.combine(start_date, time.min).replace(tzinfo=prague_tz) end_time = datetime.combine(end_date, time.max).replace(tzinfo=prague_tz) url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100" response = requests.get(url, headers=headers) response.raise_for_status() vods = [] for vod in response.json().get("data", []): published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00")) published_at_prague = published_at.astimezone(prague_tz) if start_time <= published_at_prague <= end_time: vods.append(vod) return vods # --------------------------- # Database Interaction Functions # --------------------------- def get_channels_from_db(db): """ Loads channels from the database. Each channel row contains id, channel_name, twitch_id, and language. """ cursor = db.cursor() query = "SELECT id, channel_name, twitch_name, youtube_name, twitch_id, youtube_id, language FROM channels where fetching_enabled = 1" cursor.execute(query) columns = [desc[0] for desc in cursor.description] channels = [dict(zip(columns, row)) for row in cursor.fetchall()] cursor.close() return channels def update_channel_twitch_id(db, channel_id, twitch_user_id): cursor = db.cursor() query = "UPDATE channels SET twitch_id = %s, updated_at = NOW() WHERE id = %s" cursor.execute(query, (twitch_user_id, channel_id)) db.commit() cursor.close() def parse_duration(duration_str): """ Parses a duration string (e.g. "5h56m4s") and returns the total number of seconds. """ pattern = re.compile(r'(?:(?P\d+)h)?(?:(?P\d+)m)?(?:(?P\d+)s)?') match = pattern.fullmatch(duration_str) if not match: return 0 hours = int(match.group("hours") or 0) minutes = int(match.group("minutes") or 0) seconds = int(match.group("seconds") or 0) return hours * 3600 + minutes * 60 + seconds def store_vod_in_db(db, local_channel_id, vod): """ Inserts a new VOD into the videos table if it doesn't already exist. """ cursor = db.cursor() query = "SELECT id FROM videos WHERE external_id = %s" cursor.execute(query, (vod["id"],)) if cursor.fetchone(): print(f"VOD {vod['id']} already exists in the database.") cursor.close() return False external_date = None if "published_at" in vod: external_date = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00")) query = """ INSERT INTO videos (channel_id, external_id, external_date, name, url, length) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute(query, ( local_channel_id, vod["id"], external_date, vod["title"], vod["url"], parse_duration(vod["duration"]) )) db.commit() print(f"Inserted VOD {vod['id']} into the database.") cursor.close() return True # --------------------------- # Main Functionality # --------------------------- def main(): print("Obtaining access token...") token = get_access_token() print("Access token obtained.") try: db = mariadb.connect( host=os.environ.get("DB_HOST", "192.168.0.187"), user=os.environ.get("DB_USER", "t0is"), password=os.environ.get("DB_PASS", "Silenceisgolden555"), database=os.environ.get("DB_NAME", "transcriptor"), port=int(os.environ.get("DB_PORT", 3306)) ) except mariadb.Error as err: print(f"Error connecting to MariaDB: {err}") return channels = get_channels_from_db(db) if not channels: print("No channels found in the database.") db.close() return for channel in channels: channel_name = channel["channel_name"] print(f"\nProcessing Channel: {channel_name} (Language: {channel['language']})") twitch_user_id = channel["twitch_id"] if not twitch_user_id: # If the Twitch ID is missing, retrieve it and update the DB. twitch_user_id = get_channel_id(channel["twitch_name"], token) if twitch_user_id: update_channel_twitch_id(db, channel["id"], twitch_user_id) else: print(f"Skipping channel {channel_name} due to missing Twitch ID.") continue # Get VODs for the channel from Twitch. vods = get_vods(twitch_user_id, token) if not vods: print("No VODs found.") continue for vod in vods: try: if not store_vod_in_db(db, channel["id"], vod): # VOD already exists, skip it. continue except Exception as e: print(f"Error storing VOD {vod.get('id')} for channel {channel['channel_name']}: {e}") db.close() if __name__ == "__main__": main()