import asyncio import logging import random import sqlite3 from datetime import datetime, timedelta from telethon import TelegramClient, events from telethon.tl.types import PeerChannel from bot.config import ( API_ID, API_HASH, LOG_GROUP_ID, INITIAL_SCAN_LIMIT, COMMENT_DELAY_MIN, COMMENT_DELAY_MAX ) from bot.db import ( init_db, save_comment, get_comment, get_comments_for_post, update_comment_status, update_comment_sent, get_active_sessions, update_stats, get_pending_comments, get_target_groups, is_target_group, DB_PATH ) from bot.ollama import generate_comment from bot.session_manager import session_manager logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('worker') class CommentWorker: """Воркер для отправки комментариев от имени пользователей""" def __init__(self): self.clients: dict[str, TelegramClient] = {} self.log_client: TelegramClient = None # Клиент для отправки логов self.running = False async def start(self): """Запуск воркера""" logger.info("Запуск Comment Worker...") # Инициализация БД init_db() # Подключение сессий self.clients = await session_manager.create_all_clients() logger.info(f"Подключено сессий: {len(self.clients)}") if not self.clients: logger.warning("Нет активных сессий. Добавьте файлы в sessions/") return # Создаём клиента для отправки логов (используем первую сессию) if self.clients: first_session = list(self.clients.keys())[0] self.log_client = self.clients[first_session] logger.info(f"Лог-клиент: {first_session}") self.running = True # Отправляем уведомление о запуске await self.send_log_message("🚀 Worker запущен") # Запускаем задачи параллельно await asyncio.gather( self.listen_for_new_posts(), self.monitor_approved_comments() ) async def send_log_message(self, message: str): """Отправка сообщения в лог-группу""" if not self.log_client or not LOG_GROUP_ID: return try: await self.log_client.send_message( int(LOG_GROUP_ID), message, parse_mode='HTML' ) except Exception as e: logger.debug(f"Не удалось отправить лог: {e}") async def stop(self): """Остановка воркера""" logger.info("Остановка Comment Worker...") self.running = False await session_manager.disconnect_all() async def scan_previous_messages(self): """Сканирование предыдущих сообщений""" logger.info(f"Сканирование {INITIAL_SCAN_LIMIT} сообщений...") # Получаем список целевых групп из БД target_groups = get_target_groups() if not target_groups: logger.warning("Нет групп в БД! Добавьте группу через бота: /add_group ID") return logger.info(f"Мониторинг групп: {[g['group_id'] for g in target_groups]}") for group in target_groups: group_id = group['group_id'] for session_file, client in self.clients.items(): try: target_group = await client.get_entity(PeerChannel(abs(int(group_id)))) messages = await client.get_messages(target_group, limit=INITIAL_SCAN_LIMIT) logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений из группы {group_id}") for message in reversed(messages): if not self.running: break await self.process_message(message, session_file, client, group_id) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}") async def process_message(self, message, session_file: str, client: TelegramClient, group_id: int = None, comments_group_id: int = None): """Обработка сообщения""" try: if not message.text or len(message.text) < 10: return if not message.replies or not message.replies.comments: logger.info(f"Сообщение {message.id} не поддерживает комментарии") return msg_comments_group_id = message.replies.channel_id if not msg_comments_group_id: msg_comments_group_id = comments_group_id if not msg_comments_group_id: logger.error(f"Не удалось получить ID группы комментариев для {message.id}") return existing = get_comment(message.id, msg_comments_group_id, session_file) if existing: return from telethon.tl.functions.channels import GetFullChannelRequest channel_id = None try: comments_group = await client.get_entity(PeerChannel(abs(int(msg_comments_group_id)))) channel_full = await client(GetFullChannelRequest(comments_group)) if channel_full.full_chat.linked_chat_id: channel_id = abs(int(channel_full.full_chat.linked_chat_id)) logger.info(f"Linked chat для {msg_comments_group_id}: {channel_id}") except Exception as e: logger.warning(f"Не удалось получить linked_chat: {e}") channel_id = group_id logger.info(f"Генерация комментария для сообщения {message.id} ({session_file})") comment_text = await generate_comment(message.text) if not comment_text: logger.error(f"Не удалось сгенерировать комментарий для {message.id}") return save_comment( message.id, msg_comments_group_id, comment_text, session_file, message.text, channel_id ) update_stats( datetime.now().strftime('%Y-%m-%d'), session_file, 'generated' ) logger.info(f"Комментарий сохранён: message_id={message.id}, session={session_file}") except Exception as e: logger.error(f"Ошибка обработки сообщения: {e}") async def listen_for_new_posts(self): """Прослушивание новых постов с авто-обновлением списка групп""" logger.info("Запуск прослушивания новых постов...") last_groups_check = 0 group_handlers = {} scanned_groups = set() while self.running: current_time = datetime.now().timestamp() # Проверяем новые группы каждые 10 секунд if current_time - last_groups_check > 10: target_groups = get_target_groups() new_group_ids = [str(g['group_id']) for g in target_groups] # Добавляем обработчики для новых групп for group_id in new_group_ids: if group_id not in group_handlers: logger.info(f"➕ Добавлена группа для мониторинга: {group_id}") # Сканируем последние 10 сообщений новой группы await self.scan_group(group_id, limit=10) scanned_groups.add(group_id) # Регистрируем обработчик для каждой сессии for session_file, client in self.clients.items(): # Проверяем что клиент подключён if not client.is_connected(): logger.warning(f"Клиент {session_file} не подключён") continue # Регистрируем обработчик @client.on(events.NewMessage(chats=group_id)) async def handle_new_post(event): message = event.message logger.info(f"📬 Новый пост в группе {group_id}: {message.id}") # Проверяем что это не бот if message.from_id and message.from_id.user_id == (await client.get_me()).id: return # Отправляем уведомление в лог-группу post_preview = message.text[:100] + "..." if len(message.text or "") > 100 else (message.text or "Без текста") log_message = ( f"📬 Новый пост\n\n" f"📢 Группа: {group_id}\n" f"🔗 Пост: {message.id}\n" f"📄 Текст: {post_preview}" ) await self.send_log_message(log_message) await self.process_message(message, session_file, client, group_id) group_handlers[group_id] = True logger.info(f"✅ Обработчик зарегистрирован для {group_id}") # Удаляем обработчики для удалённых групп for group_id in list(group_handlers.keys()): if group_id not in new_group_ids: logger.info(f"❌ Группа {group_id} удалена из мониторинга") del group_handlers[group_id] scanned_groups.discard(group_id) last_groups_check = current_time await asyncio.sleep(5) async def scan_group(self, group_id: int | str, limit: int = 10): """Сканирование последней сообщений из группы/канала""" logger.info(f"Сканирование {limit} сообщений из группы {group_id}...") for session_file, client in self.clients.items(): try: target_channel = None comments_group_id = None try: target_channel = await client.get_entity(group_id) except: async for dialog in client.iter_dialogs(): dialog_id = dialog.id dialog_username = getattr(dialog.entity, 'username', None) if hasattr(dialog, 'entity') and dialog.entity else None if str(dialog_id) == str(group_id) or dialog_username == str(group_id).lstrip('@'): target_channel = dialog.entity break if not target_channel: logger.error(f"Не удалось найти группу/канал {group_id}") continue group_name = getattr(target_channel, 'title', None) group_username = getattr(target_channel, 'username', None) from telethon.tl.functions.channels import GetFullChannelRequest try: channel_full = await client(GetFullChannelRequest(target_channel)) if hasattr(channel_full.full_chat, 'linked_chat_id') and channel_full.full_chat.linked_chat_id: comments_group_id = channel_full.full_chat.linked_chat_id logger.info(f"Канал {group_id} имеет группу комментариев: {comments_group_id}") try: comments_group = await client.get_entity(comments_group_id) if getattr(comments_group, 'username', None): await session_manager.join_channel(session_file, comments_group_id) logger.info(f"Вступил в группу комментариев: {comments_group_id}") except Exception as e: logger.warning(f"Не удалось вступить в группу комментариев: {e}") except Exception as e: logger.warning(f"Не удалось получить linked_chat: {e}") from bot.db import add_target_group add_target_group(group_id, group_name, group_username, 'channel', comments_group_id) logger.info(f"Обновлена информация: {group_name}, comments_group={comments_group_id}") messages = await client.get_messages(target_channel, limit=limit) logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений") for message in reversed(messages): if not self.running: break await self.process_message(message, session_file, client, group_id, comments_group_id) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}") async def monitor_approved_comments(self): """Мониторинг одобренных комментариев""" logger.info("Запуск мониторинга одобренных комментариев...") while self.running: try: conn = None try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT * FROM comments WHERE status = 'approved' AND sent_message_id IS NULL ORDER BY created_at LIMIT 10 ''') comments = [dict(row) for row in cursor.fetchall()] finally: if conn: conn.close() if comments: logger.info(f"Найдено {len(comments)} одобренных комментариев для отправки") for comment in comments: if not self.running: break await self.send_comment(comment) await asyncio.sleep(2) else: logger.debug("Нет комментариев для отправки") await asyncio.sleep(10) except Exception as e: logger.error(f"Ошибка мониторинга: {e}") await asyncio.sleep(10) async def send_comment(self, comment: dict): """Отправка комментария""" session_file = comment.get('session_file') if not session_file: logger.error(f"Нет session_file для комментария {comment['id']}") return client = self.clients.get(session_file) if not client: client = await session_manager.get_client(session_file) if not client: logger.error(f"Не удалось подключить сессию {session_file}") update_comment_status(comment['id'], 'rejected') return try: comments_group_id = abs(int(comment['chat_id'])) message_id_in_channel = comment['message_id'] channel_id = comment.get('channel_id') logger.info(f"Отправка комментария {comment['id']}: message_id={message_id_in_channel}, comments_group={comments_group_id}, channel={channel_id}") # Вступаем в группу комментариев ПЕРЕД тем как получать сущность try: await session_manager.join_channel(session_file, comments_group_id) logger.info(f"✅ Вступил в группу комментариев: {comments_group_id}") except Exception as e: logger.debug(f"Уже в группе: {e}") # Теперь получаем сущность группы комментариев comments_group = None for try_id in [comments_group_id, abs(int(comments_group_id)), -abs(int(comments_group_id))]: try: comments_group = await client.get_entity(PeerChannel(abs(int(try_id)))) logger.info(f"Группа комментариев: {comments_group.title} (ID: {try_id})") break except Exception as e: logger.debug(f"Не удалось получить сущность {try_id}: {e}") continue if not comments_group: logger.error(f"Не удалось найти группу комментариев {comments_group_id}") return from telethon.tl.functions.channels import GetFullChannelRequest channel_full = await client(GetFullChannelRequest(comments_group)) linked_chat_id = channel_full.full_chat.linked_chat_id if not linked_chat_id: logger.error("Не удалось найти связанный канал") return logger.info(f"Linked chat ID: {linked_chat_id}") # Получаем сообщение в КАНАЛЕ target_message_in_channel = None for try_id in [linked_chat_id, abs(int(linked_chat_id)), -abs(int(linked_chat_id))]: try: target_message_in_channel = await client.get_messages( PeerChannel(abs(int(try_id))), ids=message_id_in_channel ) if target_message_in_channel: logger.info(f"Найдено сообщение в канале: {target_message_in_channel.id}") break except Exception as e: logger.debug(f"Не удалось получить сообщение {try_id}/{message_id_in_channel}: {e}") continue if not target_message_in_channel: logger.error(f"Сообщение {message_id_in_channel} не найдено в канале {linked_chat_id}") return # ОТПРАВЛЯЕМ В КАНАЛ с comment_to= (как в старом проекте) # Telegram автоматически направит комментарий в группу комментариев delay = random.uniform(COMMENT_DELAY_MIN, COMMENT_DELAY_MAX) logger.info(f"Задержка {delay:.1f}с...") await asyncio.sleep(delay) sent_message = await client.send_message( PeerChannel(abs(int(linked_chat_id))), # КАНАЛ comment['comment_text'], comment_to=target_message_in_channel.id # ID в КАНАЛЕ ) logger.info(f"✅ Отправлен в канал как comment_to {target_message_in_channel.id} -> {sent_message.id}") logger.info(f"Комментарий отправлен: {comment['id']} -> message {sent_message.id}") update_comment_sent(comment['id'], sent_message.id) update_stats(datetime.now().strftime('%Y-%m-%d'), session_file, 'sent') except Exception as e: logger.error(f"Ошибка отправки комментария: {e}") update_comment_status(comment['id'], 'pending') async def main(): """Точка входа""" worker = CommentWorker() try: await worker.start() except KeyboardInterrupt: await worker.stop() except Exception as e: logger.error(f"Критическая ошибка: {e}") await worker.stop() if __name__ == '__main__': asyncio.run(main())