magic-wand_4/tg_utils.py

326 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Tg utils v2.22
# 14/06/2021
# https://t.me/ssleg © 2020 2021
import logging
from datetime import datetime
import psycopg2
from telethon import TelegramClient
class PgServer:
"""класс postgres-сервер"""
__slots__ = ['__con', '__control_flag', '__cursor', '__db_name']
def __init__(self, dbc=None, db_name='mydb'):
if dbc is None:
self.__con = psycopg2.connect(database=db_name,
user='postgres',
password='1234',
host='127.0.0.1',
port='5432')
self.__control_flag = True
self.__db_name = db_name
else:
self.__con = dbc
self.__control_flag = False
self.__db_name = ''
self.__cursor = self.__con.cursor()
def commit(self):
self.__con.commit()
def rollback(self):
self.__con.rollback()
def exec(self, sql, req_data=None, return_type=None, retry_count=0):
try:
if req_data is not None:
self.__cursor.execute(sql, req_data)
else:
self.__cursor.execute(sql)
if return_type is not None:
if return_type == 0:
return self.__cursor.fetchall()
else:
return self.__cursor.fetchone()
else:
return True
except Exception as e:
levent = 'postgres error: ' + str(e)
logging.error(levent)
if self.__control_flag:
if retry_count < 3:
self.__con.close()
self.__con = psycopg2.connect(database=self.__db_name,
user='postgres',
password='1234',
host='127.0.0.1',
port='5432')
self.__cursor = self.__con.cursor()
return self.exec(sql, req_data, return_type, retry_count=retry_count + 1)
return False
def __del__(self):
self.__cursor.close()
if self.__control_flag:
self.__con.close()
else:
self.__con.rollback()
class PgLock:
"""класс блокировки базы данных"""
__slots__ = ['__srv', '__cursor', '__lock_name', '__debug']
def __init__(self, dbc=None, lck='default_lock', debug=False):
self.__srv = PgServer(dbc)
self.__lock_name = lck
self.__debug = debug
def lock(self):
self.__srv.exec('update locks set status=True where lock_name=%s', (self.__lock_name,))
self.__srv.commit()
if self.__debug:
levent = 'locked #' + self.__lock_name
logging.info(levent)
def unlock(self):
self.__srv.exec('update locks set status=False where lock_name=%s', (self.__lock_name,))
self.__srv.commit()
if self.__debug:
levent = 'unlocked #' + self.__lock_name
logging.info(levent)
def lock_status(self):
row = self.__srv.exec('select status from locks where lock_name=%s', (self.__lock_name,), return_type=1)
self.__srv.rollback()
if row is not None:
lock_status = row[0]
return lock_status
else:
return False
class GlobalCounter:
"""глобальный универсальный счетчик"""
__slots__ = ['__count']
@staticmethod
def __is_valid_arg(arg):
if type(arg) is int:
if arg > 0:
return True
return False
def __init__(self, init_count=0):
if GlobalCounter.__is_valid_arg(init_count):
self.__count = init_count
else:
self.__count = 0
def increment(self, step=1):
if GlobalCounter.__is_valid_arg(step):
self.__count += step
return True
return False
def decrement(self, step=1):
if GlobalCounter.__is_valid_arg(step):
if self.__count - step >= 0:
self.__count -= step
return True
return False
def get(self):
return self.__count
def set(self, value):
if GlobalCounter.__is_valid_arg(value):
self.__count = value
return True
return False
def clear(self):
self.__count = 0
def __isub__(self, other):
if GlobalCounter.__is_valid_arg(other):
self.__count -= other
def __iadd__(self, other):
if GlobalCounter.__is_valid_arg(other):
self.__count += other
def __eq__(self, other):
if self.__count == other:
return True
return False
def __ge__(self, other):
if self.__count >= other:
return True
return False
def __gt__(self, other):
if self.__count > other:
return True
return False
def __le__(self, other):
if self.__count <= other:
return True
return False
def __lt__(self, other):
if self.__count <= other:
return True
return False
def __str__(self):
return str(self.__count)
class GlobalFlag:
"""глобальный универсальный флаг"""
__slots__ = ['__flag']
def __init__(self):
self.__flag = False
def set_true(self):
self.__flag = True
def set_false(self):
self.__flag = False
def switch(self):
if self.__flag:
self.__flag = False
else:
self.__flag = True
def get(self):
return self.__flag
def __bool__(self):
return self.__flag
def __str__(self):
return str(self.__flag)
def t_stamp():
now = datetime.now()
stamp = datetime.strftime(now, '%d/%m %Y %H:%M:%S')
return stamp
def t_stamp_sh():
now = datetime.now()
stamp = datetime.strftime(now, '%H:%M')
return stamp
# ловеркейс имени tg канала с проверкой валидности
def set_ch_name_lower(teststr):
i = 0
rezult = ''
while i < len(teststr):
char = teststr[i]
code = ord(char)
if code == 95:
pass
else:
if 47 < code < 58:
pass
else:
if 64 < code < 91:
code += 32
else:
if 96 < code < 123:
pass
else:
levent = 'ошибка конвертации канала: ' + str(i) + ' ' + str(code) + ' ' + char + ' ' + teststr
logging.warning(levent)
return False
rezult += chr(code)
i += 1
return rezult
# сборка имени юзера в читабельное
def set_name_printable(first, last, account=None, phone=None, ad=''):
name = str(first) + ' '
if last is not None:
name = name + str(last) + ' '
if account is not None:
name = name + '(' + ad + str(account) + ')' + ' '
if phone is not None:
name = name + '+' + str(phone) + ' '
ind = len(name)
res = name[0:ind - 1]
return res
# вывод числа с разделителями тысяч
def set_int_printable(integer, razd=' '):
string = '{:,}'.format(integer)
string = string.replace(',', razd)
return string
# версия сервера постгрес
def get_server_version(dbc=None):
srv = PgServer(dbc)
sql = "select setting from pg_config where name='VERSION'"
row = srv.exec(sql, return_type=1)
version = row[0]
return version
# создает обьект клиента с параметрами из бд
def get_tg_client(name, dbc=None, cust_name=None):
srv = PgServer(dbc)
n_api = name + '_api_id'
row = srv.exec('select value from parameters where name=%s', (n_api,), return_type=1)
api_id = int(row[0])
n_hash = name + '_api_hash'
row = srv.exec('select value from parameters where name=%s', (n_hash,), return_type=1)
api_hash = row[0]
if cust_name is None:
client = TelegramClient(name, api_id, api_hash)
else:
client = TelegramClient(cust_name, api_id, api_hash)
return client
# достает ключ бота из бд
def get_bot_key(name, dbc=None):
srv = PgServer(dbc)
bot = name + '_key'
row = srv.exec('select value from parameters where name=%s', (bot,), return_type=1)
bot_key = row[0]
return bot_key