Docka and better explanation of removal

This commit is contained in:
Casey 2024-02-02 22:06:42 +03:00
parent e137933115
commit 4a207ad544
Signed by: hkc
GPG Key ID: F0F6CFE11CDB0960
6 changed files with 113 additions and 12 deletions

11
Dockerfile Normal file
View File

@ -0,0 +1,11 @@
FROM python:3.10-alpine
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt && rm /app/requirements.txt
COPY . /app
CMD ["python3", "-m", "nfuck"]

24
logging.json Normal file
View File

@ -0,0 +1,24 @@
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(asctime)s %(levelname)s %(module)s L%(lineno)d: %(message)s"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"formatter": "simple",
"stream": "ext://sys.stdout"
}
},
"loggers": {
"root": {
"level": "DEBUG",
"handlers": [
"stdout"
]
}
}
}

View File

@ -3,44 +3,89 @@ from aiogram.types import Message
from aiogram.filters import Command from aiogram.filters import Command
from httpx import AsyncClient from httpx import AsyncClient
from nfuck.link_verifier import explain_verification, get_random_useragent, verify_link from nfuck.link_verifier import (
explain_verification,
get_random_useragent,
verify_link,
)
from nfuck.utils import sanitize_link
dp = Dispatcher() dp = Dispatcher()
# @dp.message(Command("dd"))
# async def on_dd(message: Message):
# if message.reply_to_message:
# await message.reply_to_message.delete()
@dp.message(Command("check")) @dp.message(Command("check"))
async def on_check(message: Message): async def on_check(message: Message):
results = [] results = []
for entity in message.entities or []: for entity in message.entities or []:
if entity.type in ("text_link", "url") and message.text: if entity.type in ("text_link", "url") and message.text:
if entity.type == "url": if entity.type == "url":
entity.url = message.text[entity.offset : entity.offset + entity.length] entity.url = message.text[
entity.offset : entity.offset + entity.length
]
if not entity.url: if not entity.url:
continue continue
if not entity.url.startswith("http"):
entity.url = "https://" + entity.url
async with AsyncClient( async with AsyncClient(
headers={"User-Agent": get_random_useragent()} headers={"User-Agent": get_random_useragent()}
) as client: ) as client:
data = (await client.get(entity.url)).text data = (await client.get(entity.url)).text
total_score = 0 total_score = 0
results.append(f"<b>{entity.url}</b>") results.append(f"<b>{sanitize_link(entity.url)}</b>")
for score, explanation, match in explain_verification(data): for score, explanation, match in explain_verification(data):
results.append(f"{match.span()}: {explanation}") results.append(f"{match.span()}: {explanation}")
total_score += score total_score += score
results.append(f"<b>Total score: {total_score}</b>") results.append(f"<b>Total score: {total_score}</b>")
results.append("") results.append("")
await message.reply(str.join("\n", results), parse_mode="html") if results:
await message.reply(
str.join("\n", results),
parse_mode="html",
disable_web_page_preview=True,
)
else:
await message.reply(":shrug:")
@dp.message() @dp.message()
async def on_message(message: Message): async def on_message(message: Message):
detected_links: list[tuple[str, float]] = []
for entity in message.entities or []: for entity in message.entities or []:
if entity.type in ("text_link", "url") and message.text: if entity.type in ("text_link", "url") and message.text:
if entity.type == "url": if entity.type == "url":
entity.url = message.text[entity.offset : entity.offset + entity.length] entity.url = message.text[
entity.offset : entity.offset + entity.length
]
if not entity.url: if not entity.url:
continue continue
confidence = await verify_link(entity.url) confidence = await verify_link(entity.url)
if confidence > 0.75: if confidence > 0.9:
await message.reply(f"Holy smokes, another one (~{confidence*100:.0f}% sure)") detected_links.append((entity.url, confidence))
await message.delete() if detected_links:
if message.from_user:
await message.reply(
str.join(
"\n",
[
f"Found {len(detected_links)} links:",
str.join(
"\n",
[
f"{i}. {sanitize_link(url)} with confidence {confidence:.2f}"
for i, (url, confidence) in enumerate(
detected_links, 1
)
],
),
f"Sender: {message.from_user.full_name} #{message.from_user.id} (@{message.from_user.username})",
],
),
parse_mode="html",
)
await message.delete()

View File

@ -2,6 +2,12 @@ from aiogram import Bot
from aiosqlite import connect as asqlite from aiosqlite import connect as asqlite
from os import environ from os import environ
from nfuck import dp from nfuck import dp
import logging.config
from json import load as load_json
with open("logging.json", "r") as f_in:
logging.config.dictConfig(load_json(f_in))
async def main(): async def main():
bot = Bot(environ["TG_BOT_TOKEN"]) bot = Bot(environ["TG_BOT_TOKEN"])
@ -9,7 +15,8 @@ async def main():
await dp.start_polling(bot, db=db) await dp.start_polling(bot, db=db)
await db.close() await db.close()
if __name__ == "__main__": if __name__ == "__main__":
from asyncio import run from asyncio import run
run(main())
run(main())

View File

@ -1,6 +1,10 @@
from httpx import AsyncClient from httpx import AsyncClient
from re import Match, Pattern, compile as regexp, IGNORECASE from re import Match, Pattern, compile as regexp, IGNORECASE
from random import choice from random import choice
from logging import DEBUG, getLogger
logger = getLogger("nfuck.link_verifier")
logger.setLevel(DEBUG)
USER_AGENT = [ USER_AGENT = [
"Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
@ -19,6 +23,7 @@ REGEX_PATTERNS: list[tuple[float, Pattern, str]] = [
MAX_SCORE = sum(t[0] for t in REGEX_PATTERNS) MAX_SCORE = sum(t[0] for t in REGEX_PATTERNS)
def explain_verification(content: str) -> list[tuple[float, str, Match]]: def explain_verification(content: str) -> list[tuple[float, str, Match]]:
result: list[tuple[float, str, Match]] = [] result: list[tuple[float, str, Match]] = []
for score, regex, explanation in REGEX_PATTERNS: for score, regex, explanation in REGEX_PATTERNS:
@ -26,15 +31,22 @@ def explain_verification(content: str) -> list[tuple[float, str, Match]]:
result.append((score, explanation, match)) result.append((score, explanation, match))
return result return result
def get_random_useragent() -> str: def get_random_useragent() -> str:
return choice(USER_AGENT) return choice(USER_AGENT)
async def verify_link(url: str) -> float: async def verify_link(url: str) -> float:
total_score = 0 total_score = 0
logger.info("Verifying link %s", url)
if not url.startswith("http"):
url = "https://" + url
async with AsyncClient( async with AsyncClient(
headers={"User-Agent": get_random_useragent()} headers={"User-Agent": get_random_useragent()}
) as client: ) as client:
data = await client.get(url) data = await client.get(url)
for score, _, _ in explain_verification(data.text): for score, explanation, match in explain_verification(data.text):
logger.debug("%s: %s at %d", url, explanation, match.start())
total_score += score total_score += score
logger.info("Score for %r: %f", url, total_score)
return total_score / MAX_SCORE return total_score / MAX_SCORE

2
nfuck/utils.py Normal file
View File

@ -0,0 +1,2 @@
def sanitize_link(url: str) -> str:
return url.replace("://", "[://]").replace(".", "[dot]")