Initial commit

master
ssleg 2021-12-21 14:42:38 +03:00
commit 0c05c3f17d
Signed by: anton
GPG Key ID: 50F7E97F96C07ECF
5 changed files with 631 additions and 0 deletions

9
LICENSE Normal file
View File

@ -0,0 +1,9 @@
MIT License
Copyright © 2020-2021 https://t.me/ssleg
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

40
README.MD Normal file
View File

@ -0,0 +1,40 @@
### Telegram chat reader
Программа полностью сливает чаты телеграм в базу данных PostgreSQL.
Для использования нужен развернутый сервер постгрес и телеграм-аккаунт.
Предварительно нужно настроить свой телеграм:
+ зайти на сайт [Telegram]
+ Выбрать раздел API Development Tools
+ Зарегистрировать там свое приложение (название, короткое название и главное, написать пару слов в раздел about)
+ полученные api-id и api-hash записать в chat_reader.toml (тот же ini файл, открывается блокнотом)
Так же в chat_reader.toml нужно внести название и настройки базы данных, в которую будут складываться данные.
**Внимание!** запускать скрипт следует всегда из его папки, telethon там хранит файл session
После настройки, запустите chat_reader.py --check.
В первый и единственный раз надо будет ввести телефон, на который зарегистрирован Telegram аккаунт. А также код подтверждения, который будет прислан Telegram.
После чего, храните файл chat_reader.session, он действует бесконечно, пока вы его не отключите через клиента.
Если все настройки прошли успешно, программа об этом сообщит.
Команды для использования:
--chat имя_чата - добавляет чат в базу и его скачивает. Если чат прикреплен к каналу, введите имя канала вместо чата, он будет обнаружен и скачан.
--all обновление базы данных, программа скачивает свежие сообщения со всех чатов, которые есть в базе.
При запуске без параметров, показывает текущую статистику базы данных: количество сообщений, чатов и занятое ими место.
Все рабочие сообщения выводятся на экран и в логе (по умолчанию, reader.log), лог при каждом запуске перезаписывается.
В бд программа создает три таблицы, для сообщений, юзеров и каналов.
Скачивается текстовая часть чата. Мультимедиа только обозначается в поле media таблицы сообщений.
Благодарности, вопросы и запросы фич складывать здесь или в комментариях к [этому посту].
Лицензия на код и документацию MIT. Вы можете свободно использовать, изменять и продавать код при условии сохранения
информации об авторских правах.
[Telegram]:https://my.telegram.org
[этому посту]:https://t.me/ssleg/511

384
chat_reader.py Executable file
View File

@ -0,0 +1,384 @@
#!/usr/bin/env python3
# Telegram chat reader v1.00
# 21/12/2021
# https://t.me/ssleg © 2021
import logging
from os import path
from sys import argv
import psycopg2
import toml
from telethon import TelegramClient, functions, errors
import reader_module
log_name = ''
database = ''
database_host = ''
database_user = ''
database_pass = ''
database_port = ''
api_id = 0
api_hash = ''
check_mess = '''
Сначала введите свои настройки telegram и базы данных в chat_reader.toml \
и запустите скрипт с ключом --check.
Полный список команд, смотрите --help.
'''
# noinspection SpellCheckingInspection
help_mess = '''
Команды:
--check - проверяет настройки бд и telegram.
--all - скачивает обновления для всех чатов, которые ранее скачивались в бд.
--сhat name - добавляет и скачивает чат @name или чат, подключенный к каналу @name
'''
# версия программы из заголовка
def get_version():
my_name = path.basename(__file__)
file = open(my_name)
version = ''
for line in file:
line = line[0:len(line) - 1]
if len(line) > 0:
if line[0] == '#':
offset = line.find(' v')
if offset > -1:
version = line[offset + 1:len(line)]
break
file.close()
return version
# форматирование чисел с пробелом
def set_num_printable(number):
string = '{:,}'.format(number)
string = string.replace(',', ' ')
return string
# словарь чатов в базе данных
def get_db_chats_dict(curs):
curs.execute('''
select title, channel_id from chat_reader_channels as crc
right join
(select chat_id from chat_reader_mess group by chat_id) as temp
on crc.channel_id=temp.chat_id
''')
chats_dict = {}
for row in curs.fetchall():
chats_dict[row[1]] = row[0]
return chats_dict
# обновление всех чатов
def update_all():
client = TelegramClient('chat_reader', api_id, api_hash)
con = psycopg2.connect(database=database,
user=database_user,
password=database_pass,
host=database_host,
port=database_port)
cursor = con.cursor()
chats_dict = get_db_chats_dict(cursor)
async def update():
await reader_module.init(client, con, cursor)
summary_read = 0
for key in chats_dict:
levent = f'читаем {chats_dict[key]}...'
print(levent)
logging.info(levent)
summary_read += await reader_module.read_chat(key)
print('')
levent = f'Всего прочитано: {set_num_printable(summary_read)}.'
print(levent)
logging.info(levent)
client.start()
client.loop.run_until_complete(update())
client.disconnect()
con.close()
# статистика базы данных
def print_stats():
con = psycopg2.connect(database=database,
user=database_user,
password=database_pass,
host=database_host,
port=database_port)
cursor = con.cursor()
cursor.execute('select count(chat_id), chat_id from chat_reader_mess group by chat_id')
chats_count = 0
messages = 0
for row in cursor.fetchall():
messages += row[0]
chats_count += 1
print(f'В базе {set_num_printable(messages)} сообщений из {chats_count} чатов.')
cursor.execute("select pg_total_relation_size('chat_reader_mess')")
row = cursor.fetchone()
size_of = row[0]
cursor.execute("select pg_total_relation_size('chat_reader_users')")
row = cursor.fetchone()
size_of += row[0]
cursor.execute("select pg_total_relation_size('chat_reader_channels')")
row = cursor.fetchone()
size_of += row[0]
size_mb = round(size_of / 1048576, 2)
print(f'Размер таблиц {set_num_printable(size_mb)} Мб.')
con.close()
# проверка конфигурации и создание таблиц в базе
def check_config():
valid_db = False
try:
con = psycopg2.connect(database=database,
user=database_user,
password=database_pass,
host=database_host,
port=database_port)
valid_db = True
cursor = con.cursor()
cursor.execute('''
create table if not exists chat_reader_users
(
user_id bigint not null
constraint chat_reader_users_pk
primary key,
first_name text,
last_name text,
user_name text,
phone bigint,
is_bot boolean,
is_dead boolean
);
create table if not exists chat_reader_channels
(
channel_id bigint not null
constraint chat_reader_channels_pk
primary key,
title text,
user_name text
);
create table if not exists chat_reader_mess
(
chat_id bigint not null,
message_id integer not null,
user_id bigint
constraint chat_reader_mess_chat_reader_users_user_id_fk
references chat_reader_users,
channel_id bigint
constraint chat_reader_mess_chat_reader_channels_channel_id_fk
references chat_reader_channels,
message_date timestamp(0),
grouped_id bigint,
reply_to integer,
reply_top integer,
fwd_from_channel_id bigint
constraint chat_reader_mess_chat_reader_channels_channel_id_fk_2
references chat_reader_channels,
fwd_from_channel_post integer,
fwd_from_user_id bigint
constraint chat_reader_mess_chat_reader_users_user_id_fk_2
references chat_reader_users,
fwd_from_name text,
message_txt text,
message_media text,
message_action text,
constraint chat_reader_pk
primary key (chat_id, message_id)
);
''')
con.commit()
con.close()
except Exception as e:
print(e)
valid_client = False
try:
client = TelegramClient('chat_reader', api_id, api_hash)
client.start()
print(f'версия telethon {client.__version__}')
valid_client = True
client.disconnect()
except Exception as e:
print(e)
toml_dict = {}
if valid_client and valid_db:
toml_dict['config'] = {'log_name': log_name, 'validated': True}
print('Все настройки корректны, можно скачивать чаты.')
else:
toml_dict['config'] = {'log_name': log_name, 'validated': False}
toml_dict['database'] = {'database': database, 'host': database_host, 'user': database_user,
'password': database_pass, 'port': database_port}
toml_dict['telegram'] = {'api_id': api_id, 'api_hash': api_hash}
file = open('chat_reader.toml', 'w')
file.write(toml.dumps(toml_dict))
file.close()
# добавление нового чата
def add_new(chat_name):
con = psycopg2.connect(database=database,
user=database_user,
password=database_pass,
host=database_host,
port=database_port)
cursor = con.cursor()
client = TelegramClient('chat_reader', api_id, api_hash)
chat_name = chat_name.lower()
async def add_chat():
try:
info = await client(functions.channels.GetFullChannelRequest(chat_name))
index = 0
if len(info.chats) > 1:
if info.chats[0].default_banned_rights is None:
index = 1
chat_id = info.chats[index].id
chat_title = info.chats[index].title
chat_username = info.chats[index].username
chats_dict = get_db_chats_dict(cursor)
if chat_id in chats_dict:
levent = f'Чат {chat_title} уже есть в базе.'
print(levent)
logging.info(levent)
else:
levent = f'Добавлен чат: id - {chat_id}, название - {chat_title}. читаем...'
print(levent)
logging.info(levent)
cursor.execute('select channel_id from chat_reader_channels where channel_id=%s', (chat_id,))
row = cursor.fetchone()
if row is None:
entry = (chat_id, chat_title, chat_username)
cursor.execute('insert into chat_reader_channels (channel_id, title, user_name) values (%s,%s,%s)',
entry)
con.commit()
await reader_module.init(client, con, cursor)
await reader_module.read_chat(chat_id)
except TypeError as e:
if str(e) == 'Cannot cast InputPeerUser to any kind of InputChannel.':
print(f'{chat_name} это имя пользователя, а не канала или чата.')
else:
print(e)
except ValueError as e:
if str(e) == f'No user has "{chat_name}" as username':
print(f'{chat_name} это имя не используется в telegram.')
elif str(e) == f'Cannot find any entity corresponding to "{chat_name}"':
print(f'{chat_name} это имя никогда не использовалось в telegram.')
else:
print(e)
except errors.ChannelPrivateError:
# noinspection SpellCheckingInspection
print(f'Чат/канал {chat_name} помечен как приватный. Возможно, вас там забанили.')
except Exception as e:
print(e)
client.start()
client.loop.run_until_complete(add_chat())
client.disconnect()
con.close()
# точка входа
if __name__ == '__main__':
param_dict = toml.load('chat_reader.toml')
config = param_dict['config']
db = param_dict['database']
tg = param_dict['telegram']
log_name = config['log_name']
database = db['database']
database_host = db['host']
database_user = db['user']
database_pass = db['password']
database_port = db['port']
api_id = tg['api_id']
api_hash = tg['api_hash']
print(f'Telegram chat reader {get_version()}.')
validated = config.get('validated')
if len(argv) == 1:
if validated is None:
print(check_mess)
exit(1)
elif validated is True:
print_stats()
exit(0)
else:
print('Проверьте правильность конфигурации chat_reader.toml и запустите с ключом --check')
exit(1)
if argv[1] == '--help':
print(help_mess)
elif argv[1] == '--check':
check_config()
elif argv[1] == '--all':
if validated is True:
lfile = logging.FileHandler(log_name, 'w', 'utf-8')
# noinspection SpellCheckingInspection
lfile.setFormatter(logging.Formatter('%(levelname)s %(module)-13s [%(asctime)s] %(message)s'))
logging.basicConfig(level=logging.INFO, handlers=[lfile])
update_all()
else:
print(check_mess)
elif argv[1] == '--chat':
if validated is True:
if len(argv) == 3:
lfile = logging.FileHandler(log_name, 'w', 'utf-8')
# noinspection SpellCheckingInspection
lfile.setFormatter(logging.Formatter('%(levelname)s %(module)-13s [%(asctime)s] %(message)s'))
logging.basicConfig(level=logging.INFO, handlers=[lfile])
add_new(argv[2])
else:
print('не указано имя чата/канала.')
else:
print(check_mess)
else:
print('команда не понята.')
print(help_mess)

13
chat_reader.toml Normal file
View File

@ -0,0 +1,13 @@
[config]
log_name = "reader.log"
[database]
database = "chat_reader"
host = "127.0.0.1"
user = "postgres"
password = "1234"
port = "5432"
[telegram]
api_id = 1234567
api_hash = "cxxxxxxcfffffffafgdfgwewer"

185
reader_module.py Normal file
View File

@ -0,0 +1,185 @@
# Chat reader module v1.00
# 21/12/2021
# https://t.me/ssleg © 2021
import logging
from asyncio import sleep
from datetime import datetime
from psycopg2 import extensions
from telethon import TelegramClient, types, errors
user_dict = {}
channel_dict = {}
client: TelegramClient
con: extensions.connection
cursor: extensions.cursor
# обновление информации о пользователях
async def update_user(user_id):
user_info = await client.get_entity(user_id)
user_dict[user_id] = 0
entry = (user_id, user_info.first_name, user_info.last_name, user_info.username, user_info.phone, user_info.bot,
user_info.deleted)
cursor.execute('''insert into chat_reader_users (user_id, first_name, last_name, user_name, phone, is_bot, is_dead)
values (%s, %s, %s, %s, %s, %s, %s)''', entry)
con.commit()
# обновление информации о каналах/чатах
async def update_channel(channel_id):
channel_dict[channel_id] = 0
try:
channel_info = await client.get_entity(channel_id)
entry = (channel_id, channel_info.title, channel_info.username)
cursor.execute('''insert into chat_reader_channels (channel_id, title, user_name)
values (%s, %s, %s)''', entry)
except errors.ChannelPrivateError:
entry = (channel_id, 'PRIVATE_CHANNEL')
cursor.execute('insert into chat_reader_channels (channel_id, title) VALUES (%s,%s)', entry)
except Exception as e:
levent = f'channel error {channel_id}, {e}'
print(levent)
logging.error(levent)
con.commit()
# загрузка в базу limit сообщений
async def read_messages(chat_id, start_id, limit):
read_count = 0
last_read_id = 0
async for message in client.iter_messages(chat_id, reverse=True, limit=limit, min_id=start_id):
read_count += 1
message_id = message.id
last_read_id = message_id
message_date = message.date
message_text = message.text
message_chat_id = message.peer_id.channel_id
message_from = message.from_id
user_id = None
channel_id = None
if type(message_from) == types.PeerChannel:
channel_id = message_from.channel_id
if channel_id not in channel_dict:
await update_channel(channel_id)
elif type(message_from) == types.PeerUser:
user_id = message_from.user_id
if user_id not in user_dict:
await update_user(user_id)
media_type = None
if message.media is not None:
media_type = str(message.media)
reply_to = None
reply_top = None
if message.reply_to is not None:
reply_to = message.reply_to.reply_to_msg_id
reply_top = message.reply_to.reply_to_top_id
if message.reply_to.reply_to_peer_id is not None:
levent = f'message_peer: {message.reply_to}, {message_id}'
print(levent)
logging.warning(levent)
fwd_from_user_id = None
fwd_from_channel_id = None
fwd_from_post = None
fwd_from_name = None
if message.fwd_from is not None:
fwd_from = message.fwd_from.from_id
if type(fwd_from) == types.PeerChannel:
fwd_from_channel_id = message.fwd_from.from_id.channel_id
fwd_from_post = message.fwd_from.channel_post
if fwd_from_channel_id not in channel_dict:
await update_channel(fwd_from_channel_id)
else:
if message.fwd_from.from_id is None:
fwd_from_name = message.fwd_from.from_name
else:
fwd_from_user_id = message.fwd_from.from_id.user_id
if fwd_from_user_id not in user_dict:
await update_user(fwd_from_user_id)
action = None
if message.action is not None:
action = str(message.action)
mess_grouped_id = message.grouped_id
entry = (message_chat_id, message_id, user_id, channel_id, message_date, mess_grouped_id, reply_to, reply_top,
fwd_from_channel_id, fwd_from_post, fwd_from_user_id, fwd_from_name, message_text, media_type, action)
cursor.execute('''insert into chat_reader_mess (chat_id, message_id, user_id, channel_id, message_date,
grouped_id, reply_to, reply_top, fwd_from_channel_id, fwd_from_channel_post, fwd_from_user_id,
fwd_from_name, message_txt, message_media, message_action)
values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', entry)
con.commit()
return read_count, last_read_id
# основной цикл чтения
async def read_chat(chat_id):
cursor.execute('select message_id from chat_reader_mess where chat_id= %s order by message_id desc limit 1',
(chat_id,))
row = cursor.fetchone()
if row is not None:
start_read = row[0]
else:
start_read = 1
summary_read = 0
read_count = 1
start_time = datetime.now()
low_read_count = 0
while read_count > 0:
read_count, start_read = await read_messages(chat_id, start_read, 1000)
summary_read += read_count
levent = f'прочитано сообщений в запросе- {read_count}, последний id - {start_read}. Суммарно - {summary_read}.'
print(levent)
logging.info(levent)
if read_count < 1000:
low_read_count += 1
if low_read_count < 5 and read_count > 0:
await sleep(7)
else:
read_count = 0
end_time = datetime.now()
run_time = end_time - start_time
run_seconds = round(run_time.total_seconds(), 2)
speed = round(summary_read / run_time.total_seconds(), 1)
# noinspection SpellCheckingInspection
levent = f'время закачки - {run_seconds} секунд. Cкорость {speed} сообщений/с.'
print(levent)
logging.info(levent)
return summary_read
# загрузка словарей пользователей и каналов
async def init(tg_client, connection, con_cursor):
global client
global con
global cursor
client = tg_client
con = connection
cursor = con_cursor
cursor.execute('select user_id from chat_reader_users')
for user in cursor.fetchall():
user_dict[user[0]] = 0
cursor.execute('select channel_id from chat_reader_channels')
for channel in cursor.fetchall():
channel_dict[channel[0]] = 0
levent = f'словарь юзеров - {len(user_dict)}, каналов - {len(channel_dict)}.'
print(levent)
logging.info(levent)