mail_loader/main.py
2025-07-25 12:27:46 +02:00

229 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import base64
import re
from datetime import datetime
import logging
import sys
from collections import defaultdict
import mariadb
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# Configure logging for progress
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
# Gmail API scope
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
# Configuration from environment
token_path = os.environ.get('TOKEN_PATH', 'tokens/token_rcw.json')
output_dir = os.environ.get('OUTPUT_DIR', 'data')
support_address = os.environ.get('SUPPORT_ADDRESS', 'remote.control.world1@gmail.com')
raw_port = os.environ.get('DB_PORT')
try:
port = int(raw_port) if raw_port else 3306
except ValueError:
logger.warning(f"Invalid DB_PORT={raw_port!r}, defaulting to 3306")
port = 3306
# Database configuration from environment
db_config = {
'host': os.environ.get('DB_HOST'),
'port': port,
'user': os.environ.get('DB_USER'),
'password': os.environ.get('DB_PASSWORD'),
'database': os.environ.get('DB_NAME')
}
# Aliases considered as sent, loaded from env (comma-separated)
aliases_env = os.environ.get('SENDER_ALIASES', support_address.lower())
SENDER_ALIASES = [alias.strip().lower() for alias in aliases_env.split(',')]
class User:
"""
Represents a support user with ID, shortname, and signature patterns.
"""
def __init__(self, user_id: int, shortname: str, patterns: list):
self.id = user_id
self.shortname = shortname
self.patterns = patterns
# In-memory map of shortname -> User
USERS = {}
def get_gmail_service(token_path: str):
"""
Load OAuth credentials and handle refreshing.
Performs interactive auth only when no valid token/refresh available.
"""
creds = None
# Load existing tokens
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
# Refresh if expired
if creds and creds.expired and creds.refresh_token:
logger.info("Refreshing access token using refresh token...")
creds.refresh(Request())
# If no valid credentials, do full auth flow
if not creds or not creds.valid:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
auth_url, _ = flow.authorization_url(access_type='offline', prompt='consent')
logger.warning("Please open this URL in your browser:\n%s", auth_url)
sys.stdout.write("Enter the authorization code here: ")
sys.stdout.flush()
code = sys.stdin.readline().strip()
flow.fetch_token(code=code)
creds = flow.credentials
# Save for next time
with open(token_path, 'w') as token_file:
token_file.write(creds.to_json())
logger.info("Saved new token to %s", token_path)
# Build service
return build('gmail', 'v1', credentials=creds)
def extract_body(payload) -> str:
"""
Recursively find text/plain payload.
"""
if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'):
data = payload['body']['data']
return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
for part in payload.get('parts', []):
text = extract_body(part)
if text:
return text
return ''
def extract_author(body: str) -> User:
"""
Scan the body for each users signature patterns
and return the matching User, or None.
"""
lower = body.lower()
for user in USERS.values():
for pat in user.patterns:
if pat.lower() in lower:
return user
return None
def load_users_from_db(cursor):
"""
Populate USERS map by loading id, shortname, name_patterns JSON
from the users table.
"""
cursor.execute(
"SELECT id, shortname, name_patterns FROM users WHERE name_patterns IS NOT NULL"
)
for user_id, shortname, patterns_json in cursor.fetchall():
try:
patterns = json.loads(patterns_json)
USERS[shortname] = User(user_id, shortname, patterns)
except Exception:
logger.warning(f"Invalid JSON patterns for user '{shortname}': {patterns_json}")
def main():
os.makedirs(output_dir, exist_ok=True)
# Connect to MariaDB
try:
conn = mariadb.connect(**db_config)
cursor = conn.cursor()
logger.info("Connected to MariaDB")
except mariadb.Error as e:
logger.error(f"Error connecting to MariaDB: {e}")
sys.exit(1)
# Preload users into USERS
load_users_from_db(cursor)
logger.info("Loaded users from db")
logger.info(f"Starting fetch for mailbox: {support_address}")
# Prepare upsert, now including file_path
insert_email = ("INSERT INTO emails "
"(mail_id, thread_id, subj, source, date_sent, user_id, sent, file_path) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?) "
"ON DUPLICATE KEY UPDATE subj=VALUES(subj), updated_at=NOW(), file_path=VALUES(file_path)")
service = get_gmail_service(token_path)
monthly_emails = defaultdict(list)
next_page_token = None
page_count = 0
total_msgs = 0
while True:
page_count += 1
resp = service.users().messages().list(
userId='me', q='', pageToken=next_page_token, maxResults=500
).execute()
messages = resp.get('messages', [])
count = len(messages)
total_msgs += count
logger.info(f"Page {page_count}: fetched {count} messages (total so far: {total_msgs})")
if not messages:
break
for meta in messages:
msg = service.users().messages().get(
userId='me', id=meta['id'], format='full'
).execute()
headers = {h['name']: h['value'] for h in msg['payload'].get('headers', [])}
body = extract_body(msg['payload'])
user = extract_author(body)
dt = datetime.fromtimestamp(int(msg['internalDate']) / 1000)
year, month = dt.strftime('%Y'), dt.strftime('%m')
file_path = os.path.join(year, month, 'emails.json')
sent_flag = any(alias in headers.get('From', '').lower() for alias in SENDER_ALIASES)
user_id = user.id if user else None
user_name = user.shortname if user else None
cursor.execute(insert_email, (
msg['id'],
msg.get('threadId'),
headers.get('Subject'),
support_address,
dt,
user_id,
sent_flag,
file_path
))
monthly_emails[(year, month)].append({
'id': msg['id'], 'threadId': msg.get('threadId'), 'source': support_address,
'from': headers.get('From'), 'to': headers.get('To'), 'date': headers.get('Date'),
'subject': headers.get('Subject'), 'internalDate': msg.get('internalDate'),
'body': body, 'author': user_name, 'sent': sent_flag
})
conn.commit()
next_page_token = resp.get('nextPageToken')
if not next_page_token:
break
logger.info(f"Processed and stored {total_msgs} messages")
# Write monthly files
for (yr, mo), emails in monthly_emails.items():
path = os.path.join(output_dir, yr, mo)
os.makedirs(path, exist_ok=True)
with open(os.path.join(path, 'emails.json'), 'w', encoding='utf-8') as f:
json.dump(emails, f, ensure_ascii=False, indent=2)
logger.info(f"Wrote {len(emails)} emails to {yr}/{mo}/emails.json")
cursor.close()
conn.close()
if __name__ == '__main__':
main()