commit 7a7227f28de8cd1b682f6d7d33938d6ff22fc506 Author: hkc Date: Wed Aug 24 08:09:41 2022 +0300 Initial commit :DDDDDD diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..32ac8d5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +*.py[cow] +config-prod.ini + diff --git a/config.ini b/config.ini new file mode 100644 index 0000000..504a82b --- /dev/null +++ b/config.ini @@ -0,0 +1,18 @@ +[main] +modules = telegram +instance = mastodon.example.org +token = blahblah +user = 12345 +list = 1 + +[module/telegram] +type = telegram +token = 12345:blahblah +chat = @username +show-post-link = yes +show-boost-from = yes + +# TODO: add discord functionality +[module/discord] +type = discord +webhook = url diff --git a/mastoreposter/__init__.py b/mastoreposter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mastoreposter/__main__.py b/mastoreposter/__main__.py new file mode 100644 index 0000000..2578554 --- /dev/null +++ b/mastoreposter/__main__.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +from asyncio import run +from configparser import ConfigParser + +from mastoreposter.integrations.telegram import TelegramIntegration +from mastoreposter.sources import websocket_source +from typing import AsyncGenerator, Callable, List +from mastoreposter.integrations.base import BaseIntegration +from mastoreposter.types import Status + + +async def listen( + source: Callable[..., AsyncGenerator[Status, None]], + drains: List[BaseIntegration], + user: str, + /, + **kwargs, +): + async for status in source(**kwargs): + if status.account.id != user: + continue + print(status) + if status.visibility == "direct": + continue + if ( + status.in_reply_to_account_id is not None + and status.in_reply_to_account_id != user + ): + continue + for drain in drains: + await drain.post(status) + + +def main(config_path: str): + conf = ConfigParser() + conf.read(config_path) + + modules = [] + for module_name in conf.get("main", "modules").split(): + module = conf[f"module/{module_name}"] + if module["type"] == "telegram": + modules.append( + TelegramIntegration( + token=module.get("token"), + chat_id=module.get("chat"), + show_post_link=module.getboolean("show-post-link", fallback=True), + show_boost_from=module.getboolean("show-boost-from", fallback=True), + ) + ) + else: + raise ValueError("Invalid module type %r" % module["type"]) + + url = "wss://{}/api/v1/streaming".format(conf["main"]["instance"]) + run( + listen( + websocket_source, + modules, + conf["main"]["user"], + url=url, + list=conf["main"]["list"], + access_token=conf["main"]["token"], + ) + ) + + +if __name__ == "__main__": + from sys import argv + + main(argv[1]) diff --git a/mastoreposter/integrations/__init__.py b/mastoreposter/integrations/__init__.py new file mode 100644 index 0000000..1512e6f --- /dev/null +++ b/mastoreposter/integrations/__init__.py @@ -0,0 +1 @@ +from .telegram import TelegramIntegration diff --git a/mastoreposter/integrations/base.py b/mastoreposter/integrations/base.py new file mode 100644 index 0000000..9b83c51 --- /dev/null +++ b/mastoreposter/integrations/base.py @@ -0,0 +1,12 @@ +from abc import ABC, abstractmethod + +from mastoreposter.types import Status + + +class BaseIntegration(ABC): + def __init__(self): + pass + + @abstractmethod + async def post(self, status: Status) -> str: + raise NotImplemented diff --git a/mastoreposter/integrations/telegram.py b/mastoreposter/integrations/telegram.py new file mode 100644 index 0000000..9b3ade9 --- /dev/null +++ b/mastoreposter/integrations/telegram.py @@ -0,0 +1,171 @@ +from dataclasses import dataclass +from html import escape +from typing import Any, List, Mapping, Optional, Union +from bs4 import BeautifulSoup, Tag, PageElement +from httpx import AsyncClient +from mastoreposter.integrations.base import BaseIntegration +from mastoreposter.types import Attachment, Status + + +@dataclass +class TGResponse: + ok: bool + params: dict + result: Optional[Any] = None + error: Optional[str] = None + + @classmethod + def from_dict(cls, data: dict, params: dict) -> "TGResponse": + return cls(data["ok"], params, data.get("result"), data.get("error")) + + +class TelegramIntegration(BaseIntegration): + API_URL: str = "https://api.telegram.org/bot{}/{}" + MEDIA_COMPATIBILITY: Mapping[str, set] = { + "image": {"image", "video"}, + "video": {"image", "video"}, + "gifv": {"gifv"}, + "audio": {"audio"}, + "unknown": {"unknown"}, + } + MEDIA_MAPPING: Mapping[str, str] = { + "image": "photo", + "video": "video", + "gifv": "animation", + "audio": "audio", + "unknown": "document", + } + + def __init__( + self, + token: str, + chat_id: Union[str, int], + show_post_link: bool = True, + show_boost_from: bool = True, + ): + self.token = token + self.chat_id = chat_id + self.show_post_link = show_post_link + self.show_boost_from = show_boost_from + + async def _tg_request(self, method: str, **kwargs) -> TGResponse: + url = self.API_URL.format(self.token, method) + async with AsyncClient() as client: + return TGResponse.from_dict( + (await client.post(url, json=kwargs)).json(), kwargs + ) + + async def _post_plaintext(self, text: str) -> TGResponse: + return await self._tg_request( + "sendMessage", + parse_mode="HTML", + disable_notification=True, + disable_web_page_preview=True, + chat_id=self.chat_id, + text=text, + ) + + async def _post_media(self, text: str, media: Attachment) -> TGResponse: + # Just to be safe + if media.type not in self.MEDIA_MAPPING: + return await self._post_plaintext(text) + + return await self._tg_request( + "send%s" % self.MEDIA_MAPPING[media.type].title(), + parse_mode="HTML", + disable_notification=True, + disable_web_page_preview=True, + chat_id=self.chat_id, + caption=text, + **{self.MEDIA_MAPPING[media.type]: media.preview_url}, + ) + + async def _post_mediagroup(self, text: str, media: List[Attachment]) -> TGResponse: + media_list: List[dict] = [] + allowed_medias = {"image", "gifv", "video", "audio", "unknown"} + for attachment in media: + if attachment.type not in allowed_medias: + continue + if attachment.type not in self.MEDIA_COMPATIBILITY: + continue + allowed_medias &= self.MEDIA_COMPATIBILITY[attachment.type] + media_list.append( + { + "type": self.MEDIA_MAPPING[attachment.type], + "media": attachment.url, + } + ) + if len(media_list) == 1: + media_list[0].update( + { + "caption": text, + "parse_mode": "HTML", + } + ) + + return await self._tg_request( + "sendMediaGroup", + disable_notification=True, + disable_web_page_preview=True, + chat_id=self.chat_id, + media=media_list, + ) + + @classmethod + def node_to_text(cls, el: PageElement) -> str: + if isinstance(el, Tag): + if el.name == "a": + return '{}'.format( + escape(el.attrs["href"]), + str.join("", map(cls.node_to_text, el.children)), + ) + elif el.name == "p": + return str.join("", map(cls.node_to_text, el.children)) + "\n\n" + elif el.name == "br": + return "\n" + return str.join("", map(cls.node_to_text, el.children)) + return escape(str(el)) + + async def post(self, status: Status) -> str: + source = status.reblog or status + text = self.node_to_text(BeautifulSoup(source.content, features="lxml")) + + if source.spoiler_text: + text = "Spoiler: {cw}\n{text}".format( + cw=source.spoiler_text, text=text + ) + + if self.show_post_link: + text += '\n\nLink to post' % status.link + + if status.reblog and self.show_boost_from: + text = ( + 'Boosted post from {}'.format( + source.account.url, source.account.display_name + ) + + text + ) + + if not source.media_attachments: + msg = await self._post_plaintext(text) + elif len(source.media_attachments) == 1: + msg = await self._post_media(text, source.media_attachments[0]) + else: + msg = await self._post_mediagroup(text, source.media_attachments) + + if not msg.ok: + raise Exception(msg.error, msg.params) + + return "" + + def __repr__(self) -> str: + return ( + " AsyncGenerator[Status, None]: + from websockets.client import connect + + url = f"{url}?" + urlencode({"stream": "list", **params}) + async with connect(url) as ws: + while (msg := await ws.recv()) != None: + event = loads(msg) + if "error" in event: + raise Exception(event["error"]) + if event["event"] == "update": + yield Status.from_dict(loads(event["payload"])) diff --git a/mastoreposter/types.py b/mastoreposter/types.py new file mode 100644 index 0000000..5e01834 --- /dev/null +++ b/mastoreposter/types.py @@ -0,0 +1,206 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, List, Literal + + +@dataclass +class Field: + name: str + value: str + verified_at: Optional[datetime] = None + + @classmethod + def from_dict(cls, data: dict) -> "Field": + return cls( + name=data["name"], + value=data["value"], + verified_at=( + datetime.fromisoformat(data["verified_at"].rstrip("Z")) + if data.get("verified_at") is not None + else None + ), + ) + + +@dataclass +class Emoji: + shortcode: str + url: str + static_url: str + visible_in_picker: bool + category: Optional[str] = None + + @classmethod + def from_dict(cls, data: dict) -> "Emoji": + return cls(**data) + + +@dataclass +class Account: + id: str + username: str + acct: str + url: str + display_name: str + note: str + avatar: str + avatar_static: str + header: str + header_static: str + locked: bool + emojis: List[Emoji] + discoverable: bool + created_at: datetime + last_status_at: datetime + statuses_count: int + followers_count: int + following_count: int + moved: Optional["Account"] = None + fields: Optional[List[Field]] = None + bot: Optional[bool] = None + + @classmethod + def from_dict(cls, data: dict) -> "Account": + return cls( + id=data["id"], + username=data["username"], + acct=data["acct"], + url=data["url"], + display_name=data["display_name"], + note=data["note"], + avatar=data["avatar"], + avatar_static=data["avatar_static"], + header=data["header"], + header_static=data["header_static"], + locked=data["locked"], + emojis=list(map(Emoji.from_dict, data["emojis"])), + discoverable=data["discoverable"], + created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")), + last_status_at=datetime.fromisoformat(data["last_status_at"].rstrip("Z")), + statuses_count=data["statuses_count"], + followers_count=data["followers_count"], + following_count=data["following_count"], + moved=( + Account.from_dict(data["moved"]) + if data.get("moved") is not None + else None + ), + fields=list(map(Field.from_dict, data.get("fields", []))), + bot=bool(data.get("bot")), + ) + + +@dataclass +class Attachment: + id: str + type: Literal["unknown", "image", "gifv", "video", "audio"] + url: str + preview_url: str + remote_url: Optional[str] = None + preview_remote_url: Optional[str] = None + meta: Optional[dict] = None + description: Optional[str] = None + blurhash: Optional[str] = None + text_url: Optional[str] = None # XXX: DEPRECATED + + @classmethod + def from_dict(cls, data: dict) -> "Attachment": + return cls(**data) + + +@dataclass +class Application: + name: str + website: Optional[str] = None + vapid_key: Optional[str] = None + + @classmethod + def from_dict(cls, data: dict) -> "Application": + return cls(**data) + + +@dataclass +class Mention: + id: str + username: str + acct: str + url: str + + @classmethod + def from_dict(cls, data: dict) -> "Mention": + return cls(**data) + + +@dataclass +class Tag: + name: str + url: str + + @classmethod + def from_dict(cls, data: dict) -> "Tag": + return cls(**data) + + +@dataclass +class Status: + id: str + uri: str + created_at: datetime + account: Account + content: str + visibility: Literal["public", "unlisted", "private", "direct"] + sensitive: bool + spoiler_text: str + media_attachments: List[Attachment] + reblogs_count: int + favourites_count: int + replies_count: int + application: Optional[Application] = None + url: Optional[str] = None + in_reply_to_id: Optional[str] = None + in_reply_to_account_id: Optional[str] = None + reblog: Optional["Status"] = None + poll: Optional[dict] = None + card: Optional[dict] = None + language: Optional[str] = None + text: Optional[str] = None + + @classmethod + def from_dict(cls, data: dict) -> "Status": + return cls( + id=data["id"], + uri=data["uri"], + created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")), + account=Account.from_dict(data["account"]), + content=data["content"], + visibility=data["visibility"], + sensitive=data["sensitive"], + spoiler_text=data["spoiler_text"], + media_attachments=list( + map(Attachment.from_dict, data["media_attachments"]) + ), + application=( + Application.from_dict(data["application"]) + if data.get("application") is not None + else None + ), + reblogs_count=data["reblogs_count"], + favourites_count=data["favourites_count"], + replies_count=data["replies_count"], + url=data.get("url"), + in_reply_to_id=data.get("in_reply_to_id"), + in_reply_to_account_id=data.get("in_reply_to_account_id"), + reblog=( + Status.from_dict(data["reblog"]) + if data.get("reblog") is not None + else None + ), + poll=data.get("poll"), + card=data.get("card"), + language=data.get("language"), + text=data.get("text"), + ) + + @property + def link(self) -> str: + return self.account.url + "/" + str(self.id)