magic-wand_4/tg_utils.py

502 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Tg utils v2.50
# 10/10/2021
# https://t.me/ssleg © 2020 2021
import logging
from datetime import datetime
from time import sleep
import psycopg2
import psycopg2.extensions
from telethon import TelegramClient
# TODO Добавить executemany и executescript
class PgServer:
"""класс postgres-сервер"""
__slots__ = ['__con', '__cursor', '__db_name', '__db_host', '__retry_count', '__version', '__connected']
def __init__(self, db_name='tg_bots', db_host='127.0.0.1'):
self.__con: psycopg2.extensions.connection
self.__cursor: psycopg2.extensions.cursor
self.__db_name = db_name
self.__db_host = db_host
self.__retry_count = 0
self.__connected = False
self.__version = ''
self.__establish_connect(first_connect=True)
def __establish_connect(self, first_connect=False):
self.__connected = False
while not self.__connected and self.__retry_count < 3:
self.__connected = self.__connect(first_connect=first_connect)
if not self.__connected:
self.__retry_count += 1
sleep(1)
if not self.__connected:
if first_connect:
levent = 'postgresql, подключиться к базе не удалось.'
else:
levent = 'postgresql, восстановить соединение не удалось.'
logging.error(levent)
else:
if not first_connect:
levent = 'postgresql, соединение восстановлено.'
logging.info(levent)
else:
sql = "select setting from pg_config where name='VERSION'"
row = self.exec(sql, return_type=1)
self.__version = row[0]
def __connect(self, first_connect=False):
try:
if not first_connect:
self.__cursor.close()
self.__con.close()
self.__con = psycopg2.connect(database=self.__db_name,
user='postgres',
password='1234',
host=self.__db_host,
port='5432')
self.__cursor = self.__con.cursor()
self.__retry_count = 0
return True
except Exception as e:
if first_connect:
levent = 'postgresql, ошибка подключения к бд: ' + str(e)
else:
levent = 'postgresql, ошибка при восстановлении подключения: ' + str(e)
logging.warning(levent)
return False
def commit(self):
if self.__connected:
self.__con.commit()
return True
return
def rollback(self):
if self.__connected:
self.__con.rollback()
return True
return False
def exec(self, sql, req_data=None, return_type=None):
if self.__connected:
try:
if req_data is not None:
self.__cursor.execute(sql, req_data)
else:
self.__cursor.execute(sql)
if return_type is not None:
if return_type == 0:
return self.__cursor.fetchall()
else:
return self.__cursor.fetchone()
else:
return True
except psycopg2.IntegrityError as e:
levent = 'postgresql, ошибка целостности данных: ' + str(e.pgerror)
logging.error(levent)
return False
except psycopg2.OperationalError as e:
disconnect_codes = ['57P01', '57P02', '57P03']
if e.pgcode in disconnect_codes:
levent = 'postgresql отвалился, ошибка: ' + str(e.pgerror)
logging.warning(levent)
self.__establish_connect()
if not self.__connected:
return False
else:
return self.exec(sql, req_data, return_type)
else:
levent = 'postgresql, операционная ошибка: ' + str(e)
logging.error(levent)
return False
except Exception as e:
levent = 'postgresql, ошибка: ' + str(e)
logging.error(levent)
return False
return False
def is_connected(self):
return self.__connected
def get_version(self):
return self.__version
def __str__(self):
return self.__version
def __del__(self):
if self.__connected:
self.__cursor.close()
self.__con.close()
class PgLock:
"""класс блокировки базы данных"""
__slots__ = ['__srv', '__cursor', '__lock_name', '__debug']
def __init__(self, lock_name='default', debug=False):
self.__srv = PgServer()
self.__lock_name = lock_name
self.__debug = debug
def lock(self):
self.__srv.exec('update locks set status=True where lock_name=%s', (self.__lock_name,))
self.__srv.commit()
if self.__debug:
levent = 'locked #' + self.__lock_name
logging.info(levent)
def unlock(self):
self.__srv.exec('update locks set status=False where lock_name=%s', (self.__lock_name,))
self.__srv.commit()
if self.__debug:
levent = 'unlocked #' + self.__lock_name
logging.info(levent)
def lock_status(self):
row = self.__srv.exec('select status from locks where lock_name=%s', (self.__lock_name,), return_type=1)
self.__srv.rollback()
if row is not None:
lock_status = row[0]
return lock_status
else:
return False
class GlobalCounter:
"""глобальный универсальный счетчик"""
__slots__ = ['__count']
@staticmethod
def __is_valid_arg(arg):
if type(arg) is int:
if arg > 0:
return True
return False
def __init__(self, init_count=0):
if GlobalCounter.__is_valid_arg(init_count):
self.__count = init_count
else:
self.__count = 0
def increment(self, step=1):
if GlobalCounter.__is_valid_arg(step):
self.__count += step
return True
return False
def decrement(self, step=1):
if GlobalCounter.__is_valid_arg(step):
if self.__count - step >= 0:
self.__count -= step
return True
return False
def get(self):
return self.__count
def set(self, value):
if GlobalCounter.__is_valid_arg(value):
self.__count = value
return True
return False
def clear(self):
self.__count = 0
def __isub__(self, other):
if GlobalCounter.__is_valid_arg(other):
self.__count -= other
return self
def __iadd__(self, other):
if GlobalCounter.__is_valid_arg(other):
self.__count += other
return self
def __eq__(self, other):
if self.__count == other:
return True
return False
def __ne__(self, other):
if self.__count != other:
return True
return False
def __ge__(self, other):
if self.__count >= other:
return True
return False
def __gt__(self, other):
if self.__count > other:
return True
return False
def __le__(self, other):
if self.__count <= other:
return True
return False
def __lt__(self, other):
if self.__count < other:
return True
return False
def __str__(self):
return str(self.__count)
class GlobalFlag:
"""глобальный универсальный флаг"""
__slots__ = ['__flag']
def __init__(self):
self.__flag = False
def set_true(self):
self.__flag = True
def set_false(self):
self.__flag = False
def switch(self):
if self.__flag:
self.__flag = False
else:
self.__flag = True
def get(self):
return self.__flag
def __bool__(self):
return self.__flag
def __str__(self):
return str(self.__flag)
class GlobalFloat:
"""глобальная универсальная переменная"""
__slots__ = ['__value']
@staticmethod
def __is_valid_arg(arg):
if type(arg) is int or type(arg) is float:
return True
return False
def __init__(self, init_value=0):
if GlobalFloat.__is_valid_arg(init_value):
self.__value = float(init_value)
else:
self.__value = 0
def get(self):
return self.__value
def set(self, value):
if GlobalFloat.__is_valid_arg(value):
self.__value = float(value)
return True
return False
def __isub__(self, other):
if GlobalFloat.__is_valid_arg(other):
self.__value -= other
return self
def __iadd__(self, other):
if GlobalFloat.__is_valid_arg(other):
self.__value += other
return self
def __radd__(self, other):
if GlobalFloat.__is_valid_arg(other):
return other + self.__value
def __rsub__(self, other):
if GlobalFloat.__is_valid_arg(other):
return other - self.__value
def __add__(self, other):
if GlobalFloat.__is_valid_arg(other):
return self.__value + other
def __sub__(self, other):
if GlobalFloat.__is_valid_arg(other):
return self.__value - other
def __eq__(self, other):
if self.__value == other:
return True
return False
def __ne__(self, other):
if self.__value != other:
return True
return False
def __ge__(self, other):
if self.__value >= other:
return True
return False
def __gt__(self, other):
if self.__value > other:
return True
return False
def __le__(self, other):
if self.__value <= other:
return True
return False
def __lt__(self, other):
if self.__value < other:
return True
return False
def __str__(self):
return str(round(self.__value, 2))
class ErrorClass:
"""класс возврата ошибки с кодом"""
__slots__ = ['__success', '__error_code']
def __init__(self):
self.__success = True
self.__error_code = 0
def set_error(self, error_code: int):
self.__success = False
if 0 < error_code < 1000 and type(error_code) == int:
self.__error_code = error_code
def get_error_code(self):
return self.__error_code
def __bool__(self):
return self.__success
def __str__(self):
if self.__success:
return 'No error'
else:
return 'Error code: ' + str(self.__error_code)
def t_stamp():
now = datetime.now()
stamp = datetime.strftime(now, '%d/%m %Y %H:%M:%S')
return stamp
def t_stamp_sh():
now = datetime.now()
stamp = datetime.strftime(now, '%H:%M')
return stamp
# lowercase имени tg канала с проверкой валидности
def set_ch_name_lower(test_str):
i = 0
result = ''
while i < len(test_str):
char = test_str[i]
code = ord(char)
if code == 95:
pass
else:
if 47 < code < 58:
pass
else:
if 64 < code < 91:
code += 32
else:
if 96 < code < 123:
pass
else:
levent = 'ошибка конвертации канала: ' + str(i) + ' ' + str(code) + ' ' + char + ' ' + test_str
logging.warning(levent)
return False
result += chr(code)
i += 1
return result
# сборка имени юзера в читабельное
def set_name_printable(first, last, account=None, phone=None, ad=''):
name = str(first) + ' '
if last is not None:
name = name + str(last) + ' '
if account is not None:
name = name + '(' + ad + str(account) + ')' + ' '
if phone is not None:
name = name + '+' + str(phone) + ' '
ind = len(name)
res = name[0:ind - 1]
return res
# вывод числа с разделителями тысяч
def set_int_printable(integer, separator=' '):
string = '{:,}'.format(integer)
string = string.replace(',', separator)
return string
# создает объект клиента с параметрами из бд
def get_tg_client(name, custom_name=None):
srv = PgServer()
n_api = name + '_api_id'
row = srv.exec('select value from logins where name=%s', (n_api,), return_type=1)
api_id = int(row[0])
n_hash = name + '_api_hash'
row = srv.exec('select value from logins where name=%s', (n_hash,), return_type=1)
api_hash = row[0]
if custom_name is None:
client = TelegramClient(name, api_id, api_hash)
else:
client = TelegramClient(custom_name, api_id, api_hash)
return client
# достает ключ бота из бд
def get_bot_key(name):
srv = PgServer()
bot = name + '_key'
row = srv.exec('select value from logins where name=%s', (bot,), return_type=1)
bot_key = row[0]
return bot_key