285 lines
10 KiB
Python
285 lines
10 KiB
Python
import os
|
|
import subprocess
|
|
import requests
|
|
import whisper
|
|
from datetime import datetime, time, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
import json
|
|
|
|
# ---------------------------
|
|
# Configuration
|
|
# ---------------------------
|
|
TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "")
|
|
TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "")
|
|
CHANNEL_NAME = os.environ.get("CHANNEL_NAME", "madmonq")
|
|
CHANNEL_LANGUAGE = os.environ.get("CHANNEL_LANGUAGE", "en")
|
|
SEARCH_KEYWORDS = ["madmonq", "madmonge", "madmong", "medmong", "medmonk", "madmonk"]
|
|
MODEL_NAME = "turbo" # Whisper model
|
|
|
|
# Define base directories for each file category under a folder named after the channel.
|
|
base_dirs = {
|
|
"vods": os.path.join("vods", CHANNEL_NAME),
|
|
"audio": os.path.join("audio", CHANNEL_NAME),
|
|
"transcripts": os.path.join("transcripts", CHANNEL_NAME),
|
|
"chat": os.path.join("chat", CHANNEL_NAME),
|
|
"clips": os.path.join("clips", CHANNEL_NAME)
|
|
}
|
|
|
|
# Create directories if they do not exist.
|
|
for path in base_dirs.values():
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
# ---------------------------
|
|
# Twitch API Helper Functions
|
|
# ---------------------------
|
|
def get_access_token():
|
|
url = "https://id.twitch.tv/oauth2/token"
|
|
payload = {
|
|
"client_id": TWITCH_CLIENT_ID,
|
|
"client_secret": TWITCH_CLIENT_SECRET,
|
|
"grant_type": "client_credentials"
|
|
}
|
|
response = requests.post(url, data=payload)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["access_token"]
|
|
|
|
def get_channel_id(channel_name, token):
|
|
headers = {
|
|
"Client-ID": TWITCH_CLIENT_ID,
|
|
"Authorization": f"Bearer {token}"
|
|
}
|
|
url = f"https://api.twitch.tv/helix/users?login={channel_name}"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if data.get("data"):
|
|
return data["data"][0]["id"]
|
|
else:
|
|
print("Channel not found.")
|
|
return None
|
|
|
|
def get_vods_from_yesterday(channel_id, token):
|
|
headers = {
|
|
"Client-ID": TWITCH_CLIENT_ID,
|
|
"Authorization": f"Bearer {token}"
|
|
}
|
|
prague_tz = ZoneInfo("Europe/Prague")
|
|
today_prague = datetime.now(prague_tz).date()
|
|
yesterday = today_prague - timedelta(days=1) # Change days as needed
|
|
start_time = datetime.combine(yesterday, time.min).replace(tzinfo=prague_tz)
|
|
end_time = datetime.combine(yesterday, time.max).replace(tzinfo=prague_tz)
|
|
|
|
url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
vods = []
|
|
for vod in response.json().get("data", []):
|
|
published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00"))
|
|
published_at_prague = published_at.astimezone(prague_tz)
|
|
if start_time <= published_at_prague <= end_time:
|
|
vods.append(vod)
|
|
return vods
|
|
|
|
# ---------------------------
|
|
# VOD Processing Functions
|
|
# ---------------------------
|
|
def download_vod(vod_url, output_filename):
|
|
if os.path.exists(output_filename):
|
|
print(f"{output_filename} already exists. Skipping download.")
|
|
return
|
|
command = ["yt-dlp", "-o", output_filename, vod_url]
|
|
subprocess.run(command, check=True)
|
|
print(f"Downloaded VOD to {output_filename}")
|
|
|
|
def extract_audio(video_file, audio_file):
|
|
if os.path.exists(audio_file):
|
|
print(f"{audio_file} already exists. Skipping audio extraction.")
|
|
return
|
|
command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"]
|
|
subprocess.run(command, check=True)
|
|
print(f"Extracted audio to {audio_file}")
|
|
|
|
def transcribe_audio(audio_file, model_name):
|
|
model = whisper.load_model(model_name, download_root="/app/models")
|
|
result = model.transcribe(audio_file, language=CHANNEL_LANGUAGE)
|
|
return result
|
|
|
|
def search_transcription(result, keywords):
|
|
matches = []
|
|
if "segments" in result:
|
|
for segment in result["segments"]:
|
|
segment_text = segment["text"].lower()
|
|
for keyword in keywords:
|
|
if keyword.lower() in segment_text:
|
|
matches.append(segment)
|
|
break # Stop checking further keywords for this segment
|
|
return matches
|
|
|
|
|
|
def scrape_chat_log(vod_id, output_filename):
|
|
"""
|
|
Uses TwitchDownloaderCLI to download the chat log for a given VOD.
|
|
The chat log is saved in JSON format to output_filename.
|
|
"""
|
|
if os.path.exists(output_filename):
|
|
print(f"{output_filename} already exists. Skipping chat log scrape.")
|
|
return
|
|
|
|
# Build the TwitchDownloaderCLI command.
|
|
# The command downloads the chat log in JSON format for the specified VOD.
|
|
command = [
|
|
"TwitchDownloaderCLI", "chatdownload",
|
|
"--id", vod_id,
|
|
"--output", output_filename
|
|
]
|
|
|
|
try:
|
|
subprocess.run(command, check=True)
|
|
print(f"Chat log saved to {output_filename}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error downloading chat log for VOD {vod_id}: {e}")
|
|
|
|
def create_clip_from_vod(video_file, match_start, vod_id):
|
|
clip_start = max(match_start - 15, 0)
|
|
clip_duration = 60 # seconds
|
|
clip_dir = base_dirs["clips"]
|
|
os.makedirs(clip_dir, exist_ok=True)
|
|
clip_filename = os.path.join(clip_dir, f"clip_{vod_id}_{int(match_start)}.mp4")
|
|
command = [
|
|
"ffmpeg",
|
|
"-ss", str(clip_start),
|
|
"-i", video_file,
|
|
"-t", str(clip_duration),
|
|
"-c", "copy",
|
|
clip_filename,
|
|
"-y"
|
|
]
|
|
subprocess.run(command, check=True)
|
|
print(f"Clip created: {clip_filename}")
|
|
return clip_filename
|
|
|
|
def find_comments_by_keywords(chat_log, keywords):
|
|
"""
|
|
Searches the chat log for any comments containing one of the given keywords.
|
|
The chat log can be either:
|
|
- a raw list of comment objects, or
|
|
- an object with a "comments" key containing the list.
|
|
Each comment is expected to have:
|
|
- a "message" key with the comment text (as a string)
|
|
- an "offset" key (or fallback to "content_offset_seconds") for the timestamp.
|
|
Returns a list of matching comment objects.
|
|
"""
|
|
matching_comments = []
|
|
# If the chat log is wrapped in an object, extract the list.
|
|
if isinstance(chat_log, dict) and "comments" in chat_log:
|
|
chat_log = chat_log["comments"]
|
|
|
|
for comment in chat_log:
|
|
if not isinstance(comment, dict):
|
|
continue
|
|
# Get the message text; TwitchDownloaderCLI outputs it as a string in "message"
|
|
message_text = comment['message']['body'].lower()
|
|
for keyword in keywords:
|
|
if keyword.lower() in message_text:
|
|
matching_comments.append(comment)
|
|
break # No need to check further keywords for this comment.
|
|
return matching_comments
|
|
|
|
def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod_id):
|
|
clip_start = max(comment_timestamp - 15, 0)
|
|
clip_duration = 60 # seconds
|
|
clip_dir = base_dirs["clips"]
|
|
os.makedirs(clip_dir, exist_ok=True)
|
|
clip_filename = os.path.join(clip_dir, f"clip_{vod_id}_{int(comment_timestamp)}.mp4")
|
|
command = [
|
|
"ffmpeg",
|
|
"-ss", str(clip_start),
|
|
"-i", video_file,
|
|
"-t", str(clip_duration),
|
|
"-c", "copy",
|
|
clip_filename,
|
|
"-y"
|
|
]
|
|
subprocess.run(command, check=True)
|
|
print(f"Clip created: {clip_filename}")
|
|
return clip_filename
|
|
|
|
# ---------------------------
|
|
# Main Processing Pipeline
|
|
# ---------------------------
|
|
def main():
|
|
print("Obtaining access token...")
|
|
token = get_access_token()
|
|
print("Access token obtained.")
|
|
|
|
channel_id = get_channel_id(CHANNEL_NAME, token)
|
|
if not channel_id:
|
|
return
|
|
|
|
vods = get_vods_from_yesterday(channel_id, token)
|
|
if not vods:
|
|
print("No VODs from yesterday found.")
|
|
return
|
|
|
|
for vod in vods:
|
|
vod_url = vod["url"]
|
|
vod_id = vod["id"]
|
|
|
|
# Define file paths in the respective directories
|
|
video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4")
|
|
audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3")
|
|
transcript_filename = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json")
|
|
chat_log_filename = os.path.join(base_dirs["chat"], f"chat_{vod_id}.json")
|
|
|
|
print(f"\nProcessing VOD: {vod_url}")
|
|
download_vod(vod_url, video_filename)
|
|
extract_audio(video_filename, audio_filename)
|
|
|
|
# Check if transcript already exists; if yes, load it, otherwise transcribe and save.
|
|
if os.path.exists(transcript_filename):
|
|
print(f"{transcript_filename} already exists. Skipping transcription.")
|
|
with open(transcript_filename, "r", encoding="utf-8") as f:
|
|
result = json.load(f)
|
|
else:
|
|
print("Transcribing audio. This may take some time...")
|
|
result = transcribe_audio(audio_filename, MODEL_NAME)
|
|
with open(transcript_filename, "w", encoding="utf-8") as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=4)
|
|
print(f"Transcript saved to {transcript_filename}")
|
|
|
|
scrape_chat_log(vod_id, chat_log_filename)
|
|
|
|
# Search transcript for keywords
|
|
matches = search_transcription(result, SEARCH_KEYWORDS)
|
|
if matches:
|
|
print(f"Found {len(matches)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod_id}:")
|
|
for match in matches:
|
|
start = match["start"]
|
|
text = match["text"]
|
|
print(f" - At {start:.2f}s: {text}")
|
|
create_clip_from_vod(video_filename, start, vod_id)
|
|
else:
|
|
print(f"No mentions of {SEARCH_KEYWORDS} found in VOD {vod_id}.")
|
|
|
|
# Load chat log from file
|
|
try:
|
|
with open(chat_log_filename, "r", encoding="utf-8") as f:
|
|
chat_log = json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading chat log: {e}")
|
|
chat_log = []
|
|
|
|
# Search chat log using an array of keywords (using the same keywords as for transcript)
|
|
comment_matches = find_comments_by_keywords(chat_log, SEARCH_KEYWORDS)
|
|
if comment_matches:
|
|
for comment in comment_matches:
|
|
# Try to get the timestamp from the "offset" field (or fallback to "content_offset_seconds")
|
|
timestamp = comment["content_offset_seconds"]
|
|
print(f"Found a matching comment at {timestamp} seconds.")
|
|
create_clip_from_comment_timestamp(video_filename, timestamp, vod_id)
|
|
else:
|
|
print("No matching comments found.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |