519 lines
18 KiB
Python
519 lines
18 KiB
Python
import os
|
|
import subprocess
|
|
import requests
|
|
import whisper
|
|
from faster_whisper import WhisperModel
|
|
from datetime import datetime, time, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
import json
|
|
import mariadb
|
|
|
|
# ---------------------------
|
|
# Configuration
|
|
# ---------------------------
|
|
CHANNELS_LANGUAGE = os.environ.get("CHANNELS_LANGUAGE", "")
|
|
TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "")
|
|
TIMEDELTA_DAYS = int(os.environ.get("TIMEDELTA_DAYS", "1"))
|
|
TIMEDELTA_DAYS_EXACT = os.environ.get("TIMEDELTA_DAYS_EXACT", "false").lower() in ("true", "1", "yes")
|
|
CLIP_CREATE_FROM_CHAT = os.environ.get("CLIP_CREATE_FROM_CHAT", "false").lower() in ("true", "1", "yes")
|
|
SEARCH_KEYWORDS = [
|
|
"madmonq",
|
|
"madmonge",
|
|
"madmong",
|
|
"medmong",
|
|
"medmonk",
|
|
"madmonk",
|
|
"mad monk",
|
|
"mad monq",
|
|
"mad-monq",
|
|
"mad-monk",
|
|
"madmonck",
|
|
"madmunk",
|
|
"madmon",
|
|
"madmonke",
|
|
"madmonque",
|
|
"matmonk",
|
|
"matt monk",
|
|
"mat monk",
|
|
"meth monk"
|
|
]
|
|
MODEL_NAME = "turbo" # Whisper model
|
|
|
|
|
|
channels_str = os.environ.get("CHANNELS_JSON", "[]")
|
|
try:
|
|
channels = json.loads(channels_str)
|
|
except json.JSONDecodeError:
|
|
raise ValueError("Invalid JSON in CHANNELS_JSON environment variable")
|
|
|
|
# ---------------------------
|
|
# Twitch API Helper Functions
|
|
# ---------------------------
|
|
def get_access_token():
|
|
url = "https://id.twitch.tv/oauth2/token"
|
|
payload = {
|
|
"client_id": TWITCH_CLIENT_ID,
|
|
"client_secret": TWITCH_CLIENT_SECRET,
|
|
"grant_type": "client_credentials"
|
|
}
|
|
response = requests.post(url, data=payload)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["access_token"]
|
|
|
|
def get_channel_id(channel_name, token):
|
|
headers = {
|
|
"Client-ID": TWITCH_CLIENT_ID,
|
|
"Authorization": f"Bearer {token}"
|
|
}
|
|
url = f"https://api.twitch.tv/helix/users?login={channel_name}"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if data.get("data"):
|
|
return data["data"][0]["id"]
|
|
else:
|
|
print("Channel not found.")
|
|
return None
|
|
|
|
def get_vods(channel_id, token):
|
|
headers = {
|
|
"Client-ID": TWITCH_CLIENT_ID,
|
|
"Authorization": f"Bearer {token}"
|
|
}
|
|
prague_tz = ZoneInfo("Europe/Prague")
|
|
today_prague = datetime.now(prague_tz).date()
|
|
|
|
# Define the search range based on TIMEDELTA_DAYS and TIMEDELTA_DAYS_EXACT
|
|
if TIMEDELTA_DAYS == 0:
|
|
# Only search for today
|
|
start_date = today_prague
|
|
end_date = today_prague
|
|
else:
|
|
if TIMEDELTA_DAYS_EXACT:
|
|
# Only search for the day exactly TIMEDELTA_DAYS ago
|
|
start_date = today_prague - timedelta(days=TIMEDELTA_DAYS)
|
|
end_date = start_date
|
|
else:
|
|
# Search from TIMEDELTA_DAYS ago up to yesterday
|
|
start_date = today_prague - timedelta(days=TIMEDELTA_DAYS)
|
|
end_date = today_prague - timedelta(days=1)
|
|
|
|
start_time = datetime.combine(start_date, time.min).replace(tzinfo=prague_tz)
|
|
end_time = datetime.combine(end_date, time.max).replace(tzinfo=prague_tz)
|
|
|
|
url = f"https://api.twitch.tv/helix/videos?user_id={channel_id}&type=archive&first=100"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
vods = []
|
|
for vod in response.json().get("data", []):
|
|
published_at = datetime.fromisoformat(vod["published_at"].replace("Z", "+00:00"))
|
|
published_at_prague = published_at.astimezone(prague_tz)
|
|
if start_time <= published_at_prague <= end_time:
|
|
vods.append(vod)
|
|
return vods
|
|
|
|
# ---------------------------
|
|
# VOD Processing Functions
|
|
# ---------------------------
|
|
def download_vod(vod_url, output_filename):
|
|
if os.path.exists(output_filename):
|
|
print(f"{output_filename} already exists. Skipping download.")
|
|
return
|
|
command = [
|
|
"yt-dlp",
|
|
"--cookies", "cookies.txt",
|
|
"-f", "worst",
|
|
"-o", output_filename,
|
|
vod_url
|
|
]
|
|
subprocess.run(command, check=True)
|
|
print(f"Downloaded VOD to {output_filename}")
|
|
|
|
def extract_audio(video_file, audio_file):
|
|
if os.path.exists(audio_file):
|
|
print(f"{audio_file} already exists. Skipping audio extraction.")
|
|
return
|
|
command = ["ffmpeg", "-i", video_file, "-vn", "-acodec", "mp3", audio_file, "-y"]
|
|
subprocess.run(command, check=True)
|
|
print(f"Extracted audio to {audio_file}")
|
|
|
|
def transcribe_audio(audio_file, model_name):
|
|
model = whisper.load_model(model_name, download_root="/app/models")
|
|
result = model.transcribe(audio_file, language=CHANNEL_LANGUAGE)
|
|
return result
|
|
|
|
def transcribe_audio_fast(audio_file, language, vod_id):
|
|
|
|
transcript_path = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json")
|
|
if os.path.exists(transcript_path):
|
|
print(f"faster_whisper -- Loading existing transcription for VOD {vod_id} from {transcript_path}")
|
|
with open(transcript_path, "r", encoding="utf-8") as f:
|
|
segments_data = json.load(f)
|
|
return segments_data
|
|
|
|
# Initialize the model and transcribe (passing language if provided)
|
|
model_fast = WhisperModel("large-v3-turbo", device="cuda", download_root="/app/models")
|
|
segments, info = model_fast.transcribe(audio_file, language=language)
|
|
print("faster_whisper -- Detected language '%s' with probability %f" % (info.language, info.language_probability))
|
|
|
|
# Build a list of dictionaries for the segments.
|
|
segments_data = []
|
|
for seg in segments:
|
|
segments_data.append({
|
|
"start": seg.start,
|
|
"end": seg.end,
|
|
"text": seg.text
|
|
})
|
|
|
|
with open(transcript_path, "w", encoding="utf-8") as f:
|
|
json.dump(segments_data, f, ensure_ascii=False, indent=4)
|
|
print(f"faster_whisper -- Saved transcription to {transcript_path}")
|
|
|
|
return segments_data
|
|
|
|
def search_transcription(result, keywords):
|
|
matches = []
|
|
if "segments" in result:
|
|
for segment in result["segments"]:
|
|
segment_text = segment["text"].lower()
|
|
for keyword in keywords:
|
|
if keyword.lower() in segment_text:
|
|
matches.append(segment)
|
|
break # Stop checking further keywords for this segment
|
|
return matches
|
|
|
|
def scrape_chat_log(vod_id, output_filename):
|
|
"""
|
|
Uses TwitchDownloaderCLI to download the chat log for a given VOD.
|
|
The chat log is saved in JSON format to output_filename.
|
|
"""
|
|
if os.path.exists(output_filename):
|
|
print(f"{output_filename} already exists. Skipping chat log scrape.")
|
|
return
|
|
|
|
# Build the TwitchDownloaderCLI command.
|
|
# The command downloads the chat log in JSON format for the specified VOD.
|
|
command = [
|
|
"TwitchDownloaderCLI", "chatdownload",
|
|
"--id", vod_id,
|
|
"--output", output_filename
|
|
]
|
|
|
|
try:
|
|
subprocess.run(command, check=True)
|
|
print(f"Chat log saved to {output_filename}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error downloading chat log for VOD {vod_id}: {e}")
|
|
|
|
def create_clip_from_vod(video_file, match_start, vod):
|
|
clip_start = max(match_start - 15, 0)
|
|
clip_duration = 60 # seconds
|
|
clip_dir = base_dirs["clips_transcript"]
|
|
|
|
vod_datetime = datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ')
|
|
date_folder = vod_datetime.strftime('%d-%m-%y')
|
|
|
|
# Create a subfolder inside clip_dir for the date.
|
|
clip_date_dir = os.path.join(clip_dir, date_folder)
|
|
os.makedirs(clip_date_dir, exist_ok=True)
|
|
|
|
# Build the clip filename inside the date folder.
|
|
clip_filename = os.path.join(clip_date_dir, f"clip_{vod['id']}_{int(match_start)}.mp4")
|
|
command = [
|
|
"ffmpeg",
|
|
"-ss", str(clip_start),
|
|
"-i", video_file,
|
|
"-t", str(clip_duration),
|
|
"-c", "copy",
|
|
clip_filename,
|
|
"-y"
|
|
]
|
|
subprocess.run(command, check=True)
|
|
print(f"Clip created: {clip_filename}")
|
|
return clip_filename
|
|
|
|
def find_comments_by_keywords(chat_log, keywords):
|
|
"""
|
|
Searches the chat log for any comments containing one of the given keywords.
|
|
Returns a list of matching comment objects.
|
|
"""
|
|
matching_comments = []
|
|
if isinstance(chat_log, dict) and "comments" in chat_log:
|
|
chat_log = chat_log["comments"]
|
|
|
|
for comment in chat_log:
|
|
if not isinstance(comment, dict):
|
|
continue
|
|
message_text = comment['message']['body'].lower()
|
|
for keyword in keywords:
|
|
if keyword.lower() in message_text:
|
|
matching_comments.append(comment)
|
|
break
|
|
return matching_comments
|
|
|
|
def create_clip_from_comment_timestamp(video_file, comment_timestamp, vod):
|
|
clip_start = max(comment_timestamp - 15, 0)
|
|
clip_duration = 60 # seconds
|
|
clip_dir = base_dirs["clips_chat"]
|
|
vod_datetime = datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ')
|
|
date_folder = vod_datetime.strftime('%d-%m-%y')
|
|
|
|
# Create a subfolder inside clip_dir for the date.
|
|
clip_date_dir = os.path.join(clip_dir, date_folder)
|
|
os.makedirs(clip_date_dir, exist_ok=True)
|
|
|
|
# Build the clip filename inside the date folder.
|
|
clip_filename = os.path.join(clip_date_dir, f"clip_{vod['id']}_{int(comment_timestamp)}.mp4")
|
|
command = [
|
|
"ffmpeg",
|
|
"-ss", str(clip_start),
|
|
"-i", video_file,
|
|
"-t", str(clip_duration),
|
|
"-c", "copy",
|
|
clip_filename,
|
|
"-y"
|
|
]
|
|
subprocess.run(command, check=True)
|
|
print(f"Clip created: {clip_filename}")
|
|
return clip_filename
|
|
|
|
|
|
def seconds_to_timestamp(seconds):
|
|
"""Convert seconds to HH:MM:SS format."""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
return f"{hours:02}:{minutes:02}:{secs:02}"
|
|
|
|
def download_vod_segment(vod, match_start, duration=60):
|
|
"""
|
|
Downloads a segment of a VOD using yt-dlp.
|
|
|
|
Parameters:
|
|
vod_url (str): The URL of the video.
|
|
output_filename (str): The desired output filename.
|
|
start_seconds (float): Start time in seconds (from faster-whisper).
|
|
duration (int): Duration of the segment in seconds (default 60 seconds).
|
|
"""
|
|
|
|
clip_start = max(match_start - 15, 0)
|
|
clip_dir = base_dirs["clips_transcript"]
|
|
|
|
vod_datetime = vod['external_date']
|
|
date_folder = vod_datetime.strftime('%d-%m-%y')
|
|
|
|
# Create a subfolder inside clip_dir for the date.
|
|
clip_date_dir = os.path.join(clip_dir, date_folder)
|
|
os.makedirs(clip_date_dir, exist_ok=True)
|
|
|
|
clip_filename = os.path.join(clip_date_dir, f"clip_{vod['external_id']}_{int(clip_start)}.mp4")
|
|
|
|
end_seconds = clip_start + duration
|
|
start_ts = seconds_to_timestamp(clip_start)
|
|
end_ts = seconds_to_timestamp(end_seconds)
|
|
|
|
# yt-dlp download sections format: "*HH:MM:SS-HH:MM:SS"
|
|
segment = f"*{start_ts}-{end_ts}"
|
|
command = [
|
|
"yt-dlp",
|
|
"--cookies", "cookies.txt",
|
|
"--download-sections", segment,
|
|
"-o", clip_filename,
|
|
vod["url"]
|
|
]
|
|
|
|
subprocess.run(command, check=True)
|
|
print(f"Downloaded segment from {start_ts} to {end_ts} into {clip_filename}")
|
|
|
|
# ---------------------------
|
|
# Main Processing Pipeline
|
|
# ---------------------------
|
|
def handle_matches_fast(vod, segments_data):
|
|
matches_fast = []
|
|
for segment in segments_data:
|
|
segment_text = segment["text"].lower()
|
|
for keyword in SEARCH_KEYWORDS:
|
|
if keyword.lower() in segment_text:
|
|
matches_fast.append(segment)
|
|
break
|
|
|
|
if matches_fast:
|
|
print(f"faster_whisper -- Found {len(matches_fast)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod['id']}:")
|
|
for match in matches_fast:
|
|
start = match["start"]
|
|
text = match["text"]
|
|
print(f" - At {start:.2f}s: {text}")
|
|
# create_clip_from_vod(video_filename, start, vod)
|
|
download_vod_segment(vod, start)
|
|
else:
|
|
print("faster_whisper -- No mentions of keywords.")
|
|
|
|
def handle_matches(vod, video_filename, result):
|
|
matches = search_transcription(result, SEARCH_KEYWORDS)
|
|
if matches:
|
|
print(f"Found {len(matches)} mention(s) of {SEARCH_KEYWORDS} in VOD {vod['id']}:")
|
|
for match in matches:
|
|
start = match["start"]
|
|
text = match["text"]
|
|
print(f" - At {start:.2f}s: {text}")
|
|
create_clip_from_vod(video_filename, start, vod)
|
|
else:
|
|
print(f"No mentions of {SEARCH_KEYWORDS} found in VOD {vod['id']}.")
|
|
|
|
def download_vod_audio(vod_url, output_filename):
|
|
if os.path.exists(output_filename):
|
|
print(f"{output_filename} already exists. Skipping download.")
|
|
return
|
|
command = [
|
|
"yt-dlp",
|
|
"--cookies", "cookies.txt",
|
|
"-f", "worst",
|
|
"--extract-audio",
|
|
"--audio-format", "mp3",
|
|
"-o", output_filename,
|
|
vod_url
|
|
]
|
|
subprocess.run(command, check=True)
|
|
print(f"Downloaded audio from VOD to {output_filename}")
|
|
|
|
def get_pending_videos(db):
|
|
"""
|
|
Retrieves videos that are not yet downloaded or processed.
|
|
Joins the channels table to also fetch the channel_name.
|
|
"""
|
|
cursor = db.cursor()
|
|
if CHANNELS_LANGUAGE == "other":
|
|
query = """
|
|
SELECT v.id, v.external_id, c.channel_name, v.url, v.length, v.external_date, c.language
|
|
FROM videos v
|
|
JOIN channels c ON v.channel_id = c.id
|
|
WHERE v.data_downloaded = 1 AND v.processed = 0 and c.language not in ('cs', 'en')
|
|
"""
|
|
else:
|
|
query = """
|
|
SELECT v.id, v.external_id, c.channel_name, v.url, v.length, v.external_date, c.language
|
|
FROM videos v
|
|
JOIN channels c ON v.channel_id = c.id
|
|
WHERE v.data_downloaded = 1 AND v.processed = 0 and c.language = %s
|
|
"""
|
|
cursor.execute(query, (CHANNELS_LANGUAGE))
|
|
columns = [col[0] for col in cursor.description]
|
|
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
|
|
cursor.close()
|
|
return results
|
|
|
|
|
|
def insert_transcription(db, video_id, filename):
|
|
"""
|
|
Inserts a new transcription record into the transcriptions table.
|
|
|
|
Parameters:
|
|
db: A MariaDB connection object.
|
|
video_id (int): The foreign key referencing the videos table.
|
|
filename (str): The transcription file name.
|
|
transcription_start (datetime, optional): The transcription start time. Defaults to now if None.
|
|
transcription_finish (datetime, optional): The transcription finish time. Defaults to None.
|
|
|
|
Returns:
|
|
int: The ID of the inserted transcription record.
|
|
"""
|
|
|
|
cursor = db.cursor()
|
|
query = """
|
|
INSERT INTO transcriptions (video_id, filename)
|
|
VALUES (%s, %s)
|
|
"""
|
|
cursor.execute(query, (video_id, filename))
|
|
db.commit()
|
|
inserted_id = cursor.lastrowid
|
|
cursor.close()
|
|
print(f"Inserted transcription for video_id {video_id} with filename '{filename}' (ID: {inserted_id})")
|
|
return inserted_id
|
|
|
|
def db_set_transcription_finish(db, video_id):
|
|
"""
|
|
Updates the specified column (e.g. data_downloaded) for the video.
|
|
Also updates the updated_at timestamp.
|
|
"""
|
|
cursor = db.cursor()
|
|
transcription_finish = datetime.now()
|
|
query = f"UPDATE transcriptions SET transcription_finish = %s WHERE id = %s"
|
|
cursor.execute(query, (transcription_finish, video_id))
|
|
db.commit()
|
|
cursor.close()
|
|
|
|
def db_set_video_processed(db, video_id):
|
|
"""
|
|
Updates the specified column (e.g. data_downloaded) for the video.
|
|
Also updates the updated_at timestamp.
|
|
"""
|
|
cursor = db.cursor()
|
|
query = f"UPDATE videos SET processed = %s WHERE id = %s"
|
|
cursor.execute(query, (True, video_id))
|
|
db.commit()
|
|
cursor.close()
|
|
|
|
def main():
|
|
|
|
try:
|
|
db = mariadb.connect(
|
|
host=os.environ.get("DB_HOST", "192.168.0.187"),
|
|
user=os.environ.get("DB_USER", "t0is"),
|
|
password=os.environ.get("DB_PASS", "Silenceisgolden555"),
|
|
database=os.environ.get("DB_NAME", "transcriptor"),
|
|
port=int(os.environ.get("DB_PORT", 3306))
|
|
)
|
|
except mariadb.Error as err:
|
|
print(f"Error connecting to MariaDB: {err}")
|
|
return
|
|
|
|
pending_videos = get_pending_videos(db)
|
|
if not pending_videos:
|
|
print("No pending videos to transcribe.")
|
|
db.close()
|
|
return
|
|
|
|
for video in pending_videos:
|
|
try:
|
|
video_id = video['id']
|
|
vod_url = video['url']
|
|
vod_id = video['external_id']
|
|
channel_name = video['channel_name']
|
|
channel_language = video['language']
|
|
print(f"Channel Name: {channel_name}, Language: {channel_language}, VOD: {vod_id}")
|
|
|
|
global base_dirs
|
|
base_dirs = {
|
|
"vods": os.path.join("vods", channel_name),
|
|
"audio": os.path.join("audio", channel_name),
|
|
"transcripts": os.path.join("transcripts", channel_name),
|
|
"chat": os.path.join("chat", channel_name),
|
|
"clips_transcript": os.path.join("clips", channel_name, "from_vod"),
|
|
"clips_chat": os.path.join("clips", channel_name, "from_chat")
|
|
}
|
|
|
|
for path in base_dirs.values():
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
video_filename = os.path.join(base_dirs["vods"], f"vod_{vod_id}.mp4")
|
|
audio_filename = os.path.join(base_dirs["audio"], f"vod_{vod_id}.mp3")
|
|
transcript_filename = os.path.join(base_dirs["transcripts"], f"transcript_{vod_id}.json")
|
|
chat_log_filename = os.path.join(base_dirs["chat"], f"chat_{vod_id}.json")
|
|
|
|
print(f"\nProcessing VOD: {vod_url}")
|
|
|
|
insert_transcription(db, video_id, transcript_filename)
|
|
|
|
print("Transcribing audio. This may take some time...")
|
|
# Pass language and vod_id so that the transcript is saved and reused if available.
|
|
segments_data = transcribe_audio_fast(audio_filename, language=channel_language, vod_id=vod_id)
|
|
|
|
handle_matches_fast(video, segments_data)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing video ID {video['id']}: {e}")
|
|
continue
|
|
|
|
if __name__ == "__main__":
|
|
main() |