# Bot I/O v1.82 # 15/11/2021 # https://t.me/ssleg © 2020 – 2021 import logging import sqlite3 from asyncio import sleep from datetime import datetime, timedelta from os import path from telethon import TelegramClient, errors from bot_io_classes import TgBotUsers, OutMessagesQueue from tg_utils import GlobalFlag debug = GlobalFlag() io_write = GlobalFlag() client: TelegramClient msk_hour = timedelta(hours=3) sys_wait = 0.045 manager_run = GlobalFlag() work_queue = OutMessagesQueue() tg_flood_wait = GlobalFlag() last_send_time: datetime bot_users: TgBotUsers admins_ids = [] con: sqlite3.Connection cursor: sqlite3.Cursor class IncomingMessagesTimeBuffer: """класс буфера времени входящих сообщений пользователей""" __slots__ = ['__buf', '__size', '__banned_to', '__ban_count', '__banned', '__user_id'] @staticmethod def __sort_key(element): return element[0] @staticmethod def __is_now_banned(timestamp): now = datetime.now() delta = now - timestamp if delta.total_seconds() > 0: return False else: return True def __init__(self, size, user_id): self.__buf = [] self.__size = size self.__user_id = user_id cursor.execute('select ban_count, ban_to_date from ban_list where user_id=?', (user_id,)) row = cursor.fetchone() if row is not None: self.__banned_to = datetime.strptime(row[1], '%Y-%m-%d %H:%M:%S') self.__ban_count = row[0] self.__banned = self.__is_now_banned(self.__banned_to) else: self.__banned_to = datetime(year=2020, month=1, day=1) self.__ban_count = 0 self.__banned = False for i in range(0, size): self.__buf.append((0, 0)) def __is_doubled(self, mess_id): i = 0 while i < self.__size: if self.__buf[i][0] == mess_id: levent = 'обнаружен дубликат сообщения #' + str(mess_id) + ', глубина - ' + str(i) logging.warning(levent) return True i += 1 return False def __is_valid_timing(self, mess_date): i = self.__size - 1 sec = 0 # while i > self.__size - 4: while i > 5: prev_date = self.__buf[i][1] if prev_date != 0: tmp = mess_date - prev_date '''if tmp.seconds < 1: levent = 'обнаружено слишком частое обращение: ' + str(tmp.seconds) + ' сек.' logging.warning(levent) return False else:''' sec += tmp.seconds mess_date = prev_date else: return True i -= 1 if sec >= 5: return True else: levent = 'обнаружены 15 сообщений за ' + str(sec) + ' сек.' logging.warning(levent) return False def store_mess(self, mess_id, mess_date): if not self.__is_doubled(mess_id): if self.__banned: status = self.__is_now_banned(self.__banned_to) if status: return 1 else: self.__banned = False if not self.__is_valid_timing(mess_date): if self.__user_id not in admins_ids: now = datetime.now() if self.__ban_count > 0: delta = timedelta(days=30) rez = 4 else: delta = timedelta(minutes=30) rez = 3 self.__banned_to = now + delta self.__banned = True self.__ban_count += 1 if self.__ban_count == 1: entry = (self.__user_id, self.__ban_count, str(self.__banned_to)[0:19]) cursor.execute('insert into ban_list (user_id, ban_count, ban_to_date) values (?,?,?)', entry) else: entry = (self.__ban_count, str(self.__banned_to)[0:19], self.__user_id) cursor.execute('update ban_list set ban_count=?, ban_to_date=? where user_id=?', entry) con.commit() return rez self.__buf.append((mess_id, mess_date)) self.__buf.sort(key=self.__sort_key) self.__buf.pop(0) return 0 else: return 2 # TODO Дополнить логирование кнопок class BotIncomingMessagesOrder: """класс порядка сообщений в диалогах""" __slots__ = ['__users', '__orders'] def __init__(self): self.__users = {} self.__orders = [] def new_mess(self, message): mess_id = message.id mess_date = message.date + msk_hour user_id = message.peer_id.user_id txt = message.text index = self.__users.get(user_id) if index is None: index = len(self.__orders) self.__users[user_id] = index self.__orders.append(IncomingMessagesTimeBuffer(20, user_id)) if debug: levent = 'открыт новый буфер, id - ' + str(user_id) logging.info(levent) order_result = self.__orders[index].store_mess(mess_id, mess_date) if debug or io_write: if order_result not in [1, 2]: entry = (mess_id, str(mess_date)[0:19], user_id, txt) cursor.execute('insert into messages (mess_id, mess_date, from_id, mess_txt) values (?,?,?,?)', entry) con.commit() return order_result # менеджер отложенной отправки сообщений async def queue_manager(): if not manager_run: manager_run.set_true() levent = 'менеджер отложенных сообщений стартовал.' logging.info(levent) now = datetime.now() delta = now - last_send_time if delta.total_seconds() < 0: t_sec = delta.total_seconds() seconds = abs(t_sec // 1 + 1) levent = 'ожидание разрешения на отправку - ' + str(seconds) + ' сек.' logging.info(levent) await sleep(seconds) tg_flood_wait.set_false() mess_count = 0 mess_success = 0 while work_queue.queue_empty() is not True: if not tg_flood_wait: entry = work_queue.get_next_message() user_id = entry[0] message = entry[1] file_name = entry[2] buttons = entry[3] contact_add = entry[4] await sleep(sys_wait) if debug: levent = 'попытка отправки сообщения для user_id = ' + str(user_id) logging.info(levent) result_flag = await send_reply_message(user_id, message, file_name, buttons, log_parameter=True, contact_add=contact_add, from_queue=True) work_queue.set_sending_result(result_flag) if result_flag: mess_success += 1 mess_count += 1 else: break manager_run.set_false() levent = 'менеджер отложенных сообщений закончил. попыток - ' + str(mess_count) + ', отправлено - ' + str( mess_success) logging.info(levent) if tg_flood_wait: client.loop.create_task(queue_manager()) def message_to_queue(user_id, mess, file_name, buttons, contact_add): work_queue.add_message(user_id, mess, file_name, buttons, contact_add) if not manager_run: client.loop.create_task(queue_manager()) # TODO Сделать поддержку форвардов # отправка сообщений пользователю с соблюдением требований тг async def send_reply_message(user_id, message, file_name=None, buttons=None, log_parameter=False, contact_add=True, from_queue=False): global last_send_time now = datetime.now() delta = now - last_send_time if not from_queue: find_flag, index = work_queue.is_user_in_queue(user_id) if find_flag: message_to_queue(user_id, message, file_name, buttons, contact_add) levent = 'ответ поставлен в очередь. user_id = ' + str(user_id) logging.warning(levent) return False if bot_users.is_bot_user(user_id): timestamp = bot_users.get_user_mess_timestamp(user_id) user_delta = now - timestamp if user_delta.total_seconds() < 1: if not from_queue: message_to_queue(user_id, message, file_name, buttons, contact_add) levent = 'ответ поставлен в очередь, дельта пользователя - ' + str( user_delta.total_seconds()) + ' сек. user_id = ' + str(user_id) logging.warning(levent) return False if delta.total_seconds() > 0.04: try: await client.send_message(user_id, message, file=file_name, buttons=buttons) last_send_time = datetime.now() if log_parameter or debug: levent = 'ответ отправлен, user_id = ' + str(user_id) logging.info(levent) if debug or io_write: if len(message) > 100: message = message[0:100] entry = (str(last_send_time)[0:19], user_id, message) cursor.execute('insert into messages ( mess_date, to_id, mess_txt) values (?,?,?)', entry) con.commit() if contact_add: if not bot_users.is_bot_user(user_id): user_entity = await client.get_entity(user_id) bot_users.new_user_store(user_entity) else: bot_users.update_user_mess_timestamp(user_id) return True except errors.FloodWaitError as e: seconds = e.seconds levent = 'сработал телеграм flood_wait, время ожидания - ' + str(seconds) + ' сек.' logging.warning(levent) tg_flood_wait.set_true() last_send_time = now + timedelta(seconds=seconds + 1) if not from_queue: message_to_queue(user_id, message, file_name, buttons, contact_add) return False except errors.UserIsBlockedError as e: levent = 'пользователь заблокировал бота (user_id = ' + str(user_id) + '): ' + str(e) if debug: logging.warning(levent) if from_queue: return True return False except Exception as e: levent = 'что-то пошло не так при отправке (user_id = ' + str(user_id) + '): ' + str(e) logging.error(levent) if from_queue: return True return False else: if not from_queue: message_to_queue(user_id, message, file_name, buttons, contact_add) levent = 'ответ поставлен в очередь, дельта - ' + str(delta.total_seconds()) + ' сек. user_id = ' + str( user_id) logging.warning(levent) return False # отправка сообщений всем пользователям бота async def send_message_to_all(mess): users_list = bot_users.get_users_list() count = 0 success_count = 0 for user_id in users_list: result_flag = await send_reply_message(user_id, mess) await sleep(sys_wait) if result_flag: success_count += 1 count += 1 return count, success_count # инициализация async def init(cli, debug_mode, adm_ids, io_write_mode): global client global last_send_time global con global cursor global bot_users global admins_ids admins_ids = adm_ids client = cli if debug_mode: debug.set_true() if io_write_mode: io_write.set_true() if not path.exists('io.sqlite'): con = sqlite3.connect('io.sqlite') cursor = con.cursor() cursor.executescript(''' CREATE TABLE messages ( mess_id int, mess_date text, from_id int, to_id int, mess_txt text ); CREATE TABLE contacts ( user_id int, first_name text, last_name text, account_name text ); CREATE TABLE ban_list ( user_id int, ban_count int, ban_to_date text ); ''') con.commit() else: con = sqlite3.connect('io.sqlite') cursor = con.cursor() bot_users = TgBotUsers(con, cursor) last_send_time = datetime.now() levent = 'bot I/O запущен.' logging.info(levent) # завершение работы async def terminate(): if manager_run: levent = 'bot I/O, ожидание отправки всех сообщений.' logging.info(levent) count = 6 while count > 0: await sleep(5) count -= 1 if not manager_run: break con.close() levent = 'bot I/O остановлен.' logging.info(levent)