import pandas as pd import csv import sys import os import io import zipfile import shutil from datetime import datetime from aviza.helpers import write_output_csv_to_zip transactions_df = None def load_bank_transactions(csv_file): """ Loads the bank transactions CSV file into a DataFrame and returns it. :param csv_file: Path to the bank transactions CSV file. :return: A pandas DataFrame containing the transactions. """ df = pd.read_csv(csv_file, delimiter=';', dtype=str) # Ensure the required column exists if 'Zpráva pro příjemce' not in df.columns: raise ValueError("The CSV file does not contain the required column 'Zpráva pro příjemce'.") return df def search_bank_transaction(search_string): global transactions_df """ Searches for a given string in the 'Zpráva pro příjemce' column of the loaded DataFrame. :param df: Pandas DataFrame containing bank transactions. :param search_string: String to search for in the 'Zpráva pro příjemce' column. :return: The first matching row as a dictionary or None if not found. """ matching_row = transactions_df[transactions_df['Zpráva pro příjemce'].str.contains(search_string, na=False, case=False)] return matching_row.iloc[0].to_dict() if not matching_row.empty else None def extract_and_process_zip_paynl_single(zip_file_path, bank_statement_file_path, output_file): global transactions_df transactions_df = load_bank_transactions(bank_statement_file_path) all_transformed_data = [] # Create a temporary folder for extraction base_dir = os.path.dirname(zip_file_path) extract_folder = os.path.join(base_dir, "extracted_temp") os.makedirs(extract_folder, exist_ok=True) # Extract the provided outer zip file with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: zip_ref.extractall(extract_folder) # Check if a top-level "Transactions" folder exists transactions_folder = os.path.join(extract_folder, "Transactions") if os.path.exists(transactions_folder): # Look for a CSV file inside the Transactions folder csv_filename = next( (f for f in os.listdir(transactions_folder) if f.startswith("Specification clearing") and f.endswith(".csv")), None ) if csv_filename: csv_path = os.path.join(transactions_folder, csv_filename) transformed_data = transform_csv(csv_path) all_transformed_data.append(transformed_data) else: # Otherwise, look for inner zip files named "Specification clearing*.zip" for root, dirs, files in os.walk(extract_folder): for file in files: if file.startswith("Specification clearing") and file.endswith(".zip"): inner_zip_path = os.path.join(root, file) # Create a temporary folder for the inner zip extraction inner_extract_folder = os.path.join(root, "inner_extracted") os.makedirs(inner_extract_folder, exist_ok=True) with zipfile.ZipFile(inner_zip_path, 'r') as inner_zip: inner_zip.extractall(inner_extract_folder) # Expect the inner zip to contain a "Transactions" folder with the CSV inner_transactions_folder = os.path.join(inner_extract_folder, "Transactions") if os.path.exists(inner_transactions_folder): inner_csv_filename = next( (f for f in os.listdir(inner_transactions_folder) if f.startswith("Specification clearing") and f.endswith(".csv")), None ) if inner_csv_filename: inner_csv_path = os.path.join(inner_transactions_folder, inner_csv_filename) transformed_data = transform_csv(inner_csv_path) all_transformed_data.append(transformed_data) # Clean up inner extraction folder shutil.rmtree(inner_extract_folder) # Clean up the outer extraction folder shutil.rmtree(extract_folder) print(f"Processed and cleaned up: {zip_file_path}") # Return a ZIP archive (in memory) containing all the output CSV files. return write_output_csv_to_zip(output_file, all_transformed_data) def transform_csv(input_file): global transactions_df df = pd.read_csv(input_file, delimiter=";", dtype=str, skiprows=3) df.iloc[:, 11] = pd.to_numeric(df.iloc[:, 11], errors='coerce').fillna(0) df["Commission_cumsum"] = df["Commission"].astype(float).cumsum().round(2) transformed_data = [] total_rows = len(df) clearing_id = df['CLEARING_ID'].iloc[0] if 'CLEARING_ID' in df.columns else None for index, row in df.iloc[:-1].iterrows(): amount = row.iloc[11] typ_operace = "t" if amount >= 0 else "c" transformed_row = [ typ_operace, row.iloc[1], row.iloc[15], amount, row['Zůstatek na účtu'], "TRUE", row.iloc[15], f"Dobirka za FA s VS {row.iloc[15]}", "", "", "", "", row.iloc[6], row.iloc[6], row.iloc[4], 0, "EUR" ] transformed_data.append(transformed_row) progress = (index + 1) / total_rows * 100 sys.stdout.write(f"\rProcessing: {progress:.2f}%") sys.stdout.flush() total_sum = round(df.iloc[:, 11].sum(), 2) corresponding_transaction = search_bank_transaction(clearing_id) final_row = ["w", datetime.strptime(corresponding_transaction['Datum'], "%d.%m.%Y").strftime("%Y-%m-%d"), corresponding_transaction['Zpráva pro příjemce'].split(',')[-1].strip(), -total_sum, 0, "TRUE", "", "Vyrovnání zůstatku", corresponding_transaction['Číslo účtu'] + "/2010", "", "", "", "", "", "NEWLINE BREAK", "", "EUR"] transformed_data.append(final_row) return transformed_data # extract_and_process_zip(zip_folder, output_csv)