mail_loader/main.py
2025-07-18 14:54:03 +02:00

215 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""
download_emails.py
- Uses Gmail API to fetch all messages delivered to a single support address.
- Pulls configured Gmail signatures (in any language) via Settings API.
- Parses out headers, body, and matches against signatures or falls back to regex or manual name mapping.
- Tally per-day sent vs. received counts and per-author sent email counts.
- Writes each month's emails into JSON files under OUTPUT_DIR/YYYY/MM/emails.json.
- Writes overall stats into OUTPUT_DIR as JSON files.
- Outputs progress logs to stdout for Docker visibility.
- (Commented-out) Stubs for loading the daily/author stats into MariaDB.
"""
import os
import json
import base64
import re
from datetime import datetime
from collections import defaultdict
import logging
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# Configure logging for progress
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
# Gmail API scope
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
# Configuration from environment
SUPPORT_ADDRESS = os.environ.get('SUPPORT_ADDRESS', 'remote.control.world1@gmail.com')
TOKEN_PATH = os.environ.get('TOKEN_PATH', 'tokens/test.json')
OUTPUT_DIR = os.environ.get('OUTPUT_DIR', 'data')
# Aliases considered as sent
SENDER_ALIASES = [SUPPORT_ADDRESS.lower(), 'orders@remote-control-world.eu']
# Manual name mapping if signatures not found
NAME_PATTERNS = {
'Josefina': ['Josefína Bartková'],
'Ondra': ['Kateřina Kulhánková', 'Kateřina Kulhánkova', 'Ondrej', 'Ondřej'],
'Honza': ['Jan Klus'],
'Halina': ['Halina Kutláková'],
'Helena': ['Helena Urbášková'],
'AdamH': ['Adam Holuša'],
'AdamK': ['Adam Kulhánek'],
'KubaS': ['Jakub Sopuch'],
'LukasZ': ['Lukáš Zdražila'],
'Breta': ['Břetislav Střaslička'],
'TerkaP': ['Tereza Pěchulová'],
'Marketa': ['Marketa', 'Markéta'],
}
def get_gmail_service(token_path: str):
"""
Load OAuth client credentials and per-account token.
"""
creds = None
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
if not creds or not creds.valid:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open(token_path, 'w') as f:
f.write(creds.to_json())
return build('gmail', 'v1', credentials=creds)
def extract_body(payload) -> str:
"""
Recursively find text/plain payload.
"""
if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'):
data = payload['body']['data']
return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
for part in payload.get('parts', []):
text = extract_body(part)
if text:
return text
return ''
def load_signatures(service):
"""
Fetch configured Gmail signatures.
"""
sigs = []
resp = service.users().settings().sendAs().list(userId='me').execute()
for entry in resp.get('sendAs', []):
html = entry.get('signature') or ''
text = re.sub(r'<[^>]+>', '', html).strip()
if text:
sigs.append({
'email': entry['sendAsEmail'],
'name': entry.get('displayName') or entry['sendAsEmail'],
'signature': text
})
logger.info(f"Loaded {len(sigs)} configured signatures from Gmail settings")
return sigs
def extract_author(body: str, signatures: list) -> str:
"""
Identify author by:
1) Matching configured signature blocks
2) Manual name mapping
3) Regex fallback for common sign-offs
"""
# 1) Signature blocks
for s in signatures:
sig = s.get('signature')
if sig and sig in body:
return s['name']
# 2) Manual name patterns
for name, patterns in NAME_PATTERNS.items():
for pat in patterns:
if pat in body:
return name
# 3) Regex fallback
match = re.search(
r'(?im)(?:Podpis|S pozdravem|Díky|Thanks|Regards|Best regards|Sincerely)[\s,]*\r?\n([^\r\n]{2,})',
body
)
if match:
return match.group(1).strip()
return None
def main():
os.makedirs(OUTPUT_DIR, exist_ok=True)
service = get_gmail_service(TOKEN_PATH)
signatures = load_signatures(service)
monthly_emails = defaultdict(list)
daily_counts = defaultdict(lambda: {'sent': 0, 'received': 0})
author_counts = defaultdict(int)
logger.info(f"Starting fetch for mailbox: {SUPPORT_ADDRESS}")
next_page_token = None
page_count = 0
total_msgs = 0
while True:
page_count += 1
resp = service.users().messages().list(
userId='me', q='after:2025-03-01', pageToken=next_page_token, maxResults=500
).execute()
messages = resp.get('messages', [])
count = len(messages)
total_msgs += count
logger.info(f"Page {page_count}: fetched {count} messages (total so far: {total_msgs})")
if not messages:
break
for meta in messages:
msg = service.users().messages().get(
userId='me', id=meta['id'], format='full'
).execute()
headers = {h['name']: h['value'] for h in msg['payload'].get('headers', [])}
body = extract_body(msg['payload'])
author = extract_author(body, signatures)
dt = datetime.fromtimestamp(int(msg['internalDate']) / 1000)
year, month, day = dt.strftime('%Y'), dt.strftime('%m'), dt.strftime('%Y-%m-%d')
from_hdr = headers.get('From', '').lower()
is_sent = any(alias in from_hdr for alias in SENDER_ALIASES)
if is_sent:
daily_counts[day]['sent'] += 1
if author:
author_counts[author] += 1
else:
daily_counts[day]['received'] += 1
monthly_emails[(year, month)].append({
'id': msg['id'], 'threadId': msg.get('threadId'), 'source': SUPPORT_ADDRESS,
'from': headers.get('From'), 'to': headers.get('To'), 'date': headers.get('Date'),
'subject': headers.get('Subject'), 'internalDate': msg.get('internalDate'),
'body': body, 'author': author, 'sent': is_sent
})
next_page_token = resp.get('nextPageToken')
if not next_page_token:
break
logger.info(f"Fetch complete: total messages retrieved: {total_msgs}")
# Write monthly files
for (yr, mo), emails in monthly_emails.items():
path = os.path.join(OUTPUT_DIR, yr, mo)
os.makedirs(path, exist_ok=True)
with open(os.path.join(path, 'emails.json'), 'w', encoding='utf-8') as f:
json.dump(emails, f, ensure_ascii=False, indent=2)
logger.info(f"Wrote {len(emails)} emails to {yr}/{mo}/emails.json")
# Write stats
with open(os.path.join(OUTPUT_DIR, 'daily_counts.json'), 'w') as f:
json.dump(daily_counts, f, indent=2)
with open(os.path.join(OUTPUT_DIR, 'author_counts.json'), 'w') as f:
json.dump(author_counts, f, indent=2)
logger.info(f"Processed {total_msgs} messages into {len(monthly_emails)} month folders under {OUTPUT_DIR}")
# MariaDB stubs commented out
if __name__ == '__main__':
main()