Files
batch-bot/bot/session_manager.py
bilal a18ad30961 Initial commit: Batch Bot - Telegram Comment Bot
0.0.1
Features:
- Multi-account support via session files
- AI comments generation via Ollama (local LLM)
- Telegram bot for moderation (approve/reject/regenerate)
- Docker support (controller + worker)
- Auto-join public groups
- Comment regeneration on group re-add
- Statistics tracking

Tech stack:
- Python 3.11
- Telethon 1.34 (Telegram user client)
- Aiogram 3.4 (Telegram bot framework)
- SQLite (Database)
- Docker & Docker Compose
- Ollama (Local LLM)
2026-02-24 04:40:07 +03:00

263 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
from pathlib import Path
from telethon import TelegramClient
from telethon.sessions import StringSession
from bot.config import API_ID, API_HASH, SESSIONS_DIR
from bot.db import save_session
logger = logging.getLogger('session_manager')
class SessionManager:
"""Менеджер сессий Telegram"""
def __init__(self):
self.sessions_dir = SESSIONS_DIR
self.sessions_dir.mkdir(exist_ok=True)
self.clients: dict[str, TelegramClient] = {}
self.session_info: dict[str, dict] = {}
def get_session_files(self) -> list[str]:
"""Получение списка файлов сессий"""
session_files = list(self.sessions_dir.glob("*.session"))
return [f.name for f in session_files]
def load_session_string(self, session_file: str) -> str | None:
"""Загрузка строки сессии из файла"""
try:
session_path = self.sessions_dir / session_file
if not session_path.exists():
logger.error(f"Файл сессии не найден: {session_file}")
return None
session_string = session_path.read_text(encoding='utf-8').strip()
if not session_string:
logger.error(f"Файл сессии пуст: {session_file}")
return None
return session_string
except Exception as e:
logger.error(f"Ошибка при загрузке сессии {session_file}: {e}")
return None
async def create_client(self, session_file: str) -> TelegramClient | None:
"""Создание клиента Telegram для сессии"""
try:
session_string = self.load_session_string(session_file)
if not session_string:
return None
# Проверяем, не подключён ли уже клиент
if session_file in self.clients:
if self.clients[session_file].is_connected():
logger.info(f"Клиент для {session_file} уже подключён")
return self.clients[session_file]
# Создаём новую сессию из строки
session = StringSession(session_string)
client = TelegramClient(
session,
API_ID,
API_HASH,
device_model="comment_bot",
system_version="Linux",
app_version="1.0",
lang_code="ru"
)
await client.connect()
if not await client.is_user_authorized():
logger.warning(f"Сессия {session_file} не авторизована")
await client.disconnect()
return None
# Получаем информацию о пользователе
me = await client.get_me()
user_info = {
'user_id': me.id,
'username': me.username or '',
'first_name': me.first_name or '',
'last_name': me.last_name or '',
'phone': me.phone or ''
}
# Сохраняем информацию в БД
save_session(session_file, user_info)
# Сохраняем клиента и информацию
self.clients[session_file] = client
self.session_info[session_file] = user_info
logger.info(f"Клиент подключён: {session_file} (@{user_info['username'] or 'no_username'})")
return client
except Exception as e:
logger.error(f"Ошибка при создании клиента для {session_file}: {e}")
return None
async def create_all_clients(self) -> dict[str, TelegramClient]:
"""Создание клиентов для всех сессий"""
session_files = self.get_session_files()
logger.info(f"Найдено сессий: {len(session_files)}")
tasks = [self.create_client(sf) for sf in session_files]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Фильтруем успешные результаты
for session_file, result in zip(session_files, results):
if isinstance(result, Exception):
logger.error(f"Ошибка при подключении {session_file}: {result}")
return {sf: client for sf, client in zip(session_files, results)
if isinstance(client, TelegramClient)}
async def get_client(self, session_file: str) -> TelegramClient | None:
"""Получение клиента для сессии"""
if session_file in self.clients and self.clients[session_file].is_connected():
return self.clients[session_file]
return await self.create_client(session_file)
async def disconnect_client(self, session_file: str):
"""Отключение клиента"""
if session_file in self.clients:
try:
await self.clients[session_file].disconnect()
logger.info(f"Клиент {session_file} отключён")
except Exception as e:
logger.error(f"Ошибка при отключении клиента {session_file}: {e}")
finally:
del self.clients[session_file]
if session_file in self.session_info:
del self.session_info[session_file]
async def disconnect_all(self):
"""Отключение всех клиентов"""
tasks = [self.disconnect_client(sf) for sf in list(self.clients.keys())]
await asyncio.gather(*tasks)
logger.info("Все клиенты отключены")
def get_session_info(self, session_file: str) -> dict | None:
"""Получение информации о сессии"""
return self.session_info.get(session_file)
def get_all_info(self) -> list[dict]:
"""Получение информации о всех сессиях"""
return [
{
'session_file': sf,
**self.session_info.get(sf, {})
}
for sf in self.get_session_files()
]
async def join_channel(self, session_file: str, channel_id: int | str) -> tuple[bool, str]:
"""Вступление сессии в канал/группу (по ID или username)"""
try:
client = await self.get_client(session_file)
if not client:
return False, "Не удалось подключить сессию"
# Пытаемся получить сущность по ID или username
entity = await client.get_entity(channel_id)
# Проверяем, участник ли уже
from telethon.tl.functions.channels import GetParticipantRequest
try:
await client(GetParticipantRequest(entity, await client.get_me()))
return True, "Уже участник"
except:
# Не участник, вступаем
pass
# Вступаем в канал/группу
from telethon.tl.functions.channels import JoinChannelRequest
if hasattr(entity, 'megagroup') or hasattr(entity, 'broadcast'):
# Канал или супергруппа
await client(JoinChannelRequest(entity))
else:
# Обычная группа
from telethon.tl.functions.messages import AddChatUserRequest
await client(AddChatUserRequest(entity.id, 0, fwd_limit=0))
return True, "Вступил успешно"
except Exception as e:
error_msg = str(e)
if "PRIVATE" in error_msg.upper() or "CHANNEL_PRIVATE" in error_msg:
return False, "Частный канал/группа"
elif "INVITE" in error_msg.upper():
return False, "Нужно приглашение"
elif "Could not find" in error_msg:
return False, "Группа не найдена"
else:
return False, f"Ошибка: {error_msg[:50]}"
async def join_all_sessions(self, channel_id: int | str) -> dict[str, str]:
"""Все сессии вступают в канал/группу (по ID или username)"""
results = {}
for session_file in self.get_session_files():
success, message = await self.join_channel(session_file, channel_id)
results[session_file] = f"{'' if success else ''} {message}"
return results
async def leave_channel(self, session_file: str, channel_id: int | str) -> tuple[bool, str]:
"""Выход сессии из канала/группы"""
try:
client = await self.get_client(session_file)
if not client:
return False, "Не удалось подключить сессию"
# Пробуем разные форматы ID
entity = None
for try_id in [channel_id, str(channel_id), str(channel_id).lstrip('-')]:
try:
entity = await client.get_entity(try_id)
break
except:
continue
if not entity:
# Если не нашли, пробуем получить из диалогов
async for dialog in client.iter_dialogs():
if str(dialog.id) == str(channel_id) or str(dialog.id).lstrip('-') == str(channel_id).lstrip('-'):
entity = dialog.entity
break
if not entity:
return False, "Группа не найдена в диалогах"
# Выходим из канала/группы
from telethon.tl.functions.channels import LeaveChannelRequest
await client(LeaveChannelRequest(entity))
return True, "Вышел успешно"
except Exception as e:
error_msg = str(e)
if "PRIVATE" in error_msg.upper():
return False, "Частный канал/группа"
elif "BOT" in error_msg.upper() or "ADMIN" in error_msg.upper():
return False, "Нельзя выйти (бот/админ)"
elif "Cannot find" in error_msg or "not found" in error_msg.lower():
return True, "Уже не в группе" # Считаем успешным - уже вышел
else:
return False, f"Ошибка: {error_msg[:50]}"
async def leave_all_sessions(self, channel_id: int | str) -> dict[str, str]:
"""Все сессии выходят из канала/группы"""
results = {}
for session_file in self.get_session_files():
success, message = await self.leave_channel(session_file, channel_id)
results[session_file] = f"{'' if success else ''} {message}"
return results
# Глобальный экземпляр
session_manager = SessionManager()