231 lines
8.2 KiB
Python
231 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
download_emails
|
|
|
|
- Uses Gmail API to fetch all messages delivered to a single support address.
|
|
- Pulls configured Gmail signatures (in any language) via Settings API.
|
|
- Parses out headers, body, and matches against signatures or falls back to regex or manual name mapping.
|
|
- Tally per-day sent vs. received counts and per-author sent email counts.
|
|
- Writes each month's emails into JSON files under OUTPUT_DIR/YYYY/MM/emails.json.
|
|
- Writes overall stats into OUTPUT_DIR as JSON files.
|
|
- Outputs progress logs to stdout for Docker visibility.
|
|
- (Commented-out) Stubs for loading the daily/author stats into MariaDB.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import base64
|
|
import re
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
import logging
|
|
import sys
|
|
|
|
from google.oauth2.credentials import Credentials
|
|
from google.auth.transport.requests import Request
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from googleapiclient.discovery import build
|
|
|
|
# Configure logging for progress
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Gmail API scope
|
|
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
|
|
|
|
# Configuration from environment
|
|
SUPPORT_ADDRESS = os.environ.get('SUPPORT_ADDRESS', 'remote.control.world1@gmail.com')
|
|
TOKEN_PATH = os.environ.get('TOKEN_PATH', 'tokens/test.json')
|
|
OUTPUT_DIR = os.environ.get('OUTPUT_DIR', 'data')
|
|
|
|
# Aliases considered as sent
|
|
SENDER_ALIASES = [SUPPORT_ADDRESS.lower(), 'orders@remote-control-world.eu']
|
|
|
|
# Manual name mapping if signatures not found
|
|
NAME_PATTERNS = {
|
|
'Josefina': ['Josefína Bartková'],
|
|
'Ondra': ['Kateřina Kulhánková', 'Kateřina Kulhánkova', 'Ondrej', 'Ondřej'],
|
|
'Honza': ['Jan Klus'],
|
|
'Halina': ['Halina Kutláková'],
|
|
'Helena': ['Helena Urbášková'],
|
|
'AdamH': ['Adam Holuša'],
|
|
'AdamK': ['Adam Kulhánek'],
|
|
'KubaS': ['Jakub Sopuch'],
|
|
'LukasZ': ['Lukáš Zdražila'],
|
|
'Breta': ['Břetislav Střaslička'],
|
|
'TerkaP': ['Tereza Pěchulová'],
|
|
'Marketa': ['Marketa', 'Markéta'],
|
|
}
|
|
|
|
|
|
def get_gmail_service(token_path: str):
|
|
"""
|
|
Load OAuth credentials and handle refreshing.
|
|
Performs interactive auth only when no valid token/refresh available.
|
|
"""
|
|
creds = None
|
|
# Load existing tokens
|
|
if os.path.exists(token_path):
|
|
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
|
|
# Refresh if expired
|
|
if creds and creds.expired and creds.refresh_token:
|
|
logger.info("Refreshing access token using refresh token...")
|
|
creds.refresh(Request())
|
|
# If no valid credentials, do full auth flow
|
|
if not creds or not creds.valid:
|
|
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
|
|
auth_url, _ = flow.authorization_url(access_type='offline', prompt='consent')
|
|
logger.warning("Please open this URL in your browser:\n%s", auth_url)
|
|
sys.stdout.write("Enter the authorization code here: ")
|
|
sys.stdout.flush()
|
|
code = sys.stdin.readline().strip()
|
|
flow.fetch_token(code=code)
|
|
creds = flow.credentials
|
|
# Save for next time
|
|
with open(token_path, 'w') as token_file:
|
|
token_file.write(creds.to_json())
|
|
logger.info("Saved new token to %s", token_path)
|
|
# Build service
|
|
return build('gmail', 'v1', credentials=creds)
|
|
|
|
|
|
def extract_body(payload) -> str:
|
|
"""
|
|
Recursively find text/plain payload.
|
|
"""
|
|
if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'):
|
|
data = payload['body']['data']
|
|
return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
|
|
for part in payload.get('parts', []):
|
|
text = extract_body(part)
|
|
if text:
|
|
return text
|
|
return ''
|
|
|
|
|
|
def load_signatures(service):
|
|
"""
|
|
Fetch configured Gmail signatures.
|
|
"""
|
|
sigs = []
|
|
resp = service.users().settings().sendAs().list(userId='me').execute()
|
|
for entry in resp.get('sendAs', []):
|
|
html = entry.get('signature') or ''
|
|
text = re.sub(r'<[^>]+>', '', html).strip()
|
|
if text:
|
|
sigs.append({
|
|
'email': entry['sendAsEmail'],
|
|
'name': entry.get('displayName') or entry['sendAsEmail'],
|
|
'signature': text
|
|
})
|
|
logger.info(f"Loaded {len(sigs)} configured signatures from Gmail settings")
|
|
return sigs
|
|
|
|
|
|
def extract_author(body: str, signatures: list) -> str:
|
|
"""
|
|
Identify author by:
|
|
1) Matching configured signature blocks
|
|
2) Manual name mapping
|
|
3) Regex fallback for common sign-offs
|
|
"""
|
|
# 1) Signature blocks
|
|
for s in signatures:
|
|
sig = s.get('signature')
|
|
if sig and sig in body:
|
|
return s['name']
|
|
# 2) Manual name patterns
|
|
for name, patterns in NAME_PATTERNS.items():
|
|
for pat in patterns:
|
|
if pat in body:
|
|
return name
|
|
# 3) Regex fallback
|
|
match = re.search(
|
|
r'(?im)(?:Podpis|S pozdravem|Díky|Thanks|Regards|Best regards|Sincerely)[\s,]*\r?\n([^\r\n]{2,})',
|
|
body
|
|
)
|
|
if match:
|
|
return match.group(1).strip()
|
|
return None
|
|
|
|
|
|
def main():
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
service = get_gmail_service(TOKEN_PATH)
|
|
signatures = load_signatures(service)
|
|
|
|
monthly_emails = defaultdict(list)
|
|
daily_counts = defaultdict(lambda: {'sent': 0, 'received': 0})
|
|
author_counts = defaultdict(int)
|
|
|
|
logger.info(f"Starting fetch for mailbox: {SUPPORT_ADDRESS}")
|
|
next_page_token = None
|
|
page_count = 0
|
|
total_msgs = 0
|
|
|
|
while True:
|
|
page_count += 1
|
|
resp = service.users().messages().list(
|
|
userId='me', q='before:2025-03-01', pageToken=next_page_token, maxResults=500
|
|
).execute()
|
|
messages = resp.get('messages', [])
|
|
count = len(messages)
|
|
total_msgs += count
|
|
logger.info(f"Page {page_count}: fetched {count} messages (total so far: {total_msgs})")
|
|
|
|
if not messages:
|
|
break
|
|
|
|
for meta in messages:
|
|
msg = service.users().messages().get(
|
|
userId='me', id=meta['id'], format='full'
|
|
).execute()
|
|
headers = {h['name']: h['value'] for h in msg['payload'].get('headers', [])}
|
|
body = extract_body(msg['payload'])
|
|
author = extract_author(body, signatures)
|
|
|
|
dt = datetime.fromtimestamp(int(msg['internalDate']) / 1000)
|
|
year, month, day = dt.strftime('%Y'), dt.strftime('%m'), dt.strftime('%Y-%m-%d')
|
|
from_hdr = headers.get('From', '').lower()
|
|
is_sent = any(alias in from_hdr for alias in SENDER_ALIASES)
|
|
|
|
if is_sent:
|
|
daily_counts[day]['sent'] += 1
|
|
if author:
|
|
author_counts[author] += 1
|
|
else:
|
|
daily_counts[day]['received'] += 1
|
|
|
|
monthly_emails[(year, month)].append({
|
|
'id': msg['id'], 'threadId': msg.get('threadId'), 'source': SUPPORT_ADDRESS,
|
|
'from': headers.get('From'), 'to': headers.get('To'), 'date': headers.get('Date'),
|
|
'subject': headers.get('Subject'), 'internalDate': msg.get('internalDate'),
|
|
'body': body, 'author': author, 'sent': is_sent
|
|
})
|
|
|
|
next_page_token = resp.get('nextPageToken')
|
|
if not next_page_token:
|
|
break
|
|
|
|
logger.info(f"Fetch complete: total messages retrieved: {total_msgs}")
|
|
|
|
# Write monthly files
|
|
for (yr, mo), emails in monthly_emails.items():
|
|
path = os.path.join(OUTPUT_DIR, yr, mo)
|
|
os.makedirs(path, exist_ok=True)
|
|
with open(os.path.join(path, 'emails.json'), 'w', encoding='utf-8') as f:
|
|
json.dump(emails, f, ensure_ascii=False, indent=2)
|
|
logger.info(f"Wrote {len(emails)} emails to {yr}/{mo}/emails.json")
|
|
|
|
# Write stats
|
|
with open(os.path.join(OUTPUT_DIR, 'daily_counts.json'), 'w') as f:
|
|
json.dump(daily_counts, f, indent=2)
|
|
with open(os.path.join(OUTPUT_DIR, 'author_counts.json'), 'w') as f:
|
|
json.dump(author_counts, f, indent=2)
|
|
|
|
logger.info(f"Processed {total_msgs} messages into {len(monthly_emails)} month folders under {OUTPUT_DIR}")
|
|
|
|
# MariaDB stubs commented out
|
|
|
|
if __name__ == '__main__':
|
|
main() |