mail_loader/main.py
2025-07-21 15:11:08 +02:00

231 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
download_emails
- Uses Gmail API to fetch all messages delivered to a single support address.
- Pulls configured Gmail signatures (in any language) via Settings API.
- Parses out headers, body, and matches against signatures or falls back to regex or manual name mapping.
- Tally per-day sent vs. received counts and per-author sent email counts.
- Writes each month's emails into JSON files under OUTPUT_DIR/YYYY/MM/emails.json.
- Writes overall stats into OUTPUT_DIR as JSON files.
- Outputs progress logs to stdout for Docker visibility.
- (Commented-out) Stubs for loading the daily/author stats into MariaDB.
"""
import os
import json
import base64
import re
from datetime import datetime
from collections import defaultdict
import logging
import sys
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# Configure logging for progress
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
# Gmail API scope
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
# Configuration from environment
SUPPORT_ADDRESS = os.environ.get('SUPPORT_ADDRESS', 'remote.control.world1@gmail.com')
TOKEN_PATH = os.environ.get('TOKEN_PATH', 'tokens/test.json')
OUTPUT_DIR = os.environ.get('OUTPUT_DIR', 'data')
# Aliases considered as sent
SENDER_ALIASES = [SUPPORT_ADDRESS.lower(), 'orders@remote-control-world.eu']
# Manual name mapping if signatures not found
NAME_PATTERNS = {
'Josefina': ['Josefína Bartková'],
'Ondra': ['Kateřina Kulhánková', 'Kateřina Kulhánkova', 'Ondrej', 'Ondřej'],
'Honza': ['Jan Klus'],
'Halina': ['Halina Kutláková'],
'Helena': ['Helena Urbášková'],
'AdamH': ['Adam Holuša'],
'AdamK': ['Adam Kulhánek'],
'KubaS': ['Jakub Sopuch'],
'LukasZ': ['Lukáš Zdražila'],
'Breta': ['Břetislav Střaslička'],
'TerkaP': ['Tereza Pěchulová'],
'Marketa': ['Marketa', 'Markéta'],
}
def get_gmail_service(token_path: str):
"""
Load OAuth credentials and handle refreshing.
Performs interactive auth only when no valid token/refresh available.
"""
creds = None
# Load existing tokens
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
# Refresh if expired
if creds and creds.expired and creds.refresh_token:
logger.info("Refreshing access token using refresh token...")
creds.refresh(Request())
# If no valid credentials, do full auth flow
if not creds or not creds.valid:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
auth_url, _ = flow.authorization_url(access_type='offline', prompt='consent')
logger.warning("Please open this URL in your browser:\n%s", auth_url)
sys.stdout.write("Enter the authorization code here: ")
sys.stdout.flush()
code = sys.stdin.readline().strip()
flow.fetch_token(code=code)
creds = flow.credentials
# Save for next time
with open(token_path, 'w') as token_file:
token_file.write(creds.to_json())
logger.info("Saved new token to %s", token_path)
# Build service
return build('gmail', 'v1', credentials=creds)
def extract_body(payload) -> str:
"""
Recursively find text/plain payload.
"""
if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'):
data = payload['body']['data']
return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
for part in payload.get('parts', []):
text = extract_body(part)
if text:
return text
return ''
def load_signatures(service):
"""
Fetch configured Gmail signatures.
"""
sigs = []
resp = service.users().settings().sendAs().list(userId='me').execute()
for entry in resp.get('sendAs', []):
html = entry.get('signature') or ''
text = re.sub(r'<[^>]+>', '', html).strip()
if text:
sigs.append({
'email': entry['sendAsEmail'],
'name': entry.get('displayName') or entry['sendAsEmail'],
'signature': text
})
logger.info(f"Loaded {len(sigs)} configured signatures from Gmail settings")
return sigs
def extract_author(body: str, signatures: list) -> str:
"""
Identify author by:
1) Matching configured signature blocks
2) Manual name mapping
3) Regex fallback for common sign-offs
"""
# 1) Signature blocks
for s in signatures:
sig = s.get('signature')
if sig and sig in body:
return s['name']
# 2) Manual name patterns
for name, patterns in NAME_PATTERNS.items():
for pat in patterns:
if pat in body:
return name
# 3) Regex fallback
match = re.search(
r'(?im)(?:Podpis|S pozdravem|Díky|Thanks|Regards|Best regards|Sincerely)[\s,]*\r?\n([^\r\n]{2,})',
body
)
if match:
return match.group(1).strip()
return None
def main():
os.makedirs(OUTPUT_DIR, exist_ok=True)
service = get_gmail_service(TOKEN_PATH)
signatures = load_signatures(service)
monthly_emails = defaultdict(list)
daily_counts = defaultdict(lambda: {'sent': 0, 'received': 0})
author_counts = defaultdict(int)
logger.info(f"Starting fetch for mailbox: {SUPPORT_ADDRESS}")
next_page_token = None
page_count = 0
total_msgs = 0
while True:
page_count += 1
resp = service.users().messages().list(
userId='me', q='before:2025-03-01', pageToken=next_page_token, maxResults=500
).execute()
messages = resp.get('messages', [])
count = len(messages)
total_msgs += count
logger.info(f"Page {page_count}: fetched {count} messages (total so far: {total_msgs})")
if not messages:
break
for meta in messages:
msg = service.users().messages().get(
userId='me', id=meta['id'], format='full'
).execute()
headers = {h['name']: h['value'] for h in msg['payload'].get('headers', [])}
body = extract_body(msg['payload'])
author = extract_author(body, signatures)
dt = datetime.fromtimestamp(int(msg['internalDate']) / 1000)
year, month, day = dt.strftime('%Y'), dt.strftime('%m'), dt.strftime('%Y-%m-%d')
from_hdr = headers.get('From', '').lower()
is_sent = any(alias in from_hdr for alias in SENDER_ALIASES)
if is_sent:
daily_counts[day]['sent'] += 1
if author:
author_counts[author] += 1
else:
daily_counts[day]['received'] += 1
monthly_emails[(year, month)].append({
'id': msg['id'], 'threadId': msg.get('threadId'), 'source': SUPPORT_ADDRESS,
'from': headers.get('From'), 'to': headers.get('To'), 'date': headers.get('Date'),
'subject': headers.get('Subject'), 'internalDate': msg.get('internalDate'),
'body': body, 'author': author, 'sent': is_sent
})
next_page_token = resp.get('nextPageToken')
if not next_page_token:
break
logger.info(f"Fetch complete: total messages retrieved: {total_msgs}")
# Write monthly files
for (yr, mo), emails in monthly_emails.items():
path = os.path.join(OUTPUT_DIR, yr, mo)
os.makedirs(path, exist_ok=True)
with open(os.path.join(path, 'emails.json'), 'w', encoding='utf-8') as f:
json.dump(emails, f, ensure_ascii=False, indent=2)
logger.info(f"Wrote {len(emails)} emails to {yr}/{mo}/emails.json")
# Write stats
with open(os.path.join(OUTPUT_DIR, 'daily_counts.json'), 'w') as f:
json.dump(daily_counts, f, indent=2)
with open(os.path.join(OUTPUT_DIR, 'author_counts.json'), 'w') as f:
json.dump(author_counts, f, indent=2)
logger.info(f"Processed {total_msgs} messages into {len(monthly_emails)} month folders under {OUTPUT_DIR}")
# MariaDB stubs commented out
if __name__ == '__main__':
main()