gpc-generator/aviza/paynl_single.py
2025-02-27 12:12:43 +01:00

139 lines
5.9 KiB
Python

import pandas as pd
import csv
import sys
import os
import io
import zipfile
import shutil
from datetime import datetime
from aviza.helpers import write_output_csv_to_zip
transactions_df = None
def load_bank_transactions(csv_file):
"""
Loads the bank transactions CSV file into a DataFrame and returns it.
:param csv_file: Path to the bank transactions CSV file.
:return: A pandas DataFrame containing the transactions.
"""
df = pd.read_csv(csv_file, delimiter=';', dtype=str)
# Ensure the required column exists
if 'Zpráva pro příjemce' not in df.columns:
raise ValueError("The CSV file does not contain the required column 'Zpráva pro příjemce'.")
return df
def search_bank_transaction(search_string):
global transactions_df
"""
Searches for a given string in the 'Zpráva pro příjemce' column of the loaded DataFrame.
:param df: Pandas DataFrame containing bank transactions.
:param search_string: String to search for in the 'Zpráva pro příjemce' column.
:return: The first matching row as a dictionary or None if not found.
"""
matching_row = transactions_df[transactions_df['Zpráva pro příjemce'].str.contains(search_string, na=False, case=False)]
return matching_row.iloc[0].to_dict() if not matching_row.empty else None
def extract_and_process_zip_paynl_single(zip_file_path, bank_statement_file_path, output_file):
global transactions_df
transactions_df = load_bank_transactions(bank_statement_file_path)
all_transformed_data = []
# Create a temporary folder for extraction
base_dir = os.path.dirname(zip_file_path)
extract_folder = os.path.join(base_dir, "extracted_temp")
os.makedirs(extract_folder, exist_ok=True)
# Extract the provided outer zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(extract_folder)
# Check if a top-level "Transactions" folder exists
transactions_folder = os.path.join(extract_folder, "Transactions")
if os.path.exists(transactions_folder):
# Look for a CSV file inside the Transactions folder
csv_filename = next(
(f for f in os.listdir(transactions_folder)
if f.startswith("Specification clearing") and f.endswith(".csv")),
None
)
if csv_filename:
csv_path = os.path.join(transactions_folder, csv_filename)
transformed_data = transform_csv(csv_path)
all_transformed_data.append(transformed_data)
else:
# Otherwise, look for inner zip files named "Specification clearing*.zip"
for root, dirs, files in os.walk(extract_folder):
for file in files:
if file.startswith("Specification clearing") and file.endswith(".zip"):
inner_zip_path = os.path.join(root, file)
# Create a temporary folder for the inner zip extraction
inner_extract_folder = os.path.join(root, "inner_extracted")
os.makedirs(inner_extract_folder, exist_ok=True)
with zipfile.ZipFile(inner_zip_path, 'r') as inner_zip:
inner_zip.extractall(inner_extract_folder)
# Expect the inner zip to contain a "Transactions" folder with the CSV
inner_transactions_folder = os.path.join(inner_extract_folder, "Transactions")
if os.path.exists(inner_transactions_folder):
inner_csv_filename = next(
(f for f in os.listdir(inner_transactions_folder)
if f.startswith("Specification clearing") and f.endswith(".csv")),
None
)
if inner_csv_filename:
inner_csv_path = os.path.join(inner_transactions_folder, inner_csv_filename)
transformed_data = transform_csv(inner_csv_path)
all_transformed_data.append(transformed_data)
# Clean up inner extraction folder
shutil.rmtree(inner_extract_folder)
# Clean up the outer extraction folder
shutil.rmtree(extract_folder)
print(f"Processed and cleaned up: {zip_file_path}")
# Return a ZIP archive (in memory) containing all the output CSV files.
return write_output_csv_to_zip(output_file, all_transformed_data)
def transform_csv(input_file):
global transactions_df
df = pd.read_csv(input_file, delimiter=";", dtype=str)
df.iloc[:, 12] = pd.to_numeric(df.iloc[:, 12].str.replace(',', '.'), errors='coerce').fillna(0)
df['Zůstatek na účtu'] = df.iloc[:, 12].cumsum().astype(float).round(2)
transformed_data = []
total_rows = len(df)
clearing_id = df['CLEARING_ID'].iloc[0] if 'CLEARING_ID' in df.columns else None
for index, row in df.iterrows():
amount = row.iloc[12]
typ_operace = "t" if amount >= 0 else "c"
transformed_row = [
typ_operace, row.iloc[1], row.iloc[15], amount, row['Zůstatek na účtu'], "TRUE", row.iloc[15],
f"Dobirka za FA s VS {row.iloc[15]}", "", "", "", "", row.iloc[6], row.iloc[6], row.iloc[4], 0, "EUR"
]
transformed_data.append(transformed_row)
progress = (index + 1) / total_rows * 100
sys.stdout.write(f"\rProcessing: {progress:.2f}%")
sys.stdout.flush()
total_sum = round(df.iloc[:, 12].sum(), 2)
corresponding_transaction = search_bank_transaction(clearing_id)
final_row = ["w", datetime.strptime(corresponding_transaction['Datum'], "%d.%m.%Y").strftime("%Y-%m-%d"), corresponding_transaction['Zpráva pro příjemce'].split(',')[-1].strip(), -total_sum, 0, "TRUE", "", "Vyrovnání zůstatku", corresponding_transaction['Číslo účtu'] + "/2010", "", "", "", "", "", "NEWLINE BREAK", "", "EUR"]
transformed_data.append(final_row)
return transformed_data
# extract_and_process_zip(zip_folder, output_csv)