181 lines
5.1 KiB
Python
181 lines
5.1 KiB
Python
import logging
|
|
from telegram import Update
|
|
from telegram.constants import ChatType
|
|
from telegram.error import BadRequest
|
|
from telegram.ext import (
|
|
ApplicationBuilder,
|
|
ContextTypes,
|
|
CommandHandler,
|
|
MessageHandler,
|
|
filters,
|
|
)
|
|
|
|
from . import config
|
|
from .db import Channel
|
|
|
|
from .ai import detect_ear_scritches
|
|
|
|
logging.basicConfig(
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
|
|
)
|
|
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!"
|
|
)
|
|
|
|
|
|
def remove_duplicate_letters(s: str) -> str:
|
|
return "".join([c for i, c in enumerate(s) if i == 0 or c != s[i - 1]])
|
|
|
|
|
|
async def message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
possible_names = [context.bot.username, "kerfus", "kerfus", "kerfuś"]
|
|
|
|
if update.message is None:
|
|
return
|
|
if update.message.chat.type not in [ChatType.GROUP, ChatType.SUPERGROUP]:
|
|
await update.message.reply_text("can't work not in groups")
|
|
|
|
if update.message.text is None:
|
|
return
|
|
|
|
mention = False
|
|
deduplicated_message = remove_duplicate_letters(update.message.text.lower())
|
|
for name in possible_names:
|
|
if remove_duplicate_letters(name) in deduplicated_message:
|
|
mention = True
|
|
break
|
|
|
|
channel_id = str(update.message.chat.id)
|
|
|
|
channel = Channel.select().where(Channel.channel_id == channel_id).first()
|
|
if channel is None:
|
|
channel = Channel(channel_id=channel_id)
|
|
|
|
if update.message.text is None:
|
|
return
|
|
|
|
argv = (update.message.text.strip() + " ").split(" ", 1)
|
|
if len(argv) == 0 or argv[0] == "":
|
|
return
|
|
|
|
cmd, args = argv
|
|
cmd = cmd.split("@")[0].strip().lower()
|
|
args = args.strip()
|
|
|
|
async def edit_message():
|
|
await context.bot.edit_message_text(
|
|
chat_id=channel_id, message_id=channel.topic_message_id, text=channel.topic
|
|
)
|
|
|
|
async def send_message():
|
|
message = await context.bot.send_message(chat_id=channel_id, text=channel.topic)
|
|
channel.topic_message_id = message.message_id
|
|
|
|
async def unpin_message(message_id):
|
|
try:
|
|
await context.bot.unpin_chat_message(chat_id=channel_id)
|
|
except BadRequest:
|
|
pass
|
|
|
|
async def pin_message(message_id):
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=channel_id, message_id=message_id
|
|
)
|
|
except BadRequest:
|
|
pass
|
|
|
|
async def topic_update(new_topic):
|
|
channel.topic = new_topic
|
|
if channel.topic_message_id is None:
|
|
await send_message()
|
|
else:
|
|
await unpin_message(channel.topic_message_id)
|
|
try:
|
|
await edit_message()
|
|
except BadRequest:
|
|
await send_message()
|
|
|
|
await pin_message(channel.topic_message_id)
|
|
|
|
print("Changed topic to:", new_topic)
|
|
channel.save()
|
|
|
|
sep = "┃"
|
|
|
|
current_topic = []
|
|
if channel.topic is not None and len(channel.topic.strip()) != 0:
|
|
current_topic = channel.topic.split(sep)
|
|
|
|
async def append(args):
|
|
if len(args) > config.MAX_LENGTH:
|
|
await update.message.reply_text("this won't fit in the topic")
|
|
return
|
|
|
|
current_topic.insert(0, args)
|
|
while len(sep.join(current_topic)) > config.MAX_LENGTH:
|
|
current_topic.pop()
|
|
|
|
await topic_update(sep.join(current_topic))
|
|
|
|
if cmd == "/topic":
|
|
if args == "":
|
|
if channel.topic is not None:
|
|
await update.message.reply_text(channel.topic)
|
|
return
|
|
await topic_update(args)
|
|
|
|
if cmd == "/append":
|
|
await append(args)
|
|
return
|
|
|
|
if cmd == "/quote":
|
|
m = update.message.reply_to_message
|
|
if m is None:
|
|
await update.message.reply_text(
|
|
"reply to a message with the command to quote it"
|
|
)
|
|
return
|
|
|
|
attribution = (
|
|
f" ~@{m.from_user.username}" if m.from_user.username is not None else ""
|
|
)
|
|
quote = f'"{m.text}"{attribution}'
|
|
await append(quote)
|
|
return
|
|
|
|
if cmd == "/popfirst":
|
|
if len(current_topic) <= 1:
|
|
await update.message.reply_text("can't pop the last existing item")
|
|
return
|
|
current_topic.pop(0)
|
|
await topic_update(sep.join(current_topic))
|
|
return
|
|
|
|
if cmd == "/poplast":
|
|
if len(current_topic) <= 1:
|
|
await update.message.reply_text("can't pop the last existing item")
|
|
return
|
|
current_topic.pop()
|
|
await topic_update(sep.join(current_topic))
|
|
return
|
|
|
|
print(update, type(update))
|
|
|
|
if mention and detect_ear_scritches(update.message.text):
|
|
await update.message.reply_text("Miau! Nie dotykaj moich uszu!")
|
|
|
|
|
|
application = ApplicationBuilder().token(config.TG_TOKEN).build()
|
|
|
|
start_handler = CommandHandler("start", start)
|
|
application.add_handler(start_handler)
|
|
|
|
message_handler = MessageHandler(filters.TEXT, message)
|
|
application.add_handler(message_handler)
|
|
|
|
application.run_polling()
|