magic-wand_4/bot_io.py

403 lines
14 KiB
Python
Raw Permalink Normal View History

2022-09-26 19:06:52 +03:00
# Bot I/O v1.82
# 15/11/2021
2021-06-14 17:05:26 +03:00
# https://t.me/ssleg © 2020 2021
import logging
import sqlite3
from asyncio import sleep
from datetime import datetime, timedelta
from os import path
from telethon import TelegramClient, errors
from bot_io_classes import TgBotUsers, OutMessagesQueue
from tg_utils import GlobalFlag
debug = GlobalFlag()
io_write = GlobalFlag()
client: TelegramClient
msk_hour = timedelta(hours=3)
sys_wait = 0.045
2022-09-26 19:06:52 +03:00
manager_run = GlobalFlag()
2021-06-14 17:05:26 +03:00
work_queue = OutMessagesQueue()
tg_flood_wait = GlobalFlag()
last_send_time: datetime
bot_users: TgBotUsers
admins_ids = []
con: sqlite3.Connection
cursor: sqlite3.Cursor
class IncomingMessagesTimeBuffer:
"""класс буфера времени входящих сообщений пользователей"""
__slots__ = ['__buf', '__size', '__banned_to', '__ban_count', '__banned', '__user_id']
@staticmethod
def __sort_key(element):
return element[0]
@staticmethod
def __is_now_banned(timestamp):
now = datetime.now()
delta = now - timestamp
if delta.total_seconds() > 0:
return False
else:
return True
def __init__(self, size, user_id):
self.__buf = []
self.__size = size
self.__user_id = user_id
2022-09-26 19:06:52 +03:00
cursor.execute('select ban_count, ban_to_date from ban_list where user_id=?', (user_id,))
2021-06-14 17:05:26 +03:00
row = cursor.fetchone()
if row is not None:
self.__banned_to = datetime.strptime(row[1], '%Y-%m-%d %H:%M:%S')
self.__ban_count = row[0]
self.__banned = self.__is_now_banned(self.__banned_to)
else:
self.__banned_to = datetime(year=2020, month=1, day=1)
self.__ban_count = 0
self.__banned = False
for i in range(0, size):
self.__buf.append((0, 0))
def __is_doubled(self, mess_id):
i = 0
while i < self.__size:
if self.__buf[i][0] == mess_id:
levent = 'обнаружен дубликат сообщения #' + str(mess_id) + ', глубина - ' + str(i)
logging.warning(levent)
return True
i += 1
return False
def __is_valid_timing(self, mess_date):
i = self.__size - 1
sec = 0
2022-09-26 19:06:52 +03:00
# while i > self.__size - 4:
while i > 5:
2021-06-14 17:05:26 +03:00
prev_date = self.__buf[i][1]
if prev_date != 0:
tmp = mess_date - prev_date
2022-09-26 19:06:52 +03:00
'''if tmp.seconds < 1:
2021-06-14 17:05:26 +03:00
levent = 'обнаружено слишком частое обращение: ' + str(tmp.seconds) + ' сек.'
logging.warning(levent)
return False
2022-09-26 19:06:52 +03:00
else:'''
sec += tmp.seconds
mess_date = prev_date
2021-06-14 17:05:26 +03:00
else:
return True
i -= 1
if sec >= 5:
return True
else:
2022-09-26 19:06:52 +03:00
levent = 'обнаружены 15 сообщений за ' + str(sec) + ' сек.'
2021-06-14 17:05:26 +03:00
logging.warning(levent)
return False
def store_mess(self, mess_id, mess_date):
if not self.__is_doubled(mess_id):
if self.__banned:
status = self.__is_now_banned(self.__banned_to)
if status:
return 1
else:
self.__banned = False
if not self.__is_valid_timing(mess_date):
if self.__user_id not in admins_ids:
now = datetime.now()
if self.__ban_count > 0:
delta = timedelta(days=30)
rez = 4
else:
delta = timedelta(minutes=30)
rez = 3
self.__banned_to = now + delta
self.__banned = True
self.__ban_count += 1
if self.__ban_count == 1:
entry = (self.__user_id, self.__ban_count, str(self.__banned_to)[0:19])
2022-09-26 19:06:52 +03:00
cursor.execute('insert into ban_list (user_id, ban_count, ban_to_date) values (?,?,?)', entry)
2021-06-14 17:05:26 +03:00
else:
entry = (self.__ban_count, str(self.__banned_to)[0:19], self.__user_id)
2022-09-26 19:06:52 +03:00
cursor.execute('update ban_list set ban_count=?, ban_to_date=? where user_id=?', entry)
2021-06-14 17:05:26 +03:00
con.commit()
return rez
self.__buf.append((mess_id, mess_date))
self.__buf.sort(key=self.__sort_key)
self.__buf.pop(0)
return 0
else:
return 2
2022-09-26 19:06:52 +03:00
# TODO Дополнить логирование кнопок
2021-06-14 17:05:26 +03:00
class BotIncomingMessagesOrder:
"""класс порядка сообщений в диалогах"""
__slots__ = ['__users', '__orders']
def __init__(self):
self.__users = {}
self.__orders = []
def new_mess(self, message):
mess_id = message.id
mess_date = message.date + msk_hour
user_id = message.peer_id.user_id
txt = message.text
2022-09-26 19:06:52 +03:00
index = self.__users.get(user_id)
if index is None:
index = len(self.__orders)
self.__users[user_id] = index
2021-06-14 17:05:26 +03:00
self.__orders.append(IncomingMessagesTimeBuffer(20, user_id))
if debug:
levent = 'открыт новый буфер, id - ' + str(user_id)
logging.info(levent)
2022-09-26 19:06:52 +03:00
order_result = self.__orders[index].store_mess(mess_id, mess_date)
2021-06-14 17:05:26 +03:00
if debug or io_write:
if order_result not in [1, 2]:
entry = (mess_id, str(mess_date)[0:19], user_id, txt)
cursor.execute('insert into messages (mess_id, mess_date, from_id, mess_txt) values (?,?,?,?)', entry)
con.commit()
return order_result
# менеджер отложенной отправки сообщений
async def queue_manager():
2022-09-26 19:06:52 +03:00
if not manager_run:
manager_run.set_true()
2021-06-14 17:05:26 +03:00
levent = 'менеджер отложенных сообщений стартовал.'
logging.info(levent)
now = datetime.now()
delta = now - last_send_time
if delta.total_seconds() < 0:
2022-09-26 19:06:52 +03:00
t_sec = delta.total_seconds()
seconds = abs(t_sec // 1 + 1)
2021-06-14 17:05:26 +03:00
levent = 'ожидание разрешения на отправку - ' + str(seconds) + ' сек.'
logging.info(levent)
await sleep(seconds)
tg_flood_wait.set_false()
mess_count = 0
mess_success = 0
while work_queue.queue_empty() is not True:
if not tg_flood_wait:
entry = work_queue.get_next_message()
user_id = entry[0]
message = entry[1]
file_name = entry[2]
2022-09-26 19:06:52 +03:00
buttons = entry[3]
contact_add = entry[4]
2021-06-14 17:05:26 +03:00
await sleep(sys_wait)
if debug:
levent = 'попытка отправки сообщения для user_id = ' + str(user_id)
logging.info(levent)
2022-09-26 19:06:52 +03:00
result_flag = await send_reply_message(user_id, message, file_name, buttons, log_parameter=True,
contact_add=contact_add, from_queue=True)
2021-06-14 17:05:26 +03:00
work_queue.set_sending_result(result_flag)
if result_flag:
mess_success += 1
mess_count += 1
else:
break
2022-09-26 19:06:52 +03:00
manager_run.set_false()
2021-06-14 17:05:26 +03:00
levent = 'менеджер отложенных сообщений закончил. попыток - ' + str(mess_count) + ', отправлено - ' + str(
mess_success)
logging.info(levent)
if tg_flood_wait:
client.loop.create_task(queue_manager())
2022-09-26 19:06:52 +03:00
def message_to_queue(user_id, mess, file_name, buttons, contact_add):
work_queue.add_message(user_id, mess, file_name, buttons, contact_add)
if not manager_run:
2021-06-14 17:05:26 +03:00
client.loop.create_task(queue_manager())
2022-09-26 19:06:52 +03:00
# TODO Сделать поддержку форвардов
2021-06-14 17:05:26 +03:00
# отправка сообщений пользователю с соблюдением требований тг
2022-09-26 19:06:52 +03:00
async def send_reply_message(user_id, message, file_name=None, buttons=None, log_parameter=False, contact_add=True,
from_queue=False):
2021-06-14 17:05:26 +03:00
global last_send_time
now = datetime.now()
delta = now - last_send_time
if not from_queue:
2022-09-26 19:06:52 +03:00
find_flag, index = work_queue.is_user_in_queue(user_id)
2021-06-14 17:05:26 +03:00
if find_flag:
2022-09-26 19:06:52 +03:00
message_to_queue(user_id, message, file_name, buttons, contact_add)
2021-06-14 17:05:26 +03:00
levent = 'ответ поставлен в очередь. user_id = ' + str(user_id)
logging.warning(levent)
return False
if bot_users.is_bot_user(user_id):
timestamp = bot_users.get_user_mess_timestamp(user_id)
user_delta = now - timestamp
if user_delta.total_seconds() < 1:
if not from_queue:
2022-09-26 19:06:52 +03:00
message_to_queue(user_id, message, file_name, buttons, contact_add)
2021-06-14 17:05:26 +03:00
levent = 'ответ поставлен в очередь, дельта пользователя - ' + str(
user_delta.total_seconds()) + ' сек. user_id = ' + str(user_id)
logging.warning(levent)
return False
if delta.total_seconds() > 0.04:
try:
2022-09-26 19:06:52 +03:00
await client.send_message(user_id, message, file=file_name, buttons=buttons)
2021-06-14 17:05:26 +03:00
last_send_time = datetime.now()
if log_parameter or debug:
levent = 'ответ отправлен, user_id = ' + str(user_id)
logging.info(levent)
if debug or io_write:
if len(message) > 100:
message = message[0:100]
entry = (str(last_send_time)[0:19], user_id, message)
cursor.execute('insert into messages ( mess_date, to_id, mess_txt) values (?,?,?)', entry)
con.commit()
if contact_add:
if not bot_users.is_bot_user(user_id):
user_entity = await client.get_entity(user_id)
bot_users.new_user_store(user_entity)
else:
bot_users.update_user_mess_timestamp(user_id)
return True
except errors.FloodWaitError as e:
seconds = e.seconds
2022-09-26 19:06:52 +03:00
levent = 'сработал телеграм flood_wait, время ожидания - ' + str(seconds) + ' сек.'
2021-06-14 17:05:26 +03:00
logging.warning(levent)
tg_flood_wait.set_true()
last_send_time = now + timedelta(seconds=seconds + 1)
if not from_queue:
2022-09-26 19:06:52 +03:00
message_to_queue(user_id, message, file_name, buttons, contact_add)
2021-06-14 17:05:26 +03:00
return False
2022-09-26 19:06:52 +03:00
except errors.UserIsBlockedError as e:
levent = 'пользователь заблокировал бота (user_id = ' + str(user_id) + '): ' + str(e)
2021-06-14 17:05:26 +03:00
if debug:
logging.warning(levent)
2022-09-26 19:06:52 +03:00
if from_queue:
return True
return False
except Exception as e:
levent = 'что-то пошло не так при отправке (user_id = ' + str(user_id) + '): ' + str(e)
logging.error(levent)
if from_queue:
return True
2021-06-14 17:05:26 +03:00
return False
else:
if not from_queue:
2022-09-26 19:06:52 +03:00
message_to_queue(user_id, message, file_name, buttons, contact_add)
2021-06-14 17:05:26 +03:00
levent = 'ответ поставлен в очередь, дельта - ' + str(delta.total_seconds()) + ' сек. user_id = ' + str(
user_id)
logging.warning(levent)
return False
# отправка сообщений всем пользователям бота
async def send_message_to_all(mess):
users_list = bot_users.get_users_list()
count = 0
success_count = 0
for user_id in users_list:
result_flag = await send_reply_message(user_id, mess)
await sleep(sys_wait)
if result_flag:
success_count += 1
count += 1
return count, success_count
# инициализация
async def init(cli, debug_mode, adm_ids, io_write_mode):
global client
global last_send_time
global con
global cursor
global bot_users
global admins_ids
admins_ids = adm_ids
client = cli
if debug_mode:
debug.set_true()
if io_write_mode:
io_write.set_true()
if not path.exists('io.sqlite'):
con = sqlite3.connect('io.sqlite')
cursor = con.cursor()
cursor.executescript('''
CREATE TABLE messages
(
mess_id int,
mess_date text,
from_id int,
to_id int,
mess_txt text
);
CREATE TABLE contacts
(
user_id int,
first_name text,
last_name text,
account_name text
);
2022-09-26 19:06:52 +03:00
CREATE TABLE ban_list
2021-06-14 17:05:26 +03:00
(
user_id int,
ban_count int,
ban_to_date text
);
''')
con.commit()
else:
con = sqlite3.connect('io.sqlite')
cursor = con.cursor()
bot_users = TgBotUsers(con, cursor)
last_send_time = datetime.now()
levent = 'bot I/O запущен.'
logging.info(levent)
# завершение работы
async def terminate():
2022-09-26 19:06:52 +03:00
if manager_run:
2021-06-14 17:05:26 +03:00
levent = 'bot I/O, ожидание отправки всех сообщений.'
logging.info(levent)
count = 6
while count > 0:
await sleep(5)
count -= 1
2022-09-26 19:06:52 +03:00
if not manager_run:
2021-06-14 17:05:26 +03:00
break
con.close()
levent = 'bot I/O остановлен.'
logging.info(levent)