204 lines
6.9 KiB
Python
204 lines
6.9 KiB
Python
import os
|
|
import requests
|
|
import mariadb
|
|
from datetime import datetime, time, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
import re
|
|
|
|
# ---------------------------
|
|
# Configuration
|
|
# ---------------------------
|
|
TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "a0fuj6tm5ct79clvim9816orphqkov")
|
|
TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "h7whj3yspxgj1909sgcafx6iz1p1es")
|
|
TIMEDELTA_DAYS = int(os.environ.get("TIMEDELTA_DAYS", "11"))
|
|
TIMEDELTA_DAYS_EXACT = os.environ.get("TIMEDELTA_DAYS_EXACT", "false").lower() in ("true", "1", "yes")
|
|
|
|
# ---------------------------
|
|
# Twitch API Helper Functions
|
|
# ---------------------------
|
|
def get_access_token():
|
|
url = "https://id.twitch.tv/oauth2/token"
|
|
payload = {
|
|
"client_id": TWITCH_CLIENT_ID,
|
|
"client_secret": TWITCH_CLIENT_SECRET,
|
|
"grant_type": "client_credentials"
|
|
}
|
|
response = requests.post(url, data=payload)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["access_token"]
|
|
|
|
def get_channel_id(channel_name, token):
|
|
headers = {
|
|
"Client-ID": TWITCH_CLIENT_ID,
|
|
"Authorization": f"Bearer {token}"
|
|
}
|
|
url = f"https://api.twitch.tv/helix/users?login={channel_name}"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if data.get("data"):
|
|
return data["data"][0]["id"]
|
|
else:
|
|
print(f"Channel {channel_name} not found on Twitch.")
|
|
return None
|
|
|
|
def get_vods(channel_id, token):
|
|
headers = {
|
|
"Client-ID": TWITCH_CLIENT_ID,
|
|
"Authorization": f"Bearer {token}"
|
|
}
|
|
prague_tz = ZoneInfo("Europe/Prague")
|
|
today_prague = datetime.now(prague_tz).date()
|
|
|
|
if TIMEDELTA_DAYS == 0:
|
|
start_date = today_prague
|
|
end_date = today_prague
|
|
else:
|
|
if TIMEDELTA_DAYS_EXACT:
|
|
start_date = today_prague - timedelta(days=TIMEDELTA_DAYS)
|
|
end_date = start_date
|
|
else:
|
|
start_date = today_prague - timedelta(days=TIMEDELTA_DAYS)
|
|
end_date = today_prague - timedelta(days=1)
|
|
|
|
start_time = datetime.combine(start_date, time.min).replace(tzinfo=prague_tz)
|
|
end_time = datetime.combine(end_date, time.max).replace(tzinfo=prague_tz)
|
|
|
|
url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
vods = []
|
|
for vod in response.json().get("data", []):
|
|
published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00"))
|
|
published_at_prague = published_at.astimezone(prague_tz)
|
|
if start_time <= published_at_prague <= end_time:
|
|
vods.append(vod)
|
|
return vods
|
|
|
|
# ---------------------------
|
|
# Database Interaction Functions
|
|
# ---------------------------
|
|
def get_channels_from_db(db):
|
|
"""
|
|
Loads channels from the database.
|
|
Each channel row contains id, channel_name, twitch_id, and language.
|
|
"""
|
|
cursor = db.cursor()
|
|
query = "SELECT id, channel_name, twitch_name, youtube_name, twitch_id, youtube_id, language FROM channels where fetching_enabled = 1"
|
|
cursor.execute(query)
|
|
columns = [desc[0] for desc in cursor.description]
|
|
channels = [dict(zip(columns, row)) for row in cursor.fetchall()]
|
|
cursor.close()
|
|
return channels
|
|
|
|
def update_channel_twitch_id(db, channel_id, twitch_user_id):
|
|
cursor = db.cursor()
|
|
query = "UPDATE channels SET twitch_id = %s, updated_at = NOW() WHERE id = %s"
|
|
cursor.execute(query, (twitch_user_id, channel_id))
|
|
db.commit()
|
|
cursor.close()
|
|
|
|
|
|
def parse_duration(duration_str):
|
|
"""
|
|
Parses a duration string (e.g. "5h56m4s") and returns the total number of seconds.
|
|
"""
|
|
pattern = re.compile(r'(?:(?P<hours>\d+)h)?(?:(?P<minutes>\d+)m)?(?:(?P<seconds>\d+)s)?')
|
|
match = pattern.fullmatch(duration_str)
|
|
if not match:
|
|
return 0
|
|
hours = int(match.group("hours") or 0)
|
|
minutes = int(match.group("minutes") or 0)
|
|
seconds = int(match.group("seconds") or 0)
|
|
return hours * 3600 + minutes * 60 + seconds
|
|
|
|
def store_vod_in_db(db, local_channel_id, vod):
|
|
"""
|
|
Inserts a new VOD into the videos table if it doesn't already exist.
|
|
"""
|
|
cursor = db.cursor()
|
|
query = "SELECT id FROM videos WHERE external_id = %s"
|
|
cursor.execute(query, (vod["id"],))
|
|
if cursor.fetchone():
|
|
print(f"VOD {vod['id']} already exists in the database.")
|
|
cursor.close()
|
|
return False
|
|
external_date = None
|
|
if "published_at" in vod:
|
|
external_date = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00"))
|
|
query = """
|
|
INSERT INTO videos
|
|
(channel_id, external_id, external_date, name, url, length)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
"""
|
|
cursor.execute(query, (
|
|
local_channel_id,
|
|
vod["id"],
|
|
external_date,
|
|
vod["title"],
|
|
vod["url"],
|
|
parse_duration(vod["duration"])
|
|
))
|
|
db.commit()
|
|
print(f"Inserted VOD {vod['id']} into the database.")
|
|
cursor.close()
|
|
return True
|
|
|
|
# ---------------------------
|
|
# Main Functionality
|
|
# ---------------------------
|
|
def main():
|
|
print("Obtaining access token...")
|
|
token = get_access_token()
|
|
print("Access token obtained.")
|
|
|
|
try:
|
|
db = mariadb.connect(
|
|
host=os.environ.get("DB_HOST", "192.168.0.187"),
|
|
user=os.environ.get("DB_USER", "t0is"),
|
|
password=os.environ.get("DB_PASS", "Silenceisgolden555"),
|
|
database=os.environ.get("DB_NAME", "transcriptor"),
|
|
port=int(os.environ.get("DB_PORT", 3306))
|
|
)
|
|
except mariadb.Error as err:
|
|
print(f"Error connecting to MariaDB: {err}")
|
|
return
|
|
|
|
channels = get_channels_from_db(db)
|
|
if not channels:
|
|
print("No channels found in the database.")
|
|
db.close()
|
|
return
|
|
|
|
for channel in channels:
|
|
channel_name = channel["channel_name"]
|
|
print(f"\nProcessing Channel: {channel_name} (Language: {channel['language']})")
|
|
twitch_user_id = channel["twitch_id"]
|
|
if not twitch_user_id:
|
|
# If the Twitch ID is missing, retrieve it and update the DB.
|
|
twitch_user_id = get_channel_id(channel["twitch_name"], token)
|
|
if twitch_user_id:
|
|
update_channel_twitch_id(db, channel["id"], twitch_user_id)
|
|
else:
|
|
print(f"Skipping channel {channel_name} due to missing Twitch ID.")
|
|
continue
|
|
|
|
# Get VODs for the channel from Twitch.
|
|
vods = get_vods(twitch_user_id, token)
|
|
if not vods:
|
|
print("No VODs found.")
|
|
continue
|
|
|
|
for vod in vods:
|
|
try:
|
|
if not store_vod_in_db(db, channel["id"], vod):
|
|
# VOD already exists, skip it.
|
|
continue
|
|
except Exception as e:
|
|
print(f"Error storing VOD {vod.get('id')} for channel {channel['channel_name']}: {e}")
|
|
|
|
db.close()
|
|
|
|
if __name__ == "__main__":
|
|
main() |