gpc-generator/allegro.py
2025-03-11 14:12:38 +01:00

275 lines
9.3 KiB
Python

import csv
import chardet
from datetime import datetime, date
from gpc import Data, Header, TransactionCode, CURRENCIES_GPC
from utils import extract_order_number, extract_numbers, parse_date
import mysql.connector
import re
import io
import zipfile
from collections import defaultdict
allegro_orders = None
orders_by_login = None
def load_allegro_orders():
# Define your connection parameters.
conn = mysql.connector.connect(
host="afrodite.my-devbox.cloud",
port=3307,
user="vat_dev",
password="i!532&hchjJrGmTSV37i7xPfL&",
database="remotecont1"
)
# Use a cursor that returns rows as dictionaries.
cursor = conn.cursor(dictionary=True)
# Your SQL query.
query = """
SELECT o.* \
FROM rcd_orders o
WHERE o.allegro_order_id IS NOT NULL
"""
cursor.execute(query)
allegro_orders = cursor.fetchall() # results is now a list of dictionaries
cursor.close()
conn.close()
# Group orders by allegro_login
orders_by_login = defaultdict(list)
for order in allegro_orders:
# Assuming 'allegro_login' is one of the keys in the order dictionary
orders_by_login[order['allegro_login']].append(order)
# Sort each group's orders by date_purchased (assuming the date format is 'YYYY-MM-DD HH:MM:SS')
for login, order_list in orders_by_login.items():
order_list.sort(key=lambda x: x['date_purchased'])
return allegro_orders, orders_by_login
def search_allegro_order(allegro_id):
global allegro_orders
for order in allegro_orders:
if order["allegro_login"] == allegro_id:
return order["orders_id"]
return 0
def find_first_order_after(allegro_login, payment_date_str, amount):
"""
Returns the first order for the given allegro_login with a date_purchased on or after payment_date_str.
payment_date_str should be in the format 'YYYY-MM-DD HH:MM:SS'.
"""
global allegro_orders, orders_by_login
payment_date = payment_date_str
for order in orders_by_login.get(allegro_login, []):
if (order['date_purchased'] >= payment_date) and (abs(round(float(order['order_total']) * float(order['currency_value']), 2) * 100) == abs(amount)):
return order['orders_id']
return 0
def create_allegro_zip(gpc_lines):
# Prepare file contents (join lines and encode as UTF-8)
payu_content = "".join(gpc_lines["payu"]).encode("utf-8")
przelewy_content = "".join(gpc_lines["przelewy24"]).encode("utf-8")
finance_content = "".join(gpc_lines["allegro finance"]).encode("utf-8")
# Create an in-memory ZIP file
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("allegro_payu.gpc", payu_content)
zf.writestr("allegro_przelewy.gpc", przelewy_content)
zf.writestr("allegro_finance.gpc", finance_content)
mem_zip.seek(0) # Reset pointer to the beginning of the ZIP data
return mem_zip
def convert_csv_to_gpc_allegro(csv_file_path, gpc_file_path, account_number, currency, mapping):
global allegro_orders, orders_by_login
allegro_orders, orders_by_login = load_allegro_orders()
gpc_lines = {
"payu": [],
"przelewy24": [],
"allegro finance": []
}
account_numbers = {
"payu": 4113600111,
"przelewy24": 4113600222,
"allegro finance": 4113600333
}
# Create and add the header
header = Header(
account=4113600111,
account_name=currency.ljust(20),
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
old_balance=78449,
old_sign='+',
new_balance=6215449,
new_sign='+',
turnover_debet=6585600,
turnover_debet_sign='0',
turnover_credit=127226,
turnover_credit_sign='0',
transaction_list_no=int(date.today().strftime("%j")),
date=datetime.now()
)
gpc_lines["payu"].append(header.to_string())
# Create and add the header
header = Header(
account=4113600222,
account_name=currency.ljust(20),
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
old_balance=78449,
old_sign='+',
new_balance=6215449,
new_sign='+',
turnover_debet=6585600,
turnover_debet_sign='0',
turnover_credit=127226,
turnover_credit_sign='0',
transaction_list_no=int(date.today().strftime("%j")),
date=datetime.now()
)
gpc_lines["przelewy24"].append(header.to_string())
# Create and add the header
header = Header(
account=4113600333,
account_name=currency.ljust(20),
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
old_balance=78449,
old_sign='+',
new_balance=6215449,
new_sign='+',
turnover_debet=6585600,
turnover_debet_sign='0',
turnover_credit=127226,
turnover_credit_sign='0',
transaction_list_no=int(date.today().strftime("%j")),
date=datetime.now()
)
gpc_lines["allegro finance"].append(header.to_string())
if mapping['forced_encoding'] is not None:
detected_encoding = mapping['forced_encoding']
print(f"Forced encoding: {detected_encoding}")
else:
with open(csv_file_path, 'rb') as f:
rawdata = f.read(1024) # Read a small part of the file
result = chardet.detect(rawdata)
detected_encoding = result['encoding']
print(f"Detected encoding: {detected_encoding}")
with open(csv_file_path, mode='r', encoding=detected_encoding) as csv_file:
reader = csv.DictReader(csv_file, delimiter=mapping['delimiter']) if mapping['is_dict_mapping'] else csv.reader(csv_file, delimiter=mapping['delimiter'])
if not mapping['is_dict_mapping']:
next(reader)
for row in reader:
source_name = row[mapping['source_name']]
value_str = row[mapping['source_amount']]
match = re.search(r'[-+]?\d*\.?\d+', value_str)
if match:
source_amount = float(match.group(0)) * 100
else:
source_amount = 0.0
reference = find_first_order_after(source_name.split(";")[0].strip(), parse_date(row[mapping['created_on']]), source_amount) if ";" in row[mapping['source_name']] else 0
transaction_id = reference
direction = row[mapping['direction']].lower() if mapping['direction'] is not None else None
payer_account = extract_numbers(row[mapping['payer_account']].replace("Rachunek nadawcy: ", "").replace(" ", ""))[:16].ljust(16) if mapping['payer_number_exists'] else 0
if payer_account != 0:
if payer_account.strip() == "":
payer_account = 0
created_on = parse_date(row[mapping['created_on']])
# Determine transaction type
if(direction is None):
if source_amount > 0:
transaction_code = TransactionCode.CREDIT
else:
transaction_code = TransactionCode.DEBET
else:
if direction == "out":
transaction_code = TransactionCode.DEBET
else:
transaction_code = TransactionCode.CREDIT
# Convert currency
currency_code = CURRENCIES_GPC.get("CZK", "0000")
# Create GPC Data object
gpc_data = Data(
account=account_numbers[row[mapping['wallet_source']].lower()],
payer_account=0,
no=reference,
balance=source_amount,
code=transaction_code,
variable=reference,
constant_symbol=0,
bank_code=0,
specific_symbol=0,
client_name= "VAT VATu" if row[mapping['payment_type']] == "wypłata środków" else "",
currency=currency_code,
date=created_on
)
gpc_lines[row[mapping['wallet_source']].lower()].append(gpc_data.to_string())
# with open("allegro_payu.gpc", mode='w', encoding='utf-8') as gpc_file:
# gpc_file.writelines(gpc_lines["payu"])
#
# with open("allegro_przelewy.gpc", mode='w', encoding='utf-8') as gpc_file:
# gpc_file.writelines(gpc_lines["przelewy24"])
#
# with open("allegro_finance.gpc", mode='w', encoding='utf-8') as gpc_file:
# gpc_file.writelines(gpc_lines["allegro finance"])
#
# print(f"GPC file successfully created: allegro_payu.gpc")
# print(f"GPC file successfully created: allegro_przelewy.gpc")
# print(f"GPC file successfully created: allegro_finance.gpc")
return create_allegro_zip(gpc_lines)
# Example mappings
mapping_allegro = {
'transaction_id': 2,
'reference': None,
'direction': None,
'source_name': 5,
'source_amount': 8,
'source_currency': 4,
'payer_account': None,
'created_on': 0,
'payment_type': 3,
'wallet_source': 4,
'acc_sum': 9,
'delimiter': ",",
'is_dict_mapping': False,
'use_transaction_id': False,
'payer_number_exists': False,
'use_source_name': False,
'forced_encoding': None
}
# Example usage:
# convert_csv_to_gpc_allegro("allegro_payments.csv", "allegro_payments.gpc", account_number=3214724742, currency="PLN", mapping=mapping_allegro)