magic-wand_4/magic.py

319 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# Magic wand v4.27
# 27/06/2023
# https://t.me/ssleg © 2020 2023
import logging
import signal
from asyncio import sleep
from configparser import ConfigParser
from datetime import datetime
from hashlib import md5
from logging.handlers import RotatingFileHandler
from os import path
from requests import post
from telethon import TelegramClient, events
import bot_io
import main_module
# import module_two
from tg_utils import get_tg_client, get_bot_key, GlobalFlag, GlobalCounter
# загрузка конфигурации из ini файла
bot_config = ConfigParser()
bot_config.read('magic.ini')
debug = bot_config.getboolean('bot_config', 'debug')
auth_from_db = bot_config.getboolean('bot_config', 'auth_from_db')
bot_stats = bot_config.getboolean('bot_config', 'bot_stats')
io_write = bot_config.getboolean('bot_config', 'io_write')
logfile_name = bot_config.get('bot_config', 'logfile_name')
admins_ids_str = bot_config.get('bot_admins', 'admins_ids')
admins_ids_array = []
if admins_ids_str.find(';') > -1:
admins_ids_str = admins_ids_str.replace(' ', '')
str_split = admins_ids_str.split(';')
for value in str_split:
if value.isdigit():
admins_ids_array.append(int(value))
else:
if admins_ids_str.isdigit():
admins_ids_array.append(int(admins_ids_str))
api_id = bot_config.getint('bot_telegram_account', 'api_id')
api_hash = bot_config.get('bot_telegram_account', 'api_hash')
session_name = bot_config.get('bot_telegram_account', 'session_name')
bot_token = bot_config.get('bot_telegram_account', 'bot_token')
account = bot_config.get('bot_db_account', 'account')
bot_account = bot_config.get('bot_db_account', 'bot_account')
# глобальные переменные
sigterm_flag = GlobalFlag()
log_size = GlobalCounter()
last_string = GlobalCounter()
error_count = GlobalCounter()
started_time: datetime
next_stat = GlobalCounter(3600)
# инициализация лога и параметров подключения
log_file = RotatingFileHandler(logfile_name, 'a', maxBytes=524288, backupCount=10, encoding='utf=8')
# noinspection SpellCheckingInspection
log_file.setFormatter(logging.Formatter('%(levelname)s %(module)-13s [%(asctime)s] %(message)s'))
# noinspection PyArgumentList
logging.basicConfig(level=logging.INFO, handlers=[log_file])
sigterm_flag.set_true()
if len(admins_ids_array) == 0:
l_event = 'ни одного админа не указано, бот неуправляем.'
logging.warning(l_event)
admins_ids = ()
else:
admins_ids = tuple(admins_ids_array)
if auth_from_db:
client = get_tg_client(account)
bot_key = get_bot_key(bot_account)
else:
client = TelegramClient(session_name, api_id, api_hash)
bot_key = bot_token
# возвращает строку с версией приложения
def get_my_version():
my_name = path.basename(__file__)
file = open(my_name)
version = ''
for line in file:
line = line[0:len(line) - 1]
if len(line) > 0:
if line[0] == '#':
offset = line.find(' v')
if offset > -1:
version = line[offset + 1:len(line)]
break
file.close()
return version
# записывает начальные параметры лог-файла
def init_log_var():
log_size.set(path.getsize(logfile_name))
file = open(logfile_name, 'r', encoding='utf-8')
all_file = file.readlines()
file.close()
last_string.set(len(all_file))
levent = 'начальные параметры этого лог-файла: ' + str(log_size) + ' байт, ' + str(last_string) + ' строк.'
logging.info(levent)
# основная инициализация бота и модулей
async def init():
my_version = get_my_version()
if auth_from_db:
account_id = account
else:
account_id = str(api_id)
levent = 'Magic wand ' + my_version + ' (' + account_id + '), запуск модулей.'
logging.info(levent)
init_log_var()
await bot_io.init(client, debug, admins_ids, io_write)
await main_module.init(client, debug, admins_ids)
# await module_two.init(client)
levent = 'инициализация завершена.'
logging.info(levent)
# возвращает строку со временем аптайм
def get_uptime():
now = datetime.now()
delta = now - started_time
days = delta.days
secs = delta.seconds
hours = round(secs // 3600)
minutes = round(secs // 60 - hours * 60)
seconds = round(secs - hours * 3600 - minutes * 60)
message = 'Аптайм - ' + str(days) + ' дней, ' + str(hours) + ' час. ' + str(minutes) + ' мин. ' + str(
seconds) + ' сек.\n\n'
return message
# возвращает количество секунд с момента старта
def get_up_seconds():
now = datetime.now()
delta = now - started_time
return delta.days * 86400 + delta.seconds
# обработчик событий, отвечающий за команды админа (статистика работы, рассылки)
async def admins_command(event):
from_id = event.message.peer_id.user_id
command = event.message.text
if command == '-mw':
message = get_uptime()
message += main_module.status()
message += 'пользователей - ' + str(len(bot_io.bot_users.get_users_list())) + '\n'
message += 'ошибок - ' + str(error_count)
await bot_io.send_reply_message(from_id, message)
levent = 'запрос статистики выполнен успешно, админ: ' + str(from_id)
logging.info(levent)
if command.find('-send') == 0:
mess = command[6:len(command)]
if mess != '':
levent = 'админ ' + str(from_id) + ' создал рассылку.'
logging.info(levent)
count, success_count = await bot_io.send_message_to_all(mess)
levent = 'массовая рассылка сообщения выполнена, пользователей: ' + str(
count) + ', доставлено сообщений: ' + str(success_count)
logging.info(levent)
await bot_io.send_reply_message(from_id, 'рассылка выполнена для ' + str(
count) + ' пользователей, доставлено сообщений: ' + str(success_count))
else:
await bot_io.send_reply_message(from_id, 'сообщения для рассылки нет.')
# загрузка статистики работы на сервер
def stat_upload():
request_headers = {
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json; charset=utf-8'
}
# noinspection HttpUrlsUsage
stat_upload_url = 'https://api.ssleg.tech/v1/stat_up'
hash_md5 = md5(bot_key.encode())
request_json = {'protocol_version': '1.1', 'application': 'Magic Wand', 'app_version': get_my_version(),
'uptime': get_up_seconds(), 'errors': error_count.get(), 'fingerprint': hash_md5.hexdigest()}
try:
response = post(stat_upload_url, headers=request_headers, json=request_json, timeout=5)
response_code = response.status_code
if response_code == 200:
if debug:
levent = 'статистика отправлена успешно.'
logging.info(levent)
next_stat.increment(86400)
else:
levent = 'ошибка на сервере статистики, код ' + str(response_code)
logging.warning(levent)
next_stat.increment(300)
except Exception as e:
levent = 'ошибка в http запросе: ' + str(e)
logging.error(levent)
next_stat.increment(300)
# рассылка админам бота уведомления об ошибке
async def send_notices(mess):
error_count.increment()
for admin in admins_ids:
await sleep(0.5)
await bot_io.send_reply_message(admin, mess)
levent = 'уведомление об ошибке отправлено админу: ' + str(admin)
logging.info(levent)
# чтение изменений лог-файла и поиск ошибок в нем
def log_reader():
file = open(logfile_name, 'r', encoding='utf-8')
read_count = 0
work_count = last_string.get()
i = 0
flag = False
errmsg = ''
last_read = ''
while i < 1:
string = file.readline()
if string != '':
read_count += 1
if read_count > work_count:
last_string.increment()
if flag:
if len(string) > 1:
if string.find('[20') > -1:
errmsg += '\n' + last_read
client.loop.create_task(send_notices(errmsg))
flag = False
else:
last_read = string
if string.find('ERROR') == 0:
errmsg = string
flag = True
else:
if flag:
errmsg += '\n' + last_read
client.loop.create_task(send_notices(errmsg))
i = 1
file.close()
# наблюдатель за лог-файлом, проверяет изменения раз в минуту
def log_watcher():
if sigterm_flag:
filesize = path.getsize(logfile_name)
if filesize > log_size:
log_size.set(filesize)
log_reader()
if filesize < log_size:
log_size.clear()
last_string.clear()
if bot_stats:
if get_up_seconds() > next_stat:
stat_upload()
client.loop.call_later(60, log_watcher)
# завершает работу модулей
async def terminate():
levent = 'остановка началась...'
logging.info(levent)
await main_module.terminate()
await bot_io.terminate()
await client.disconnect()
levent = 'бот остановлен.'
logging.info(levent)
# обработчик сигнала sigterm, возникающего при перезапуске системы или бота.
# вызывает процедуру корректного завершения бота и модулей
# noinspection PyUnusedLocal
def sigterm_call(signum, frame):
if sigterm_flag:
sigterm_flag.set_false()
levent = 'получен SIGTERM ' + str(signum)
logging.info(levent)
client.loop.create_task(terminate())
# старт telethon и основной инициализации
if __name__ == '__main__':
signal.signal(signal.SIGTERM, sigterm_call)
client.start(bot_token=bot_key)
client.loop.run_until_complete(init())
started_time = datetime.now()
client.add_event_handler(admins_command, events.NewMessage(chats=admins_ids, incoming=True))
client.loop.call_later(60, log_watcher)
client.run_until_disconnected()