import pandas as pd from schema.pydantic import settings, Translit import os class BaseHandler: def __init__(self, file_path:str): self.file_name = file_path self.file_path = os.path.join(settings.INPUTDIR, file_path) def struct(self): try: return pd.ExcelFile(self.file_path) except Exception as e: raise Exception(f"⚠️ Ошибка при получении структуры {self.file_path}: {e}") def read(self, xls, sheet_name): try: return pd.read_excel(xls, sheet_name) except Exception as e: raise Exception(f"⚠️ Ошибка при чтении файла {self.file_path}: {e}") class Handler(BaseHandler): def __init__(self,file_path:str, columns:dict, sheet_name:str): super().__init__(file_path) xls = self.struct() if sheet_name not in xls.sheet_names: raise Exception(f'⚠️ Лист {sheet_name} не найден') self.df = self.read(xls, sheet_name=sheet_name) self.df = self.df.iloc[:, list(columns.values())] self.df.columns = list(columns.keys()) def normalization(self): #Нормализация self.df['Артикул'] = self.df['Артикул'].replace(Translit.TRANSLIT, regex=True) def grouping(self): #группировка self.df['Артикул'] = self.df['Артикул'].astype(str).str.upper().str.extract(f'({settings.PATTERN})') # Отбираем Артикула по регексу self.df.dropna(subset=["Артикул"], inplace=True) # Удаляем пустые строки agg_dict = {col: "sum" for col in self.df.columns if col != "Артикул" and col != "Наименование"} # По умолчанию суммируем все кроме Артикула и наименования if "Наименование" in self.df.columns: agg_dict["Наименование"] = lambda x: "\n".join(sorted(set(x))) # Set и Сортировка по алфавиту self.df= self.df.groupby("Артикул", as_index=False).agg(agg_dict) # Группировка по артикулу self.df = self.df.sort_values(ascending=False,by="Прибыль") def cast_types(self): self.df["Все_удержания_магазина"] = abs(self.df["Все_удержания_магазина"]) if "Логистика" in self.df.columns: self.df["Логистика"] = abs(self.df["Логистика"]) if "Эквайринг" in self.df.columns: self.df["Эквайринг"] = abs(self.df["Эквайринг"]) self.df = self.df.convert_dtypes() def calculations(self): #Исчисляемые колонки self.df["Все удержания в %"] = self.df.apply( lambda row: (row["Все_удержания_магазина"] / row["Выручка"] * 100) if row["Выручка"] != 0 else 100, #Возможно все удержания в таком случае 100% axis=1 ) if "Выкупы" in self.df.columns and "Не_выкупы" in self.df.columns: self.df["Всего заказано"] = self.df.apply( lambda row: row["Выкупы"]+row["Не_выкупы"], axis=1 ) self.df["Процент выкупа"] = self.df.apply( lambda row: ((row["Выкупы"] - row["Возвраты"]) / row["Всего заказано"] * 100) if row["Всего заказано"] != 0 else 0, axis=1 ) if "Логистика" in self.df.columns: self.df["Логистика_на_ед"] = self.df.apply( lambda row: ((row["Логистика"]/(row["Выкупы"]-row["Возвраты"]))) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0, axis=1 ) self.df["Маржинальность"] = self.df.apply( lambda row: (row["Прибыль"] / row["Выручка"] * 100) if row["Выручка"] != 0 else 0, axis=1 ) if "Себестоимость" in self.df.columns: if "Утилизация" in self.df.columns: self.df["Себестоимость"] = self.df.apply( lambda row: ((row["Себестоимость"])/(row["Выкупы"]-row["Возвраты"]+row["Утилизация"])) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0, axis=1 ) else: self.df["Себестоимость"] = self.df.apply( lambda row: ((row["Себестоимость"])/(row["Выкупы"]-row["Возвраты"])) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0, axis=1 ) else: self.df["Себестоимость"] = self.df.apply( lambda row: ((row["Выручка"] - row["Налог_в_руб"] - row["Прибыль"] - row["Все_удержания_магазина"])/(row["Выкупы"]-row["Возвраты"])) if (row["Выкупы"] - row["Возвраты"]) != 0 else 0, axis=1 ) self.df=self.df.round(2) self.df["Себестоимость"]=self.df["Себестоимость"].round() def styling(self): def multi_style(val): if val < 0: return "background-color: red" elif val > 0: return "background-color: green; color: white" return "" def align_center(val): return 'text-align: center' self.df = self.df.style.map(multi_style, subset=["Маржинальность","Прибыль"]) self.df=self.df.map(align_center) def reorder_columns(self): if "ozon" in self.file_name: self.df = self.df[["Артикул","Наименование","Выручка", "Выкупы","Не_выкупы","Процент выкупа","Возвраты", "Всего заказано","Все_удержания_магазина","Все удержания в %", "Утилизация", "Эквайринг", "Логистика", "Логистика_на_ед", "Налог_в_руб", "Прибыль", "Маржинальность", "Себестоимость" ]] elif "wb" in self.file_name: pass else: pass def saving(self): self.df.to_excel(f"output/Товары_по_месяцам_{self.file_name}", engine="openpyxl", index=False) def get_articles_with_sales(self): self.normalization() self.grouping() self.cast_types() self.calculations() self.reorder_columns() self.styling() self.saving()