import asyncio import logging import random import sqlite3 from datetime import datetime, timedelta from telethon import TelegramClient, events from telethon.tl.types import PeerChannel from bot.config import ( API_ID, API_HASH, LOG_GROUP_ID, INITIAL_SCAN_LIMIT, COMMENT_DELAY_MIN, COMMENT_DELAY_MAX ) from bot.db import ( init_db, save_comment, get_comment, get_comments_for_post, update_comment_status, update_comment_sent, get_active_sessions, update_stats, get_pending_comments, get_target_groups, is_target_group, DB_PATH ) from bot.ollama import generate_comment from bot.session_manager import session_manager logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('worker') class CommentWorker: """Воркер для отправки комментариев от имени пользователей""" def __init__(self): self.clients: dict[str, TelegramClient] = {} self.running = False async def start(self): """Запуск воркера""" logger.info("Запуск Comment Worker...") # Инициализация БД init_db() # Подключение сессий self.clients = await session_manager.create_all_clients() logger.info(f"Подключено сессий: {len(self.clients)}") if not self.clients: logger.warning("Нет активных сессий. Добавьте файлы в sessions/") return self.running = True # Запускаем задачи параллельно await asyncio.gather( self.listen_for_new_posts(), self.monitor_approved_comments() ) async def stop(self): """Остановка воркера""" logger.info("Остановка Comment Worker...") self.running = False await session_manager.disconnect_all() async def scan_previous_messages(self): """Сканирование предыдущих сообщений""" logger.info(f"Сканирование {INITIAL_SCAN_LIMIT} сообщений...") # Получаем список целевых групп из БД target_groups = get_target_groups() if not target_groups: logger.warning("Нет групп в БД! Добавьте группу через бота: /add_group ID") return logger.info(f"Мониторинг групп: {[g['group_id'] for g in target_groups]}") for group in target_groups: group_id = group['group_id'] for session_file, client in self.clients.items(): try: target_group = await client.get_entity(PeerChannel(abs(int(group_id)))) messages = await client.get_messages(target_group, limit=INITIAL_SCAN_LIMIT) logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений из группы {group_id}") for message in reversed(messages): if not self.running: break await self.process_message(message, session_file, client, group_id) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}") async def process_message(self, message, session_file: str, client: TelegramClient, group_id: int = None, comments_group_id: int = None): """Обработка сообщения""" try: if not message.text or len(message.text) < 10: return if not message.replies or not message.replies.comments: logger.info(f"Сообщение {message.id} не поддерживает комментарии") return msg_comments_group_id = message.replies.channel_id if not msg_comments_group_id: msg_comments_group_id = comments_group_id if not msg_comments_group_id: logger.error(f"Не удалось получить ID группы комментариев для {message.id}") return existing = get_comment(message.id, msg_comments_group_id, session_file) if existing: return from telethon.tl.functions.channels import GetFullChannelRequest channel_id = None try: comments_group = await client.get_entity(PeerChannel(abs(int(msg_comments_group_id)))) channel_full = await client(GetFullChannelRequest(comments_group)) if channel_full.full_chat.linked_chat_id: channel_id = abs(int(channel_full.full_chat.linked_chat_id)) logger.info(f"Linked chat для {msg_comments_group_id}: {channel_id}") except Exception as e: logger.warning(f"Не удалось получить linked_chat: {e}") channel_id = group_id logger.info(f"Генерация комментария для сообщения {message.id} ({session_file})") comment_text = await generate_comment(message.text) if not comment_text: logger.error(f"Не удалось сгенерировать комментарий для {message.id}") return save_comment( message.id, msg_comments_group_id, comment_text, session_file, message.text, channel_id ) update_stats( datetime.now().strftime('%Y-%m-%d'), session_file, 'generated' ) logger.info(f"Комментарий сохранён: message_id={message.id}, session={session_file}") except Exception as e: logger.error(f"Ошибка обработки сообщения: {e}") async def listen_for_new_posts(self): """Прослушивание новых постов с авто-обновлением списка групп""" logger.info("Запуск прослушивания новых постов...") last_groups_check = 0 group_handlers = {} scanned_groups = set() while self.running: current_time = datetime.now().timestamp() if current_time - last_groups_check > 10: target_groups = get_target_groups() new_group_ids = [str(g['group_id']) for g in target_groups] # Проверяем, есть ли группы которые были удалены и добавлены заново for group_id in new_group_ids: if group_id not in group_handlers: logger.info(f"Добавлена группа для мониторинга: {group_id}") # Сканируем последние 10 сообщений новой группы # Всегда сканируем при добавлении, даже если уже было await self.scan_group(group_id, limit=10) scanned_groups.add(group_id) for session_file, client in self.clients.items(): @client.on(events.NewMessage(chats=group_id)) async def handle_new_post(event): message = event.message logger.info(f"Новый пост в группе {group_id}: {message.id}") await self.process_message(message, session_file, client, group_id) group_handlers[group_id] = True else: # Группа уже обрабатывается, но проверяем не была ли она удалена и добавлена снова # Если была - очищаем scanned_groups для этой группы if group_id in scanned_groups: # Проверяем что группа всё ещё в БД group_exists = any(str(g['group_id']) == group_id for g in target_groups) if not group_exists: scanned_groups.discard(group_id) del group_handlers[group_id] logger.info(f"Группа {group_id} удалена, сброс кэша") # Удаляем обработчики для удалённых групп for group_id in list(group_handlers.keys()): if group_id not in new_group_ids: del group_handlers[group_id] scanned_groups.discard(group_id) logger.info(f"Группа {group_id} удалена из мониторинга") last_groups_check = current_time await asyncio.sleep(5) async def scan_group(self, group_id: int | str, limit: int = 10): """Сканирование последней сообщений из группы/канала""" logger.info(f"Сканирование {limit} сообщений из группы {group_id}...") for session_file, client in self.clients.items(): try: target_channel = None comments_group_id = None try: target_channel = await client.get_entity(group_id) except: async for dialog in client.iter_dialogs(): dialog_id = dialog.id dialog_username = getattr(dialog.entity, 'username', None) if hasattr(dialog, 'entity') and dialog.entity else None if str(dialog_id) == str(group_id) or dialog_username == str(group_id).lstrip('@'): target_channel = dialog.entity break if not target_channel: logger.error(f"Не удалось найти группу/канал {group_id}") continue group_name = getattr(target_channel, 'title', None) group_username = getattr(target_channel, 'username', None) from telethon.tl.functions.channels import GetFullChannelRequest try: channel_full = await client(GetFullChannelRequest(target_channel)) if hasattr(channel_full.full_chat, 'linked_chat_id') and channel_full.full_chat.linked_chat_id: comments_group_id = channel_full.full_chat.linked_chat_id logger.info(f"Канал {group_id} имеет группу комментариев: {comments_group_id}") try: comments_group = await client.get_entity(comments_group_id) if getattr(comments_group, 'username', None): await session_manager.join_channel(session_file, comments_group_id) logger.info(f"Вступил в группу комментариев: {comments_group_id}") except Exception as e: logger.warning(f"Не удалось вступить в группу комментариев: {e}") except Exception as e: logger.warning(f"Не удалось получить linked_chat: {e}") from bot.db import add_target_group add_target_group(group_id, group_name, group_username, 'channel', comments_group_id) logger.info(f"Обновлена информация: {group_name}, comments_group={comments_group_id}") messages = await client.get_messages(target_channel, limit=limit) logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений") for message in reversed(messages): if not self.running: break await self.process_message(message, session_file, client, group_id, comments_group_id) await asyncio.sleep(0.5) except Exception as e: logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}") async def monitor_approved_comments(self): """Мониторинг одобренных комментариев""" logger.info("Запуск мониторинга одобренных комментариев...") while self.running: try: conn = None try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT * FROM comments WHERE status = 'approved' AND sent_message_id IS NULL ORDER BY created_at LIMIT 10 ''') comments = [dict(row) for row in cursor.fetchall()] finally: if conn: conn.close() if comments: logger.info(f"Найдено {len(comments)} одобренных комментариев для отправки") for comment in comments: if not self.running: break await self.send_comment(comment) await asyncio.sleep(2) else: logger.debug("Нет комментариев для отправки") await asyncio.sleep(10) except Exception as e: logger.error(f"Ошибка мониторинга: {e}") await asyncio.sleep(10) async def send_comment(self, comment: dict): """Отправка комментария""" session_file = comment.get('session_file') if not session_file: logger.error(f"Нет session_file для комментария {comment['id']}") return client = self.clients.get(session_file) if not client: client = await session_manager.get_client(session_file) if not client: logger.error(f"Не удалось подключить сессию {session_file}") update_comment_status(comment['id'], 'rejected') return try: comments_group_id = abs(int(comment['chat_id'])) message_id_in_channel = comment['message_id'] channel_id = comment.get('channel_id') logger.info(f"Отправка комментария {comment['id']}: message_id={message_id_in_channel}, comments_group={comments_group_id}, channel={channel_id}") comments_group = await client.get_entity(PeerChannel(comments_group_id)) logger.info(f"Группа комментариев: {comments_group.title} (ID: {comments_group_id})") from telethon.tl.functions.channels import GetFullChannelRequest channel_full = await client(GetFullChannelRequest(comments_group)) linked_chat_id = channel_full.full_chat.linked_chat_id if not linked_chat_id: logger.error("Не удалось найти связанный канал") return logger.info(f"Linked chat ID: {linked_chat_id}") # Получаем сообщение в КАНАЛЕ target_message_in_channel = await client.get_messages( PeerChannel(abs(int(linked_chat_id))), ids=message_id_in_channel ) if not target_message_in_channel: logger.error(f"Сообщение {message_id_in_channel} не найдено в канале {linked_chat_id}") return logger.info(f"Найдено сообщение в канале: {target_message_in_channel.id}") # Вступаем в группу комментариев (если ещё не вступили) # Это требуется для некоторых каналов try: await session_manager.join_channel(session_file, comments_group_id) logger.info(f"Вступил в группу комментариев: {comments_group_id}") except Exception as e: logger.warning(f"Не удалось вступить в группу комментариев: {e}") # ОТПРАВЛЯЕМ В КАНАЛ с comment_to= (как в старом проекте) # Telegram автоматически направит комментарий в группу комментариев delay = random.uniform(COMMENT_DELAY_MIN, COMMENT_DELAY_MAX) logger.info(f"Задержка {delay:.1f}с...") await asyncio.sleep(delay) sent_message = await client.send_message( PeerChannel(abs(int(linked_chat_id))), # КАНАЛ comment['comment_text'], comment_to=target_message_in_channel.id # ID в КАНАЛЕ ) logger.info(f"✅ Отправлен в канал как comment_to {target_message_in_channel.id} -> {sent_message.id}") logger.info(f"Комментарий отправлен: {comment['id']} -> message {sent_message.id}") update_comment_sent(comment['id'], sent_message.id) update_stats(datetime.now().strftime('%Y-%m-%d'), session_file, 'sent') except Exception as e: logger.error(f"Ошибка отправки комментария: {e}") update_comment_status(comment['id'], 'pending') async def main(): """Точка входа""" worker = CommentWorker() try: await worker.start() except KeyboardInterrupt: await worker.stop() except Exception as e: logger.error(f"Критическая ошибка: {e}") await worker.stop() if __name__ == '__main__': asyncio.run(main())