Files
batch-bot/bot/worker.py
bilal bb27161524 Final release: Multi-session comment bot with filtering
Features:
- Multi-account support (session files)
- AI comments via Ollama
- Telegram bot moderation
- Filter by sessions and groups
- Docker support
- Auto-join groups
- Log notifications
- DB migration script

Bug fixes:
- Fixed comment_to for proper post targeting
- Fixed entity lookup with multiple ID formats
- Fixed callback handlers for filtering
- Added auto-join before entity lookup
2026-02-28 01:44:40 +03:00

462 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
import random
import sqlite3
from datetime import datetime, timedelta
from telethon import TelegramClient, events
from telethon.tl.types import PeerChannel
from bot.config import (
API_ID, API_HASH, LOG_GROUP_ID,
INITIAL_SCAN_LIMIT, COMMENT_DELAY_MIN, COMMENT_DELAY_MAX
)
from bot.db import (
init_db, save_comment, get_comment, get_comments_for_post,
update_comment_status, update_comment_sent, get_active_sessions,
update_stats, get_pending_comments, get_target_groups, is_target_group, DB_PATH
)
from bot.ollama import generate_comment
from bot.session_manager import session_manager
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('worker')
class CommentWorker:
"""Воркер для отправки комментариев от имени пользователей"""
def __init__(self):
self.clients: dict[str, TelegramClient] = {}
self.log_client: TelegramClient = None # Клиент для отправки логов
self.running = False
async def start(self):
"""Запуск воркера"""
logger.info("Запуск Comment Worker...")
# Инициализация БД
init_db()
# Подключение сессий
self.clients = await session_manager.create_all_clients()
logger.info(f"Подключено сессий: {len(self.clients)}")
if not self.clients:
logger.warning("Нет активных сессий. Добавьте файлы в sessions/")
return
# Создаём клиента для отправки логов (используем первую сессию)
if self.clients:
first_session = list(self.clients.keys())[0]
self.log_client = self.clients[first_session]
logger.info(f"Лог-клиент: {first_session}")
self.running = True
# Отправляем уведомление о запуске
await self.send_log_message("🚀 Worker запущен")
# Запускаем задачи параллельно
await asyncio.gather(
self.listen_for_new_posts(),
self.monitor_approved_comments()
)
async def send_log_message(self, message: str):
"""Отправка сообщения в лог-группу"""
if not self.log_client or not LOG_GROUP_ID:
return
try:
await self.log_client.send_message(
int(LOG_GROUP_ID),
message,
parse_mode='HTML'
)
except Exception as e:
logger.debug(f"Не удалось отправить лог: {e}")
async def stop(self):
"""Остановка воркера"""
logger.info("Остановка Comment Worker...")
self.running = False
await session_manager.disconnect_all()
async def scan_previous_messages(self):
"""Сканирование предыдущих сообщений"""
logger.info(f"Сканирование {INITIAL_SCAN_LIMIT} сообщений...")
# Получаем список целевых групп из БД
target_groups = get_target_groups()
if not target_groups:
logger.warning("Нет групп в БД! Добавьте группу через бота: /add_group ID")
return
logger.info(f"Мониторинг групп: {[g['group_id'] for g in target_groups]}")
for group in target_groups:
group_id = group['group_id']
for session_file, client in self.clients.items():
try:
target_group = await client.get_entity(PeerChannel(abs(int(group_id))))
messages = await client.get_messages(target_group, limit=INITIAL_SCAN_LIMIT)
logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений из группы {group_id}")
for message in reversed(messages):
if not self.running:
break
await self.process_message(message, session_file, client, group_id)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}")
async def process_message(self, message, session_file: str, client: TelegramClient, group_id: int = None, comments_group_id: int = None):
"""Обработка сообщения"""
try:
if not message.text or len(message.text) < 10:
return
if not message.replies or not message.replies.comments:
logger.info(f"Сообщение {message.id} не поддерживает комментарии")
return
msg_comments_group_id = message.replies.channel_id
if not msg_comments_group_id:
msg_comments_group_id = comments_group_id
if not msg_comments_group_id:
logger.error(f"Не удалось получить ID группы комментариев для {message.id}")
return
existing = get_comment(message.id, msg_comments_group_id, session_file)
if existing:
return
from telethon.tl.functions.channels import GetFullChannelRequest
channel_id = None
try:
comments_group = await client.get_entity(PeerChannel(abs(int(msg_comments_group_id))))
channel_full = await client(GetFullChannelRequest(comments_group))
if channel_full.full_chat.linked_chat_id:
channel_id = abs(int(channel_full.full_chat.linked_chat_id))
logger.info(f"Linked chat для {msg_comments_group_id}: {channel_id}")
except Exception as e:
logger.warning(f"Не удалось получить linked_chat: {e}")
channel_id = group_id
logger.info(f"Генерация комментария для сообщения {message.id} ({session_file})")
comment_text = await generate_comment(message.text)
if not comment_text:
logger.error(f"Не удалось сгенерировать комментарий для {message.id}")
return
save_comment(
message.id,
msg_comments_group_id,
comment_text,
session_file,
message.text,
channel_id
)
update_stats(
datetime.now().strftime('%Y-%m-%d'),
session_file,
'generated'
)
logger.info(f"Комментарий сохранён: message_id={message.id}, session={session_file}")
except Exception as e:
logger.error(f"Ошибка обработки сообщения: {e}")
async def listen_for_new_posts(self):
"""Прослушивание новых постов с авто-обновлением списка групп"""
logger.info("Запуск прослушивания новых постов...")
last_groups_check = 0
group_handlers = {}
scanned_groups = set()
while self.running:
current_time = datetime.now().timestamp()
# Проверяем новые группы каждые 10 секунд
if current_time - last_groups_check > 10:
target_groups = get_target_groups()
new_group_ids = [str(g['group_id']) for g in target_groups]
# Добавляем обработчики для новых групп
for group_id in new_group_ids:
if group_id not in group_handlers:
logger.info(f" Добавлена группа для мониторинга: {group_id}")
# Сканируем последние 10 сообщений новой группы
await self.scan_group(group_id, limit=10)
scanned_groups.add(group_id)
# Регистрируем обработчик для каждой сессии
for session_file, client in self.clients.items():
# Проверяем что клиент подключён
if not client.is_connected():
logger.warning(f"Клиент {session_file} не подключён")
continue
# Регистрируем обработчик
@client.on(events.NewMessage(chats=group_id))
async def handle_new_post(event):
message = event.message
logger.info(f"📬 Новый пост в группе {group_id}: {message.id}")
# Проверяем что это не бот
if message.from_id and message.from_id.user_id == (await client.get_me()).id:
return
# Отправляем уведомление в лог-группу
post_preview = message.text[:100] + "..." if len(message.text or "") > 100 else (message.text or "Без текста")
log_message = (
f"📬 <b>Новый пост</b>\n\n"
f"📢 Группа: <code>{group_id}</code>\n"
f"🔗 Пост: <a href='https://t.me/c/{str(group_id).lstrip('-')}/{message.id}'>{message.id}</a>\n"
f"📄 Текст: <i>{post_preview}</i>"
)
await self.send_log_message(log_message)
await self.process_message(message, session_file, client, group_id)
group_handlers[group_id] = True
logger.info(f"✅ Обработчик зарегистрирован для {group_id}")
# Удаляем обработчики для удалённых групп
for group_id in list(group_handlers.keys()):
if group_id not in new_group_ids:
logger.info(f"❌ Группа {group_id} удалена из мониторинга")
del group_handlers[group_id]
scanned_groups.discard(group_id)
last_groups_check = current_time
await asyncio.sleep(5)
async def scan_group(self, group_id: int | str, limit: int = 10):
"""Сканирование последней сообщений из группы/канала"""
logger.info(f"Сканирование {limit} сообщений из группы {group_id}...")
for session_file, client in self.clients.items():
try:
target_channel = None
comments_group_id = None
try:
target_channel = await client.get_entity(group_id)
except:
async for dialog in client.iter_dialogs():
dialog_id = dialog.id
dialog_username = getattr(dialog.entity, 'username', None) if hasattr(dialog, 'entity') and dialog.entity else None
if str(dialog_id) == str(group_id) or dialog_username == str(group_id).lstrip('@'):
target_channel = dialog.entity
break
if not target_channel:
logger.error(f"Не удалось найти группу/канал {group_id}")
continue
group_name = getattr(target_channel, 'title', None)
group_username = getattr(target_channel, 'username', None)
from telethon.tl.functions.channels import GetFullChannelRequest
try:
channel_full = await client(GetFullChannelRequest(target_channel))
if hasattr(channel_full.full_chat, 'linked_chat_id') and channel_full.full_chat.linked_chat_id:
comments_group_id = channel_full.full_chat.linked_chat_id
logger.info(f"Канал {group_id} имеет группу комментариев: {comments_group_id}")
try:
comments_group = await client.get_entity(comments_group_id)
if getattr(comments_group, 'username', None):
await session_manager.join_channel(session_file, comments_group_id)
logger.info(f"Вступил в группу комментариев: {comments_group_id}")
except Exception as e:
logger.warning(f"Не удалось вступить в группу комментариев: {e}")
except Exception as e:
logger.warning(f"Не удалось получить linked_chat: {e}")
from bot.db import add_target_group
add_target_group(group_id, group_name, group_username, 'channel', comments_group_id)
logger.info(f"Обновлена информация: {group_name}, comments_group={comments_group_id}")
messages = await client.get_messages(target_channel, limit=limit)
logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений")
for message in reversed(messages):
if not self.running:
break
await self.process_message(message, session_file, client, group_id, comments_group_id)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}")
async def monitor_approved_comments(self):
"""Мониторинг одобренных комментариев"""
logger.info("Запуск мониторинга одобренных комментариев...")
while self.running:
try:
conn = None
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM comments
WHERE status = 'approved' AND sent_message_id IS NULL
ORDER BY created_at
LIMIT 10
''')
comments = [dict(row) for row in cursor.fetchall()]
finally:
if conn:
conn.close()
if comments:
logger.info(f"Найдено {len(comments)} одобренных комментариев для отправки")
for comment in comments:
if not self.running:
break
await self.send_comment(comment)
await asyncio.sleep(2)
else:
logger.debug("Нет комментариев для отправки")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"Ошибка мониторинга: {e}")
await asyncio.sleep(10)
async def send_comment(self, comment: dict):
"""Отправка комментария"""
session_file = comment.get('session_file')
if not session_file:
logger.error(f"Нет session_file для комментария {comment['id']}")
return
client = self.clients.get(session_file)
if not client:
client = await session_manager.get_client(session_file)
if not client:
logger.error(f"Не удалось подключить сессию {session_file}")
update_comment_status(comment['id'], 'rejected')
return
try:
comments_group_id = abs(int(comment['chat_id']))
message_id_in_channel = comment['message_id']
channel_id = comment.get('channel_id')
logger.info(f"Отправка комментария {comment['id']}: message_id={message_id_in_channel}, comments_group={comments_group_id}, channel={channel_id}")
# Вступаем в группу комментариев ПЕРЕД тем как получать сущность
try:
await session_manager.join_channel(session_file, comments_group_id)
logger.info(f"✅ Вступил в группу комментариев: {comments_group_id}")
except Exception as e:
logger.debug(f"Уже в группе: {e}")
# Теперь получаем сущность группы комментариев
comments_group = None
for try_id in [comments_group_id, abs(int(comments_group_id)), -abs(int(comments_group_id))]:
try:
comments_group = await client.get_entity(PeerChannel(abs(int(try_id))))
logger.info(f"Группа комментариев: {comments_group.title} (ID: {try_id})")
break
except Exception as e:
logger.debug(f"Не удалось получить сущность {try_id}: {e}")
continue
if not comments_group:
logger.error(f"Не удалось найти группу комментариев {comments_group_id}")
return
from telethon.tl.functions.channels import GetFullChannelRequest
channel_full = await client(GetFullChannelRequest(comments_group))
linked_chat_id = channel_full.full_chat.linked_chat_id
if not linked_chat_id:
logger.error("Не удалось найти связанный канал")
return
logger.info(f"Linked chat ID: {linked_chat_id}")
# Получаем сообщение в КАНАЛЕ
target_message_in_channel = None
for try_id in [linked_chat_id, abs(int(linked_chat_id)), -abs(int(linked_chat_id))]:
try:
target_message_in_channel = await client.get_messages(
PeerChannel(abs(int(try_id))),
ids=message_id_in_channel
)
if target_message_in_channel:
logger.info(f"Найдено сообщение в канале: {target_message_in_channel.id}")
break
except Exception as e:
logger.debug(f"Не удалось получить сообщение {try_id}/{message_id_in_channel}: {e}")
continue
if not target_message_in_channel:
logger.error(f"Сообщение {message_id_in_channel} не найдено в канале {linked_chat_id}")
return
# ОТПРАВЛЯЕМ В КАНАЛ с comment_to= (как в старом проекте)
# Telegram автоматически направит комментарий в группу комментариев
delay = random.uniform(COMMENT_DELAY_MIN, COMMENT_DELAY_MAX)
logger.info(f"Задержка {delay:.1f}с...")
await asyncio.sleep(delay)
sent_message = await client.send_message(
PeerChannel(abs(int(linked_chat_id))), # КАНАЛ
comment['comment_text'],
comment_to=target_message_in_channel.id # ID в КАНАЛЕ
)
logger.info(f"✅ Отправлен в канал как comment_to {target_message_in_channel.id} -> {sent_message.id}")
logger.info(f"Комментарий отправлен: {comment['id']} -> message {sent_message.id}")
update_comment_sent(comment['id'], sent_message.id)
update_stats(datetime.now().strftime('%Y-%m-%d'), session_file, 'sent')
except Exception as e:
logger.error(f"Ошибка отправки комментария: {e}")
update_comment_status(comment['id'], 'pending')
async def main():
"""Точка входа"""
worker = CommentWorker()
try:
await worker.start()
except KeyboardInterrupt:
await worker.stop()
except Exception as e:
logger.error(f"Критическая ошибка: {e}")
await worker.stop()
if __name__ == '__main__':
asyncio.run(main())