diff --git a/.gitignore b/.gitignore index 6f9578f..7e7acb0 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ config-*.ini venv # :3 -tmp.py +tmp*.py +test-data diff --git a/config.ini b/config.ini index df70619..279b0e4 100644 --- a/config.ini +++ b/config.ini @@ -1,66 +1,134 @@ [main] -; This is a list of output modules. Each module should be defined in section, -; named "module/MODULENAME". Space-separated list of strings. +# This is a list of output modules. Each module should be defined in section, +# named "module/MODULENAME". Space-separated list of strings. modules = telegram -; Mastodon instance to grab posts from +# Mastodon instance to grab posts from instance = mastodon.example.org -; Mastodon user token. -; Required permissions: read:statuses read:lists -; You can get your token by creating application in -; ${instance}/settings/applications +# Mastodon user token. +# Required permissions: read:statuses read:lists +# You can get your token by creating application in +# ${instance}/settings/applications token = blahblah -; Mastodon user ID. Used to filter out posts. Unfortunately, I can't find a way -; to get it using token itself. GARGROOOOOOON!!!!! -; Anyways, you could navigate to your profile ${instance}/@${username} and -; look for your profile picture link. For example, for me it's -; https://mastodon.astrr.ru/system/accounts/avatars/107/914/495/779/447/227/original/9651ac2f47cb2993.jpg -; that part between "avarars" and "original" is the user ID. Grab it, remove -; all of the slashes and you should be left with, for example, this: +# Mastodon user ID. Used to filter out posts. Unfortunately, I can't find a way +# to get it using token itself. GARGROOOOOOON!!!!! +# Anyways, you could navigate to your profile ${instance}/@${username} and +# look for your profile picture link. For example, for me it's +# https://mastodon.astrr.ru/system/accounts/avatars/107/914/495/779/447/227/original/9651ac2f47cb2993.jpg +# that part between "avarars" and "original" is the user ID. Grab it, remove +# all of the slashes and you should be left with, for example, this: user = 107914495779447227 -; Mastodon user list ID. AGAIN, UNFORTUNATELY, there is no way to reliably use -; streaming API to get all of your posts. Using home timeline is unreliable and -; does not always include boosts, same with public:local -; So, create a list, add yourself here, and put its ID here (it should be in -; address bar while you have that list open) +# Mastodon user list ID. AGAIN, UNFORTUNATELY, there is no way to reliably use +# streaming API to get all of your posts. Using home timeline is unreliable and +# does not always include boosts, same with public:local +# So, create a list, add yourself here, and put its ID here (it should be in +# address bar while you have that list open) list = 1 -; Should we automatically reconnect to the streaming socket? -; That option exists because it's not really a big deal when crossposter runs -; as a service and restarts automatically by the service manager. +# Should we automatically reconnect to the streaming socket? +# That option exists because it's not really a big deal when crossposter runs +# as a service and restarts automatically by the service manager. auto-reconnect = yes -; Example Telegram integration. You can use it as a template +# Example Telegram integration. You can use it as a template [module/telegram] - -; For Telegram it should be "telegram". Obviously type = telegram -; Telegram Bot API token. There's plenty of guides how to obtain one. -; https://core.telegram.org/bots#3-how-do-i-create-a-bot +# Telegram Bot API token. There's plenty of guides how to obtain one. +# https://core.telegram.org/bots#3-how-do-i-create-a-bot token = 12345:blahblah -; Telegram channel/chat ID or name. Also can be just a regular user. -; You can use @showjsonbot to obtain your channel ID, or just use its -; username, if it is public +# Telegram channel/chat ID or name. Also can be just a regular user. +# You can use @showjsonbot to obtain your channel ID, or just use its +# username, if it is public chat = @username -; Should we show link to post as a link after post content? +# Should we show link to post as a link after post content? show-post-link = yes -; Should we show link to original author before post content? +# Should we show link to original author before post content? show-boost-from = yes -; Should we make posts silent? -; https://core.telegram.org/bots/api#sendmessage `disable_notification` +# Should we make posts silent? +# https://core.telegram.org/bots/api#sendmessage `disable_notification` silent = true -; Discord integration +# Discord integration [module/discord] type = discord -; Webhook URL with the `?wait=true` +# Webhook URL with the `?wait=true` webhook = url + + +;# Boost filter. Only boosts will be matched by that one +;[filter/boost] +;type = boost +;# List of sources. If empty, boost from any account will be allowed +;list = @MaidsBot@* + +;# Mention filter. If anyone from that list is mentioned in the post, +;# it will be triggered. Useful in negation mode to ignore some people +;[filter/mention] +;type = mention +;# Space-separated list of mentions. +;# @[name] means specific local user +;# @[name]@[instance] means specific remote user +;# @[name]@* means specific user on any remote instance +;# @*@[instance] means any remote user on specific instance +;# @*@* means any remote user +;# @* __should__ mean any local user, but we're using `glob` to test for it and +;# it just means "any user" for now. This will be changed to more consistent +;# behavior +;list = @name @name@instance @*@instance @name@* @*@* + +;# Media filter. Only posts with some specific media content are triggered +;[filter/media] +;type = media +;# space-separated list of media types to be checked +;valid-media = image video gifv audio unknown +;# mode of the filter itself +;# "include" means "there should be at least one media of any type listed" +;# "exclude" means "there shouldn't be anything from that list" +;# "only" allows only media from the list to be sent +;mode = include + +;# Text content filter +;[filter/content] +;type = content +;# Mode of the filter. +;# "regexp" requires "regexp" property and should contain... A RegExp +;# "hashtag" should contain space-separated list of tags +;mode = regexp +;# Regular expression pattern to be matched +;regexp = ^x-no-repost +;# List of tags +; tags = maids artspam + +;# Spoiler text filter +;# Will be matched if spoiler matches some regexp +;# (use ^.+$ to check for any spoiler) +;[filter/spoiler] +;type = spoiler +;regexp = ^CW: + +;# Visibility filter. +;# Only posts with specific visibility will be matched +;[filter/visibility] +;type = visibility +;# Space-separated list of visibilities +;# NOTE: `direct` visibility is always ignored even before filters are ran +;options = public + +;# Combined filter +;# Basically a way to combine multiple filters using some operation +;[filter/combined] +;type = combined +;# List of filters inside of itself +;filters = spoiler boost +;# Operator to be used here +;# Options: "all", "any" or "single" +;operator = any diff --git a/mastoposter/__init__.py b/mastoposter/__init__.py index f6d3f8e..a096cb5 100644 --- a/mastoposter/__init__.py +++ b/mastoposter/__init__.py @@ -1,33 +1,54 @@ from asyncio import gather from configparser import ConfigParser -from typing import List, Optional +from typing import Dict, List, Optional +from mastoposter.filters import run_filters +from mastoposter.filters.base import BaseFilter, FilterInstance -from mastoposter.integrations.base import BaseIntegration -from mastoposter.integrations import DiscordIntegration, TelegramIntegration +from mastoposter.integrations import ( + DiscordIntegration, + FilteredIntegration, + TelegramIntegration, +) from mastoposter.types import Status -def load_integrations_from(config: ConfigParser) -> List[BaseIntegration]: - modules: List[BaseIntegration] = [] +def load_integrations_from(config: ConfigParser) -> List[FilteredIntegration]: + modules: List[FilteredIntegration] = [] for module_name in config.get("main", "modules").split(): - module = config[f"module/{module_name}"] - if module["type"] == "telegram": + mod = config[f"module/{module_name}"] + + filters: Dict[str, FilterInstance] = {} + for filter_name in mod.get("filters", "").split(): + filter_basename = filter_name.lstrip("~!") + + filters[filter_basename] = BaseFilter.new_instance( + filter_name, config[f"filter/{filter_basename}"] + ) + + for finst in list(filters.values()): + finst.filter.post_init(filters, config) + + if mod["type"] == "telegram": modules.append( - TelegramIntegration( - token=module["token"], - chat_id=module["chat"], - show_post_link=module.getboolean("show_post_link", fallback=True), - show_boost_from=module.getboolean("show_boost_from", fallback=True), + FilteredIntegration( + TelegramIntegration(mod), list(filters.values()) + ) + ) + elif mod["type"] == "discord": + modules.append( + FilteredIntegration( + DiscordIntegration(mod), list(filters.values()) ) ) - elif module["type"] == "discord": - modules.append(DiscordIntegration(webhook=module["webhook"])) else: - raise ValueError("Invalid module type %r" % module["type"]) + raise ValueError("Invalid module type %r" % mod["type"]) return modules async def execute_integrations( - status: Status, sinks: List[BaseIntegration] + status: Status, sinks: List[FilteredIntegration] ) -> List[Optional[str]]: - return await gather(*[sink.post(status) for sink in sinks], return_exceptions=True) + return await gather( + *[sink[0](status) for sink in sinks if run_filters(sink[1], status)], + return_exceptions=True, + ) diff --git a/mastoposter/__main__.py b/mastoposter/__main__.py index 4174dce..15eb5cb 100644 --- a/mastoposter/__main__.py +++ b/mastoposter/__main__.py @@ -2,15 +2,15 @@ from asyncio import run from configparser import ConfigParser from mastoposter import execute_integrations, load_integrations_from +from mastoposter.integrations import FilteredIntegration from mastoposter.sources import websocket_source from typing import AsyncGenerator, Callable, List -from mastoposter.integrations.base import BaseIntegration from mastoposter.types import Status async def listen( source: Callable[..., AsyncGenerator[Status, None]], - drains: List[BaseIntegration], + drains: List[FilteredIntegration], user: str, /, **kwargs, @@ -48,7 +48,7 @@ def main(config_path: str): for k in _remove: del conf[section][k] - modules = load_integrations_from(conf) + modules: List[FilteredIntegration] = load_integrations_from(conf) url = "wss://{}/api/v1/streaming".format(conf["main"]["instance"]) run( @@ -57,7 +57,7 @@ def main(config_path: str): modules, conf["main"]["user"], url=url, - reconnect=conf["main"].getboolean("auto_reconnect", fallback=False), + reconnect=conf["main"].getboolean("auto_reconnect", False), list=conf["main"]["list"], access_token=conf["main"]["token"], ) diff --git a/mastoposter/filters/__init__.py b/mastoposter/filters/__init__.py new file mode 100644 index 0000000..45d14d0 --- /dev/null +++ b/mastoposter/filters/__init__.py @@ -0,0 +1,17 @@ +from typing import List + +from mastoposter.types import Status +from .base import FilterInstance +from mastoposter.filters.boost import BoostFilter # NOQA +from mastoposter.filters.combined import CombinedFilter # NOQA +from mastoposter.filters.mention import MentionFilter # NOQA +from mastoposter.filters.media import MediaFilter # NOQA +from mastoposter.filters.text import TextFilter # NOQA +from mastoposter.filters.spoiler import SpoilerFilter # NOQA +from mastoposter.filters.visibility import VisibilityFilter # NOQA + + +def run_filters(filters: List[FilterInstance], status: Status) -> bool: + if not filters: + return True + return all((fil.filter(status) ^ fil.inverse for fil in filters)) diff --git a/mastoposter/filters/base.py b/mastoposter/filters/base.py new file mode 100644 index 0000000..9513790 --- /dev/null +++ b/mastoposter/filters/base.py @@ -0,0 +1,61 @@ +from abc import ABC, abstractmethod +from configparser import ConfigParser, SectionProxy +from typing import ClassVar, Dict, NamedTuple, Type +from mastoposter.types import Status +from re import Pattern, compile as regexp + +UNUSED = lambda *_: None # NOQA + + +class FilterInstance(NamedTuple): + inverse: bool + filter: "BaseFilter" + + def __repr__(self): + if self.inverse: + return f"~{self.filter!r}" + return repr(self.filter) + + +class BaseFilter(ABC): + FILTER_REGISTRY: ClassVar[Dict[str, Type["BaseFilter"]]] = {} + FILTER_NAME_REGEX: ClassVar[Pattern] = regexp(r"^([a-z_]+)$") + + filter_name: ClassVar[str] = "_base" + + def __init__(self, section: SectionProxy): + UNUSED(section) + + def __init_subclass__(cls, filter_name: str, **kwargs): + super().__init_subclass__(**kwargs) + if not cls.FILTER_NAME_REGEX.match(filter_name): + raise ValueError(f"invalid {filter_name=!r}") + if filter_name in cls.FILTER_REGISTRY: + raise KeyError(f"{filter_name=!r} is already registered") + cls.FILTER_REGISTRY[filter_name] = cls + setattr(cls, "filter_name", filter_name) + + @abstractmethod + def __call__(self, status: Status) -> bool: + raise NotImplementedError + + def post_init( + self, filters: Dict[str, FilterInstance], config: ConfigParser + ): + UNUSED(filters, config) + + def __repr__(self): + return f"Filter:{self.filter_name}()" + + @classmethod + def load_filter(cls, name: str, section: SectionProxy) -> "BaseFilter": + if name not in cls.FILTER_REGISTRY: + raise KeyError(f"no filter with name {name!r} was found") + return cls.FILTER_REGISTRY[name](section) + + @classmethod + def new_instance(cls, name: str, section: SectionProxy) -> FilterInstance: + return FilterInstance( + inverse=name[:1] in "~!", + filter=cls.load_filter(section["type"], section), + ) diff --git a/mastoposter/filters/boost.py b/mastoposter/filters/boost.py new file mode 100644 index 0000000..687b60b --- /dev/null +++ b/mastoposter/filters/boost.py @@ -0,0 +1,31 @@ +from configparser import SectionProxy +from fnmatch import fnmatch +from mastoposter.filters.base import BaseFilter +from mastoposter.types import Status + + +class BoostFilter(BaseFilter, filter_name="boost"): + def __init__(self, section: SectionProxy): + super().__init__(section) + self.list = section.get("list", "").split() + + @classmethod + def check_account(cls, acct: str, mask: str) -> bool: + return fnmatch(acct, mask) + + def __call__(self, status: Status) -> bool: + if status.reblog is None: + return False + if not self.list: + return True + return any( + [ + self.check_account("@" + status.reblog.account.acct, mask) + for mask in self.list + ] + ) + + def __repr__(self): + if not self.list: + return "Filter:boost(any)" + return f"Filter:boost(from={self.list!r})" diff --git a/mastoposter/filters/combined.py b/mastoposter/filters/combined.py new file mode 100644 index 0000000..1fe32bd --- /dev/null +++ b/mastoposter/filters/combined.py @@ -0,0 +1,38 @@ +from configparser import ConfigParser, SectionProxy +from typing import Callable, ClassVar, Dict, List, Sequence +from mastoposter.filters.base import BaseFilter, FilterInstance +from mastoposter.types import Status + + +class CombinedFilter(BaseFilter, filter_name="combined"): + OPERATORS: ClassVar[Dict[str, Callable[[Sequence[bool]], bool]]] = { + "all": lambda d: all(d), + "any": lambda d: any(d), + "single": lambda d: sum(d) == 1, + } + + def __init__(self, section: SectionProxy): + self.filter_names = section.get("filters", "").split() + self.operator = self.OPERATORS[section.get("operator", "and")] + self._operator_name = section.get("operator", "and") + self.filters: List[FilterInstance] = [] + + def post_init( + self, filters: Dict[str, FilterInstance], config: ConfigParser + ): + super().post_init(filters, config) + self.filters = [ + self.new_instance(name, config["filter/" + name.lstrip("~!")]) + for name in self.filter_names + ] + + def __call__(self, post: Status) -> bool: + if self.OPERATORS[self._operator_name] is not self.operator: + self._operator_name = str(self.operator) + return self.operator([f[1](post) ^ f[0] for f in self.filters]) + + def __repr__(self): + return ( + f"Filter:combined(op={self._operator_name}, " + f"filters={self.filters!r})" + ) diff --git a/mastoposter/filters/media.py b/mastoposter/filters/media.py new file mode 100644 index 0000000..39c6a39 --- /dev/null +++ b/mastoposter/filters/media.py @@ -0,0 +1,35 @@ +from configparser import SectionProxy +from typing import Set +from mastoposter.filters.base import BaseFilter +from mastoposter.types import Status + + +class MediaFilter(BaseFilter, filter_name="media"): + def __init__(self, section: SectionProxy): + super().__init__(section) + self.valid_media: Set[str] = set(section.get("valid_media").split()) + self.mode = section.get("mode", "include") + if self.mode not in ("include", "exclude", "only"): + raise ValueError(f"{self.mode=} is not valid") + + def __call__(self, status: Status) -> bool: + if not status.media_attachments: + return False + + types: Set[str] = {a.type for a in status.media_attachments} + + if self.mode == "include": + return len(types & self.valid_media) > 0 + elif self.mode == "exclude": + return len(types & self.valid_media) == 0 + elif self.mode == "only": + return len((types ^ self.valid_media) & types) == 0 + raise ValueError(f"{self.mode=} is not valid") + + def __repr__(self): + return str.format( + "Filter:{name}(mode={mode}, media={media})", + name=self.filter_name, + mode=self.mode, + media=self.valid_media, + ) diff --git a/mastoposter/filters/mention.py b/mastoposter/filters/mention.py new file mode 100644 index 0000000..8ba70e4 --- /dev/null +++ b/mastoposter/filters/mention.py @@ -0,0 +1,36 @@ +from configparser import SectionProxy +from re import Pattern, compile as regexp +from typing import ClassVar +from fnmatch import fnmatch +from mastoposter.filters.base import BaseFilter +from mastoposter.types import Status + + +class MentionFilter(BaseFilter, filter_name="mention"): + MENTION_REGEX: ClassVar[Pattern] = regexp(r"@([^@]+)(@([^@]+))?") + + def __init__(self, section: SectionProxy): + super().__init__(section) + self.list = section.get("list", "").split() + + @classmethod + def check_account(cls, acct: str, mask: str) -> bool: + return fnmatch("@" + acct, mask) + + def __call__(self, status: Status) -> bool: + if not self.list and status.mentions: + return True + return any( + ( + any( + self.check_account(mention.acct, mask) + for mask in self.list + ) + for mention in status.mentions + ) + ) + + def __repr__(self): + return str.format( + "Filter:{name}({list!r})", name=self.filter_name, list=self.list + ) diff --git a/mastoposter/filters/spoiler.py b/mastoposter/filters/spoiler.py new file mode 100644 index 0000000..6e23fcd --- /dev/null +++ b/mastoposter/filters/spoiler.py @@ -0,0 +1,20 @@ +from configparser import SectionProxy +from re import Pattern, compile as regexp +from mastoposter.filters.base import BaseFilter +from mastoposter.types import Status + + +class SpoilerFilter(BaseFilter, filter_name="spoiler"): + def __init__(self, section: SectionProxy): + super().__init__(section) + self.regexp: Pattern = regexp(section.get("regexp", "^.*$")) + + def __call__(self, status: Status) -> bool: + return self.regexp.match(status.spoiler_text) is not None + + def __repr__(self): + return str.format( + "Filter:{name}({regex!r})", + name=self.filter_name, + regex=self.regexp.pattern, + ) diff --git a/mastoposter/filters/text.py b/mastoposter/filters/text.py new file mode 100644 index 0000000..eb7738f --- /dev/null +++ b/mastoposter/filters/text.py @@ -0,0 +1,65 @@ +from configparser import SectionProxy +from re import Pattern, compile as regexp +from typing import Optional, Set + +from bs4 import BeautifulSoup, PageElement, Tag +from mastoposter.filters.base import BaseFilter +from mastoposter.types import Status + + +class TextFilter(BaseFilter, filter_name="content"): + def __init__(self, section: SectionProxy): + super().__init__(section) + self.mode = section["mode"] + self.tags: Set[str] = set() + self.regexp: Optional[Pattern] = None + + if self.mode == "regexp": + self.regexp = regexp(section["regexp"]) + elif self.mode in ("hashtag", "tag"): + self.tags = set(map(str.lower, section["tags"].split())) + else: + raise ValueError(f"Invalid filter mode {self.mode}") + + @classmethod + def node_to_text(cls, el: PageElement) -> str: + if isinstance(el, Tag): + if el.name == "br": + return "\n" + elif el.name == "p": + return ( + str.join("", map(cls.node_to_text, el.children)) + "\n\n" + ) + return str.join("", map(cls.node_to_text, el.children)) + return str(el) + + @classmethod + def html_to_plain(cls, html: str) -> str: + soup = BeautifulSoup(html, "lxml") + return cls.node_to_text(soup).rstrip() + + def __call__(self, status: Status) -> bool: + source = status.reblog or status + if self.regexp is not None: + return ( + self.regexp.match(self.html_to_plain(source.content)) + is not None + ) + elif self.tags: + return len(self.tags & {t.name.lower() for t in source.tags}) > 0 + else: + raise ValueError("Neither regexp or tags were set. Why?") + + def __repr__(self): + if self.regexp is not None: + return str.format( + "Filter:{name}(regexp={regex!r})", + name=self.filter_name, + regex=self.regexp.pattern, + ) + elif self.tags: + return str.format( + "Filter:{name}(tags={tags!r})", + name=self.filter_name, + tags=self.tags, + ) diff --git a/mastoposter/filters/visibility.py b/mastoposter/filters/visibility.py new file mode 100644 index 0000000..70fda56 --- /dev/null +++ b/mastoposter/filters/visibility.py @@ -0,0 +1,15 @@ +from configparser import SectionProxy +from mastoposter.filters.base import BaseFilter +from mastoposter.types import Status + + +class VisibilityFilter(BaseFilter, filter_name="visibility"): + def __init__(self, section: SectionProxy): + super().__init__(section) + self.options = set(section["options"].split()) + + def __call__(self, status: Status) -> bool: + return status.visibility in self.options + + def __repr__(self): + return str.format("Filter:{}({})", self.filter_name, self.options) diff --git a/mastoposter/integrations/__init__.py b/mastoposter/integrations/__init__.py index 0c5a5ea..37294b4 100644 --- a/mastoposter/integrations/__init__.py +++ b/mastoposter/integrations/__init__.py @@ -1,2 +1,11 @@ -from .telegram import TelegramIntegration -from .discord import DiscordIntegration +from typing import List, NamedTuple +from mastoposter.filters.base import FilterInstance + +from mastoposter.integrations.base import BaseIntegration +from .telegram import TelegramIntegration # NOQA +from .discord import DiscordIntegration # NOQA + + +class FilteredIntegration(NamedTuple): + sink: BaseIntegration + filters: List[FilterInstance] diff --git a/mastoposter/integrations/base.py b/mastoposter/integrations/base.py index e53fd4a..1b6765c 100644 --- a/mastoposter/integrations/base.py +++ b/mastoposter/integrations/base.py @@ -1,13 +1,14 @@ from abc import ABC, abstractmethod +from configparser import SectionProxy from typing import Optional from mastoposter.types import Status class BaseIntegration(ABC): - def __init__(self): + def __init__(self, section: SectionProxy): pass @abstractmethod - async def post(self, status: Status) -> Optional[str]: - raise NotImplemented + async def __call__(self, status: Status) -> Optional[str]: + raise NotImplementedError diff --git a/mastoposter/integrations/discord/__init__.py b/mastoposter/integrations/discord/__init__.py index 4fe9228..03b990a 100644 --- a/mastoposter/integrations/discord/__init__.py +++ b/mastoposter/integrations/discord/__init__.py @@ -1,3 +1,4 @@ +from configparser import SectionProxy from typing import List, Optional from bs4 import BeautifulSoup, PageElement, Tag from httpx import AsyncClient @@ -12,8 +13,8 @@ from mastoposter.types import Status class DiscordIntegration(BaseIntegration): - def __init__(self, webhook: str): - self.webhook = webhook + def __init__(self, section: SectionProxy): + self.webhook = section.get("webhook", "") @staticmethod def md_escape(text: str) -> str: @@ -33,11 +34,15 @@ class DiscordIntegration(BaseIntegration): if isinstance(el, Tag): if el.name == "a": return "[%s](%s)" % ( - cls.md_escape(str.join("", map(cls.node_to_text, el.children))), + cls.md_escape( + str.join("", map(cls.node_to_text, el.children)) + ), el.attrs["href"], ) elif el.name == "p": - return str.join("", map(cls.node_to_text, el.children)) + "\n\n" + return ( + str.join("", map(cls.node_to_text, el.children)) + "\n\n" + ) elif el.name == "br": return "\n" return str.join("", map(cls.node_to_text, el.children)) @@ -66,16 +71,20 @@ class DiscordIntegration(BaseIntegration): ) ).json() - async def post(self, status: Status) -> Optional[str]: + async def __call__(self, status: Status) -> Optional[str]: source = status.reblog or status embeds: List[DiscordEmbed] = [] - text = self.node_to_text(BeautifulSoup(source.content, features="lxml")) + text = self.node_to_text( + BeautifulSoup(source.content, features="lxml") + ) if source.spoiler_text: text = f"{source.spoiler_text}\n||{text}||" if status.reblog is not None: - title = f"@{status.account.acct} boosted from @{source.account.acct}" + title = ( + f"@{status.account.acct} boosted from @{source.account.acct}" + ) else: title = f"@{status.account.acct} posted" diff --git a/mastoposter/integrations/discord/types.py b/mastoposter/integrations/discord/types.py index 0bb91a3..2452666 100644 --- a/mastoposter/integrations/discord/types.py +++ b/mastoposter/integrations/discord/types.py @@ -68,7 +68,9 @@ class DiscordEmbed: "title": self.title, "description": self.description, "url": self.url, - "timestamp": _f(datetime.isoformat, self.timestamp, "T", "seconds"), + "timestamp": _f( + datetime.isoformat, self.timestamp, "T", "seconds" + ), "color": self.color, "footer": _f(asdict, self.footer), "image": _f(asdict, self.image), diff --git a/mastoposter/integrations/telegram.py b/mastoposter/integrations/telegram.py index f53fc0f..cd952d4 100644 --- a/mastoposter/integrations/telegram.py +++ b/mastoposter/integrations/telegram.py @@ -1,6 +1,7 @@ +from configparser import SectionProxy from dataclasses import dataclass from html import escape -from typing import Any, List, Mapping, Optional, Union +from typing import Any, List, Mapping, Optional from bs4 import BeautifulSoup, Tag, PageElement from httpx import AsyncClient from mastoposter.integrations.base import BaseIntegration @@ -16,7 +17,12 @@ class TGResponse: @classmethod def from_dict(cls, data: dict, params: dict) -> "TGResponse": - return cls(data["ok"], params, data.get("result"), data.get("description")) + return cls( + ok=data["ok"], + params=params, + result=data.get("result"), + error=data.get("description"), + ) class TelegramIntegration(BaseIntegration): @@ -36,19 +42,12 @@ class TelegramIntegration(BaseIntegration): "unknown": "document", } - def __init__( - self, - token: str, - chat_id: Union[str, int], - show_post_link: bool = True, - show_boost_from: bool = True, - silent: bool = True, - ): - self.token = token - self.chat_id = chat_id - self.show_post_link = show_post_link - self.show_boost_from = show_boost_from - self.silent = silent + def __init__(self, sect: SectionProxy): + self.token = sect.get("token", "") + self.chat_id = sect.get("chat", "") + self.show_post_link = sect.getboolean("show_post_link", True) + self.show_boost_from = sect.getboolean("show_boost_from", True) + self.silent = sect.getboolean("silent", True) async def _tg_request(self, method: str, **kwargs) -> TGResponse: url = self.API_URL.format(self.token, method) @@ -82,7 +81,9 @@ class TelegramIntegration(BaseIntegration): **{self.MEDIA_MAPPING[media.type]: media.url}, ) - async def _post_mediagroup(self, text: str, media: List[Attachment]) -> TGResponse: + async def _post_mediagroup( + self, text: str, media: List[Attachment] + ) -> TGResponse: media_list: List[dict] = [] allowed_medias = {"image", "gifv", "video", "audio", "unknown"} for attachment in media: @@ -136,15 +137,19 @@ class TelegramIntegration(BaseIntegration): str.join("", map(cls.node_to_text, el.children)), ) elif el.name == "p": - return str.join("", map(cls.node_to_text, el.children)) + "\n\n" + return ( + str.join("", map(cls.node_to_text, el.children)) + "\n\n" + ) elif el.name == "br": return "\n" return str.join("", map(cls.node_to_text, el.children)) return escape(str(el)) - async def post(self, status: Status) -> Optional[str]: + async def __call__(self, status: Status) -> Optional[str]: source = status.reblog or status - text = self.node_to_text(BeautifulSoup(source.content, features="lxml")) + text = self.node_to_text( + BeautifulSoup(source.content, features="lxml") + ) text = text.rstrip() if source.spoiler_text: @@ -173,12 +178,16 @@ class TelegramIntegration(BaseIntegration): elif len(source.media_attachments) == 1: if ( - res := await self._post_media(text, source.media_attachments[0]) + res := await self._post_media( + text, source.media_attachments[0] + ) ).ok and res.result is not None: ids.append(res.result["message_id"]) else: if ( - res := await self._post_mediagroup(text, source.media_attachments) + res := await self._post_mediagroup( + text, source.media_attachments + ) ).ok and res.result is not None: ids.append(res.result["message_id"]) @@ -203,5 +212,5 @@ class TelegramIntegration(BaseIntegration): chat=self.chat_id, show_post_link=self.show_post_link, show_boost_from=self.show_boost_from, - silent=self.silent + silent=self.silent, ) diff --git a/mastoposter/sources.py b/mastoposter/sources.py index 24d2424..67e3edb 100644 --- a/mastoposter/sources.py +++ b/mastoposter/sources.py @@ -15,7 +15,7 @@ async def websocket_source( while True: try: async with connect(url) as ws: - while (msg := await ws.recv()) != None: + while (msg := await ws.recv()) is not None: event = loads(msg) if "error" in event: raise Exception(event["error"]) diff --git a/mastoposter/types.py b/mastoposter/types.py index fb0da46..ecffd2f 100644 --- a/mastoposter/types.py +++ b/mastoposter/types.py @@ -1,6 +1,25 @@ from dataclasses import dataclass, field from datetime import datetime -from typing import Optional, List, Literal +from typing import Any, Callable, Optional, List, Literal, TypeVar + + +def _date(val: str) -> datetime: + return datetime.fromisoformat(val.rstrip("Z")) + + +T = TypeVar("T") + + +def _fnil(fn: Callable[[Any], T], val: Optional[Any]) -> Optional[T]: + return None if val is None else fn(val) + + +def _date_or_none(val: Optional[str]) -> Optional[datetime]: + return _fnil(_date, val) + + +def _int_or_none(val: Optional[str]) -> Optional[int]: + return _fnil(int, val) @dataclass @@ -14,11 +33,7 @@ class Field: return cls( name=data["name"], value=data["value"], - verified_at=( - datetime.fromisoformat(data["verified_at"].rstrip("Z")) - if data.get("verified_at") is not None - else None - ), + verified_at=_date_or_none(data.get("verified_at")), ) @@ -74,17 +89,13 @@ class Account: header_static=data["header_static"], locked=data["locked"], emojis=list(map(Emoji.from_dict, data["emojis"])), - discoverable=data["discoverable"], - created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")), - last_status_at=datetime.fromisoformat(data["last_status_at"].rstrip("Z")), + discoverable=data.get("discoverable", False), + created_at=_date(data["created_at"]), + last_status_at=_date(data["last_status_at"]), statuses_count=data["statuses_count"], followers_count=data["followers_count"], following_count=data["following_count"], - moved=( - Account.from_dict(data["moved"]) - if data.get("moved") is not None - else None - ), + moved=_fnil(Account.from_dict, data.get("moved")), fields=list(map(Field.from_dict, data.get("fields", []))), bot=bool(data.get("bot")), ) @@ -228,19 +239,11 @@ class Poll: def from_dict(cls, data: dict) -> "Poll": return cls( id=data["id"], - expires_at=( - datetime.fromisoformat(data["expires_at"].rstrip("Z")) - if data.get("expires_at") is not None - else None - ), + expires_at=_date_or_none(data.get("expires_at")), expired=data["expired"], multiple=data["multiple"], votes_count=data["votes_count"], - voters_count=( - int(data["voters_count"]) - if data.get("voters_count") is not None - else None - ), + voters_count=_int_or_none(data.get("voters_count")), options=[cls.PollOption(**opt) for opt in data["options"]], ) @@ -259,6 +262,8 @@ class Status: reblogs_count: int favourites_count: int replies_count: int + mentions: List[Mention] + tags: List[Tag] application: Optional[Application] = None url: Optional[str] = None in_reply_to_id: Optional[str] = None @@ -274,7 +279,7 @@ class Status: return cls( id=data["id"], uri=data["uri"], - created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")), + created_at=_date(data["created_at"]), account=Account.from_dict(data["account"]), content=data["content"], visibility=data["visibility"], @@ -283,28 +288,20 @@ class Status: media_attachments=list( map(Attachment.from_dict, data["media_attachments"]) ), - application=( - Application.from_dict(data["application"]) - if data.get("application") is not None - else None - ), + application=_fnil(Application.from_dict, data.get("application")), reblogs_count=data["reblogs_count"], favourites_count=data["favourites_count"], replies_count=data["replies_count"], url=data.get("url"), in_reply_to_id=data.get("in_reply_to_id"), in_reply_to_account_id=data.get("in_reply_to_account_id"), - reblog=( - Status.from_dict(data["reblog"]) - if data.get("reblog") is not None - else None - ), - poll=( - Poll.from_dict(data["poll"]) if data.get("poll") is not None else None - ), + reblog=_fnil(Status.from_dict, data.get("reblog")), + poll=_fnil(Poll.from_dict, data.get("poll")), card=data.get("card"), language=data.get("language"), text=data.get("text"), + mentions=[Mention.from_dict(m) for m in data.get("mentions", [])], + tags=[Tag.from_dict(m) for m in data.get("tags", [])], ) @property