script edits

This commit is contained in:
t0is 2025-02-20 22:09:50 +01:00
parent 828bb60302
commit fb23986fdb

204
main.py
View File

@ -4,29 +4,35 @@ import requests
import whisper import whisper
from datetime import datetime, time, timedelta from datetime import datetime, time, timedelta
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
import json import json
# --------------------------- # ---------------------------
# Configuration # Configuration
# --------------------------- # ---------------------------
# Make sure these environment variables are set: TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "")
# TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "")
TWITCH_CLIENT_ID='a0fuj6tm5ct79clvim9816orphqkov'
TWITCH_CLIENT_SECRET='h7whj3yspxgj1909sgcafx6iz1p1es'
# CHANNEL_NAME = "kuruhs" # e.g. "examplechannel"
CHANNEL_NAME = os.environ.get("CHANNEL_NAME", "madmonq") CHANNEL_NAME = os.environ.get("CHANNEL_NAME", "madmonq")
CHANNEL_LANGUAGE = os.environ.get("CHANNEL_LANGUAGE", "en") CHANNEL_LANGUAGE = os.environ.get("CHANNEL_LANGUAGE", "en")
SEARCH_KEYWORDS = ["madmonq", 'madmonge', 'madmong', 'medmong', 'medmonk', 'madmonk'] # keyword to search in the transcript SEARCH_KEYWORDS = ["madmonq", "madmonge", "madmong", "medmong", "medmonk", "madmonk"]
MODEL_NAME = "turbo" # Whisper model (e.g., "tiny", "base", "small", etc.) MODEL_NAME = "turbo" # Whisper model
# Define base directories for each file category under a folder named after the channel.
base_dirs = {
"vods": os.path.join("vods", CHANNEL_NAME),
"audio": os.path.join("audio", CHANNEL_NAME),
"transcripts": os.path.join("transcripts", CHANNEL_NAME),
"chat": os.path.join("chat", CHANNEL_NAME),
"clips": os.path.join("clips", CHANNEL_NAME)
}
# Create directories if they do not exist.
for path in base_dirs.values():
os.makedirs(path, exist_ok=True)
# --------------------------- # ---------------------------
# Twitch API Helper Functions # Twitch API Helper Functions
# --------------------------- # ---------------------------
def get_access_token(): def get_access_token():
"""
Uses the client credentials flow to obtain an OAuth token.
"""
url = "https://id.twitch.tv/oauth2/token" url = "https://id.twitch.tv/oauth2/token"
payload = { payload = {
"client_id": TWITCH_CLIENT_ID, "client_id": TWITCH_CLIENT_ID,
@ -53,79 +59,67 @@ def get_channel_id(channel_name, token):
print("Channel not found.") print("Channel not found.")
return None return None
def get_vods_from_yesterday(channel_id, token): def get_vods_from_yesterday(channel_id, token):
headers = { headers = {
"Client-ID": TWITCH_CLIENT_ID, "Client-ID": TWITCH_CLIENT_ID,
"Authorization": f"Bearer {token}" "Authorization": f"Bearer {token}"
} }
# Define Prague timezone
prague_tz = ZoneInfo("Europe/Prague") prague_tz = ZoneInfo("Europe/Prague")
# Get today's date in Prague, then compute yesterday's date
today_prague = datetime.now(prague_tz).date() today_prague = datetime.now(prague_tz).date()
yesterday = today_prague - timedelta(days=0) yesterday = today_prague - timedelta(days=0) # Change days as needed
# Create timezone-aware datetime objects for the entire day in Prague
start_time = datetime.combine(yesterday, time.min).replace(tzinfo=prague_tz) start_time = datetime.combine(yesterday, time.min).replace(tzinfo=prague_tz)
end_time = datetime.combine(yesterday, time.max).replace(tzinfo=prague_tz) end_time = datetime.combine(yesterday, time.max).replace(tzinfo=prague_tz)
# Fetch up to 100 archived VODs for the channel
url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100" url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100"
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
vods = [] vods = []
for vod in response.json().get("data", []): for vod in response.json().get("data", []):
# Parse the published_at timestamp (Twitch uses UTC)
published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00")) published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00"))
# Convert published_at to Prague time
published_at_prague = published_at.astimezone(prague_tz) published_at_prague = published_at.astimezone(prague_tz)
if start_time <= published_at_prague <= end_time: if start_time <= published_at_prague <= end_time:
vods.append(vod) vods.append(vod)
return vods return vods
# --------------------------- # ---------------------------
# VOD Processing Functions # VOD Processing Functions
# --------------------------- # ---------------------------
def download_vod(vod_url, output_filename): def download_vod(vod_url, output_filename):
# Use yt-dlp to download the VOD if os.path.exists(output_filename):
print(f"{output_filename} already exists. Skipping download.")
return
command = ["yt-dlp", "-o", output_filename, vod_url] command = ["yt-dlp", "-o", output_filename, vod_url]
subprocess.run(command, check=True) subprocess.run(command, check=True)
print(f"Downloaded VOD to {output_filename}") print(f"Downloaded VOD to {output_filename}")
def extract_audio(video_file, audio_file): def extract_audio(video_file, audio_file):
# Use ffmpeg to extract the audio from the video if os.path.exists(audio_file):
print(f"{audio_file} already exists. Skipping audio extraction.")
return
command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"] command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"]
subprocess.run(command, check=True) subprocess.run(command, check=True)
print(f"Extracted audio to {audio_file}") print(f"Extracted audio to {audio_file}")
def transcribe_audio(audio_file, model_name): def transcribe_audio(audio_file, model_name):
global CHANNEL_LANGUAGE
model = whisper.load_model(model_name, download_root="/app/models") model = whisper.load_model(model_name, download_root="/app/models")
result = model.transcribe(audio_file, language=CHANNEL_LANGUAGE) result = model.transcribe(audio_file, language=CHANNEL_LANGUAGE)
return result return result
def search_transcription(result, keywords): def search_transcription(result, keywords):
matches = [] matches = []
# Whisper returns segments with approximate start and end timestamps.
if "segments" in result: if "segments" in result:
for segment in result["segments"]: for segment in result["segments"]:
segment_text = segment["text"].lower() segment_text = segment["text"].lower()
# Check if any keyword is in the segment text
for keyword in keywords: for keyword in keywords:
if keyword.lower() in segment_text: if keyword.lower() in segment_text:
matches.append(segment) matches.append(segment)
break # Prevent duplicate entries if more than one keyword matches break # Stop checking further keywords for this segment
return matches return matches
def scrape_chat_log(vod_id, output_filename): def scrape_chat_log(vod_id, output_filename):
""" if os.path.exists(output_filename):
Scrapes the entire chat log for a given VOD using Twitch v5 API. print(f"{output_filename} already exists. Skipping chat log scrape.")
The chat log is saved to output_filename as JSON. return
"""
headers = { headers = {
"Client-ID": TWITCH_CLIENT_ID, "Client-ID": TWITCH_CLIENT_ID,
"Accept": "application/vnd.twitchtv.v5+json" "Accept": "application/vnd.twitchtv.v5+json"
@ -133,90 +127,72 @@ def scrape_chat_log(vod_id, output_filename):
base_url = f"https://api.twitch.tv/v5/videos/{vod_id}/comments" base_url = f"https://api.twitch.tv/v5/videos/{vod_id}/comments"
comments = [] comments = []
cursor = None cursor = None
while True: while True:
params = {} params = {}
if cursor: if cursor:
params["cursor"] = cursor params["cursor"] = cursor
response = requests.get(base_url, headers=headers, params=params) response = requests.get(base_url, headers=headers, params=params)
if response.status_code != 200: if response.status_code != 200:
print(f"Error fetching chat comments for VOD {vod_id}: {response.text}") print(f"Error fetching chat comments for VOD {vod_id}: {response.text}")
break break
data = response.json() data = response.json()
comments.extend(data.get("comments", [])) comments.extend(data.get("comments", []))
cursor = data.get("_next") cursor = data.get("_next")
if not cursor: if not cursor:
break break
with open(output_filename, "w", encoding="utf-8") as f: with open(output_filename, "w", encoding="utf-8") as f:
json.dump(comments, f, ensure_ascii=False, indent=4) json.dump(comments, f, ensure_ascii=False, indent=4)
print(f"Chat log saved to {output_filename}") print(f"Chat log saved to {output_filename}")
def create_clip_from_vod(video_file, match_start, vod_id): def create_clip_from_vod(video_file, match_start, vod_id):
"""
Extract a 1-minute clip from the video_file.
The clip starts 15 seconds before match_start (or at 0 if match_start < 15).
"""
# Adjust start time to include 15 seconds of context (but not before the beginning)
clip_start = max(match_start - 15, 0) clip_start = max(match_start - 15, 0)
clip_duration = 60 # seconds clip_duration = 60 # seconds
clip_dir = base_dirs["clips"]
clip_dir = os.path.join("clips", CHANNEL_NAME)
os.makedirs(clip_dir, exist_ok=True) os.makedirs(clip_dir, exist_ok=True)
clip_filename = os.path.join(clip_dir, f"clip_{vod_id}_{int(match_start)}.mp4") clip_filename = os.path.join(clip_dir, f"clip_{vod_id}_{int(match_start)}.mp4")
command = [ command = [
"ffmpeg", "ffmpeg",
"-ss", str(clip_start), # Start time for the clip "-ss", str(clip_start),
"-i", video_file, # Input video file "-i", video_file,
"-t", str(clip_duration), # Duration of the clip "-t", str(clip_duration),
"-c", "copy", # Copy the streams without re-encoding "-c", "copy",
clip_filename, clip_filename,
"-y" # Overwrite output file if exists "-y"
] ]
subprocess.run(command, check=True) subprocess.run(command, check=True)
print(f"Clip created: {clip_filename}") print(f"Clip created: {clip_filename}")
return clip_filename return clip_filename
def find_comments_by_keywords(chat_log, keywords):
def find_comments_by_keyword(chat_log, keyword):
"""
Given a chat log (list of comments) and a keyword,
return a list of comments that contain the keyword.
Each comment is expected to have a 'content_offset_seconds' field.
"""
matching_comments = [] matching_comments = []
# Ensure chat_log is a list of dictionaries.
for comment in chat_log: for comment in chat_log:
# Adjust the key access based on the chat log's structure. if not isinstance(comment, dict):
# For v5 API, each comment typically has: continue
# comment["message"]["body"] message = comment.get("message", {})
text = comment.get("message", {}).get("body", "").lower() if not isinstance(message, dict):
if keyword.lower() in text: continue
matching_comments.append(comment) text = message.get("body", "").lower()
for keyword in keywords:
if keyword.lower() in text:
matching_comments.append(comment)
break
return matching_comments return matching_comments
def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod_id): def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod_id):
"""
Extract a 1-minute clip from the VOD starting 15 seconds before the comment timestamp.
"""
# Start the clip 15 seconds before the comment timestamp (if possible)
clip_start = max(comment_timestamp - 15, 0) clip_start = max(comment_timestamp - 15, 0)
clip_duration = 60 # seconds clip_duration = 60 # seconds
clip_filename = f"clip_{vod_id}_{int(comment_timestamp)}.mp4" clip_dir = base_dirs["clips"]
os.makedirs(clip_dir, exist_ok=True)
clip_filename = os.path.join(clip_dir, f"clip_{vod_id}_{int(comment_timestamp)}.mp4")
command = [ command = [
"ffmpeg", "ffmpeg",
"-ss", str(clip_start), # Start time for the clip "-ss", str(clip_start),
"-i", video_file, # Input video file "-i", video_file,
"-t", str(clip_duration), # Duration of the clip "-t", str(clip_duration),
"-c", "copy", # Copy streams without re-encoding "-c", "copy",
clip_filename, clip_filename,
"-y" # Overwrite if exists "-y"
] ]
subprocess.run(command, check=True) subprocess.run(command, check=True)
print(f"Clip created: {clip_filename}") print(f"Clip created: {clip_filename}")
@ -226,17 +202,14 @@ def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod_id):
# Main Processing Pipeline # Main Processing Pipeline
# --------------------------- # ---------------------------
def main(): def main():
# Step 0: Get Twitch access token using client credentials
print("Obtaining access token...") print("Obtaining access token...")
token = get_access_token() token = get_access_token()
print("Access token obtained.") print("Access token obtained.")
# Step 1: Get channel ID
channel_id = get_channel_id(CHANNEL_NAME, token) channel_id = get_channel_id(CHANNEL_NAME, token)
if not channel_id: if not channel_id:
return return
# Step 2: Get yesterday's VODs
vods = get_vods_from_yesterday(channel_id, token) vods = get_vods_from_yesterday(channel_id, token)
if not vods: if not vods:
print("No VODs from yesterday found.") print("No VODs from yesterday found.")
@ -245,60 +218,55 @@ def main():
for vod in vods: for vod in vods:
vod_url = vod["url"] vod_url = vod["url"]
vod_id = vod["id"] vod_id = vod["id"]
video_filename = f"vod_{vod_id}.mp4"
# video_filename = "vod_2382031096.mp4" # Define file paths in the respective directories
audio_filename = f"vod_{vod_id}.mp3" video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4")
# audio_filename = "vod_2382031096.mp3" audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3")
transcript_filename = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json")
chat_log_filename = os.path.join(base_dirs["chat"], f"chat_{vod_id}.json")
print(f"\nProcessing VOD: {vod_url}") print(f"\nProcessing VOD: {vod_url}")
# Download the VOD
download_vod(vod_url, video_filename) download_vod(vod_url, video_filename)
# Extract the audio track
extract_audio(video_filename, audio_filename) extract_audio(video_filename, audio_filename)
# Transcribe using Whisper (this may take a while for long audio files)
# print("Transcribing audio. This may take some time...")
# result = transcribe_audio(audio_filename, MODEL_NAME)
# # Search for the keyword in the transcription
# matches = search_transcription(result, SEARCH_KEYWORDS)
# Check if transcript already exists; if yes, load it, otherwise transcribe and save.
if os.path.exists(transcript_filename):
print(f"{transcript_filename} already exists. Skipping transcription.")
with open(transcript_filename, "r", encoding="utf-8") as f:
result = json.load(f)
else:
print("Transcribing audio. This may take some time...")
result = transcribe_audio(audio_filename, MODEL_NAME)
with open(transcript_filename, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=4)
print(f"Transcript saved to {transcript_filename}")
print("Transcribing audio. This may take some time...")
result = transcribe_audio(audio_filename, MODEL_NAME)
chat_log_filename = f"chat_{vod_id}.json"
print("Scraping chat log...")
scrape_chat_log(vod_id, chat_log_filename) scrape_chat_log(vod_id, chat_log_filename)
transcripts_dir = os.path.join("transcripts", CHANNEL_NAME) # Search transcript for keywords
os.makedirs(transcripts_dir, exist_ok=True)
transcript_filename = os.path.join(transcripts_dir, f"transcript_{vod_id}.json")
with open(transcript_filename, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=4)
print(f"Transcript saved to {transcript_filename}")
# Search for the keyword in the transcription
matches = search_transcription(result, SEARCH_KEYWORDS) matches = search_transcription(result, SEARCH_KEYWORDS)
if matches: if matches:
print(f"Found {len(matches)} mention(s) of '{SEARCH_KEYWORDS}' in VOD {vod_id}:") print(f"Found {len(matches)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod_id}:")
for match in matches: for match in matches:
start = match["start"] start = match["start"]
end = match["end"]
text = match["text"] text = match["text"]
print(f" - At {start:.2f}s to {end:.2f}s: {text}") print(f" - At {start:.2f}s: {text}")
create_clip_from_vod(video_filename, start, vod_id) create_clip_from_vod(video_filename, start, vod_id)
else: else:
print(f"No mentions of '{SEARCH_KEYWORDS}' found in VOD {vod_id}.") print(f"No mentions of {SEARCH_KEYWORDS} found in VOD {vod_id}.")
# keyword = "your_keyword_here" # Load chat log from file
matches = find_comments_by_keyword(chat_log_filename, "Madmonq") try:
with open(chat_log_filename, "r", encoding="utf-8") as f:
chat_log = json.load(f)
except Exception as e:
print(f"Error loading chat log: {e}")
chat_log = []
if matches: # Search chat log using an array of keywords (using the same keywords as for transcript)
for comment in matches: comment_matches = find_comments_by_keywords(chat_log, SEARCH_KEYWORDS)
# Use the content_offset_seconds from the comment as the timestamp. if comment_matches:
for comment in comment_matches:
timestamp = comment.get("content_offset_seconds") timestamp = comment.get("content_offset_seconds")
print(f"Found a matching comment at {timestamp} seconds.") print(f"Found a matching comment at {timestamp} seconds.")
create_clip_from_comment_timestamp(video_filename, timestamp, vod_id) create_clip_from_comment_timestamp(video_filename, timestamp, vod_id)