170 lines
7.6 KiB
Python
170 lines
7.6 KiB
Python
import pandas as pd
|
||
from schema.pydantic import settings, Translit
|
||
import os
|
||
import re
|
||
|
||
class BaseHandler:
|
||
|
||
def __init__(self, file_path:str):
|
||
|
||
self.file_name = file_path
|
||
self.file_path = os.path.join(settings.INPUTDIR, file_path)
|
||
|
||
def struct(self):
|
||
|
||
try:
|
||
return pd.ExcelFile(self.file_path)
|
||
except Exception as e:
|
||
raise Exception(f"⚠️ Ошибка при получении структуры {self.file_path}: {e}")
|
||
|
||
def read(self, xls, sheet_name):
|
||
|
||
try:
|
||
return pd.read_excel(xls, sheet_name)
|
||
except Exception as e:
|
||
raise Exception(f"⚠️ Ошибка при чтении файла {self.file_path}: {e}")
|
||
|
||
|
||
class Handler(BaseHandler):
|
||
|
||
def __init__(self,file_path:str, columns:dict, sheet_name:str):
|
||
|
||
super().__init__(file_path)
|
||
|
||
xls = self.struct()
|
||
|
||
if sheet_name not in xls.sheet_names:
|
||
raise Exception(f'⚠️ Лист {sheet_name} не найден')
|
||
|
||
self.df = self.read(xls, sheet_name=sheet_name)
|
||
self.df = self.df.iloc[:, list(columns.values())]
|
||
self.df.columns = list(columns.keys())
|
||
|
||
|
||
def normalization(self): #Нормализация
|
||
|
||
self.df['Артикул'] = self.df['Артикул'].replace(Translit.TRANSLIT, regex=True)
|
||
|
||
self.df['Артикул'] = self.df['Артикул'].astype(str).str.upper().str.extract(f'({settings.PATTERN})')
|
||
self.df.dropna(subset=["Артикул"], inplace=True) # Удаляем пустые строки
|
||
|
||
def normalize(arti):
|
||
if re.search(r'\d{3,}$', arti):
|
||
return re.sub(r'(\d)\d{2,}$', r'\1', arti)
|
||
else:
|
||
return re.sub(r'\d{1,2}$', '', arti)
|
||
|
||
self.df['Артикул'] = self.df['Артикул'].apply(normalize)
|
||
|
||
|
||
def grouping(self): #группировка
|
||
|
||
agg_dict = {col: "sum" for col in self.df.columns if col != "Артикул" and col != "Наименование"} # По умолчанию суммируем все кроме Артикула и наименования
|
||
|
||
if "Наименование" in self.df.columns:
|
||
agg_dict["Наименование"] = lambda x: "\n".join(sorted(set(x))) # Set и Сортировка по алфавиту
|
||
|
||
self.df= self.df.groupby("Артикул", as_index=False).agg(agg_dict) # Группировка по артикулу
|
||
|
||
self.df = self.df.sort_values(ascending=False,by="Прибыль")
|
||
|
||
|
||
def cast_types(self):
|
||
|
||
self.df["Все_удержания_магазина"] = abs(self.df["Все_удержания_магазина"])
|
||
if "Логистика" in self.df.columns:
|
||
self.df["Логистика"] = abs(self.df["Логистика"])
|
||
if "Эквайринг" in self.df.columns:
|
||
self.df["Эквайринг"] = abs(self.df["Эквайринг"])
|
||
self.df = self.df.convert_dtypes()
|
||
|
||
|
||
def calculations(self): #Исчисляемые колонки
|
||
self.df["Все удержания в %"] = self.df.apply(
|
||
lambda row: (row["Все_удержания_магазина"] / row["Выручка"] * 100) if abs(row["Выручка"]) > 0.01 else 100, #Возможно все удержания в таком случае 100%
|
||
axis=1
|
||
)
|
||
if "Выкупы" in self.df.columns and "Не_выкупы" in self.df.columns:
|
||
self.df["Всего заказано"] = self.df.apply(
|
||
lambda row: row["Выкупы"]+row["Не_выкупы"],
|
||
axis=1
|
||
)
|
||
self.df["Процент выкупа"] = self.df.apply(
|
||
lambda row: ((row["Выкупы"] - row["Возвраты"]) / row["Всего заказано"] * 100)
|
||
if row["Всего заказано"] != 0 else 0,
|
||
axis=1
|
||
)
|
||
if "Логистика" in self.df.columns:
|
||
self.df["Логистика_на_ед"] = self.df.apply(
|
||
lambda row: ((row["Логистика"]/(row["Выкупы"]-row["Возвраты"]))) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0,
|
||
axis=1
|
||
)
|
||
self.df["Маржинальность"] = self.df.apply(
|
||
lambda row: (row["Прибыль"] / row["Выручка"] * 100) if abs(row["Выручка"]) > 0.01 else 0,
|
||
axis=1
|
||
)
|
||
if "Себестоимость" in self.df.columns:
|
||
if "Утилизация" in self.df.columns:
|
||
self.df["Себестоимость"] = self.df.apply(
|
||
lambda row: ((row["Себестоимость"])/(row["Выкупы"]-row["Возвраты"]+row["Утилизация"])) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0,
|
||
axis=1
|
||
)
|
||
else:
|
||
self.df["Себестоимость"] = self.df.apply(
|
||
lambda row: ((row["Себестоимость"])/(row["Выкупы"]-row["Возвраты"])) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0,
|
||
axis=1
|
||
)
|
||
else:
|
||
self.df["Себестоимость"] = self.df.apply(
|
||
lambda row: ((row["Выручка"] - row["Налог_в_руб"] - row["Прибыль"] - row["Все_удержания_магазина"])/(row["Выкупы"]-row["Возвраты"])) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0,
|
||
axis=1
|
||
)
|
||
|
||
|
||
self.df=self.df.round(2)
|
||
self.df["Себестоимость"]=self.df["Себестоимость"].round()
|
||
|
||
|
||
def styling(self):
|
||
|
||
def multi_style(val):
|
||
|
||
if val < 0:
|
||
return "background-color: red"
|
||
elif val > 0:
|
||
return "background-color: green; color: white"
|
||
return ""
|
||
|
||
def align_center(val):
|
||
return 'text-align: center'
|
||
|
||
|
||
self.df = self.df.style.map(multi_style, subset=["Маржинальность","Прибыль"])
|
||
self.df=self.df.map(align_center)
|
||
|
||
|
||
def reorder_columns(self):
|
||
if "ozon" in self.file_name:
|
||
self.df = self.df[["Артикул","Наименование","Выручка", "Выкупы","Не_выкупы","Процент выкупа","Возвраты", "Всего заказано","Все_удержания_магазина","Все удержания в %",
|
||
"Утилизация", "Эквайринг", "Логистика", "Логистика_на_ед", "Налог_в_руб", "Прибыль", "Маржинальность", "Себестоимость"
|
||
]]
|
||
elif "wb" in self.file_name:
|
||
pass
|
||
else:
|
||
self.df = self.df[["Артикул","Платеж_покупателя","Выручка","Выкупы", "Возвраты", "Налог_в_руб", "Все_удержания_магазина", "Все удержания в %", "Прибыль","Маржинальность", "Себестоимость"]]
|
||
|
||
|
||
def saving(self):
|
||
|
||
self.df.to_excel(f"output/Товары_по_месяцам_{self.file_name}", engine="openpyxl", index=False)
|
||
|
||
|
||
def get_articles_with_sales(self):
|
||
|
||
self.normalization()
|
||
self.grouping()
|
||
self.cast_types()
|
||
self.calculations()
|
||
self.reorder_columns()
|
||
self.styling()
|
||
self.saving() |