import csv import chardet from datetime import datetime from gpc import Data, Header, TransactionCode, CURRENCIES_GPC from utils import extract_order_number, extract_numbers, parse_date import mysql.connector import re import io import zipfile from collections import defaultdict allegro_orders = None orders_by_login = None def load_allegro_orders(): # Define your connection parameters. conn = mysql.connector.connect( host="afrodite.my-devbox.cloud", port=3307, user="vat_dev", password="i!532&hchjJrGmTSV37i7xPfL&", database="remotecont1" ) # Use a cursor that returns rows as dictionaries. cursor = conn.cursor(dictionary=True) # Your SQL query. query = """ SELECT o.* \ FROM rcd_orders o WHERE o.allegro_order_id IS NOT NULL """ cursor.execute(query) allegro_orders = cursor.fetchall() # results is now a list of dictionaries cursor.close() conn.close() # Group orders by allegro_login orders_by_login = defaultdict(list) for order in allegro_orders: # Assuming 'allegro_login' is one of the keys in the order dictionary orders_by_login[order['allegro_login']].append(order) # Sort each group's orders by date_purchased (assuming the date format is 'YYYY-MM-DD HH:MM:SS') for login, order_list in orders_by_login.items(): order_list.sort(key=lambda x: x['date_purchased']) return allegro_orders, orders_by_login def search_allegro_order(allegro_id): global allegro_orders for order in allegro_orders: if order["allegro_login"] == allegro_id: return order["orders_id"] return 0 def find_first_order_after(allegro_login, payment_date_str, amount): """ Returns the first order for the given allegro_login with a date_purchased on or after payment_date_str. payment_date_str should be in the format 'YYYY-MM-DD HH:MM:SS'. """ global allegro_orders, orders_by_login payment_date = payment_date_str for order in orders_by_login.get(allegro_login, []): if (order['date_purchased'] >= payment_date) and (abs(round(float(order['order_total']) * float(order['currency_value']), 2) * 100) == abs(amount)): return order['orders_id'] return 0 def create_allegro_zip(gpc_lines): # Prepare file contents (join lines and encode as UTF-8) payu_content = "".join(gpc_lines["payu"]).encode("utf-8") przelewy_content = "".join(gpc_lines["przelewy24"]).encode("utf-8") finance_content = "".join(gpc_lines["allegro finance"]).encode("utf-8") # Create an in-memory ZIP file mem_zip = io.BytesIO() with zipfile.ZipFile(mem_zip, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("allegro_payu.gpc", payu_content) zf.writestr("allegro_przelewy.gpc", przelewy_content) zf.writestr("allegro_finance.gpc", finance_content) mem_zip.seek(0) # Reset pointer to the beginning of the ZIP data return mem_zip def convert_csv_to_gpc_allegro(csv_file_path, gpc_file_path, account_number, currency, mapping): global allegro_orders, orders_by_login allegro_orders, orders_by_login = load_allegro_orders() gpc_lines = { "payu": [], "przelewy24": [], "allegro finance": [] } account_numbers = { "payu": 4113600111, "przelewy24": 4113600222, "allegro finance": 4113600333 } # Create and add the header header = Header( account=4113600111, account_name=currency.ljust(20), old_date=datetime.strptime("01-03-20", "%d-%m-%y"), old_balance=78449, old_sign='+', new_balance=6215449, new_sign='+', turnover_debet=6585600, turnover_debet_sign='0', turnover_credit=127226, turnover_credit_sign='0', transaction_list_no=3, date=datetime.now() ) gpc_lines["payu"].append(header.to_string()) # Create and add the header header = Header( account=4113600222, account_name=currency.ljust(20), old_date=datetime.strptime("01-03-20", "%d-%m-%y"), old_balance=78449, old_sign='+', new_balance=6215449, new_sign='+', turnover_debet=6585600, turnover_debet_sign='0', turnover_credit=127226, turnover_credit_sign='0', transaction_list_no=3, date=datetime.now() ) gpc_lines["przelewy24"].append(header.to_string()) # Create and add the header header = Header( account=4113600333, account_name=currency.ljust(20), old_date=datetime.strptime("01-03-20", "%d-%m-%y"), old_balance=78449, old_sign='+', new_balance=6215449, new_sign='+', turnover_debet=6585600, turnover_debet_sign='0', turnover_credit=127226, turnover_credit_sign='0', transaction_list_no=3, date=datetime.now() ) gpc_lines["allegro finance"].append(header.to_string()) if mapping['forced_encoding'] is not None: detected_encoding = mapping['forced_encoding'] print(f"Forced encoding: {detected_encoding}") else: with open(csv_file_path, 'rb') as f: rawdata = f.read(1024) # Read a small part of the file result = chardet.detect(rawdata) detected_encoding = result['encoding'] print(f"Detected encoding: {detected_encoding}") with open(csv_file_path, mode='r', encoding=detected_encoding) as csv_file: reader = csv.DictReader(csv_file, delimiter=mapping['delimiter']) if mapping['is_dict_mapping'] else csv.reader(csv_file, delimiter=mapping['delimiter']) if not mapping['is_dict_mapping']: next(reader) for row in reader: source_name = row[mapping['source_name']] value_str = row[mapping['source_amount']] match = re.search(r'[-+]?\d*\.?\d+', value_str) if match: source_amount = float(match.group(0)) * 100 else: source_amount = 0.0 reference = find_first_order_after(source_name.split(";")[0].strip(), parse_date(row[mapping['created_on']]), source_amount) if ";" in row[mapping['source_name']] else 0 transaction_id = reference direction = row[mapping['direction']].lower() if mapping['direction'] is not None else None payer_account = extract_numbers(row[mapping['payer_account']].replace("Rachunek nadawcy: ", "").replace(" ", ""))[:16].ljust(16) if mapping['payer_number_exists'] else 0 if payer_account != 0: if payer_account.strip() == "": payer_account = 0 created_on = parse_date(row[mapping['created_on']]) # Determine transaction type if(direction is None): if source_amount > 0: transaction_code = TransactionCode.CREDIT else: transaction_code = TransactionCode.DEBET else: if direction == "out": transaction_code = TransactionCode.DEBET else: transaction_code = TransactionCode.CREDIT # Convert currency currency_code = CURRENCIES_GPC.get("CZK", "0000") # Create GPC Data object gpc_data = Data( account=account_numbers[row[mapping['wallet_source']].lower()], payer_account=0, no=reference, balance=source_amount, code=transaction_code, variable=reference, constant_symbol=0, bank_code=0, specific_symbol=0, client_name= "VAT VATu" if row[mapping['payment_type']] == "wypłata środków" else "", currency=currency_code, date=created_on ) gpc_lines[row[mapping['wallet_source']].lower()].append(gpc_data.to_string()) # with open("allegro_payu.gpc", mode='w', encoding='utf-8') as gpc_file: # gpc_file.writelines(gpc_lines["payu"]) # # with open("allegro_przelewy.gpc", mode='w', encoding='utf-8') as gpc_file: # gpc_file.writelines(gpc_lines["przelewy24"]) # # with open("allegro_finance.gpc", mode='w', encoding='utf-8') as gpc_file: # gpc_file.writelines(gpc_lines["allegro finance"]) # # print(f"GPC file successfully created: allegro_payu.gpc") # print(f"GPC file successfully created: allegro_przelewy.gpc") # print(f"GPC file successfully created: allegro_finance.gpc") return create_allegro_zip(gpc_lines) # Example mappings mapping_allegro = { 'transaction_id': 2, 'reference': None, 'direction': None, 'source_name': 5, 'source_amount': 8, 'source_currency': 4, 'payer_account': None, 'created_on': 0, 'payment_type': 3, 'wallet_source': 4, 'acc_sum': 9, 'delimiter': ",", 'is_dict_mapping': False, 'use_transaction_id': False, 'payer_number_exists': False, 'use_source_name': False, 'forced_encoding': None } # Example usage: # convert_csv_to_gpc_allegro("allegro_payments.csv", "allegro_payments.gpc", account_number=3214724742, currency="PLN", mapping=mapping_allegro)