#!/usr/bin/env python3 """ download_emails - Uses Gmail API to fetch all messages delivered to a single support address. - Pulls configured Gmail signatures (in any language) via Settings API. - Parses out headers, body, and matches against signatures or falls back to regex or manual name mapping. - Tally per-day sent vs. received counts and per-author sent email counts. - Writes each month's emails into JSON files under OUTPUT_DIR/YYYY/MM/emails.json. - Writes overall stats into OUTPUT_DIR as JSON files. - Outputs progress logs to stdout for Docker visibility. - (Commented-out) Stubs for loading the daily/author stats into MariaDB. """ import os import json import base64 import re from datetime import datetime from collections import defaultdict import logging import sys from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # Configure logging for progress logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) # Gmail API scope SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # Configuration from environment SUPPORT_ADDRESS = os.environ.get('SUPPORT_ADDRESS', 'remote.control.world1@gmail.com') TOKEN_PATH = os.environ.get('TOKEN_PATH', 'tokens/test.json') OUTPUT_DIR = os.environ.get('OUTPUT_DIR', 'data') # Aliases considered as sent SENDER_ALIASES = [SUPPORT_ADDRESS.lower(), 'orders@remote-control-world.eu'] # Manual name mapping if signatures not found NAME_PATTERNS = { 'Josefina': ['Josefína Bartková'], 'Ondra': ['Kateřina Kulhánková', 'Kateřina Kulhánkova', 'Ondrej', 'Ondřej'], 'Honza': ['Jan Klus'], 'Halina': ['Halina Kutláková'], 'Helena': ['Helena Urbášková'], 'AdamH': ['Adam Holuša'], 'AdamK': ['Adam Kulhánek'], 'KubaS': ['Jakub Sopuch'], 'LukasZ': ['Lukáš Zdražila'], 'Breta': ['Břetislav Střaslička'], 'TerkaP': ['Tereza Pěchulová'], 'Marketa': ['Marketa', 'Markéta'], } def get_gmail_service(token_path: str): """ Load OAuth credentials and handle refreshing. Performs interactive auth only when no valid token/refresh available. """ creds = None # Load existing tokens if os.path.exists(token_path): creds = Credentials.from_authorized_user_file(token_path, SCOPES) # Refresh if expired if creds and creds.expired and creds.refresh_token: logger.info("Refreshing access token using refresh token...") creds.refresh(Request()) # If no valid credentials, do full auth flow if not creds or not creds.valid: flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) auth_url, _ = flow.authorization_url(access_type='offline', prompt='consent') logger.warning("Please open this URL in your browser:\n%s", auth_url) sys.stdout.write("Enter the authorization code here: ") sys.stdout.flush() code = sys.stdin.readline().strip() flow.fetch_token(code=code) creds = flow.credentials # Save for next time with open(token_path, 'w') as token_file: token_file.write(creds.to_json()) logger.info("Saved new token to %s", token_path) # Build service return build('gmail', 'v1', credentials=creds) def extract_body(payload) -> str: """ Recursively find text/plain payload. """ if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'): data = payload['body']['data'] return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore') for part in payload.get('parts', []): text = extract_body(part) if text: return text return '' def load_signatures(service): """ Fetch configured Gmail signatures. """ sigs = [] resp = service.users().settings().sendAs().list(userId='me').execute() for entry in resp.get('sendAs', []): html = entry.get('signature') or '' text = re.sub(r'<[^>]+>', '', html).strip() if text: sigs.append({ 'email': entry['sendAsEmail'], 'name': entry.get('displayName') or entry['sendAsEmail'], 'signature': text }) logger.info(f"Loaded {len(sigs)} configured signatures from Gmail settings") return sigs def extract_author(body: str, signatures: list) -> str: """ Identify author by: 1) Matching configured signature blocks 2) Manual name mapping 3) Regex fallback for common sign-offs """ # 1) Signature blocks for s in signatures: sig = s.get('signature') if sig and sig in body: return s['name'] # 2) Manual name patterns for name, patterns in NAME_PATTERNS.items(): for pat in patterns: if pat in body: return name # 3) Regex fallback match = re.search( r'(?im)(?:Podpis|S pozdravem|Díky|Thanks|Regards|Best regards|Sincerely)[\s,]*\r?\n([^\r\n]{2,})', body ) if match: return match.group(1).strip() return None def main(): os.makedirs(OUTPUT_DIR, exist_ok=True) service = get_gmail_service(TOKEN_PATH) signatures = load_signatures(service) monthly_emails = defaultdict(list) daily_counts = defaultdict(lambda: {'sent': 0, 'received': 0}) author_counts = defaultdict(int) logger.info(f"Starting fetch for mailbox: {SUPPORT_ADDRESS}") next_page_token = None page_count = 0 total_msgs = 0 while True: page_count += 1 resp = service.users().messages().list( userId='me', q='before:2025-03-01', pageToken=next_page_token, maxResults=500 ).execute() messages = resp.get('messages', []) count = len(messages) total_msgs += count logger.info(f"Page {page_count}: fetched {count} messages (total so far: {total_msgs})") if not messages: break for meta in messages: msg = service.users().messages().get( userId='me', id=meta['id'], format='full' ).execute() headers = {h['name']: h['value'] for h in msg['payload'].get('headers', [])} body = extract_body(msg['payload']) author = extract_author(body, signatures) dt = datetime.fromtimestamp(int(msg['internalDate']) / 1000) year, month, day = dt.strftime('%Y'), dt.strftime('%m'), dt.strftime('%Y-%m-%d') from_hdr = headers.get('From', '').lower() is_sent = any(alias in from_hdr for alias in SENDER_ALIASES) if is_sent: daily_counts[day]['sent'] += 1 if author: author_counts[author] += 1 else: daily_counts[day]['received'] += 1 monthly_emails[(year, month)].append({ 'id': msg['id'], 'threadId': msg.get('threadId'), 'source': SUPPORT_ADDRESS, 'from': headers.get('From'), 'to': headers.get('To'), 'date': headers.get('Date'), 'subject': headers.get('Subject'), 'internalDate': msg.get('internalDate'), 'body': body, 'author': author, 'sent': is_sent }) next_page_token = resp.get('nextPageToken') if not next_page_token: break logger.info(f"Fetch complete: total messages retrieved: {total_msgs}") # Write monthly files for (yr, mo), emails in monthly_emails.items(): path = os.path.join(OUTPUT_DIR, yr, mo) os.makedirs(path, exist_ok=True) with open(os.path.join(path, 'emails.json'), 'w', encoding='utf-8') as f: json.dump(emails, f, ensure_ascii=False, indent=2) logger.info(f"Wrote {len(emails)} emails to {yr}/{mo}/emails.json") # Write stats with open(os.path.join(OUTPUT_DIR, 'daily_counts.json'), 'w') as f: json.dump(daily_counts, f, indent=2) with open(os.path.join(OUTPUT_DIR, 'author_counts.json'), 'w') as f: json.dump(author_counts, f, indent=2) logger.info(f"Processed {total_msgs} messages into {len(monthly_emails)} month folders under {OUTPUT_DIR}") # MariaDB stubs commented out if __name__ == '__main__': main()