import os import json import base64 import re from datetime import datetime import logging import sys from collections import defaultdict import mariadb from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # Configure logging for progress logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) # Gmail API scope SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # Configuration from environment token_path = os.environ.get('TOKEN_PATH', 'tokens/token_rcw.json') output_dir = os.environ.get('OUTPUT_DIR', 'data') support_address = os.environ.get('SUPPORT_ADDRESS', 'remote.control.world1@gmail.com') raw_port = os.environ.get('DB_PORT') try: port = int(raw_port) if raw_port else 3306 except ValueError: logger.warning(f"Invalid DB_PORT={raw_port!r}, defaulting to 3306") port = 3306 # Database configuration from environment db_config = { 'host': os.environ.get('DB_HOST'), 'port': port, 'user': os.environ.get('DB_USER'), 'password': os.environ.get('DB_PASSWORD'), 'database': os.environ.get('DB_NAME') } # Aliases considered as sent, loaded from env (comma-separated) aliases_env = os.environ.get('SENDER_ALIASES', support_address.lower()) SENDER_ALIASES = [alias.strip().lower() for alias in aliases_env.split(',')] class User: """ Represents a support user with ID, shortname, and signature patterns. """ def __init__(self, user_id: int, shortname: str, patterns: list): self.id = user_id self.shortname = shortname self.patterns = patterns # In-memory map of shortname -> User USERS = {} def get_gmail_service(token_path: str): """ Load OAuth credentials and handle refreshing. Performs interactive auth only when no valid token/refresh available. """ creds = None # Load existing tokens if os.path.exists(token_path): creds = Credentials.from_authorized_user_file(token_path, SCOPES) # Refresh if expired if creds and creds.expired and creds.refresh_token: logger.info("Refreshing access token using refresh token...") creds.refresh(Request()) # If no valid credentials, do full auth flow if not creds or not creds.valid: flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) auth_url, _ = flow.authorization_url(access_type='offline', prompt='consent') logger.warning("Please open this URL in your browser:\n%s", auth_url) sys.stdout.write("Enter the authorization code here: ") sys.stdout.flush() code = sys.stdin.readline().strip() flow.fetch_token(code=code) creds = flow.credentials # Save for next time with open(token_path, 'w') as token_file: token_file.write(creds.to_json()) logger.info("Saved new token to %s", token_path) # Build service return build('gmail', 'v1', credentials=creds) def extract_body(payload) -> str: """ Recursively find text/plain payload. """ if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'): data = payload['body']['data'] return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore') for part in payload.get('parts', []): text = extract_body(part) if text: return text return '' def extract_author(body: str) -> User: """ Scan the body for each user’s signature patterns and return the matching User, or None. """ lower = body.lower() for user in USERS.values(): for pat in user.patterns: if pat.lower() in lower: return user return None def load_users_from_db(cursor): """ Populate USERS map by loading id, shortname, name_patterns JSON from the users table. """ cursor.execute( "SELECT id, shortname, name_patterns FROM users WHERE name_patterns IS NOT NULL" ) for user_id, shortname, patterns_json in cursor.fetchall(): try: patterns = json.loads(patterns_json) USERS[shortname] = User(user_id, shortname, patterns) except Exception: logger.warning(f"Invalid JSON patterns for user '{shortname}': {patterns_json}") def main(): os.makedirs(output_dir, exist_ok=True) # Connect to MariaDB try: conn = mariadb.connect(**db_config) cursor = conn.cursor() logger.info("Connected to MariaDB") except mariadb.Error as e: logger.error(f"Error connecting to MariaDB: {e}") sys.exit(1) # Preload users into USERS load_users_from_db(cursor) logger.info("Loaded users from db") logger.info(f"Starting fetch for mailbox: {support_address}") # Prepare upsert, now including file_path insert_email = ("INSERT INTO emails " "(mail_id, thread_id, subj, source, date_sent, user_id, sent, file_path) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?) " "ON DUPLICATE KEY UPDATE subj=VALUES(subj), updated_at=NOW(), file_path=VALUES(file_path)") service = get_gmail_service(token_path) monthly_emails = defaultdict(list) next_page_token = None page_count = 0 total_msgs = 0 while True: page_count += 1 resp = service.users().messages().list( userId='me', q='', pageToken=next_page_token, maxResults=500 ).execute() messages = resp.get('messages', []) count = len(messages) total_msgs += count logger.info(f"Page {page_count}: fetched {count} messages (total so far: {total_msgs})") if not messages: break for meta in messages: msg = service.users().messages().get( userId='me', id=meta['id'], format='full' ).execute() headers = {h['name']: h['value'] for h in msg['payload'].get('headers', [])} body = extract_body(msg['payload']) user = extract_author(body) dt = datetime.fromtimestamp(int(msg['internalDate']) / 1000) year, month = dt.strftime('%Y'), dt.strftime('%m') file_path = os.path.join(year, month, 'emails.json') sent_flag = any(alias in headers.get('From', '').lower() for alias in SENDER_ALIASES) user_id = user.id if user else None user_name = user.shortname if user else None cursor.execute(insert_email, ( msg['id'], msg.get('threadId'), headers.get('Subject'), support_address, dt, user_id, sent_flag, file_path )) monthly_emails[(year, month)].append({ 'id': msg['id'], 'threadId': msg.get('threadId'), 'source': support_address, 'from': headers.get('From'), 'to': headers.get('To'), 'date': headers.get('Date'), 'subject': headers.get('Subject'), 'internalDate': msg.get('internalDate'), 'body': body, 'author': user_name, 'sent': sent_flag }) conn.commit() next_page_token = resp.get('nextPageToken') if not next_page_token: break logger.info(f"Processed and stored {total_msgs} messages") # Write monthly files for (yr, mo), emails in monthly_emails.items(): path = os.path.join(output_dir, yr, mo) os.makedirs(path, exist_ok=True) with open(os.path.join(path, 'emails.json'), 'w', encoding='utf-8') as f: json.dump(emails, f, ensure_ascii=False, indent=2) logger.info(f"Wrote {len(emails)} emails to {yr}/{mo}/emails.json") cursor.close() conn.close() if __name__ == '__main__': main()