125 lines
4.6 KiB
Python
125 lines
4.6 KiB
Python
import pandas as pd
|
|
import sys
|
|
import os
|
|
import zipfile
|
|
from datetime import datetime
|
|
from aviza.helpers import write_output_csv_to_zip
|
|
|
|
GLOBAL_CURRENCY = None
|
|
transactions_df = None
|
|
|
|
|
|
def load_bank_transactions(csv_file):
|
|
"""
|
|
Loads the bank transactions CSV file into a DataFrame and returns it.
|
|
|
|
:param csv_file: Path to the bank transactions CSV file.
|
|
:return: A pandas DataFrame containing the transactions.
|
|
"""
|
|
global GLOBAL_CURRENCY
|
|
df = pd.read_csv(csv_file, delimiter=',', dtype=str)
|
|
|
|
if GLOBAL_CURRENCY == "HUF":
|
|
if 'Reference' not in df.columns:
|
|
raise ValueError("The CSV file does not contain the required column 'Reference'.")
|
|
|
|
return df
|
|
|
|
def search_bank_transaction(search_string):
|
|
"""
|
|
Searches for a given string in the 'Zpráva pro příjemce' column of the loaded DataFrame.
|
|
|
|
:param search_string: String to search for in the 'Zpráva pro příjemce' column.
|
|
:return: The first matching row as a dictionary or None if not found.
|
|
"""
|
|
|
|
global GLOBAL_CURRENCY, transactions_df
|
|
|
|
if GLOBAL_CURRENCY == "HUF":
|
|
row_title = 'Reference'
|
|
matching_row = transactions_df[transactions_df[row_title].str.contains(search_string, na=False, case=False)]
|
|
|
|
return matching_row.iloc[0].to_dict() if not matching_row.empty else None
|
|
|
|
|
|
def extract_and_process_zip_gls_single(zip_file_path, bank_statement_file_path, output_file, currency):
|
|
|
|
global GLOBAL_CURRENCY, transactions_df
|
|
all_transformed_data = []
|
|
|
|
transactions_df = load_bank_transactions(bank_statement_file_path)
|
|
GLOBAL_CURRENCY = currency
|
|
|
|
base_dir = os.path.dirname(zip_file_path)
|
|
extract_folder = os.path.join(base_dir, "extracted_temp")
|
|
os.makedirs(extract_folder, exist_ok=True)
|
|
|
|
# Extract the zip file.
|
|
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_folder)
|
|
|
|
# Search for the folder named 'GLOBAL_CURRENCY' within the extracted contents.
|
|
global_currency_folder = None
|
|
for root, dirs, files in os.walk(os.path.join(extract_folder, 'Aviza GLS')):
|
|
if os.path.basename(root) == GLOBAL_CURRENCY:
|
|
global_currency_folder = root
|
|
break
|
|
|
|
if global_currency_folder:
|
|
# Process every CSV file found in the GLOBAL_CURRENCY folder.
|
|
for filename in os.listdir(global_currency_folder):
|
|
if filename.endswith(".xlsx"):
|
|
csv_path = os.path.join(global_currency_folder, filename)
|
|
transformed_data = transform_csv(csv_path)
|
|
all_transformed_data.append(transformed_data)
|
|
else:
|
|
print(f"'GLOBAL_CURRENCY' folder not found in zip file")
|
|
|
|
# Clean up extracted files.
|
|
for root, dirs, files in os.walk(extract_folder, topdown=False):
|
|
for file in files:
|
|
os.remove(os.path.join(root, file))
|
|
for dir in dirs:
|
|
os.rmdir(os.path.join(root, dir))
|
|
os.rmdir(extract_folder)
|
|
|
|
print(f"Processed and cleaned up")
|
|
|
|
# Write all collected transformed data to the output file.
|
|
return write_output_csv_to_zip(output_file, all_transformed_data)
|
|
|
|
def transform_csv(input_file):
|
|
global transactions_df, GLOBAL_CURRENCY
|
|
df = pd.read_excel(input_file, skiprows=7, dtype=str)
|
|
|
|
payment_date = datetime.strptime(input_file.split("_" + GLOBAL_CURRENCY + "_")[1].split("_")[0], "%Y%m%d").strftime("%Y.%m.%d")
|
|
|
|
df.iloc[:, 4] = pd.to_numeric(df.iloc[:, 4].str.replace(',', '.'), errors='coerce').fillna(0)
|
|
cumsum = 0.00
|
|
transformed_data = []
|
|
total_rows = len(df)
|
|
for index, row in df.iterrows():
|
|
amount = row.iloc[4]
|
|
typ_operace = "t" if amount >= 0 else "c"
|
|
cumsum += amount
|
|
transformed_row = [
|
|
typ_operace, row.iloc[3], row.iloc[0], amount, cumsum, "TRUE", row.iloc[2],
|
|
f"Dobirka za FA s VS {row.iloc[2]}", "", "", "", "", "", "", row.iloc[1], 0, GLOBAL_CURRENCY
|
|
]
|
|
transformed_data.append(transformed_row)
|
|
|
|
progress = (index + 1) / total_rows * 100
|
|
sys.stdout.write(f"\rProcessing: {progress:.2f}%")
|
|
sys.stdout.flush()
|
|
|
|
if index == total_rows - 2:
|
|
break
|
|
|
|
|
|
total_sum = cumsum
|
|
corresponding_transaction = search_bank_transaction(payment_date)
|
|
final_row = ["w", datetime.strptime(corresponding_transaction['Created on'], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d"), corresponding_transaction['ID'].split('-')[-1].strip(), -total_sum, 0, "TRUE", "", "Vyrovnání zůstatku", "12600016-16965466-28438156", "", "", "", "", "", "NEWLINE BREAK", "", GLOBAL_CURRENCY]
|
|
transformed_data.append(final_row)
|
|
|
|
return transformed_data
|