1
0
Fork 0

Added Discord integration

Check `config.ini` for more details.

Also now `private` posts are not forwarded. That may be changed with
filters (coming sometime soon).
This commit is contained in:
Casey 2022-08-26 02:03:06 +03:00
parent 35903c6cb4
commit 60f18c8d22
Signed by: hkc
GPG Key ID: F0F6CFE11CDB0960
6 changed files with 296 additions and 11 deletions

View File

@ -49,7 +49,9 @@ show-post-link = yes
; Should we show link to original author before post content? ; Should we show link to original author before post content?
show-boost-from = yes show-boost-from = yes
# TODO: add discord functionality ; Discord integration
[module/discord] [module/discord]
type = discord type = discord
; Webhook URL with the `?wait=true`
webhook = url webhook = url

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from asyncio import run from asyncio import run
from configparser import ConfigParser from configparser import ConfigParser
from mastoposter.integrations.discord import DiscordIntegration
from mastoposter.integrations.telegram import TelegramIntegration from mastoposter.integrations.telegram import TelegramIntegration
from mastoposter.sources import websocket_source from mastoposter.sources import websocket_source
from typing import AsyncGenerator, Callable, List from typing import Any, AsyncGenerator, Callable, Dict, List
from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Status from mastoposter.types import Status
@ -19,14 +20,18 @@ async def listen(
async for status in source(**kwargs): async for status in source(**kwargs):
if status.account.id != user: if status.account.id != user:
continue continue
print(status)
if status.visibility == "direct": # TODO: add option/filter to handle that
if status.visibility in ("direct", "private"):
continue continue
# TODO: find a better way to handle threads
if ( if (
status.in_reply_to_account_id is not None status.in_reply_to_account_id is not None
and status.in_reply_to_account_id != user and status.in_reply_to_account_id != user
): ):
continue continue
for drain in drains: for drain in drains:
await drain.post(status) await drain.post(status)
@ -35,18 +40,31 @@ def main(config_path: str):
conf = ConfigParser() conf = ConfigParser()
conf.read(config_path) conf.read(config_path)
modules = [] for section in conf.sections():
_remove = set()
for k, v in conf[section].items():
normalized_key = k.replace(" ", "_").replace("-", "_")
if k == normalized_key:
continue
conf[section][normalized_key] = v
_remove.add(k)
for k in _remove:
del conf[section][k]
modules: List[BaseIntegration] = []
for module_name in conf.get("main", "modules").split(): for module_name in conf.get("main", "modules").split():
module = conf[f"module/{module_name}"] module = conf[f"module/{module_name}"]
if module["type"] == "telegram": if module["type"] == "telegram":
modules.append( modules.append(
TelegramIntegration( TelegramIntegration(
token=module.get("token"), token=module["token"],
chat_id=module.get("chat"), chat_id=module["chat"],
show_post_link=module.getboolean("show-post-link", fallback=True), show_post_link=module.getboolean("show_post_link", fallback=True),
show_boost_from=module.getboolean("show-boost-from", fallback=True), show_boost_from=module.getboolean("show_boost_from", fallback=True),
) )
) )
elif module["type"] == "discord":
modules.append(DiscordIntegration(webhook=module["webhook"]))
else: else:
raise ValueError("Invalid module type %r" % module["type"]) raise ValueError("Invalid module type %r" % module["type"])

View File

@ -0,0 +1,116 @@
from json import dumps
from typing import Dict, List, Optional
from bs4 import BeautifulSoup, PageElement, Tag
from httpx import AsyncClient
from zlib import crc32
from mastoposter.integrations.base import BaseIntegration
from mastoposter.integrations.discord.types import (
DiscordEmbed,
DiscordEmbedAuthor,
DiscordEmbedField,
DiscordEmbedImage,
)
from mastoposter.types import Status
class DiscordIntegration(BaseIntegration):
def __init__(self, webhook: str):
self.webhook = webhook
@staticmethod
def md_escape(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("*", "\\*")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("_", "\\_")
.replace("~", "\\~")
.replace("|", "\\|")
.replace("`", "\\`")
)
@classmethod
def node_to_text(cls, el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "[%s](%s)" % (
cls.md_escape(str.join("", map(cls.node_to_text, el.children))),
el.attrs["href"],
)
elif el.name == "p":
return str.join("", map(cls.node_to_text, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(cls.node_to_text, el.children))
return cls.md_escape(str(el))
async def execute_webhook(
self,
content: Optional[str] = None,
username: Optional[str] = None,
avatar_url: Optional[str] = None,
embeds: Optional[List[DiscordEmbed]] = None,
) -> dict:
async with AsyncClient() as c:
json = {
"content": content,
"username": username,
"avatar_url": avatar_url,
"embeds": [embed.asdict() for embed in embeds]
if embeds is not None
else [],
}
return (
await c.post(
self.webhook,
json=json,
)
).json()
async def post(self, status: Status) -> str:
source = status.reblog or status
embeds: List[DiscordEmbed] = []
text = self.node_to_text(BeautifulSoup(source.content, features="lxml"))
if source.spoiler_text:
text = f"CW: {source.spoiler_text}\n||{text}||"
if status.reblog is not None:
title = f"{status.account.acct} boosted from {source.account.acct}"
else:
title = f"{status.account.acct} posted"
embeds.append(
DiscordEmbed(
title=title,
description=text,
url=status.link,
timestamp=source.created_at,
author=DiscordEmbedAuthor(
name=source.account.display_name,
url=source.account.url,
icon_url=source.account.avatar_static,
),
color=crc32(source.account.id.encode("utf-8")) & 0xFFFFFF,
)
)
for attachment in source.media_attachments:
if attachment.type == "image":
embeds.append(
DiscordEmbed(
url=status.link,
image=DiscordEmbedImage(
url=attachment.url,
),
)
)
await self.execute_webhook(
username=status.account.acct,
avatar_url=status.account.avatar_static,
embeds=embeds,
)
return ""

View File

@ -0,0 +1,78 @@
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
def _f(func: Callable, v: Optional[Any], *a, **kw) -> Any:
return func(v, *a, **kw) if v is not None else None
__all__ = (
"DiscordEmbed",
"DiscordEmbedFooter",
"DiscordEmbedImage",
"DiscordEmbedThumbnail",
"DiscordEmbedAuthor",
"DiscordEmbedField",
)
@dataclass
class DiscordEmbedFooter:
text: str
icon_url: Optional[str]
@dataclass
class DiscordEmbedImage:
url: str
width: int = 0
height: int = 0
@dataclass
class DiscordEmbedThumbnail:
url: str
@dataclass
class DiscordEmbedAuthor:
name: str
url: Optional[str] = None
icon_url: Optional[str] = None
@dataclass
class DiscordEmbedField:
name: str
value: str
inline: Optional[bool]
@dataclass
class DiscordEmbed:
title: Optional[str] = None
description: Optional[str] = None
url: Optional[str] = None
timestamp: Optional[datetime] = None
color: Optional[int] = None
footer: Optional[DiscordEmbedFooter] = None
image: Optional[DiscordEmbedImage] = None
thumbnail: Optional[DiscordEmbedThumbnail] = None
author: Optional[DiscordEmbedAuthor] = None
fields: Optional[List[DiscordEmbedField]] = None
def asdict(self) -> Dict[str, Any]:
return {
"type": "rich",
"title": self.title,
"description": self.description,
"url": self.url,
"timestamp": _f(datetime.isoformat, self.timestamp, "T", "seconds"),
"color": self.color,
"footer": _f(asdict, self.footer),
"image": _f(asdict, self.image),
"thumbnail": _f(asdict, self.thumbnail),
"author": _f(asdict, self.author),
"fields": _f(lambda v: list(map(asdict, v)), self.fields),
}

View File

@ -16,7 +16,7 @@ class TGResponse:
@classmethod @classmethod
def from_dict(cls, data: dict, params: dict) -> "TGResponse": def from_dict(cls, data: dict, params: dict) -> "TGResponse":
return cls(data["ok"], params, data.get("result"), data.get("error")) return cls(data["ok"], params, data.get("result"), data.get("description"))
class TelegramIntegration(BaseIntegration): class TelegramIntegration(BaseIntegration):
@ -129,6 +129,7 @@ class TelegramIntegration(BaseIntegration):
async def post(self, status: Status) -> str: async def post(self, status: Status) -> str:
source = status.reblog or status source = status.reblog or status
text = self.node_to_text(BeautifulSoup(source.content, features="lxml")) text = self.node_to_text(BeautifulSoup(source.content, features="lxml"))
text = text.rstrip()
if source.spoiler_text: if source.spoiler_text:
text = "Spoiler: {cw}\n<tg-spoiler>{text}</tg-spoiler>".format( text = "Spoiler: {cw}\n<tg-spoiler>{text}</tg-spoiler>".format(
@ -154,8 +155,11 @@ class TelegramIntegration(BaseIntegration):
msg = await self._post_mediagroup(text, source.media_attachments) msg = await self._post_mediagroup(text, source.media_attachments)
if not msg.ok: if not msg.ok:
raise Exception(msg.error, msg.params) # raise Exception(msg.error, msg.params)
return "" # XXX: silently ignore for now
if msg.result:
return msg.result.get("message_id", "")
return "" return ""
def __repr__(self) -> str: def __repr__(self) -> str:

View File

@ -90,6 +90,73 @@ class Account:
) )
@dataclass
class AttachmentMetaImage:
@dataclass
class Vec2F:
x: float
y: float
@dataclass
class AttachmentMetaImageDimensions:
width: int
height: int
size: str
aspect: float
original: AttachmentMetaImageDimensions
small: AttachmentMetaImageDimensions
focus: Vec2F
@classmethod
def from_dict(cls, data: dict) -> "AttachmentMetaImage":
return cls(
**data,
original=cls.AttachmentMetaImageDimensions(**data["original"]),
small=cls.AttachmentMetaImageDimensions(**data["small"]),
focus=cls.Vec2F(**data["focus"])
)
@dataclass
class AttachmentMetaVideo:
@dataclass
class AttachmentMetaVideoOriginal:
width: int
height: int
duration: float
bitrate: int
frame_rate: Optional[str] # XXX Gargron wtf?
@dataclass
class AttachmentMetaVideoSmall:
width: int
height: int
size: str
aspect: float
length: str
duration: float
fps: int
size: str
width: int
height: int
aspect: float
audio_encode: str
audio_bitrate: str # XXX GARGROOOOONNNNNN!!!!!!!
audio_channels: str # XXX I HATE YOU
original: AttachmentMetaVideoOriginal
small: AttachmentMetaVideoSmall
@classmethod
def from_dict(cls, data: dict) -> "AttachmentMetaVideo":
return cls(
**data,
original=cls.AttachmentMetaVideoOriginal(**data["original"]),
small=cls.AttachmentMetaVideoSmall(**data["small"])
)
@dataclass @dataclass
class Attachment: class Attachment:
id: str id: str