magic-wand_4/bot_io.py

403 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Bot I/O v1.82
# 15/11/2021
# https://t.me/ssleg © 2020 2021
import logging
import sqlite3
from asyncio import sleep
from datetime import datetime, timedelta
from os import path
from telethon import TelegramClient, errors
from bot_io_classes import TgBotUsers, OutMessagesQueue
from tg_utils import GlobalFlag
debug = GlobalFlag()
io_write = GlobalFlag()
client: TelegramClient
msk_hour = timedelta(hours=3)
sys_wait = 0.045
manager_run = GlobalFlag()
work_queue = OutMessagesQueue()
tg_flood_wait = GlobalFlag()
last_send_time: datetime
bot_users: TgBotUsers
admins_ids = []
con: sqlite3.Connection
cursor: sqlite3.Cursor
class IncomingMessagesTimeBuffer:
"""класс буфера времени входящих сообщений пользователей"""
__slots__ = ['__buf', '__size', '__banned_to', '__ban_count', '__banned', '__user_id']
@staticmethod
def __sort_key(element):
return element[0]
@staticmethod
def __is_now_banned(timestamp):
now = datetime.now()
delta = now - timestamp
if delta.total_seconds() > 0:
return False
else:
return True
def __init__(self, size, user_id):
self.__buf = []
self.__size = size
self.__user_id = user_id
cursor.execute('select ban_count, ban_to_date from ban_list where user_id=?', (user_id,))
row = cursor.fetchone()
if row is not None:
self.__banned_to = datetime.strptime(row[1], '%Y-%m-%d %H:%M:%S')
self.__ban_count = row[0]
self.__banned = self.__is_now_banned(self.__banned_to)
else:
self.__banned_to = datetime(year=2020, month=1, day=1)
self.__ban_count = 0
self.__banned = False
for i in range(0, size):
self.__buf.append((0, 0))
def __is_doubled(self, mess_id):
i = 0
while i < self.__size:
if self.__buf[i][0] == mess_id:
levent = 'обнаружен дубликат сообщения #' + str(mess_id) + ', глубина - ' + str(i)
logging.warning(levent)
return True
i += 1
return False
def __is_valid_timing(self, mess_date):
i = self.__size - 1
sec = 0
# while i > self.__size - 4:
while i > 5:
prev_date = self.__buf[i][1]
if prev_date != 0:
tmp = mess_date - prev_date
'''if tmp.seconds < 1:
levent = 'обнаружено слишком частое обращение: ' + str(tmp.seconds) + ' сек.'
logging.warning(levent)
return False
else:'''
sec += tmp.seconds
mess_date = prev_date
else:
return True
i -= 1
if sec >= 5:
return True
else:
levent = 'обнаружены 15 сообщений за ' + str(sec) + ' сек.'
logging.warning(levent)
return False
def store_mess(self, mess_id, mess_date):
if not self.__is_doubled(mess_id):
if self.__banned:
status = self.__is_now_banned(self.__banned_to)
if status:
return 1
else:
self.__banned = False
if not self.__is_valid_timing(mess_date):
if self.__user_id not in admins_ids:
now = datetime.now()
if self.__ban_count > 0:
delta = timedelta(days=30)
rez = 4
else:
delta = timedelta(minutes=30)
rez = 3
self.__banned_to = now + delta
self.__banned = True
self.__ban_count += 1
if self.__ban_count == 1:
entry = (self.__user_id, self.__ban_count, str(self.__banned_to)[0:19])
cursor.execute('insert into ban_list (user_id, ban_count, ban_to_date) values (?,?,?)', entry)
else:
entry = (self.__ban_count, str(self.__banned_to)[0:19], self.__user_id)
cursor.execute('update ban_list set ban_count=?, ban_to_date=? where user_id=?', entry)
con.commit()
return rez
self.__buf.append((mess_id, mess_date))
self.__buf.sort(key=self.__sort_key)
self.__buf.pop(0)
return 0
else:
return 2
# TODO Дополнить логирование кнопок
class BotIncomingMessagesOrder:
"""класс порядка сообщений в диалогах"""
__slots__ = ['__users', '__orders']
def __init__(self):
self.__users = {}
self.__orders = []
def new_mess(self, message):
mess_id = message.id
mess_date = message.date + msk_hour
user_id = message.peer_id.user_id
txt = message.text
index = self.__users.get(user_id)
if index is None:
index = len(self.__orders)
self.__users[user_id] = index
self.__orders.append(IncomingMessagesTimeBuffer(20, user_id))
if debug:
levent = 'открыт новый буфер, id - ' + str(user_id)
logging.info(levent)
order_result = self.__orders[index].store_mess(mess_id, mess_date)
if debug or io_write:
if order_result not in [1, 2]:
entry = (mess_id, str(mess_date)[0:19], user_id, txt)
cursor.execute('insert into messages (mess_id, mess_date, from_id, mess_txt) values (?,?,?,?)', entry)
con.commit()
return order_result
# менеджер отложенной отправки сообщений
async def queue_manager():
if not manager_run:
manager_run.set_true()
levent = 'менеджер отложенных сообщений стартовал.'
logging.info(levent)
now = datetime.now()
delta = now - last_send_time
if delta.total_seconds() < 0:
t_sec = delta.total_seconds()
seconds = abs(t_sec // 1 + 1)
levent = 'ожидание разрешения на отправку - ' + str(seconds) + ' сек.'
logging.info(levent)
await sleep(seconds)
tg_flood_wait.set_false()
mess_count = 0
mess_success = 0
while work_queue.queue_empty() is not True:
if not tg_flood_wait:
entry = work_queue.get_next_message()
user_id = entry[0]
message = entry[1]
file_name = entry[2]
buttons = entry[3]
contact_add = entry[4]
await sleep(sys_wait)
if debug:
levent = 'попытка отправки сообщения для user_id = ' + str(user_id)
logging.info(levent)
result_flag = await send_reply_message(user_id, message, file_name, buttons, log_parameter=True,
contact_add=contact_add, from_queue=True)
work_queue.set_sending_result(result_flag)
if result_flag:
mess_success += 1
mess_count += 1
else:
break
manager_run.set_false()
levent = 'менеджер отложенных сообщений закончил. попыток - ' + str(mess_count) + ', отправлено - ' + str(
mess_success)
logging.info(levent)
if tg_flood_wait:
client.loop.create_task(queue_manager())
def message_to_queue(user_id, mess, file_name, buttons, contact_add):
work_queue.add_message(user_id, mess, file_name, buttons, contact_add)
if not manager_run:
client.loop.create_task(queue_manager())
# TODO Сделать поддержку форвардов
# отправка сообщений пользователю с соблюдением требований тг
async def send_reply_message(user_id, message, file_name=None, buttons=None, log_parameter=False, contact_add=True,
from_queue=False):
global last_send_time
now = datetime.now()
delta = now - last_send_time
if not from_queue:
find_flag, index = work_queue.is_user_in_queue(user_id)
if find_flag:
message_to_queue(user_id, message, file_name, buttons, contact_add)
levent = 'ответ поставлен в очередь. user_id = ' + str(user_id)
logging.warning(levent)
return False
if bot_users.is_bot_user(user_id):
timestamp = bot_users.get_user_mess_timestamp(user_id)
user_delta = now - timestamp
if user_delta.total_seconds() < 1:
if not from_queue:
message_to_queue(user_id, message, file_name, buttons, contact_add)
levent = 'ответ поставлен в очередь, дельта пользователя - ' + str(
user_delta.total_seconds()) + ' сек. user_id = ' + str(user_id)
logging.warning(levent)
return False
if delta.total_seconds() > 0.04:
try:
await client.send_message(user_id, message, file=file_name, buttons=buttons)
last_send_time = datetime.now()
if log_parameter or debug:
levent = 'ответ отправлен, user_id = ' + str(user_id)
logging.info(levent)
if debug or io_write:
if len(message) > 100:
message = message[0:100]
entry = (str(last_send_time)[0:19], user_id, message)
cursor.execute('insert into messages ( mess_date, to_id, mess_txt) values (?,?,?)', entry)
con.commit()
if contact_add:
if not bot_users.is_bot_user(user_id):
user_entity = await client.get_entity(user_id)
bot_users.new_user_store(user_entity)
else:
bot_users.update_user_mess_timestamp(user_id)
return True
except errors.FloodWaitError as e:
seconds = e.seconds
levent = 'сработал телеграм flood_wait, время ожидания - ' + str(seconds) + ' сек.'
logging.warning(levent)
tg_flood_wait.set_true()
last_send_time = now + timedelta(seconds=seconds + 1)
if not from_queue:
message_to_queue(user_id, message, file_name, buttons, contact_add)
return False
except errors.UserIsBlockedError as e:
levent = 'пользователь заблокировал бота (user_id = ' + str(user_id) + '): ' + str(e)
if debug:
logging.warning(levent)
if from_queue:
return True
return False
except Exception as e:
levent = 'что-то пошло не так при отправке (user_id = ' + str(user_id) + '): ' + str(e)
logging.error(levent)
if from_queue:
return True
return False
else:
if not from_queue:
message_to_queue(user_id, message, file_name, buttons, contact_add)
levent = 'ответ поставлен в очередь, дельта - ' + str(delta.total_seconds()) + ' сек. user_id = ' + str(
user_id)
logging.warning(levent)
return False
# отправка сообщений всем пользователям бота
async def send_message_to_all(mess):
users_list = bot_users.get_users_list()
count = 0
success_count = 0
for user_id in users_list:
result_flag = await send_reply_message(user_id, mess)
await sleep(sys_wait)
if result_flag:
success_count += 1
count += 1
return count, success_count
# инициализация
async def init(cli, debug_mode, adm_ids, io_write_mode):
global client
global last_send_time
global con
global cursor
global bot_users
global admins_ids
admins_ids = adm_ids
client = cli
if debug_mode:
debug.set_true()
if io_write_mode:
io_write.set_true()
if not path.exists('io.sqlite'):
con = sqlite3.connect('io.sqlite')
cursor = con.cursor()
cursor.executescript('''
CREATE TABLE messages
(
mess_id int,
mess_date text,
from_id int,
to_id int,
mess_txt text
);
CREATE TABLE contacts
(
user_id int,
first_name text,
last_name text,
account_name text
);
CREATE TABLE ban_list
(
user_id int,
ban_count int,
ban_to_date text
);
''')
con.commit()
else:
con = sqlite3.connect('io.sqlite')
cursor = con.cursor()
bot_users = TgBotUsers(con, cursor)
last_send_time = datetime.now()
levent = 'bot I/O запущен.'
logging.info(levent)
# завершение работы
async def terminate():
if manager_run:
levent = 'bot I/O, ожидание отправки всех сообщений.'
logging.info(levent)
count = 6
while count > 0:
await sleep(5)
count -= 1
if not manager_run:
break
con.close()
levent = 'bot I/O остановлен.'
logging.info(levent)