forked from hkc/mastoposter
The Beginning Of Filters
Also Flake8 cleanup and other stuff
This commit is contained in:
parent
6a9750b33b
commit
ae8a1ddf34
|
@ -10,24 +10,18 @@ from mastoposter.types import Status
|
|||
def load_integrations_from(config: ConfigParser) -> List[BaseIntegration]:
|
||||
modules: List[BaseIntegration] = []
|
||||
for module_name in config.get("main", "modules").split():
|
||||
module = config[f"module/{module_name}"]
|
||||
if module["type"] == "telegram":
|
||||
modules.append(
|
||||
TelegramIntegration(
|
||||
token=module["token"],
|
||||
chat_id=module["chat"],
|
||||
show_post_link=module.getboolean("show_post_link", fallback=True),
|
||||
show_boost_from=module.getboolean("show_boost_from", fallback=True),
|
||||
)
|
||||
)
|
||||
elif module["type"] == "discord":
|
||||
modules.append(DiscordIntegration(webhook=module["webhook"]))
|
||||
mod = config[f"module/{module_name}"]
|
||||
if mod["type"] == "telegram":
|
||||
modules.append(TelegramIntegration(mod))
|
||||
elif mod["type"] == "discord":
|
||||
modules.append(DiscordIntegration(mod))
|
||||
else:
|
||||
raise ValueError("Invalid module type %r" % module["type"])
|
||||
raise ValueError("Invalid module type %r" % mod["type"])
|
||||
return modules
|
||||
|
||||
|
||||
async def execute_integrations(
|
||||
status: Status, sinks: List[BaseIntegration]
|
||||
) -> List[Optional[str]]:
|
||||
return await gather(*[sink.post(status) for sink in sinks], return_exceptions=True)
|
||||
coros = [sink.post(status) for sink in sinks]
|
||||
return await gather(*coros, return_exceptions=True)
|
||||
|
|
|
@ -57,7 +57,7 @@ def main(config_path: str):
|
|||
modules,
|
||||
conf["main"]["user"],
|
||||
url=url,
|
||||
reconnect=conf["main"].getboolean("auto_reconnect", fallback=False),
|
||||
reconnect=conf["main"].getboolean("auto_reconnect", False),
|
||||
list=conf["main"]["list"],
|
||||
access_token=conf["main"]["token"],
|
||||
)
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
from .base import BaseFilter # NOQA
|
||||
from mastoposter.filters.boost_filter import BoostFilter # NOQA
|
|
@ -0,0 +1,24 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import ClassVar, Dict, Type
|
||||
from mastoposter.types import Status
|
||||
from re import Pattern, compile as regexp
|
||||
|
||||
|
||||
class BaseFilter(ABC):
|
||||
FILTER_REGISTRY: ClassVar[Dict[str, Type["BaseFilter"]]] = {}
|
||||
FILTER_NAME_REGEX: Pattern = regexp(r"^([a-z_]+)$")
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def __call__(self, status: Status) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def __init_subclass__(cls, filter_name: str, **kwargs):
|
||||
super().__init_subclass__(**kwargs)
|
||||
if not cls.FILTER_NAME_REGEX.match(filter_name):
|
||||
raise ValueError(f"invalid {filter_name=!r}")
|
||||
if filter_name in cls.FILTER_REGISTRY:
|
||||
raise KeyError(f"{filter_name=!r} is already registered")
|
||||
cls.FILTER_REGISTRY[filter_name] = cls
|
|
@ -0,0 +1,7 @@
|
|||
from mastoposter.filters.base import BaseFilter
|
||||
from mastoposter.types import Status
|
||||
|
||||
|
||||
class BoostFilter(BaseFilter, filter_name="boost"):
|
||||
def __call__(self, status: Status) -> bool:
|
||||
return status.reblog is not None
|
|
@ -1,2 +1,2 @@
|
|||
from .telegram import TelegramIntegration
|
||||
from .discord import DiscordIntegration
|
||||
from .telegram import TelegramIntegration # NOQA
|
||||
from .discord import DiscordIntegration # NOQA
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from configparser import SectionProxy
|
||||
from typing import Optional
|
||||
|
||||
from mastoposter.types import Status
|
||||
|
||||
|
||||
class BaseIntegration(ABC):
|
||||
def __init__(self):
|
||||
def __init__(self, section: SectionProxy):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def post(self, status: Status) -> Optional[str]:
|
||||
raise NotImplemented
|
||||
raise NotImplementedError
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from configparser import SectionProxy
|
||||
from typing import List, Optional
|
||||
from bs4 import BeautifulSoup, PageElement, Tag
|
||||
from httpx import AsyncClient
|
||||
|
@ -12,8 +13,8 @@ from mastoposter.types import Status
|
|||
|
||||
|
||||
class DiscordIntegration(BaseIntegration):
|
||||
def __init__(self, webhook: str):
|
||||
self.webhook = webhook
|
||||
def __init__(self, section: SectionProxy):
|
||||
self.webhook = section.get("webhook", "")
|
||||
|
||||
@staticmethod
|
||||
def md_escape(text: str) -> str:
|
||||
|
@ -33,11 +34,15 @@ class DiscordIntegration(BaseIntegration):
|
|||
if isinstance(el, Tag):
|
||||
if el.name == "a":
|
||||
return "[%s](%s)" % (
|
||||
cls.md_escape(str.join("", map(cls.node_to_text, el.children))),
|
||||
cls.md_escape(
|
||||
str.join("", map(cls.node_to_text, el.children))
|
||||
),
|
||||
el.attrs["href"],
|
||||
)
|
||||
elif el.name == "p":
|
||||
return str.join("", map(cls.node_to_text, el.children)) + "\n\n"
|
||||
return (
|
||||
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
|
||||
)
|
||||
elif el.name == "br":
|
||||
return "\n"
|
||||
return str.join("", map(cls.node_to_text, el.children))
|
||||
|
@ -70,12 +75,16 @@ class DiscordIntegration(BaseIntegration):
|
|||
source = status.reblog or status
|
||||
embeds: List[DiscordEmbed] = []
|
||||
|
||||
text = self.node_to_text(BeautifulSoup(source.content, features="lxml"))
|
||||
text = self.node_to_text(
|
||||
BeautifulSoup(source.content, features="lxml")
|
||||
)
|
||||
if source.spoiler_text:
|
||||
text = f"{source.spoiler_text}\n||{text}||"
|
||||
|
||||
if status.reblog is not None:
|
||||
title = f"@{status.account.acct} boosted from @{source.account.acct}"
|
||||
title = (
|
||||
f"@{status.account.acct} boosted from @{source.account.acct}"
|
||||
)
|
||||
else:
|
||||
title = f"@{status.account.acct} posted"
|
||||
|
||||
|
|
|
@ -68,7 +68,9 @@ class DiscordEmbed:
|
|||
"title": self.title,
|
||||
"description": self.description,
|
||||
"url": self.url,
|
||||
"timestamp": _f(datetime.isoformat, self.timestamp, "T", "seconds"),
|
||||
"timestamp": _f(
|
||||
datetime.isoformat, self.timestamp, "T", "seconds"
|
||||
),
|
||||
"color": self.color,
|
||||
"footer": _f(asdict, self.footer),
|
||||
"image": _f(asdict, self.image),
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
from configparser import SectionProxy
|
||||
from dataclasses import dataclass
|
||||
from html import escape
|
||||
from typing import Any, List, Mapping, Optional, Union
|
||||
from typing import Any, List, Mapping, Optional
|
||||
from bs4 import BeautifulSoup, Tag, PageElement
|
||||
from httpx import AsyncClient
|
||||
from mastoposter.integrations.base import BaseIntegration
|
||||
|
@ -16,7 +17,12 @@ class TGResponse:
|
|||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict, params: dict) -> "TGResponse":
|
||||
return cls(data["ok"], params, data.get("result"), data.get("description"))
|
||||
return cls(
|
||||
ok=data["ok"],
|
||||
params=params,
|
||||
result=data.get("result"),
|
||||
error=data.get("description"),
|
||||
)
|
||||
|
||||
|
||||
class TelegramIntegration(BaseIntegration):
|
||||
|
@ -36,19 +42,12 @@ class TelegramIntegration(BaseIntegration):
|
|||
"unknown": "document",
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
token: str,
|
||||
chat_id: Union[str, int],
|
||||
show_post_link: bool = True,
|
||||
show_boost_from: bool = True,
|
||||
silent: bool = True,
|
||||
):
|
||||
self.token = token
|
||||
self.chat_id = chat_id
|
||||
self.show_post_link = show_post_link
|
||||
self.show_boost_from = show_boost_from
|
||||
self.silent = silent
|
||||
def __init__(self, sect: SectionProxy):
|
||||
self.token = sect.get("token", "")
|
||||
self.chat_id = sect.get("chat", "")
|
||||
self.show_post_link = sect.getboolean("show_post_link", True)
|
||||
self.show_boost_from = sect.getboolean("show_boost_from", True)
|
||||
self.silent = sect.getboolean("silent", True)
|
||||
|
||||
async def _tg_request(self, method: str, **kwargs) -> TGResponse:
|
||||
url = self.API_URL.format(self.token, method)
|
||||
|
@ -82,7 +81,9 @@ class TelegramIntegration(BaseIntegration):
|
|||
**{self.MEDIA_MAPPING[media.type]: media.url},
|
||||
)
|
||||
|
||||
async def _post_mediagroup(self, text: str, media: List[Attachment]) -> TGResponse:
|
||||
async def _post_mediagroup(
|
||||
self, text: str, media: List[Attachment]
|
||||
) -> TGResponse:
|
||||
media_list: List[dict] = []
|
||||
allowed_medias = {"image", "gifv", "video", "audio", "unknown"}
|
||||
for attachment in media:
|
||||
|
@ -136,7 +137,9 @@ class TelegramIntegration(BaseIntegration):
|
|||
str.join("", map(cls.node_to_text, el.children)),
|
||||
)
|
||||
elif el.name == "p":
|
||||
return str.join("", map(cls.node_to_text, el.children)) + "\n\n"
|
||||
return (
|
||||
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
|
||||
)
|
||||
elif el.name == "br":
|
||||
return "\n"
|
||||
return str.join("", map(cls.node_to_text, el.children))
|
||||
|
@ -144,7 +147,9 @@ class TelegramIntegration(BaseIntegration):
|
|||
|
||||
async def post(self, status: Status) -> Optional[str]:
|
||||
source = status.reblog or status
|
||||
text = self.node_to_text(BeautifulSoup(source.content, features="lxml"))
|
||||
text = self.node_to_text(
|
||||
BeautifulSoup(source.content, features="lxml")
|
||||
)
|
||||
text = text.rstrip()
|
||||
|
||||
if source.spoiler_text:
|
||||
|
@ -173,12 +178,16 @@ class TelegramIntegration(BaseIntegration):
|
|||
|
||||
elif len(source.media_attachments) == 1:
|
||||
if (
|
||||
res := await self._post_media(text, source.media_attachments[0])
|
||||
res := await self._post_media(
|
||||
text, source.media_attachments[0]
|
||||
)
|
||||
).ok and res.result is not None:
|
||||
ids.append(res.result["message_id"])
|
||||
else:
|
||||
if (
|
||||
res := await self._post_mediagroup(text, source.media_attachments)
|
||||
res := await self._post_mediagroup(
|
||||
text, source.media_attachments
|
||||
)
|
||||
).ok and res.result is not None:
|
||||
ids.append(res.result["message_id"])
|
||||
|
||||
|
@ -203,5 +212,5 @@ class TelegramIntegration(BaseIntegration):
|
|||
chat=self.chat_id,
|
||||
show_post_link=self.show_post_link,
|
||||
show_boost_from=self.show_boost_from,
|
||||
silent=self.silent
|
||||
silent=self.silent,
|
||||
)
|
||||
|
|
|
@ -15,7 +15,7 @@ async def websocket_source(
|
|||
while True:
|
||||
try:
|
||||
async with connect(url) as ws:
|
||||
while (msg := await ws.recv()) != None:
|
||||
while (msg := await ws.recv()) is not None:
|
||||
event = loads(msg)
|
||||
if "error" in event:
|
||||
raise Exception(event["error"])
|
||||
|
|
|
@ -1,6 +1,25 @@
|
|||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Optional, List, Literal
|
||||
from typing import Any, Callable, Optional, List, Literal, TypeVar
|
||||
|
||||
|
||||
def _date(val: str) -> datetime:
|
||||
return datetime.fromisoformat(val.rstrip("Z"))
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def _fnil(fn: Callable[[Any], T], val: Optional[Any]) -> Optional[T]:
|
||||
return None if val is None else fn(val)
|
||||
|
||||
|
||||
def _date_or_none(val: Optional[str]) -> Optional[datetime]:
|
||||
return _fnil(_date, val)
|
||||
|
||||
|
||||
def _int_or_none(val: Optional[str]) -> Optional[int]:
|
||||
return _fnil(int, val)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -14,11 +33,7 @@ class Field:
|
|||
return cls(
|
||||
name=data["name"],
|
||||
value=data["value"],
|
||||
verified_at=(
|
||||
datetime.fromisoformat(data["verified_at"].rstrip("Z"))
|
||||
if data.get("verified_at") is not None
|
||||
else None
|
||||
),
|
||||
verified_at=_date_or_none(data.get("verified_at")),
|
||||
)
|
||||
|
||||
|
||||
|
@ -75,16 +90,12 @@ class Account:
|
|||
locked=data["locked"],
|
||||
emojis=list(map(Emoji.from_dict, data["emojis"])),
|
||||
discoverable=data["discoverable"],
|
||||
created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")),
|
||||
last_status_at=datetime.fromisoformat(data["last_status_at"].rstrip("Z")),
|
||||
created_at=_date(data["created_at"]),
|
||||
last_status_at=_date(data["last_status_at"]),
|
||||
statuses_count=data["statuses_count"],
|
||||
followers_count=data["followers_count"],
|
||||
following_count=data["following_count"],
|
||||
moved=(
|
||||
Account.from_dict(data["moved"])
|
||||
if data.get("moved") is not None
|
||||
else None
|
||||
),
|
||||
moved=_fnil(Account.from_dict, data.get("moved")),
|
||||
fields=list(map(Field.from_dict, data.get("fields", []))),
|
||||
bot=bool(data.get("bot")),
|
||||
)
|
||||
|
@ -228,19 +239,11 @@ class Poll:
|
|||
def from_dict(cls, data: dict) -> "Poll":
|
||||
return cls(
|
||||
id=data["id"],
|
||||
expires_at=(
|
||||
datetime.fromisoformat(data["expires_at"].rstrip("Z"))
|
||||
if data.get("expires_at") is not None
|
||||
else None
|
||||
),
|
||||
expires_at=_date_or_none(data.get("expires_at")),
|
||||
expired=data["expired"],
|
||||
multiple=data["multiple"],
|
||||
votes_count=data["votes_count"],
|
||||
voters_count=(
|
||||
int(data["voters_count"])
|
||||
if data.get("voters_count") is not None
|
||||
else None
|
||||
),
|
||||
voters_count=_int_or_none(data.get("voters_count")),
|
||||
options=[cls.PollOption(**opt) for opt in data["options"]],
|
||||
)
|
||||
|
||||
|
@ -274,7 +277,7 @@ class Status:
|
|||
return cls(
|
||||
id=data["id"],
|
||||
uri=data["uri"],
|
||||
created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")),
|
||||
created_at=_date(data["created_at"]),
|
||||
account=Account.from_dict(data["account"]),
|
||||
content=data["content"],
|
||||
visibility=data["visibility"],
|
||||
|
@ -283,25 +286,15 @@ class Status:
|
|||
media_attachments=list(
|
||||
map(Attachment.from_dict, data["media_attachments"])
|
||||
),
|
||||
application=(
|
||||
Application.from_dict(data["application"])
|
||||
if data.get("application") is not None
|
||||
else None
|
||||
),
|
||||
application=_fnil(Application.from_dict, data.get("application")),
|
||||
reblogs_count=data["reblogs_count"],
|
||||
favourites_count=data["favourites_count"],
|
||||
replies_count=data["replies_count"],
|
||||
url=data.get("url"),
|
||||
in_reply_to_id=data.get("in_reply_to_id"),
|
||||
in_reply_to_account_id=data.get("in_reply_to_account_id"),
|
||||
reblog=(
|
||||
Status.from_dict(data["reblog"])
|
||||
if data.get("reblog") is not None
|
||||
else None
|
||||
),
|
||||
poll=(
|
||||
Poll.from_dict(data["poll"]) if data.get("poll") is not None else None
|
||||
),
|
||||
reblog=_fnil(Status.from_dict, data.get("reblog")),
|
||||
poll=_fnil(Poll.from_dict, data.get("poll")),
|
||||
card=data.get("card"),
|
||||
language=data.get("language"),
|
||||
text=data.get("text"),
|
||||
|
|
Loading…
Reference in New Issue