# Tg utils v2.50 # 10/10/2021 # https://t.me/ssleg © 2020 – 2021 import logging from datetime import datetime from time import sleep import psycopg2 import psycopg2.extensions from telethon import TelegramClient # TODO Добавить executemany и executescript class PgServer: """класс postgres-сервер""" __slots__ = ['__con', '__cursor', '__db_name', '__db_host', '__retry_count', '__version', '__connected'] def __init__(self, db_name='tg_bots', db_host='127.0.0.1'): self.__con: psycopg2.extensions.connection self.__cursor: psycopg2.extensions.cursor self.__db_name = db_name self.__db_host = db_host self.__retry_count = 0 self.__connected = False self.__version = '' self.__establish_connect(first_connect=True) def __establish_connect(self, first_connect=False): self.__connected = False while not self.__connected and self.__retry_count < 3: self.__connected = self.__connect(first_connect=first_connect) if not self.__connected: self.__retry_count += 1 sleep(1) if not self.__connected: if first_connect: levent = 'postgresql, подключиться к базе не удалось.' else: levent = 'postgresql, восстановить соединение не удалось.' logging.error(levent) else: if not first_connect: levent = 'postgresql, соединение восстановлено.' logging.info(levent) else: sql = "select setting from pg_config where name='VERSION'" row = self.exec(sql, return_type=1) self.__version = row[0] def __connect(self, first_connect=False): try: if not first_connect: self.__cursor.close() self.__con.close() self.__con = psycopg2.connect(database=self.__db_name, user='postgres', password='1234', host=self.__db_host, port='5432') self.__cursor = self.__con.cursor() self.__retry_count = 0 return True except Exception as e: if first_connect: levent = 'postgresql, ошибка подключения к бд: ' + str(e) else: levent = 'postgresql, ошибка при восстановлении подключения: ' + str(e) logging.warning(levent) return False def commit(self): if self.__connected: self.__con.commit() return True return def rollback(self): if self.__connected: self.__con.rollback() return True return False def exec(self, sql, req_data=None, return_type=None): if self.__connected: try: if req_data is not None: self.__cursor.execute(sql, req_data) else: self.__cursor.execute(sql) if return_type is not None: if return_type == 0: return self.__cursor.fetchall() else: return self.__cursor.fetchone() else: return True except psycopg2.IntegrityError as e: levent = 'postgresql, ошибка целостности данных: ' + str(e.pgerror) logging.error(levent) return False except psycopg2.OperationalError as e: disconnect_codes = ['57P01', '57P02', '57P03'] if e.pgcode in disconnect_codes: levent = 'postgresql отвалился, ошибка: ' + str(e.pgerror) logging.warning(levent) self.__establish_connect() if not self.__connected: return False else: return self.exec(sql, req_data, return_type) else: levent = 'postgresql, операционная ошибка: ' + str(e) logging.error(levent) return False except Exception as e: levent = 'postgresql, ошибка: ' + str(e) logging.error(levent) return False return False def is_connected(self): return self.__connected def get_version(self): return self.__version def __str__(self): return self.__version def __del__(self): if self.__connected: self.__cursor.close() self.__con.close() class PgLock: """класс блокировки базы данных""" __slots__ = ['__srv', '__cursor', '__lock_name', '__debug'] def __init__(self, lock_name='default', debug=False): self.__srv = PgServer() self.__lock_name = lock_name self.__debug = debug def lock(self): self.__srv.exec('update locks set status=True where lock_name=%s', (self.__lock_name,)) self.__srv.commit() if self.__debug: levent = 'locked #' + self.__lock_name logging.info(levent) def unlock(self): self.__srv.exec('update locks set status=False where lock_name=%s', (self.__lock_name,)) self.__srv.commit() if self.__debug: levent = 'unlocked #' + self.__lock_name logging.info(levent) def lock_status(self): row = self.__srv.exec('select status from locks where lock_name=%s', (self.__lock_name,), return_type=1) self.__srv.rollback() if row is not None: lock_status = row[0] return lock_status else: return False class GlobalCounter: """глобальный универсальный счетчик""" __slots__ = ['__count'] @staticmethod def __is_valid_arg(arg): if type(arg) is int: if arg > 0: return True return False def __init__(self, init_count=0): if GlobalCounter.__is_valid_arg(init_count): self.__count = init_count else: self.__count = 0 def increment(self, step=1): if GlobalCounter.__is_valid_arg(step): self.__count += step return True return False def decrement(self, step=1): if GlobalCounter.__is_valid_arg(step): if self.__count - step >= 0: self.__count -= step return True return False def get(self): return self.__count def set(self, value): if GlobalCounter.__is_valid_arg(value): self.__count = value return True return False def clear(self): self.__count = 0 def __isub__(self, other): if GlobalCounter.__is_valid_arg(other): self.__count -= other return self def __iadd__(self, other): if GlobalCounter.__is_valid_arg(other): self.__count += other return self def __eq__(self, other): if self.__count == other: return True return False def __ne__(self, other): if self.__count != other: return True return False def __ge__(self, other): if self.__count >= other: return True return False def __gt__(self, other): if self.__count > other: return True return False def __le__(self, other): if self.__count <= other: return True return False def __lt__(self, other): if self.__count < other: return True return False def __str__(self): return str(self.__count) class GlobalFlag: """глобальный универсальный флаг""" __slots__ = ['__flag'] def __init__(self): self.__flag = False def set_true(self): self.__flag = True def set_false(self): self.__flag = False def switch(self): if self.__flag: self.__flag = False else: self.__flag = True def get(self): return self.__flag def __bool__(self): return self.__flag def __str__(self): return str(self.__flag) class GlobalFloat: """глобальная универсальная переменная""" __slots__ = ['__value'] @staticmethod def __is_valid_arg(arg): if type(arg) is int or type(arg) is float: return True return False def __init__(self, init_value=0): if GlobalFloat.__is_valid_arg(init_value): self.__value = float(init_value) else: self.__value = 0 def get(self): return self.__value def set(self, value): if GlobalFloat.__is_valid_arg(value): self.__value = float(value) return True return False def __isub__(self, other): if GlobalFloat.__is_valid_arg(other): self.__value -= other return self def __iadd__(self, other): if GlobalFloat.__is_valid_arg(other): self.__value += other return self def __radd__(self, other): if GlobalFloat.__is_valid_arg(other): return other + self.__value def __rsub__(self, other): if GlobalFloat.__is_valid_arg(other): return other - self.__value def __add__(self, other): if GlobalFloat.__is_valid_arg(other): return self.__value + other def __sub__(self, other): if GlobalFloat.__is_valid_arg(other): return self.__value - other def __eq__(self, other): if self.__value == other: return True return False def __ne__(self, other): if self.__value != other: return True return False def __ge__(self, other): if self.__value >= other: return True return False def __gt__(self, other): if self.__value > other: return True return False def __le__(self, other): if self.__value <= other: return True return False def __lt__(self, other): if self.__value < other: return True return False def __str__(self): return str(round(self.__value, 2)) class ErrorClass: """класс возврата ошибки с кодом""" __slots__ = ['__success', '__error_code'] def __init__(self): self.__success = True self.__error_code = 0 def set_error(self, error_code: int): self.__success = False if 0 < error_code < 1000 and type(error_code) == int: self.__error_code = error_code def get_error_code(self): return self.__error_code def __bool__(self): return self.__success def __str__(self): if self.__success: return 'No error' else: return 'Error code: ' + str(self.__error_code) def t_stamp(): now = datetime.now() stamp = datetime.strftime(now, '%d/%m %Y %H:%M:%S') return stamp def t_stamp_sh(): now = datetime.now() stamp = datetime.strftime(now, '%H:%M') return stamp # lowercase имени tg канала с проверкой валидности def set_ch_name_lower(test_str): i = 0 result = '' while i < len(test_str): char = test_str[i] code = ord(char) if code == 95: pass else: if 47 < code < 58: pass else: if 64 < code < 91: code += 32 else: if 96 < code < 123: pass else: levent = 'ошибка конвертации канала: ' + str(i) + ' ' + str(code) + ' ' + char + ' ' + test_str logging.warning(levent) return False result += chr(code) i += 1 return result # сборка имени юзера в читабельное def set_name_printable(first, last, account=None, phone=None, ad=''): name = str(first) + ' ' if last is not None: name = name + str(last) + ' ' if account is not None: name = name + '(' + ad + str(account) + ')' + ' ' if phone is not None: name = name + '+' + str(phone) + ' ' ind = len(name) res = name[0:ind - 1] return res # вывод числа с разделителями тысяч def set_int_printable(integer, separator=' '): string = '{:,}'.format(integer) string = string.replace(',', separator) return string # создает объект клиента с параметрами из бд def get_tg_client(name, custom_name=None): srv = PgServer() n_api = name + '_api_id' row = srv.exec('select value from logins where name=%s', (n_api,), return_type=1) api_id = int(row[0]) n_hash = name + '_api_hash' row = srv.exec('select value from logins where name=%s', (n_hash,), return_type=1) api_hash = row[0] if custom_name is None: client = TelegramClient(name, api_id, api_hash) else: client = TelegramClient(custom_name, api_id, api_hash) return client # достает ключ бота из бд def get_bot_key(name): srv = PgServer() bot = name + '_key' row = srv.exec('select value from logins where name=%s', (bot,), return_type=1) bot_key = row[0] return bot_key