diff --git a/app_main.py b/app_main.py index 90980d1..8d8b6f5 100644 --- a/app_main.py +++ b/app_main.py @@ -1,6 +1,9 @@ from flask import Flask, request, send_file, render_template, redirect import io import os + +from aviza.paynl_single import extract_and_process_zip_paynl_single +from aviza.gls_single import extract_and_process_zip_gls_single from csv2gpc import convert_csv_to_gpc, mapping_sparkasse, mapping_pko, mapping_wise from allegro import convert_csv_to_gpc_allegro, mapping_allegro import tempfile @@ -49,6 +52,16 @@ def index(): file.save(tmp_input.name) tmp_file_path = tmp_input.name + try: + file_bank_statement = request.files["file_bank_statement"] + + suffix = os.path.splitext(file_bank_statement.filename)[1] + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_input: + file_bank_statement.save(tmp_input.name) + bank_statement_file_path = tmp_input.name + except: + print("no bank_statement") + day_of_year = int(datetime.date.today().strftime("%j")) out_filename = f"{option}_vypis_{day_of_year}.gpc" mimetype = "text/plain" @@ -80,6 +93,25 @@ def index(): as_attachment=True, mimetype="application/zip" ) + elif option == "paynl_single": + + result_data = extract_and_process_zip_paynl_single(tmp_file_path, bank_statement_file_path, f"{option}_aviza_{day_of_year}.csv") + return send_file( + result_data, + download_name=f"paynl_single_{day_of_year}.zip", + as_attachment=True, + mimetype="application/zip" + ) + + elif option == "gls_single_huf": + + result_data = extract_and_process_zip_gls_single(tmp_file_path, bank_statement_file_path, f"{option}_aviza_{day_of_year}.csv", "HUF") + return send_file( + result_data, + download_name=f"gls_single_huf_{day_of_year}.zip", + as_attachment=True, + mimetype="application/zip" + ) diff --git a/aviza/gls_single.py b/aviza/gls_single.py new file mode 100644 index 0000000..d7fe63d --- /dev/null +++ b/aviza/gls_single.py @@ -0,0 +1,124 @@ +import pandas as pd +import sys +import os +import zipfile +from datetime import datetime +from aviza.helpers import write_output_csv_to_zip + +GLOBAL_CURRENCY = None +transactions_df = None + + +def load_bank_transactions(csv_file): + """ + Loads the bank transactions CSV file into a DataFrame and returns it. + + :param csv_file: Path to the bank transactions CSV file. + :return: A pandas DataFrame containing the transactions. + """ + global GLOBAL_CURRENCY + df = pd.read_csv(csv_file, delimiter=',', dtype=str) + + if GLOBAL_CURRENCY == "HUF": + if 'Reference' not in df.columns: + raise ValueError("The CSV file does not contain the required column 'Reference'.") + + return df + +def search_bank_transaction(search_string): + """ + Searches for a given string in the 'Zpráva pro příjemce' column of the loaded DataFrame. + + :param search_string: String to search for in the 'Zpráva pro příjemce' column. + :return: The first matching row as a dictionary or None if not found. + """ + + global GLOBAL_CURRENCY, transactions_df + + if GLOBAL_CURRENCY == "HUF": + row_title = 'Reference' + matching_row = transactions_df[transactions_df[row_title].str.contains(search_string, na=False, case=False)] + + return matching_row.iloc[0].to_dict() if not matching_row.empty else None + + +def extract_and_process_zip_gls_single(zip_file_path, bank_statement_file_path, output_file, currency): + + global GLOBAL_CURRENCY, transactions_df + all_transformed_data = [] + + transactions_df = load_bank_transactions(bank_statement_file_path) + GLOBAL_CURRENCY = currency + + base_dir = os.path.dirname(zip_file_path) + extract_folder = os.path.join(base_dir, "extracted_temp") + os.makedirs(extract_folder, exist_ok=True) + + # Extract the zip file. + with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: + zip_ref.extractall(extract_folder) + + # Search for the folder named 'GLOBAL_CURRENCY' within the extracted contents. + global_currency_folder = None + for root, dirs, files in os.walk(os.path.join(extract_folder, 'Aviza GLS')): + if os.path.basename(root) == GLOBAL_CURRENCY: + global_currency_folder = root + break + + if global_currency_folder: + # Process every CSV file found in the GLOBAL_CURRENCY folder. + for filename in os.listdir(global_currency_folder): + if filename.endswith(".xlsx"): + csv_path = os.path.join(global_currency_folder, filename) + transformed_data = transform_csv(csv_path) + all_transformed_data.append(transformed_data) + else: + print(f"'GLOBAL_CURRENCY' folder not found in zip file") + + # Clean up extracted files. + for root, dirs, files in os.walk(extract_folder, topdown=False): + for file in files: + os.remove(os.path.join(root, file)) + for dir in dirs: + os.rmdir(os.path.join(root, dir)) + os.rmdir(extract_folder) + + print(f"Processed and cleaned up") + + # Write all collected transformed data to the output file. + return write_output_csv_to_zip(output_file, all_transformed_data) + +def transform_csv(input_file): + global transactions_df, GLOBAL_CURRENCY + df = pd.read_excel(input_file, skiprows=7, dtype=str) + + payment_date = datetime.strptime(input_file.split("_" + GLOBAL_CURRENCY + "_")[1].split("_")[0], "%Y%m%d").strftime("%Y.%m.%d") + + df.iloc[:, 4] = pd.to_numeric(df.iloc[:, 4].str.replace(',', '.'), errors='coerce').fillna(0) + cumsum = 0.00 + transformed_data = [] + total_rows = len(df) + for index, row in df.iterrows(): + amount = row.iloc[4] + typ_operace = "t" if amount >= 0 else "c" + cumsum += amount + transformed_row = [ + typ_operace, row.iloc[3], row.iloc[0], amount, cumsum, "TRUE", row.iloc[2], + f"Dobirka za FA s VS {row.iloc[2]}", "", "", "", "", "", "", row.iloc[1], 0, GLOBAL_CURRENCY + ] + transformed_data.append(transformed_row) + + progress = (index + 1) / total_rows * 100 + sys.stdout.write(f"\rProcessing: {progress:.2f}%") + sys.stdout.flush() + + if index == total_rows - 2: + break + + + total_sum = cumsum + corresponding_transaction = search_bank_transaction(payment_date) + final_row = ["w", datetime.strptime(corresponding_transaction['Created on'], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d"), corresponding_transaction['ID'].split('-')[-1].strip(), -total_sum, 0, "TRUE", "", "Vyrovnání zůstatku", "12600016-16965466-28438156", "", "", "", "", "", "NEWLINE BREAK", "", GLOBAL_CURRENCY] + transformed_data.append(final_row) + + return transformed_data diff --git a/aviza/helpers.py b/aviza/helpers.py new file mode 100644 index 0000000..43fbf8b --- /dev/null +++ b/aviza/helpers.py @@ -0,0 +1,31 @@ +import csv +import zipfile +import io + +def write_output_csv_to_zip(output_file, transformed_data): + output_headers = [ + "Typ operace", "Datum", "ID transakce", "Částka", "Zůstatek na účtu", "Změnit zůstatek na účtu", + "ID objednávky", "Popis", "Popis2/Číslo účtu", "Město", "PSČ", "Telefon", "E-mail", + "Emailová adresa", "Jméno a příjmení", "Provize", "Měna" + ] + + # Create an in-memory ZIP archive + mem_zip = io.BytesIO() + with zipfile.ZipFile(mem_zip, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: + for idx, file_data in enumerate(transformed_data): + # Create a filename, e.g. "0_output_file.csv" + filename = f"{idx}_{output_file}" + # Create an in-memory text stream for CSV output + csv_buffer = io.StringIO() + writer = csv.writer(csv_buffer, delimiter=';') + # Write header and file rows + writer.writerow(output_headers) + writer.writerows(file_data) + # Get CSV content as a string, encode it to bytes + csv_content = csv_buffer.getvalue().encode("utf-8") + csv_buffer.close() + # Write the CSV file into the ZIP archive + zf.writestr(filename, csv_content) + + mem_zip.seek(0) # Reset pointer to the beginning of the ZIP archive + return mem_zip \ No newline at end of file diff --git a/aviza/paynl_single.py b/aviza/paynl_single.py new file mode 100644 index 0000000..b70155e --- /dev/null +++ b/aviza/paynl_single.py @@ -0,0 +1,138 @@ +import pandas as pd +import csv +import sys +import os +import io +import zipfile +import shutil +from datetime import datetime +from aviza.helpers import write_output_csv_to_zip + +transactions_df = None + + + +def load_bank_transactions(csv_file): + """ + Loads the bank transactions CSV file into a DataFrame and returns it. + + :param csv_file: Path to the bank transactions CSV file. + :return: A pandas DataFrame containing the transactions. + """ + df = pd.read_csv(csv_file, delimiter=';', dtype=str) + + # Ensure the required column exists + if 'Zpráva pro příjemce' not in df.columns: + raise ValueError("The CSV file does not contain the required column 'Zpráva pro příjemce'.") + + return df + +def search_bank_transaction(search_string): + global transactions_df + """ + Searches for a given string in the 'Zpráva pro příjemce' column of the loaded DataFrame. + + :param df: Pandas DataFrame containing bank transactions. + :param search_string: String to search for in the 'Zpráva pro příjemce' column. + :return: The first matching row as a dictionary or None if not found. + """ + matching_row = transactions_df[transactions_df['Zpráva pro příjemce'].str.contains(search_string, na=False, case=False)] + + return matching_row.iloc[0].to_dict() if not matching_row.empty else None + + +def extract_and_process_zip_paynl_single(zip_file_path, bank_statement_file_path, output_file): + global transactions_df + transactions_df = load_bank_transactions(bank_statement_file_path) + all_transformed_data = [] + + # Create a temporary folder for extraction + base_dir = os.path.dirname(zip_file_path) + extract_folder = os.path.join(base_dir, "extracted_temp") + os.makedirs(extract_folder, exist_ok=True) + + # Extract the provided outer zip file + with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: + zip_ref.extractall(extract_folder) + + # Check if a top-level "Transactions" folder exists + transactions_folder = os.path.join(extract_folder, "Transactions") + if os.path.exists(transactions_folder): + # Look for a CSV file inside the Transactions folder + csv_filename = next( + (f for f in os.listdir(transactions_folder) + if f.startswith("Specification clearing") and f.endswith(".csv")), + None + ) + if csv_filename: + csv_path = os.path.join(transactions_folder, csv_filename) + transformed_data = transform_csv(csv_path) + all_transformed_data.append(transformed_data) + else: + # Otherwise, look for inner zip files named "Specification clearing*.zip" + for root, dirs, files in os.walk(extract_folder): + for file in files: + if file.startswith("Specification clearing") and file.endswith(".zip"): + inner_zip_path = os.path.join(root, file) + # Create a temporary folder for the inner zip extraction + inner_extract_folder = os.path.join(root, "inner_extracted") + os.makedirs(inner_extract_folder, exist_ok=True) + with zipfile.ZipFile(inner_zip_path, 'r') as inner_zip: + inner_zip.extractall(inner_extract_folder) + + # Expect the inner zip to contain a "Transactions" folder with the CSV + inner_transactions_folder = os.path.join(inner_extract_folder, "Transactions") + if os.path.exists(inner_transactions_folder): + inner_csv_filename = next( + (f for f in os.listdir(inner_transactions_folder) + if f.startswith("Specification clearing") and f.endswith(".csv")), + None + ) + if inner_csv_filename: + inner_csv_path = os.path.join(inner_transactions_folder, inner_csv_filename) + transformed_data = transform_csv(inner_csv_path) + all_transformed_data.append(transformed_data) + # Clean up inner extraction folder + shutil.rmtree(inner_extract_folder) + + # Clean up the outer extraction folder + shutil.rmtree(extract_folder) + print(f"Processed and cleaned up: {zip_file_path}") + + # Return a ZIP archive (in memory) containing all the output CSV files. + return write_output_csv_to_zip(output_file, all_transformed_data) + +def transform_csv(input_file): + global transactions_df + df = pd.read_csv(input_file, delimiter=";", dtype=str) + df.iloc[:, 12] = pd.to_numeric(df.iloc[:, 12].str.replace(',', '.'), errors='coerce').fillna(0) + df['Zůstatek na účtu'] = df.iloc[:, 12].cumsum().astype(float).round(2) + transformed_data = [] + total_rows = len(df) + clearing_id = df['CLEARING_ID'].iloc[0] if 'CLEARING_ID' in df.columns else None + + for index, row in df.iterrows(): + amount = row.iloc[12] + typ_operace = "t" if amount >= 0 else "c" + + transformed_row = [ + typ_operace, row.iloc[1], row.iloc[15], amount, row['Zůstatek na účtu'], "TRUE", row.iloc[15], + f"Dobirka za FA s VS {row.iloc[15]}", "", "", "", "", row.iloc[6], row.iloc[6], row.iloc[4], 0, "EUR" + ] + transformed_data.append(transformed_row) + + progress = (index + 1) / total_rows * 100 + sys.stdout.write(f"\rProcessing: {progress:.2f}%") + sys.stdout.flush() + + total_sum = round(df.iloc[:, 12].sum(), 2) + corresponding_transaction = search_bank_transaction(clearing_id) + final_row = ["w", datetime.strptime(corresponding_transaction['Datum'], "%d.%m.%Y").strftime("%Y-%m-%d"), corresponding_transaction['Zpráva pro příjemce'].split(',')[-1].strip(), -total_sum, 0, "TRUE", "", "Vyrovnání zůstatku", corresponding_transaction['Číslo účtu'] + "/2010", "", "", "", "", "", "NEWLINE BREAK", "", "EUR"] + transformed_data.append(final_row) + + return transformed_data + + + +# extract_and_process_zip(zip_folder, output_csv) + diff --git a/requirements.txt b/requirements.txt index c1f49c9..bd56126 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ chardet~=5.2.0 mysql-connector-python~=9.2.0 pandas~=2.2.3 flask -gunicorn \ No newline at end of file +gunicorn +openpyxl \ No newline at end of file diff --git a/templates/index.html b/templates/index.html index 6105db4..f46c7c3 100644 --- a/templates/index.html +++ b/templates/index.html @@ -46,10 +46,32 @@ - -

Aviza

+ +

Aviza - parovani primo na jednu platbu

- + +
+

Paynl .csv

+

Pouzijte .zip se vsemi zip soubory z exportu

+

.csv aviza pro EUR ucet

+
+
+

GLS HU.csv

+

Pouzijte .zip se vsemi zip soubory z exportu (z disku zazipovat slozku Aviza GLS)

+

.csv aviza pro HUF dobirky

+
+ + +
+ +

Aviza - automaticke prirazeni (podporuje import vice souboru najednou)

+
+ + + +
@@ -72,7 +94,7 @@