init commit
This commit is contained in:
commit
ca02980c1b
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
*.gpc
|
||||
*.csv
|
||||
.venv
|
8
.idea/.gitignore
generated
vendored
Normal file
8
.idea/.gitignore
generated
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
@ -0,0 +1,6 @@
|
||||
<component name="InspectionProjectProfileManager">
|
||||
<settings>
|
||||
<option name="USE_PROJECT_PROFILE" value="false" />
|
||||
<version value="1.0" />
|
||||
</settings>
|
||||
</component>
|
7
.idea/misc.xml
generated
Normal file
7
.idea/misc.xml
generated
Normal file
@ -0,0 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="Black">
|
||||
<option name="sdkName" value="Python 3.13 (wise)" />
|
||||
</component>
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13 (wise)" project-jdk-type="Python SDK" />
|
||||
</project>
|
8
.idea/modules.xml
generated
Normal file
8
.idea/modules.xml
generated
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/wise.iml" filepath="$PROJECT_DIR$/.idea/wise.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
14
.idea/wise.iml
generated
Normal file
14
.idea/wise.iml
generated
Normal file
@ -0,0 +1,14 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="PYTHON_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$">
|
||||
<excludeFolder url="file://$MODULE_DIR$/.venv" />
|
||||
</content>
|
||||
<orderEntry type="jdk" jdkName="Python 3.13 (wise)" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
<component name="PyDocumentationSettings">
|
||||
<option name="format" value="PLAIN" />
|
||||
<option name="myDocStringFormat" value="Plain" />
|
||||
</component>
|
||||
</module>
|
BIN
__pycache__/gpc.cpython-313.pyc
Normal file
BIN
__pycache__/gpc.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/utils.cpython-313.pyc
Normal file
BIN
__pycache__/utils.cpython-313.pyc
Normal file
Binary file not shown.
222
allegro.py
Normal file
222
allegro.py
Normal file
@ -0,0 +1,222 @@
|
||||
import csv
|
||||
import chardet
|
||||
from datetime import datetime
|
||||
from gpc import Data, Header, TransactionCode, CURRENCIES_GPC
|
||||
from utils import extract_order_number, extract_numbers, parse_date
|
||||
import mysql.connector
|
||||
import re
|
||||
|
||||
def load_allegro_orders():
|
||||
|
||||
# Define your connection parameters.
|
||||
conn = mysql.connector.connect(
|
||||
host="afrodite.my-devbox.cloud",
|
||||
port=3307,
|
||||
user="vat_dev",
|
||||
password="i!532&hchjJrGmTSV37i7xPfL&",
|
||||
database="remotecont1"
|
||||
)
|
||||
|
||||
# Use a cursor that returns rows as dictionaries.
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
# Your SQL query.
|
||||
query = """
|
||||
SELECT o.* \
|
||||
FROM rcd_orders o
|
||||
WHERE o.allegro_order_id IS NOT NULL
|
||||
"""
|
||||
|
||||
cursor.execute(query)
|
||||
allegro_orders = cursor.fetchall() # results is now a list of dictionaries
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return allegro_orders
|
||||
|
||||
def search_allegro_order(allegro_id):
|
||||
global allegro_orders
|
||||
for order in allegro_orders:
|
||||
if order["allegro_login"] == allegro_id:
|
||||
return order["orders_id"]
|
||||
return 0
|
||||
|
||||
def convert_csv_to_gpc(csv_file_path, gpc_file_path, account_number, currency, mapping):
|
||||
|
||||
gpc_lines = {
|
||||
"payu": [],
|
||||
"przelewy24": [],
|
||||
"allegro finance": []
|
||||
}
|
||||
|
||||
account_numbers = {
|
||||
"payu": 4113600111,
|
||||
"przelewy24": 4113600222,
|
||||
"allegro finance": 4113600333
|
||||
}
|
||||
|
||||
# Create and add the header
|
||||
header = Header(
|
||||
account=4113600111,
|
||||
account_name=currency.ljust(20),
|
||||
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
|
||||
old_balance=78449,
|
||||
old_sign='+',
|
||||
new_balance=6215449,
|
||||
new_sign='+',
|
||||
turnover_debet=6585600,
|
||||
turnover_debet_sign='0',
|
||||
turnover_credit=127226,
|
||||
turnover_credit_sign='0',
|
||||
transaction_list_no=3,
|
||||
date=datetime.now()
|
||||
)
|
||||
gpc_lines["payu"].append(header.to_string())
|
||||
# Create and add the header
|
||||
header = Header(
|
||||
account=4113600222,
|
||||
account_name=currency.ljust(20),
|
||||
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
|
||||
old_balance=78449,
|
||||
old_sign='+',
|
||||
new_balance=6215449,
|
||||
new_sign='+',
|
||||
turnover_debet=6585600,
|
||||
turnover_debet_sign='0',
|
||||
turnover_credit=127226,
|
||||
turnover_credit_sign='0',
|
||||
transaction_list_no=3,
|
||||
date=datetime.now()
|
||||
)
|
||||
gpc_lines["przelewy24"].append(header.to_string())
|
||||
|
||||
# Create and add the header
|
||||
header = Header(
|
||||
account=4113600333,
|
||||
account_name=currency.ljust(20),
|
||||
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
|
||||
old_balance=78449,
|
||||
old_sign='+',
|
||||
new_balance=6215449,
|
||||
new_sign='+',
|
||||
turnover_debet=6585600,
|
||||
turnover_debet_sign='0',
|
||||
turnover_credit=127226,
|
||||
turnover_credit_sign='0',
|
||||
transaction_list_no=3,
|
||||
date=datetime.now()
|
||||
)
|
||||
gpc_lines["allegro finance"].append(header.to_string())
|
||||
|
||||
|
||||
|
||||
|
||||
if mapping['forced_encoding'] is not None:
|
||||
detected_encoding = mapping['forced_encoding']
|
||||
print(f"Forced encoding: {detected_encoding}")
|
||||
else:
|
||||
with open(csv_file_path, 'rb') as f:
|
||||
rawdata = f.read(1024) # Read a small part of the file
|
||||
result = chardet.detect(rawdata)
|
||||
detected_encoding = result['encoding']
|
||||
print(f"Detected encoding: {detected_encoding}")
|
||||
|
||||
|
||||
with open(csv_file_path, mode='r', encoding=detected_encoding) as csv_file:
|
||||
reader = csv.DictReader(csv_file, delimiter=mapping['delimiter']) if mapping['is_dict_mapping'] else csv.reader(csv_file, delimiter=mapping['delimiter'])
|
||||
|
||||
if not mapping['is_dict_mapping']:
|
||||
next(reader)
|
||||
|
||||
for row in reader:
|
||||
source_name = row[mapping['source_name']]
|
||||
reference = search_allegro_order(source_name.split(";")[0].strip()) if ";" in row[mapping['source_name']] else 0
|
||||
transaction_id = reference
|
||||
direction = row[mapping['direction']].lower() if mapping['direction'] is not None else None
|
||||
payer_account = extract_numbers(row[mapping['payer_account']].replace("Rachunek nadawcy: ", "").replace(" ", ""))[:16].ljust(16) if mapping['payer_number_exists'] else 0
|
||||
if payer_account != 0:
|
||||
if payer_account.strip() == "":
|
||||
payer_account = 0
|
||||
|
||||
value_str = row[mapping['source_amount']]
|
||||
match = re.search(r'[-+]?\d*\.?\d+', value_str)
|
||||
if match:
|
||||
source_amount = float(match.group(0)) * 100
|
||||
else:
|
||||
source_amount = 0.0
|
||||
|
||||
created_on = parse_date(row[mapping['created_on']])
|
||||
|
||||
# Determine transaction type
|
||||
if(direction is None):
|
||||
if source_amount > 0:
|
||||
transaction_code = TransactionCode.CREDIT
|
||||
else:
|
||||
transaction_code = TransactionCode.DEBET
|
||||
else:
|
||||
if direction == "out":
|
||||
transaction_code = TransactionCode.DEBET
|
||||
else:
|
||||
transaction_code = TransactionCode.CREDIT
|
||||
|
||||
# Convert currency
|
||||
currency_code = CURRENCIES_GPC.get("CZK", "0000")
|
||||
|
||||
# Create GPC Data object
|
||||
gpc_data = Data(
|
||||
account=account_numbers[row[mapping['wallet_source']].lower()],
|
||||
payer_account=0,
|
||||
no=reference,
|
||||
balance=source_amount,
|
||||
code=transaction_code,
|
||||
variable=reference,
|
||||
constant_symbol=0,
|
||||
bank_code=0,
|
||||
specific_symbol=0,
|
||||
client_name= "VAT VATu" if row[mapping['payment_type']] == "wypłata środków" else "",
|
||||
currency=currency_code,
|
||||
date=created_on
|
||||
)
|
||||
|
||||
gpc_lines[row[mapping['wallet_source']].lower()].append(gpc_data.to_string())
|
||||
|
||||
with open("allegro_payu.gpc", mode='w', encoding='utf-8') as gpc_file:
|
||||
gpc_file.writelines(gpc_lines["payu"])
|
||||
|
||||
with open("allegro_przelewy.gpc", mode='w', encoding='utf-8') as gpc_file:
|
||||
gpc_file.writelines(gpc_lines["przelewy24"])
|
||||
|
||||
with open("allegro_finance.gpc", mode='w', encoding='utf-8') as gpc_file:
|
||||
gpc_file.writelines(gpc_lines["allegro finance"])
|
||||
|
||||
print(f"GPC file successfully created: allegro_payu.gpc")
|
||||
print(f"GPC file successfully created: allegro_przelewy.gpc")
|
||||
print(f"GPC file successfully created: allegro_finance.gpc")
|
||||
|
||||
# Example mappings
|
||||
|
||||
mapping_allegro = {
|
||||
'transaction_id': 2,
|
||||
'reference': None,
|
||||
'direction': None,
|
||||
'source_name': 5,
|
||||
'source_amount': 8,
|
||||
'source_currency': 4,
|
||||
'payer_account': None,
|
||||
'created_on': 0,
|
||||
'payment_type': 3,
|
||||
'wallet_source': 4,
|
||||
'acc_sum': 9,
|
||||
'delimiter': ",",
|
||||
'is_dict_mapping': False,
|
||||
'use_transaction_id': False,
|
||||
'payer_number_exists': False,
|
||||
'use_source_name': False,
|
||||
'forced_encoding': None
|
||||
}
|
||||
|
||||
|
||||
allegro_orders = load_allegro_orders()
|
||||
|
||||
|
||||
# Example usage:
|
||||
convert_csv_to_gpc("allegro_payments.csv", "allegro_payments.gpc", account_number=3214724742, currency="PLN", mapping=mapping_allegro)
|
76
gpc.py
Normal file
76
gpc.py
Normal file
@ -0,0 +1,76 @@
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class RecordType(str, Enum):
|
||||
HEADER = "074"
|
||||
ITEM = "075"
|
||||
|
||||
class TransactionCode(int, Enum):
|
||||
DEBET = 1 # 1 = položka debet
|
||||
CREDIT = 2 # 2 = položka kredit
|
||||
STORNO_DEBET = 4 # 4 = storno položky debet
|
||||
STORNO_CREDIT = 5 # 5 = storno položky kredit
|
||||
|
||||
CURRENCIES_GPC = {
|
||||
"AUD": "0036", "CAD": "0124", "CNY": "0156", "CZK": "0203", "DKK": "0208",
|
||||
"EUR": "0978", "GBP": "0826", "HRK": "0191", "HUF": "0348", "CHF": "0756",
|
||||
"JPY": "0392", "NOK": "0578", "PLN": "0985", "RON": "0946", "RUB": "0643",
|
||||
"SEK": "0752", "TRY": "0949", "USD": "0840"
|
||||
}
|
||||
|
||||
class BaseRecord:
|
||||
def __init__(self, record_type: RecordType, account: int):
|
||||
self.record_type = record_type
|
||||
self.account = account
|
||||
|
||||
class Header(BaseRecord):
|
||||
def __init__(self, account: int, account_name: str, old_date: datetime, old_balance: int,
|
||||
old_sign: str, new_balance: int, new_sign: str, turnover_debet: int,
|
||||
turnover_debet_sign: str, turnover_credit: int, turnover_credit_sign: str,
|
||||
transaction_list_no: int, date: datetime):
|
||||
super().__init__(RecordType.HEADER, account)
|
||||
self.account_name = account_name[:20].ljust(20)
|
||||
self.old_date = old_date.strftime("%d%m%y")
|
||||
self.old_balance = old_balance
|
||||
self.old_sign = old_sign
|
||||
self.new_balance = new_balance
|
||||
self.new_sign = new_sign
|
||||
self.turnover_debet = turnover_debet
|
||||
self.turnover_debet_sign = turnover_debet_sign
|
||||
self.turnover_credit = turnover_credit
|
||||
self.turnover_credit_sign = turnover_credit_sign
|
||||
self.transaction_list_no = transaction_list_no
|
||||
self.date = date.strftime("%d%m%y")
|
||||
|
||||
|
||||
def to_string(self):
|
||||
return (f"074{str(self.account)[:16].zfill(16)}{str(self.account_name)[:20].zfill(20)}"
|
||||
f"{self.old_date}{self.old_balance:014}{self.old_sign}"
|
||||
f"{self.new_balance:014}{self.new_sign}"
|
||||
f"{int(self.turnover_debet):014}{self.turnover_debet_sign}"
|
||||
f"{int(self.turnover_credit):014}{self.turnover_credit_sign}"
|
||||
f"{self.transaction_list_no:03}{self.date}{' '.ljust(14)}\r\n")
|
||||
|
||||
class Data(BaseRecord):
|
||||
def __init__(self, account: int, payer_account: int, no: int, balance: float, code: TransactionCode,
|
||||
variable: int, constant_symbol: int, bank_code: int, specific_symbol: int,
|
||||
client_name: str, currency: str, date: datetime):
|
||||
super().__init__(RecordType.ITEM, account)
|
||||
self.payer_account = payer_account
|
||||
self.no = no
|
||||
self.balance = abs(balance)
|
||||
self.code = code
|
||||
self.variable = variable
|
||||
self.constant_symbol = constant_symbol
|
||||
self.bank_code = bank_code
|
||||
self.specific_symbol = specific_symbol
|
||||
self.client_name = client_name
|
||||
self.currency = currency
|
||||
self.date = date.strftime("%d%m%y")
|
||||
|
||||
def to_string(self):
|
||||
return (f"{self.record_type.value:03}{str(self.account)[:16].zfill(16)}{str(self.payer_account).rjust(16, '0')[:16]}"
|
||||
f"{str(self.no)[:13].zfill(13)}{int(self.balance):012}{self.code.value:1}{self.variable:010}"
|
||||
f"{self.constant_symbol:010}{self.specific_symbol:010}"
|
||||
f"{'0'*6}{str(self.client_name)[:20].ljust(20)}{'0'}{self.currency}{self.date}\r\n")
|
199
paynl_aviza.py
Normal file
199
paynl_aviza.py
Normal file
@ -0,0 +1,199 @@
|
||||
import csv
|
||||
import pandas as pd
|
||||
import chardet
|
||||
from datetime import datetime
|
||||
from gpc import Data, Header, TransactionCode, CURRENCIES_GPC
|
||||
from utils import extract_order_number, extract_numbers, parse_date
|
||||
|
||||
|
||||
def load_bank_transactions(csv_file):
|
||||
"""
|
||||
Loads the bank transactions CSV file into a DataFrame and returns it.
|
||||
|
||||
:param csv_file: Path to the bank transactions CSV file.
|
||||
:return: A pandas DataFrame containing the transactions.
|
||||
"""
|
||||
df = pd.read_csv(csv_file, delimiter=';', dtype=str)
|
||||
|
||||
# Ensure the required column exists
|
||||
if 'Zpráva pro příjemce' not in df.columns:
|
||||
raise ValueError("The CSV file does not contain the required column 'Zpráva pro příjemce'.")
|
||||
|
||||
return df
|
||||
|
||||
def search_bank_transaction(df, search_string):
|
||||
"""
|
||||
Searches for a given string in the 'Zpráva pro příjemce' column of the loaded DataFrame.
|
||||
|
||||
:param df: Pandas DataFrame containing bank transactions.
|
||||
:param search_string: String to search for in the 'Zpráva pro příjemce' column.
|
||||
:return: The first matching row as a dictionary or None if not found.
|
||||
"""
|
||||
matching_row = df[df['Zpráva pro příjemce'].str.contains(search_string, na=False, case=False)]
|
||||
|
||||
return matching_row.iloc[0].to_dict() if not matching_row.empty else None
|
||||
|
||||
|
||||
def convert_csv_to_gpc(csv_file_path, gpc_file_path, account_number, currency, mapping):
|
||||
gpc_lines = []
|
||||
global transactions_df
|
||||
|
||||
|
||||
if mapping['forced_encoding'] is not None:
|
||||
detected_encoding = mapping['forced_encoding']
|
||||
print(f"Forced encoding: {detected_encoding}")
|
||||
else:
|
||||
with open(csv_file_path, 'rb') as f:
|
||||
rawdata = f.read(1024) # Read a small part of the file
|
||||
result = chardet.detect(rawdata)
|
||||
detected_encoding = result['encoding']
|
||||
print(f"Detected encoding: {detected_encoding}")
|
||||
|
||||
|
||||
with open(csv_file_path, mode='r', encoding=detected_encoding) as csv_file:
|
||||
reader = csv.DictReader(csv_file, delimiter=mapping['delimiter']) if mapping['is_dict_mapping'] else csv.reader(csv_file, delimiter=mapping['delimiter'])
|
||||
|
||||
if not mapping['is_dict_mapping']:
|
||||
next(reader)
|
||||
|
||||
total_payout = 0.0
|
||||
first = True
|
||||
clearing_id = ""
|
||||
for row in reader:
|
||||
|
||||
if first:
|
||||
clearing_id = row[mapping['CLEARING_ID']]
|
||||
first = False
|
||||
reference = extract_order_number(row[mapping['reference']])
|
||||
transaction_id = extract_numbers(row[mapping['transaction_id']]) if mapping['use_transaction_id'] else reference
|
||||
direction = row[mapping['direction']].lower() if mapping['direction'] is not None else None
|
||||
source_name = row[mapping['source_name']].replace("Nazwa nadawcy: ", "")[:20].ljust(20) if mapping['use_source_name'] else ""
|
||||
payer_account = extract_numbers(row[mapping['payer_account']].replace("Rachunek nadawcy: ", "").replace(" ", ""))[:16].ljust(16) if mapping['payer_number_exists'] else 0
|
||||
if payer_account != 0:
|
||||
if payer_account.strip() == "":
|
||||
payer_account = 0
|
||||
source_amount = float(row[mapping['source_amount']].replace(',', '.')) * 100 # Convert to cents
|
||||
created_on = parse_date(row[mapping['created_on']])
|
||||
|
||||
# Determine transaction type
|
||||
if(direction is None):
|
||||
if source_amount > 0:
|
||||
transaction_code = TransactionCode.CREDIT
|
||||
else:
|
||||
transaction_code = TransactionCode.DEBET
|
||||
else:
|
||||
if direction == "out":
|
||||
transaction_code = TransactionCode.DEBET
|
||||
else:
|
||||
transaction_code = TransactionCode.CREDIT
|
||||
|
||||
# Convert currency
|
||||
currency_code = CURRENCIES_GPC.get("CZK", "0000")
|
||||
|
||||
# Create GPC Data object
|
||||
gpc_data = Data(
|
||||
account=account_number,
|
||||
payer_account=payer_account,
|
||||
no=transaction_id,
|
||||
balance=source_amount,
|
||||
code=transaction_code,
|
||||
variable=int(reference) if reference.isdigit() else 0,
|
||||
constant_symbol=0,
|
||||
bank_code=0,
|
||||
specific_symbol=0,
|
||||
client_name=source_name,
|
||||
currency=currency_code,
|
||||
date=created_on
|
||||
)
|
||||
|
||||
gpc_lines.append(gpc_data.to_string())
|
||||
total_payout += source_amount
|
||||
|
||||
# add fees
|
||||
# fees_data = Data(
|
||||
# account=account_number,
|
||||
# payer_account=0,
|
||||
# no=0,
|
||||
# balance=total_fees,
|
||||
# code=transaction_code,
|
||||
# variable=0,
|
||||
# constant_symbol=0,
|
||||
# bank_code=0,
|
||||
# specific_symbol=0,
|
||||
# client_name="",
|
||||
# currency=CURRENCIES_GPC.get("CZK", "0000"),
|
||||
# date=created_on
|
||||
# )
|
||||
#
|
||||
# gpc_lines.append(gpc_data.to_string())
|
||||
|
||||
corresponding_transaction = search_bank_transaction(transactions_df, clearing_id)
|
||||
|
||||
# vyuctovani row
|
||||
payout_data = Data(
|
||||
account=account_number,
|
||||
payer_account=0,
|
||||
no=0,
|
||||
balance=total_payout,
|
||||
code=TransactionCode.DEBET,
|
||||
variable=666111222,
|
||||
# variable=corresponding_transaction['Zpráva pro příjemce'].split(',')[-1].strip(),
|
||||
constant_symbol=0,
|
||||
bank_code=0,
|
||||
specific_symbol=0,
|
||||
client_name="",
|
||||
currency=CURRENCIES_GPC.get("CZK", "0000"),
|
||||
date=parse_date(corresponding_transaction['Datum'])
|
||||
)
|
||||
|
||||
gpc_lines.append(payout_data.to_string())
|
||||
|
||||
# Create and add the header
|
||||
header = Header(
|
||||
account=account_number,
|
||||
account_name=currency.ljust(20),
|
||||
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
|
||||
old_balance=0,
|
||||
old_sign='+',
|
||||
new_balance=0,
|
||||
new_sign='+',
|
||||
turnover_debet=total_payout,
|
||||
turnover_debet_sign='0',
|
||||
turnover_credit=total_payout,
|
||||
turnover_credit_sign='0',
|
||||
transaction_list_no=3,
|
||||
date=datetime.now()
|
||||
)
|
||||
gpc_lines.insert(0, header.to_string())
|
||||
|
||||
with open(gpc_file_path, mode='w', encoding='utf-8') as gpc_file:
|
||||
gpc_file.writelines(gpc_lines)
|
||||
|
||||
print(f"GPC file successfully created: {gpc_file_path}")
|
||||
|
||||
# Example mappings
|
||||
mapping_paynl = {
|
||||
'transaction_id': 'PAYMENT_SESSION_ID',
|
||||
'reference': 'EXTRA_1',
|
||||
'direction': None,
|
||||
'source_name': 'CONSUMER_NAME',
|
||||
'source_amount': 'TURNOVER_TOTAL',
|
||||
'source_currency': '',
|
||||
'payer_account': 'PAYMENT_SESSION_ID',
|
||||
'created_on': 'TRANSACTION_DATE',
|
||||
'CLEARING_ID': 'CLEARING_ID',
|
||||
'fees': 'COSTS',
|
||||
'delimiter': ";",
|
||||
'is_dict_mapping': True,
|
||||
'use_transaction_id': True,
|
||||
'payer_number_exists': False,
|
||||
'use_source_name': False,
|
||||
'forced_encoding': None
|
||||
}
|
||||
|
||||
|
||||
transactions_df = load_bank_transactions("bank_statement.csv")
|
||||
# Example usage:
|
||||
convert_csv_to_gpc("Specification clearing 2024-05-10.csv", "avizo_paynl_test.gpc", account_number=2801379531, currency="EUR", mapping=mapping_paynl)
|
||||
# convert_csv_to_gpc("pko_input.csv", "pko_output.gpc", account_number=95102013900000630206821286, currency="PLN", mapping=mapping_pko)
|
||||
# convert_csv_to_gpc("sparkasse_input.csv", "sparkasse_output.gpc", account_number=95850503000221267034, currency="EUR", mapping=mapping_sparkasse)
|
3
requirements.txt
Normal file
3
requirements.txt
Normal file
@ -0,0 +1,3 @@
|
||||
chardet~=5.2.0
|
||||
mysql-connector-python~=9.2.0
|
||||
pandas~=2.2.3
|
33
utils.py
Normal file
33
utils.py
Normal file
@ -0,0 +1,33 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
def extract_numbers(text: str) -> str:
|
||||
"""Extracts only digits from the given string."""
|
||||
return ''.join(re.findall(r'\d+', text))
|
||||
|
||||
def extract_order_number(reference: str) -> str:
|
||||
"""Extracts a 6-digit order number from the reference field.
|
||||
|
||||
If no 6-digit number is found, returns "0".
|
||||
"""
|
||||
match = re.search(r'\b\d{6}\b', reference)
|
||||
return match.group(0) if match else "0"
|
||||
|
||||
def parse_date(date_str):
|
||||
"""Tries multiple date formats to handle different CSV structures."""
|
||||
date_formats = [
|
||||
"%Y-%m-%d %H:%M:%S", # 2025-02-06 10:35:44
|
||||
"%d.%m.%Y %H:%M", # 31.12.2024 21:17
|
||||
"%Y-%m-%d", # 2025-02-06
|
||||
"%d-%m-%Y", # 06-02-2025
|
||||
"%d.%m.%Y", # 06.02.2025
|
||||
"%d.%m.%y" # 06.02.25
|
||||
]
|
||||
|
||||
for fmt in date_formats:
|
||||
try:
|
||||
return datetime.strptime(date_str, fmt)
|
||||
except ValueError:
|
||||
continue # Try the next format
|
||||
|
||||
raise ValueError(f"Unsupported date format: {date_str}") # Raise error if none match
|
153
wise.py
Normal file
153
wise.py
Normal file
@ -0,0 +1,153 @@
|
||||
import csv
|
||||
import chardet
|
||||
from datetime import datetime
|
||||
from gpc import Data, Header, TransactionCode, CURRENCIES_GPC
|
||||
from utils import extract_order_number, extract_numbers, parse_date
|
||||
|
||||
def convert_csv_to_gpc(csv_file_path, gpc_file_path, account_number, currency, mapping):
|
||||
gpc_lines = []
|
||||
|
||||
# Create and add the header
|
||||
header = Header(
|
||||
account=account_number,
|
||||
account_name=currency.ljust(20),
|
||||
old_date=datetime.strptime("01-03-20", "%d-%m-%y"),
|
||||
old_balance=78449,
|
||||
old_sign='+',
|
||||
new_balance=6215449,
|
||||
new_sign='+',
|
||||
turnover_debet=6585600,
|
||||
turnover_debet_sign='0',
|
||||
turnover_credit=127226,
|
||||
turnover_credit_sign='0',
|
||||
transaction_list_no=3,
|
||||
date=datetime.now()
|
||||
)
|
||||
gpc_lines.append(header.to_string())
|
||||
|
||||
|
||||
|
||||
|
||||
if mapping['forced_encoding'] is not None:
|
||||
detected_encoding = mapping['forced_encoding']
|
||||
print(f"Forced encoding: {detected_encoding}")
|
||||
else:
|
||||
with open(csv_file_path, 'rb') as f:
|
||||
rawdata = f.read(1024) # Read a small part of the file
|
||||
result = chardet.detect(rawdata)
|
||||
detected_encoding = result['encoding']
|
||||
print(f"Detected encoding: {detected_encoding}")
|
||||
|
||||
|
||||
with open(csv_file_path, mode='r', encoding=detected_encoding) as csv_file:
|
||||
reader = csv.DictReader(csv_file, delimiter=mapping['delimiter']) if mapping['is_dict_mapping'] else csv.reader(csv_file, delimiter=mapping['delimiter'])
|
||||
|
||||
if not mapping['is_dict_mapping']:
|
||||
next(reader)
|
||||
|
||||
for row in reader:
|
||||
reference = extract_order_number(row[mapping['reference']])
|
||||
transaction_id = extract_numbers(row[mapping['transaction_id']]) if mapping['use_transaction_id'] else reference
|
||||
direction = row[mapping['direction']].lower() if mapping['direction'] is not None else None
|
||||
source_name = row[mapping['source_name']].replace("Nazwa nadawcy: ", "")[:20].ljust(20) if mapping['use_source_name'] else ""
|
||||
payer_account = extract_numbers(row[mapping['payer_account']].replace("Rachunek nadawcy: ", "").replace(" ", ""))[:16].ljust(16) if mapping['payer_number_exists'] else 0
|
||||
if payer_account != 0:
|
||||
if payer_account.strip() == "":
|
||||
payer_account = 0
|
||||
source_amount = float(row[mapping['source_amount']].replace(',', '.')) * 100 # Convert to cents
|
||||
created_on = parse_date(row[mapping['created_on']])
|
||||
|
||||
# Determine transaction type
|
||||
if(direction is None):
|
||||
if source_amount > 0:
|
||||
transaction_code = TransactionCode.CREDIT
|
||||
else:
|
||||
transaction_code = TransactionCode.DEBET
|
||||
else:
|
||||
if direction == "out":
|
||||
transaction_code = TransactionCode.DEBET
|
||||
else:
|
||||
transaction_code = TransactionCode.CREDIT
|
||||
|
||||
# Convert currency
|
||||
currency_code = CURRENCIES_GPC.get("CZK", "0000")
|
||||
|
||||
# Create GPC Data object
|
||||
gpc_data = Data(
|
||||
account=account_number,
|
||||
payer_account=payer_account,
|
||||
no=transaction_id,
|
||||
balance=source_amount,
|
||||
code=transaction_code,
|
||||
variable=int(reference) if reference.isdigit() else 0,
|
||||
constant_symbol=0,
|
||||
bank_code=0,
|
||||
specific_symbol=0,
|
||||
client_name=source_name,
|
||||
currency=currency_code,
|
||||
date=created_on
|
||||
)
|
||||
|
||||
gpc_lines.append(gpc_data.to_string())
|
||||
|
||||
with open(gpc_file_path, mode='w', encoding='utf-8') as gpc_file:
|
||||
gpc_file.writelines(gpc_lines)
|
||||
|
||||
print(f"GPC file successfully created: {gpc_file_path}")
|
||||
|
||||
# Example mappings
|
||||
mapping_wise = {
|
||||
'transaction_id': 'ID',
|
||||
'reference': 'Reference',
|
||||
'direction': 'Direction',
|
||||
'source_name': 'Source name',
|
||||
'source_amount': 'Source amount (after fees)',
|
||||
'source_currency': 'Source currency',
|
||||
'payer_account': 'Source currency',
|
||||
'created_on': 'Created on',
|
||||
'delimiter': ",",
|
||||
'is_dict_mapping': True,
|
||||
'use_transaction_id': False,
|
||||
'payer_number_exists': False,
|
||||
'use_source_name': False,
|
||||
'forced_encoding': None
|
||||
}
|
||||
|
||||
mapping_pko = {
|
||||
'transaction_id': 2,
|
||||
'reference': 9,
|
||||
'direction': None,
|
||||
'source_name': 7,
|
||||
'source_amount': 3,
|
||||
'source_currency': 4,
|
||||
'payer_account': 6,
|
||||
'created_on': 1,
|
||||
'delimiter': ",",
|
||||
'is_dict_mapping': False,
|
||||
'use_transaction_id': False,
|
||||
'payer_number_exists': True,
|
||||
'use_source_name': False,
|
||||
'forced_encoding': 'iso-8859-2'
|
||||
}
|
||||
|
||||
mapping_sparkasse = {
|
||||
'transaction_id': 7,
|
||||
'reference': 4,
|
||||
'direction': None,
|
||||
'source_name': 11,
|
||||
'source_amount': 14,
|
||||
'source_currency': 15,
|
||||
'payer_account': 12,
|
||||
'created_on': 1,
|
||||
'delimiter': ";",
|
||||
'is_dict_mapping': False,
|
||||
'use_transaction_id': False,
|
||||
'payer_number_exists': True,
|
||||
'use_source_name': False,
|
||||
'forced_encoding': None
|
||||
}
|
||||
|
||||
# Example usage:
|
||||
convert_csv_to_gpc("leden-2025-huf.csv", "wise-huf-2025.gpc", account_number=330005602964780100, currency="HUF", mapping=mapping_wise)
|
||||
# convert_csv_to_gpc("pko_input.csv", "pko_output.gpc", account_number=95102013900000630206821286, currency="PLN", mapping=mapping_pko)
|
||||
# convert_csv_to_gpc("sparkasse_input.csv", "sparkasse_output.gpc", account_number=95850503000221267034, currency="EUR", mapping=mapping_sparkasse)
|
Loading…
Reference in New Issue
Block a user