403 lines
14 KiB
Python
403 lines
14 KiB
Python
# Bot I/O v1.82
|
||
# 15/11/2021
|
||
# https://t.me/ssleg © 2020 – 2021
|
||
|
||
import logging
|
||
import sqlite3
|
||
from asyncio import sleep
|
||
from datetime import datetime, timedelta
|
||
from os import path
|
||
|
||
from telethon import TelegramClient, errors
|
||
|
||
from bot_io_classes import TgBotUsers, OutMessagesQueue
|
||
from tg_utils import GlobalFlag
|
||
|
||
debug = GlobalFlag()
|
||
io_write = GlobalFlag()
|
||
|
||
client: TelegramClient
|
||
msk_hour = timedelta(hours=3)
|
||
sys_wait = 0.045
|
||
|
||
manager_run = GlobalFlag()
|
||
work_queue = OutMessagesQueue()
|
||
tg_flood_wait = GlobalFlag()
|
||
last_send_time: datetime
|
||
|
||
bot_users: TgBotUsers
|
||
admins_ids = []
|
||
|
||
con: sqlite3.Connection
|
||
cursor: sqlite3.Cursor
|
||
|
||
|
||
class IncomingMessagesTimeBuffer:
|
||
"""класс буфера времени входящих сообщений пользователей"""
|
||
|
||
__slots__ = ['__buf', '__size', '__banned_to', '__ban_count', '__banned', '__user_id']
|
||
|
||
@staticmethod
|
||
def __sort_key(element):
|
||
return element[0]
|
||
|
||
@staticmethod
|
||
def __is_now_banned(timestamp):
|
||
now = datetime.now()
|
||
delta = now - timestamp
|
||
if delta.total_seconds() > 0:
|
||
return False
|
||
else:
|
||
return True
|
||
|
||
def __init__(self, size, user_id):
|
||
self.__buf = []
|
||
self.__size = size
|
||
self.__user_id = user_id
|
||
cursor.execute('select ban_count, ban_to_date from ban_list where user_id=?', (user_id,))
|
||
row = cursor.fetchone()
|
||
if row is not None:
|
||
self.__banned_to = datetime.strptime(row[1], '%Y-%m-%d %H:%M:%S')
|
||
self.__ban_count = row[0]
|
||
self.__banned = self.__is_now_banned(self.__banned_to)
|
||
else:
|
||
self.__banned_to = datetime(year=2020, month=1, day=1)
|
||
self.__ban_count = 0
|
||
self.__banned = False
|
||
for i in range(0, size):
|
||
self.__buf.append((0, 0))
|
||
|
||
def __is_doubled(self, mess_id):
|
||
i = 0
|
||
while i < self.__size:
|
||
if self.__buf[i][0] == mess_id:
|
||
levent = 'обнаружен дубликат сообщения #' + str(mess_id) + ', глубина - ' + str(i)
|
||
logging.warning(levent)
|
||
return True
|
||
i += 1
|
||
return False
|
||
|
||
def __is_valid_timing(self, mess_date):
|
||
i = self.__size - 1
|
||
sec = 0
|
||
# while i > self.__size - 4:
|
||
while i > 5:
|
||
prev_date = self.__buf[i][1]
|
||
if prev_date != 0:
|
||
tmp = mess_date - prev_date
|
||
'''if tmp.seconds < 1:
|
||
levent = 'обнаружено слишком частое обращение: ' + str(tmp.seconds) + ' сек.'
|
||
logging.warning(levent)
|
||
return False
|
||
else:'''
|
||
sec += tmp.seconds
|
||
mess_date = prev_date
|
||
else:
|
||
return True
|
||
i -= 1
|
||
|
||
if sec >= 5:
|
||
return True
|
||
else:
|
||
levent = 'обнаружены 15 сообщений за ' + str(sec) + ' сек.'
|
||
logging.warning(levent)
|
||
return False
|
||
|
||
def store_mess(self, mess_id, mess_date):
|
||
if not self.__is_doubled(mess_id):
|
||
if self.__banned:
|
||
status = self.__is_now_banned(self.__banned_to)
|
||
if status:
|
||
return 1
|
||
else:
|
||
self.__banned = False
|
||
|
||
if not self.__is_valid_timing(mess_date):
|
||
if self.__user_id not in admins_ids:
|
||
now = datetime.now()
|
||
if self.__ban_count > 0:
|
||
delta = timedelta(days=30)
|
||
rez = 4
|
||
else:
|
||
delta = timedelta(minutes=30)
|
||
rez = 3
|
||
|
||
self.__banned_to = now + delta
|
||
self.__banned = True
|
||
self.__ban_count += 1
|
||
|
||
if self.__ban_count == 1:
|
||
entry = (self.__user_id, self.__ban_count, str(self.__banned_to)[0:19])
|
||
cursor.execute('insert into ban_list (user_id, ban_count, ban_to_date) values (?,?,?)', entry)
|
||
else:
|
||
entry = (self.__ban_count, str(self.__banned_to)[0:19], self.__user_id)
|
||
cursor.execute('update ban_list set ban_count=?, ban_to_date=? where user_id=?', entry)
|
||
con.commit()
|
||
return rez
|
||
|
||
self.__buf.append((mess_id, mess_date))
|
||
self.__buf.sort(key=self.__sort_key)
|
||
self.__buf.pop(0)
|
||
|
||
return 0
|
||
else:
|
||
return 2
|
||
|
||
|
||
# TODO Дополнить логирование кнопок
|
||
class BotIncomingMessagesOrder:
|
||
"""класс порядка сообщений в диалогах"""
|
||
|
||
__slots__ = ['__users', '__orders']
|
||
|
||
def __init__(self):
|
||
self.__users = {}
|
||
self.__orders = []
|
||
|
||
def new_mess(self, message):
|
||
mess_id = message.id
|
||
mess_date = message.date + msk_hour
|
||
user_id = message.peer_id.user_id
|
||
txt = message.text
|
||
index = self.__users.get(user_id)
|
||
if index is None:
|
||
index = len(self.__orders)
|
||
self.__users[user_id] = index
|
||
self.__orders.append(IncomingMessagesTimeBuffer(20, user_id))
|
||
if debug:
|
||
levent = 'открыт новый буфер, id - ' + str(user_id)
|
||
logging.info(levent)
|
||
|
||
order_result = self.__orders[index].store_mess(mess_id, mess_date)
|
||
if debug or io_write:
|
||
if order_result not in [1, 2]:
|
||
entry = (mess_id, str(mess_date)[0:19], user_id, txt)
|
||
cursor.execute('insert into messages (mess_id, mess_date, from_id, mess_txt) values (?,?,?,?)', entry)
|
||
con.commit()
|
||
|
||
return order_result
|
||
|
||
|
||
# менеджер отложенной отправки сообщений
|
||
async def queue_manager():
|
||
if not manager_run:
|
||
manager_run.set_true()
|
||
levent = 'менеджер отложенных сообщений стартовал.'
|
||
logging.info(levent)
|
||
now = datetime.now()
|
||
delta = now - last_send_time
|
||
if delta.total_seconds() < 0:
|
||
t_sec = delta.total_seconds()
|
||
seconds = abs(t_sec // 1 + 1)
|
||
levent = 'ожидание разрешения на отправку - ' + str(seconds) + ' сек.'
|
||
logging.info(levent)
|
||
await sleep(seconds)
|
||
tg_flood_wait.set_false()
|
||
|
||
mess_count = 0
|
||
mess_success = 0
|
||
while work_queue.queue_empty() is not True:
|
||
if not tg_flood_wait:
|
||
entry = work_queue.get_next_message()
|
||
user_id = entry[0]
|
||
message = entry[1]
|
||
file_name = entry[2]
|
||
buttons = entry[3]
|
||
contact_add = entry[4]
|
||
await sleep(sys_wait)
|
||
if debug:
|
||
levent = 'попытка отправки сообщения для user_id = ' + str(user_id)
|
||
logging.info(levent)
|
||
result_flag = await send_reply_message(user_id, message, file_name, buttons, log_parameter=True,
|
||
contact_add=contact_add, from_queue=True)
|
||
work_queue.set_sending_result(result_flag)
|
||
if result_flag:
|
||
mess_success += 1
|
||
mess_count += 1
|
||
else:
|
||
break
|
||
|
||
manager_run.set_false()
|
||
levent = 'менеджер отложенных сообщений закончил. попыток - ' + str(mess_count) + ', отправлено - ' + str(
|
||
mess_success)
|
||
logging.info(levent)
|
||
if tg_flood_wait:
|
||
client.loop.create_task(queue_manager())
|
||
|
||
|
||
def message_to_queue(user_id, mess, file_name, buttons, contact_add):
|
||
work_queue.add_message(user_id, mess, file_name, buttons, contact_add)
|
||
if not manager_run:
|
||
client.loop.create_task(queue_manager())
|
||
|
||
|
||
# TODO Сделать поддержку форвардов
|
||
# отправка сообщений пользователю с соблюдением требований тг
|
||
async def send_reply_message(user_id, message, file_name=None, buttons=None, log_parameter=False, contact_add=True,
|
||
from_queue=False):
|
||
global last_send_time
|
||
now = datetime.now()
|
||
delta = now - last_send_time
|
||
|
||
if not from_queue:
|
||
find_flag, index = work_queue.is_user_in_queue(user_id)
|
||
if find_flag:
|
||
message_to_queue(user_id, message, file_name, buttons, contact_add)
|
||
levent = 'ответ поставлен в очередь. user_id = ' + str(user_id)
|
||
logging.warning(levent)
|
||
return False
|
||
|
||
if bot_users.is_bot_user(user_id):
|
||
timestamp = bot_users.get_user_mess_timestamp(user_id)
|
||
user_delta = now - timestamp
|
||
if user_delta.total_seconds() < 1:
|
||
if not from_queue:
|
||
message_to_queue(user_id, message, file_name, buttons, contact_add)
|
||
levent = 'ответ поставлен в очередь, дельта пользователя - ' + str(
|
||
user_delta.total_seconds()) + ' сек. user_id = ' + str(user_id)
|
||
logging.warning(levent)
|
||
return False
|
||
|
||
if delta.total_seconds() > 0.04:
|
||
try:
|
||
await client.send_message(user_id, message, file=file_name, buttons=buttons)
|
||
last_send_time = datetime.now()
|
||
|
||
if log_parameter or debug:
|
||
levent = 'ответ отправлен, user_id = ' + str(user_id)
|
||
logging.info(levent)
|
||
|
||
if debug or io_write:
|
||
if len(message) > 100:
|
||
message = message[0:100]
|
||
entry = (str(last_send_time)[0:19], user_id, message)
|
||
cursor.execute('insert into messages ( mess_date, to_id, mess_txt) values (?,?,?)', entry)
|
||
con.commit()
|
||
|
||
if contact_add:
|
||
if not bot_users.is_bot_user(user_id):
|
||
user_entity = await client.get_entity(user_id)
|
||
bot_users.new_user_store(user_entity)
|
||
else:
|
||
bot_users.update_user_mess_timestamp(user_id)
|
||
|
||
return True
|
||
|
||
except errors.FloodWaitError as e:
|
||
seconds = e.seconds
|
||
levent = 'сработал телеграм flood_wait, время ожидания - ' + str(seconds) + ' сек.'
|
||
logging.warning(levent)
|
||
tg_flood_wait.set_true()
|
||
last_send_time = now + timedelta(seconds=seconds + 1)
|
||
if not from_queue:
|
||
message_to_queue(user_id, message, file_name, buttons, contact_add)
|
||
return False
|
||
|
||
except errors.UserIsBlockedError as e:
|
||
levent = 'пользователь заблокировал бота (user_id = ' + str(user_id) + '): ' + str(e)
|
||
if debug:
|
||
logging.warning(levent)
|
||
if from_queue:
|
||
return True
|
||
return False
|
||
|
||
except Exception as e:
|
||
levent = 'что-то пошло не так при отправке (user_id = ' + str(user_id) + '): ' + str(e)
|
||
logging.error(levent)
|
||
if from_queue:
|
||
return True
|
||
return False
|
||
|
||
else:
|
||
if not from_queue:
|
||
message_to_queue(user_id, message, file_name, buttons, contact_add)
|
||
levent = 'ответ поставлен в очередь, дельта - ' + str(delta.total_seconds()) + ' сек. user_id = ' + str(
|
||
user_id)
|
||
logging.warning(levent)
|
||
return False
|
||
|
||
|
||
# отправка сообщений всем пользователям бота
|
||
async def send_message_to_all(mess):
|
||
users_list = bot_users.get_users_list()
|
||
count = 0
|
||
success_count = 0
|
||
for user_id in users_list:
|
||
result_flag = await send_reply_message(user_id, mess)
|
||
await sleep(sys_wait)
|
||
if result_flag:
|
||
success_count += 1
|
||
count += 1
|
||
return count, success_count
|
||
|
||
|
||
# инициализация
|
||
async def init(cli, debug_mode, adm_ids, io_write_mode):
|
||
global client
|
||
global last_send_time
|
||
global con
|
||
global cursor
|
||
global bot_users
|
||
global admins_ids
|
||
|
||
admins_ids = adm_ids
|
||
client = cli
|
||
if debug_mode:
|
||
debug.set_true()
|
||
if io_write_mode:
|
||
io_write.set_true()
|
||
|
||
if not path.exists('io.sqlite'):
|
||
con = sqlite3.connect('io.sqlite')
|
||
cursor = con.cursor()
|
||
cursor.executescript('''
|
||
CREATE TABLE messages
|
||
(
|
||
mess_id int,
|
||
mess_date text,
|
||
from_id int,
|
||
to_id int,
|
||
mess_txt text
|
||
);
|
||
|
||
CREATE TABLE contacts
|
||
(
|
||
user_id int,
|
||
first_name text,
|
||
last_name text,
|
||
account_name text
|
||
);
|
||
|
||
CREATE TABLE ban_list
|
||
(
|
||
user_id int,
|
||
ban_count int,
|
||
ban_to_date text
|
||
);
|
||
''')
|
||
con.commit()
|
||
else:
|
||
con = sqlite3.connect('io.sqlite')
|
||
cursor = con.cursor()
|
||
|
||
bot_users = TgBotUsers(con, cursor)
|
||
last_send_time = datetime.now()
|
||
levent = 'bot I/O запущен.'
|
||
logging.info(levent)
|
||
|
||
|
||
# завершение работы
|
||
async def terminate():
|
||
if manager_run:
|
||
levent = 'bot I/O, ожидание отправки всех сообщений.'
|
||
logging.info(levent)
|
||
count = 6
|
||
while count > 0:
|
||
await sleep(5)
|
||
count -= 1
|
||
if not manager_run:
|
||
break
|
||
con.close()
|
||
levent = 'bot I/O остановлен.'
|
||
logging.info(levent)
|