import asyncio
import logging
import random
import sqlite3
from datetime import datetime, timedelta
from telethon import TelegramClient, events
from telethon.tl.types import PeerChannel
from bot.config import (
API_ID, API_HASH, LOG_GROUP_ID,
INITIAL_SCAN_LIMIT, COMMENT_DELAY_MIN, COMMENT_DELAY_MAX
)
from bot.db import (
init_db, save_comment, get_comment, get_comments_for_post,
update_comment_status, update_comment_sent, get_active_sessions,
update_stats, get_pending_comments, get_target_groups, is_target_group, DB_PATH
)
from bot.ollama import generate_comment
from bot.session_manager import session_manager
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('worker')
class CommentWorker:
"""Воркер для отправки комментариев от имени пользователей"""
def __init__(self):
self.clients: dict[str, TelegramClient] = {}
self.log_client: TelegramClient = None # Клиент для отправки логов
self.running = False
async def start(self):
"""Запуск воркера"""
logger.info("Запуск Comment Worker...")
# Инициализация БД
init_db()
# Подключение сессий
self.clients = await session_manager.create_all_clients()
logger.info(f"Подключено сессий: {len(self.clients)}")
if not self.clients:
logger.warning("Нет активных сессий. Добавьте файлы в sessions/")
return
# Создаём клиента для отправки логов (используем первую сессию)
if self.clients:
first_session = list(self.clients.keys())[0]
self.log_client = self.clients[first_session]
logger.info(f"Лог-клиент: {first_session}")
self.running = True
# Отправляем уведомление о запуске
await self.send_log_message("🚀 Worker запущен")
# Запускаем задачи параллельно
await asyncio.gather(
self.listen_for_new_posts(),
self.monitor_approved_comments()
)
async def send_log_message(self, message: str):
"""Отправка сообщения в лог-группу"""
if not self.log_client or not LOG_GROUP_ID:
return
try:
await self.log_client.send_message(
int(LOG_GROUP_ID),
message,
parse_mode='HTML'
)
except Exception as e:
logger.debug(f"Не удалось отправить лог: {e}")
async def stop(self):
"""Остановка воркера"""
logger.info("Остановка Comment Worker...")
self.running = False
await session_manager.disconnect_all()
async def scan_previous_messages(self):
"""Сканирование предыдущих сообщений"""
logger.info(f"Сканирование {INITIAL_SCAN_LIMIT} сообщений...")
# Получаем список целевых групп из БД
target_groups = get_target_groups()
if not target_groups:
logger.warning("Нет групп в БД! Добавьте группу через бота: /add_group ID")
return
logger.info(f"Мониторинг групп: {[g['group_id'] for g in target_groups]}")
for group in target_groups:
group_id = group['group_id']
for session_file, client in self.clients.items():
try:
target_group = await client.get_entity(PeerChannel(abs(int(group_id))))
messages = await client.get_messages(target_group, limit=INITIAL_SCAN_LIMIT)
logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений из группы {group_id}")
for message in reversed(messages):
if not self.running:
break
await self.process_message(message, session_file, client, group_id)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}")
async def process_message(self, message, session_file: str, client: TelegramClient, group_id: int = None, comments_group_id: int = None):
"""Обработка сообщения"""
try:
if not message.text or len(message.text) < 10:
return
if not message.replies or not message.replies.comments:
logger.info(f"Сообщение {message.id} не поддерживает комментарии")
return
msg_comments_group_id = message.replies.channel_id
if not msg_comments_group_id:
msg_comments_group_id = comments_group_id
if not msg_comments_group_id:
logger.error(f"Не удалось получить ID группы комментариев для {message.id}")
return
existing = get_comment(message.id, msg_comments_group_id, session_file)
if existing:
return
from telethon.tl.functions.channels import GetFullChannelRequest
channel_id = None
try:
comments_group = await client.get_entity(PeerChannel(abs(int(msg_comments_group_id))))
channel_full = await client(GetFullChannelRequest(comments_group))
if channel_full.full_chat.linked_chat_id:
channel_id = abs(int(channel_full.full_chat.linked_chat_id))
logger.info(f"Linked chat для {msg_comments_group_id}: {channel_id}")
except Exception as e:
logger.warning(f"Не удалось получить linked_chat: {e}")
channel_id = group_id
logger.info(f"Генерация комментария для сообщения {message.id} ({session_file})")
comment_text = await generate_comment(message.text)
if not comment_text:
logger.error(f"Не удалось сгенерировать комментарий для {message.id}")
return
save_comment(
message.id,
msg_comments_group_id,
comment_text,
session_file,
message.text,
channel_id
)
update_stats(
datetime.now().strftime('%Y-%m-%d'),
session_file,
'generated'
)
logger.info(f"Комментарий сохранён: message_id={message.id}, session={session_file}")
except Exception as e:
logger.error(f"Ошибка обработки сообщения: {e}")
async def listen_for_new_posts(self):
"""Прослушивание новых постов с авто-обновлением списка групп"""
logger.info("Запуск прослушивания новых постов...")
last_groups_check = 0
group_handlers = {}
scanned_groups = set()
while self.running:
current_time = datetime.now().timestamp()
# Проверяем новые группы каждые 10 секунд
if current_time - last_groups_check > 10:
target_groups = get_target_groups()
new_group_ids = [str(g['group_id']) for g in target_groups]
# Добавляем обработчики для новых групп
for group_id in new_group_ids:
if group_id not in group_handlers:
logger.info(f"➕ Добавлена группа для мониторинга: {group_id}")
# Сканируем последние 10 сообщений новой группы
await self.scan_group(group_id, limit=10)
scanned_groups.add(group_id)
# Регистрируем обработчик для каждой сессии
for session_file, client in self.clients.items():
# Проверяем что клиент подключён
if not client.is_connected():
logger.warning(f"Клиент {session_file} не подключён")
continue
# Регистрируем обработчик
@client.on(events.NewMessage(chats=group_id))
async def handle_new_post(event):
message = event.message
logger.info(f"📬 Новый пост в группе {group_id}: {message.id}")
# Проверяем что это не бот
if message.from_id and message.from_id.user_id == (await client.get_me()).id:
return
# Отправляем уведомление в лог-группу
post_preview = message.text[:100] + "..." if len(message.text or "") > 100 else (message.text or "Без текста")
log_message = (
f"📬 Новый пост\n\n"
f"📢 Группа: {group_id}\n"
f"🔗 Пост: {message.id}\n"
f"📄 Текст: {post_preview}"
)
await self.send_log_message(log_message)
await self.process_message(message, session_file, client, group_id)
group_handlers[group_id] = True
logger.info(f"✅ Обработчик зарегистрирован для {group_id}")
# Удаляем обработчики для удалённых групп
for group_id in list(group_handlers.keys()):
if group_id not in new_group_ids:
logger.info(f"❌ Группа {group_id} удалена из мониторинга")
del group_handlers[group_id]
scanned_groups.discard(group_id)
last_groups_check = current_time
await asyncio.sleep(5)
async def scan_group(self, group_id: int | str, limit: int = 10):
"""Сканирование последней сообщений из группы/канала"""
logger.info(f"Сканирование {limit} сообщений из группы {group_id}...")
for session_file, client in self.clients.items():
try:
target_channel = None
comments_group_id = None
try:
target_channel = await client.get_entity(group_id)
except:
async for dialog in client.iter_dialogs():
dialog_id = dialog.id
dialog_username = getattr(dialog.entity, 'username', None) if hasattr(dialog, 'entity') and dialog.entity else None
if str(dialog_id) == str(group_id) or dialog_username == str(group_id).lstrip('@'):
target_channel = dialog.entity
break
if not target_channel:
logger.error(f"Не удалось найти группу/канал {group_id}")
continue
group_name = getattr(target_channel, 'title', None)
group_username = getattr(target_channel, 'username', None)
from telethon.tl.functions.channels import GetFullChannelRequest
try:
channel_full = await client(GetFullChannelRequest(target_channel))
if hasattr(channel_full.full_chat, 'linked_chat_id') and channel_full.full_chat.linked_chat_id:
comments_group_id = channel_full.full_chat.linked_chat_id
logger.info(f"Канал {group_id} имеет группу комментариев: {comments_group_id}")
try:
comments_group = await client.get_entity(comments_group_id)
if getattr(comments_group, 'username', None):
await session_manager.join_channel(session_file, comments_group_id)
logger.info(f"Вступил в группу комментариев: {comments_group_id}")
except Exception as e:
logger.warning(f"Не удалось вступить в группу комментариев: {e}")
except Exception as e:
logger.warning(f"Не удалось получить linked_chat: {e}")
from bot.db import add_target_group
add_target_group(group_id, group_name, group_username, 'channel', comments_group_id)
logger.info(f"Обновлена информация: {group_name}, comments_group={comments_group_id}")
messages = await client.get_messages(target_channel, limit=limit)
logger.info(f"Сессия {session_file}: получено {len(messages)} сообщений")
for message in reversed(messages):
if not self.running:
break
await self.process_message(message, session_file, client, group_id, comments_group_id)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Ошибка сканирования для {session_file} в группе {group_id}: {e}")
async def monitor_approved_comments(self):
"""Мониторинг одобренных комментариев"""
logger.info("Запуск мониторинга одобренных комментариев...")
while self.running:
try:
conn = None
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM comments
WHERE status = 'approved' AND sent_message_id IS NULL
ORDER BY created_at
LIMIT 10
''')
comments = [dict(row) for row in cursor.fetchall()]
finally:
if conn:
conn.close()
if comments:
logger.info(f"Найдено {len(comments)} одобренных комментариев для отправки")
for comment in comments:
if not self.running:
break
await self.send_comment(comment)
await asyncio.sleep(2)
else:
logger.debug("Нет комментариев для отправки")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"Ошибка мониторинга: {e}")
await asyncio.sleep(10)
async def send_comment(self, comment: dict):
"""Отправка комментария"""
session_file = comment.get('session_file')
if not session_file:
logger.error(f"Нет session_file для комментария {comment['id']}")
return
client = self.clients.get(session_file)
if not client:
client = await session_manager.get_client(session_file)
if not client:
logger.error(f"Не удалось подключить сессию {session_file}")
update_comment_status(comment['id'], 'rejected')
return
try:
comments_group_id = abs(int(comment['chat_id']))
message_id_in_channel = comment['message_id']
channel_id = comment.get('channel_id')
logger.info(f"Отправка комментария {comment['id']}: message_id={message_id_in_channel}, comments_group={comments_group_id}, channel={channel_id}")
# Вступаем в группу комментариев ПЕРЕД тем как получать сущность
try:
await session_manager.join_channel(session_file, comments_group_id)
logger.info(f"✅ Вступил в группу комментариев: {comments_group_id}")
except Exception as e:
logger.debug(f"Уже в группе: {e}")
# Теперь получаем сущность группы комментариев
comments_group = None
for try_id in [comments_group_id, abs(int(comments_group_id)), -abs(int(comments_group_id))]:
try:
comments_group = await client.get_entity(PeerChannel(abs(int(try_id))))
logger.info(f"Группа комментариев: {comments_group.title} (ID: {try_id})")
break
except Exception as e:
logger.debug(f"Не удалось получить сущность {try_id}: {e}")
continue
if not comments_group:
logger.error(f"Не удалось найти группу комментариев {comments_group_id}")
return
from telethon.tl.functions.channels import GetFullChannelRequest
channel_full = await client(GetFullChannelRequest(comments_group))
linked_chat_id = channel_full.full_chat.linked_chat_id
if not linked_chat_id:
logger.error("Не удалось найти связанный канал")
return
logger.info(f"Linked chat ID: {linked_chat_id}")
# Получаем сообщение в КАНАЛЕ
target_message_in_channel = None
for try_id in [linked_chat_id, abs(int(linked_chat_id)), -abs(int(linked_chat_id))]:
try:
target_message_in_channel = await client.get_messages(
PeerChannel(abs(int(try_id))),
ids=message_id_in_channel
)
if target_message_in_channel:
logger.info(f"Найдено сообщение в канале: {target_message_in_channel.id}")
break
except Exception as e:
logger.debug(f"Не удалось получить сообщение {try_id}/{message_id_in_channel}: {e}")
continue
if not target_message_in_channel:
logger.error(f"Сообщение {message_id_in_channel} не найдено в канале {linked_chat_id}")
return
# ОТПРАВЛЯЕМ В КАНАЛ с comment_to= (как в старом проекте)
# Telegram автоматически направит комментарий в группу комментариев
delay = random.uniform(COMMENT_DELAY_MIN, COMMENT_DELAY_MAX)
logger.info(f"Задержка {delay:.1f}с...")
await asyncio.sleep(delay)
sent_message = await client.send_message(
PeerChannel(abs(int(linked_chat_id))), # КАНАЛ
comment['comment_text'],
comment_to=target_message_in_channel.id # ID в КАНАЛЕ
)
logger.info(f"✅ Отправлен в канал как comment_to {target_message_in_channel.id} -> {sent_message.id}")
logger.info(f"Комментарий отправлен: {comment['id']} -> message {sent_message.id}")
update_comment_sent(comment['id'], sent_message.id)
update_stats(datetime.now().strftime('%Y-%m-%d'), session_file, 'sent')
except Exception as e:
logger.error(f"Ошибка отправки комментария: {e}")
update_comment_status(comment['id'], 'pending')
async def main():
"""Точка входа"""
worker = CommentWorker()
try:
await worker.start()
except KeyboardInterrupt:
await worker.stop()
except Exception as e:
logger.error(f"Критическая ошибка: {e}")
await worker.stop()
if __name__ == '__main__':
asyncio.run(main())