gpc-generator/app_main.py
2025-03-11 11:16:20 +01:00

201 lines
8.2 KiB
Python

from flask import Flask, request, send_file, render_template, redirect
import io
import os
from aviza.paynl_single import extract_and_process_zip_paynl_single
from aviza.gls_single import extract_and_process_zip_gls_single
from csv2gpc import convert_csv_to_gpc, mapping_sparkasse, mapping_pko, mapping_wise
from allegro import convert_csv_to_gpc_allegro, mapping_allegro
from aviza.przelewy24_report_single import extract_and_process_zip_przelewy_report_single
from aviza.paynl_auto import extract_and_process_zip_paynl_auto, mapping_paynl_avizo
from aviza.przelewy24_auto import extract_and_process_zip_przelewy24_auto, mapping_przelewy_avizo
from aviza.packeta_comgate_convert import extract_and_process_zip_packeta2comgate, mapping_payu
from aviza.przelewy24_transactions import extract_and_process_csv_przelewy24_transactions, mapping_przelewy_transactions
import tempfile
import datetime
app = Flask(__name__)
def convert_avizo_na_jednu_platbu(data):
# Replace with your real conversion code
return data
def convert_avizo_full_auto(data):
# Replace with your real conversion code
return data
def convert_allegro(data):
# Replace with your real conversion code
return data
def create_download_response(file_data: bytes, filename: str, mimetype: str = "text/plain"):
"""Wrap the file data in a BytesIO stream and return it as a downloadable response."""
return send_file(
io.BytesIO(file_data),
download_name=filename,
as_attachment=True,
mimetype=mimetype
)
@app.route("/", methods=["GET", "POST"])
def index():
if request.method == "POST":
if "file" not in request.files or request.files["file"].filename == "":
return redirect(request.url)
file = request.files["file"]
option = request.form.get("option")
suffix = os.path.splitext(file.filename)[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_input:
file.save(tmp_input.name)
tmp_file_path = tmp_input.name
try:
file_bank_statement = request.files["file_bank_statement"]
suffix = os.path.splitext(file_bank_statement.filename)[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_input:
file_bank_statement.save(tmp_input.name)
bank_statement_file_path = tmp_input.name
except:
print("no bank_statement")
try:
file_refunds = request.files["file_refunds"]
suffix = os.path.splitext(file_refunds.filename)[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_input:
file_refunds.save(tmp_input.name)
refunds_file_path = tmp_input.name
except:
print("no bank_statement")
day_of_year = int(datetime.date.today().strftime("%j"))
out_filename = f"{option}_vypis_{day_of_year}.gpc"
mimetype = "text/plain"
# Choose the conversion function based on the selected option
if option == "sparkasse":
result_data = convert_csv_to_gpc(tmp_file_path, f"{option}_vypis_{day_of_year}.gpc",
account_number=95850503000221267034,
currency="EUR", mapping=mapping_sparkasse)
elif option == "wise_huf":
result_data = convert_csv_to_gpc(tmp_file_path, f"{option}_vypis_{day_of_year}.gpc",
account_number=330005602964780100,
currency="HUF", mapping=mapping_wise)
elif option == "wise_ron":
result_data = convert_csv_to_gpc(tmp_file_path, f"{option}_vypis_{day_of_year}.gpc",
account_number=5602964780100,
currency="RON", mapping=mapping_wise)
elif option == "pko":
result_data = convert_csv_to_gpc(tmp_file_path, f"{option}_vypis_{day_of_year}.gpc",
account_number=95102013900000630206821286,
currency="PLN", mapping=mapping_pko)
elif option == "allegro":
result_data = convert_csv_to_gpc_allegro(tmp_file_path, f"{option}_vypis_{day_of_year}.gpc",
account_number=3214724742,
currency="PLN", mapping=mapping_allegro)
return send_file(
result_data,
download_name="allegro_files.zip",
as_attachment=True,
mimetype="application/zip"
)
elif option == "paynl_single":
result_data = extract_and_process_zip_paynl_single(tmp_file_path, bank_statement_file_path, f"{option}_aviza_{day_of_year}.csv")
return send_file(
result_data,
download_name=f"paynl_single_{day_of_year}.zip",
as_attachment=True,
mimetype="application/zip"
)
elif option == "gls_single_huf":
result_data = extract_and_process_zip_gls_single(tmp_file_path, bank_statement_file_path, f"{option}_aviza_{day_of_year}.csv", "HUF")
return send_file(
result_data,
download_name=f"gls_single_huf_{day_of_year}.zip",
as_attachment=True,
mimetype="application/zip"
)
elif option == "przelewy24_single":
result_data = extract_and_process_zip_przelewy_report_single(tmp_file_path, f"{option}_aviza_{day_of_year}.csv")
return send_file(
result_data,
download_name=f"przelewy24_single_{day_of_year}.zip",
as_attachment=True,
mimetype="application/zip"
)
elif option == "paynl_auto":
result_data = extract_and_process_zip_paynl_auto(tmp_file_path, bank_statement_file_path, f"{option}_avizo_{day_of_year}.gpc",
account_number=3498710000999125,
currency="EUR", mapping=mapping_paynl_avizo)
return send_file(
result_data,
download_name=f"paynl_auto_{day_of_year}.zip",
as_attachment=True,
mimetype="application/zip"
)
elif option == "przelewy24_auto":
result_data = extract_and_process_zip_przelewy24_auto(tmp_file_path, bank_statement_file_path, f"{option}_avizo_{day_of_year}.gpc",
account_number=3498710000999133,
currency="PLN", mapping=mapping_przelewy_avizo)
return send_file(
result_data,
download_name=f"przelewy24_auto_{day_of_year}.zip",
as_attachment=True,
mimetype="application/zip"
)
elif option == "przelewy24_transactions":
result_data = extract_and_process_csv_przelewy24_transactions(tmp_file_path, bank_statement_file_path, refunds_file_path, f"{option}_avizo_{day_of_year}.gpc",
account_number=3498710000999133,
currency="PLN", mapping=mapping_przelewy_transactions)
return send_file(
result_data,
download_name=f"przelewy24_transactions_{day_of_year}.zip",
as_attachment=True,
mimetype="application/zip"
)
elif option == "packeta2comgate":
result_data = extract_and_process_zip_packeta2comgate(tmp_file_path, f"{option}_avizo_{day_of_year}.gpc", mapping=mapping_payu)
return send_file(
result_data,
download_name=f"packeta_auto_{day_of_year}.zip",
as_attachment=True,
mimetype="application/zip"
)
if isinstance(result_data, str):
result_bytes = result_data.encode("utf-8")
else:
result_bytes = result_data
return create_download_response(result_bytes, out_filename, mimetype)
return render_template("index.html")
if __name__ == '__main__':
app.run(debug=True, port=5002)