265 lines
9.6 KiB
Python
265 lines
9.6 KiB
Python
import csv
|
|
import pandas as pd
|
|
import chardet
|
|
import os
|
|
import io
|
|
import zipfile
|
|
import shutil
|
|
from datetime import datetime, date
|
|
from gpc import Data, Header, TransactionCode, CURRENCIES_GPC
|
|
from utils import extract_order_number, extract_numbers, parse_date
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
from aviza.helpers import write_output_gpc_to_zip
|
|
|
|
def load_bank_transactions(csv_file):
|
|
"""
|
|
Loads the bank transactions CSV file into a DataFrame and returns it.
|
|
|
|
:param csv_file: Path to the bank transactions CSV file.
|
|
:return: A pandas DataFrame containing the transactions.
|
|
"""
|
|
df = pd.read_csv(csv_file, delimiter=';', dtype=str)
|
|
|
|
# Ensure the required column exists
|
|
if 'Zpráva pro příjemce' not in df.columns:
|
|
raise ValueError("The CSV file does not contain the required column 'Zpráva pro příjemce'.")
|
|
|
|
return df
|
|
|
|
def search_bank_transaction(search_string):
|
|
"""
|
|
Searches for a given string in the 'Zpráva pro příjemce' column of the loaded DataFrame.
|
|
|
|
:param df: Pandas DataFrame containing bank transactions.
|
|
:param search_string: String to search for in the 'Zpráva pro příjemce' column.
|
|
:return: The first matching row as a dictionary or None if not found.
|
|
"""
|
|
global transactions_df
|
|
matching_row = transactions_df[transactions_df['Zpráva pro příjemce'].str.contains(search_string, na=False, case=False)]
|
|
|
|
return matching_row.iloc[0].to_dict() if not matching_row.empty else None
|
|
|
|
|
|
def extract_and_process_zip_przelewy24_auto(zip_file_path, bank_statement_file_path, output_file, account_number, currency, mapping):
|
|
global transactions_df
|
|
transactions_df = load_bank_transactions(bank_statement_file_path)
|
|
all_transformed_data = []
|
|
|
|
# Create a temporary folder for extraction
|
|
base_dir = os.path.dirname(zip_file_path)
|
|
extract_folder = os.path.join(base_dir, "extracted_temp")
|
|
os.makedirs(extract_folder, exist_ok=True)
|
|
|
|
# Extract the provided outer zip file
|
|
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_folder)
|
|
|
|
|
|
for files in os.walk(extract_folder):
|
|
for file in files:
|
|
try:
|
|
if file.endswith(".csv"):
|
|
|
|
transformed_data = convert_csv_to_gpc_przelewy24_auto(file, account_number, currency, mapping)
|
|
all_transformed_data.append(transformed_data)
|
|
except:
|
|
continue
|
|
|
|
|
|
# Clean up the outer extraction folder
|
|
shutil.rmtree(extract_folder)
|
|
print(f"Processed and cleaned up: {zip_file_path}")
|
|
|
|
# Return a ZIP archive (in memory) containing all the output CSV files.
|
|
return write_output_gpc_to_zip(output_file, all_transformed_data)
|
|
|
|
|
|
def convert_csv_to_gpc_przelewy24_auto(csv_file_path, account_number, currency, mapping):
|
|
gpc_lines = []
|
|
global transactions_df
|
|
|
|
|
|
if mapping['forced_encoding'] is not None:
|
|
detected_encoding = mapping['forced_encoding']
|
|
print(f"Forced encoding: {detected_encoding}")
|
|
else:
|
|
with open(csv_file_path, 'rb') as f:
|
|
rawdata = f.read(1024) # Read a small part of the file
|
|
result = chardet.detect(rawdata)
|
|
detected_encoding = result['encoding']
|
|
print(f"Detected encoding: {detected_encoding}")
|
|
|
|
|
|
with open(csv_file_path, mode='r', encoding=detected_encoding) as csv_file:
|
|
# Skip the first 3 rows
|
|
for _ in range(3):
|
|
next(csv_file)
|
|
|
|
reader = csv.DictReader(csv_file, delimiter=mapping['delimiter']) if mapping['is_dict_mapping'] else csv.reader(
|
|
csv_file, delimiter=mapping['delimiter'])
|
|
|
|
|
|
if not mapping['is_dict_mapping']:
|
|
next(reader)
|
|
|
|
total_payout = 0.0
|
|
total_payout_debet = 0.0
|
|
total_payout_credit = 0.0
|
|
total_payout_abs = 0.0
|
|
first = True
|
|
clearing_id = ""
|
|
created_on = None
|
|
for row in reader:
|
|
|
|
if first:
|
|
# clearing_id = row[mapping['CLEARING_ID']]
|
|
first = False
|
|
reference = extract_order_number(row[mapping['reference']])
|
|
transaction_id = extract_numbers(row[mapping['transaction_id']]) if mapping['use_transaction_id'] else reference
|
|
direction = row[mapping['direction']].lower() if mapping['direction'] is not None else None
|
|
source_name = row[mapping['source_name']].replace("Nazwa nadawcy: ", "")[:20].ljust(20) if mapping['use_source_name'] else ""
|
|
payer_account = extract_numbers(row[mapping['payer_account']].replace("Rachunek nadawcy: ", "").replace(" ", ""))[:16].ljust(16) if mapping['payer_number_exists'] else 0
|
|
if payer_account != 0:
|
|
if payer_account.strip() == "":
|
|
payer_account = 0
|
|
created_on = parse_date(row[mapping['created_on']])
|
|
|
|
# Convert the value using Decimal instead of float
|
|
source_str = row[mapping['source_amount']].replace(',', '.')
|
|
source_amount = Decimal(source_str)
|
|
|
|
# Quantize to 2 decimal places (rounding if necessary)
|
|
source_amount = source_amount.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
|
|
|
|
# Multiply by 100 to get cents (if that's what you need) and convert to integer
|
|
source_amount_cents = int(source_amount * 100)
|
|
|
|
# Determine transaction type
|
|
if(direction is None):
|
|
if source_amount_cents > 0:
|
|
transaction_code = TransactionCode.CREDIT
|
|
else:
|
|
transaction_code = TransactionCode.DEBET
|
|
else:
|
|
if direction == "out":
|
|
transaction_code = TransactionCode.DEBET
|
|
else:
|
|
transaction_code = TransactionCode.CREDIT
|
|
|
|
# Convert currency
|
|
currency_code = "100" + str(transaction_code.value)
|
|
|
|
# Create GPC Data object
|
|
gpc_data = Data(
|
|
account=account_number,
|
|
payer_account=payer_account,
|
|
no=transaction_id,
|
|
balance=source_amount_cents,
|
|
code=transaction_code,
|
|
variable=int(reference) if reference.isdigit() else 0,
|
|
constant_symbol=0,
|
|
bank_code=0,
|
|
specific_symbol=0,
|
|
client_name="CG ABCD-EFGH-IJKL" if transaction_code == TransactionCode.CREDIT else "CG refundace",
|
|
currency=currency_code,
|
|
date=created_on
|
|
)
|
|
|
|
gpc_lines.append(gpc_data.to_string())
|
|
total_payout += source_amount_cents
|
|
total_payout_abs += abs(source_amount_cents)
|
|
total_payout_debet += abs(source_amount_cents) if transaction_code == TransactionCode.DEBET else 0
|
|
total_payout_credit += abs(source_amount_cents) if transaction_code == TransactionCode.CREDIT else 0
|
|
# break
|
|
|
|
# add fees
|
|
# fees_data = Data(
|
|
# account=account_number,
|
|
# payer_account=0,
|
|
# no=0,
|
|
# balance=total_fees,
|
|
# code=transaction_code,
|
|
# variable=0,
|
|
# constant_symbol=0,
|
|
# bank_code=0,
|
|
# specific_symbol=0,
|
|
# client_name="",
|
|
# currency=CURRENCIES_GPC.get("CZK", "0000"),
|
|
# date=created_on
|
|
# )
|
|
#
|
|
# gpc_lines.append(gpc_data.to_string())
|
|
|
|
# corresponding_transaction = search_bank_transaction(clearing_id)
|
|
|
|
# vyuctovani row
|
|
payout_data = Data(
|
|
account=account_number,
|
|
payer_account=95102013900000630206821286,
|
|
no=int(total_payout),
|
|
balance=total_payout,
|
|
code=TransactionCode.DEBET,
|
|
variable=int(total_payout),
|
|
# variable=corresponding_transaction['Zpráva pro příjemce'].split(',')[-1].strip(),
|
|
constant_symbol=0,
|
|
bank_code=0,
|
|
specific_symbol=0,
|
|
client_name="CG vyúčtování",
|
|
currency="1001",
|
|
date=created_on
|
|
)
|
|
|
|
total_payout_credit += abs(total_payout)
|
|
total_payout_abs += abs(total_payout)
|
|
|
|
gpc_lines.append(payout_data.to_string())
|
|
|
|
# Create and add the header
|
|
header = Header(
|
|
account=account_number,
|
|
account_name=currency.ljust(20),
|
|
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
|
|
old_balance=0,
|
|
old_sign='+',
|
|
new_balance=0,
|
|
new_sign='+',
|
|
turnover_debet=total_payout,
|
|
turnover_debet_sign='+',
|
|
turnover_credit=total_payout,
|
|
turnover_credit_sign='+',
|
|
transaction_list_no=int(date.today().strftime("%j")),
|
|
date=datetime.now()
|
|
)
|
|
gpc_lines.insert(0, header.to_string())
|
|
|
|
|
|
file_content = "".join(gpc_lines)
|
|
print(f"GPC file content successfully created")
|
|
return file_content.encode("windows-1250")
|
|
|
|
|
|
mapping_przelewy_avizo = {
|
|
'transaction_id': 'Transaction id',
|
|
'reference': 'Title',
|
|
'direction': None,
|
|
'source_name': 'Payment method name',
|
|
'source_amount': 'Amount',
|
|
'source_currency': '',
|
|
'payer_account': 'Session id',
|
|
'created_on': 'Creation date',
|
|
'CLEARING_ID': 'CLEARING_ID',
|
|
'fees': 'Commission',
|
|
'delimiter': ",",
|
|
'is_dict_mapping': True,
|
|
'use_transaction_id': True,
|
|
'payer_number_exists': False,
|
|
'use_source_name': False,
|
|
'forced_encoding': "UTF-8"
|
|
}
|
|
|
|
|
|
# Example usage:
|
|
# convert_csv_to_gpc("../Specification clearing 2024-05-10.csv", "avizo_przelewy24_test.gpc", account_number=3498710000999117, currency="EUR", mapping=mapping_przelewy24_avizo)
|
|
# convert_csv_to_gpc("pko_input.csv", "pko_output.gpc", account_number=95102013900000630206821286, currency="PLN", mapping=mapping_pko)
|
|
# convert_csv_to_gpc("sparkasse_input.csv", "sparkasse_output.gpc", account_number=95850503000221267034, currency="EUR", mapping=mapping_sparkasse)
|