1
0
Fork 0

Merge pull request #16 from hatkidchan/formatting

Implemented custom formatting
This commit is contained in:
Casey 2022-08-31 18:43:05 +03:00 committed by GitHub
commit fd661b6a99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 148 additions and 116 deletions

View File

@ -46,12 +46,6 @@ token = 12345:blahblah
# username, if it is public # username, if it is public
chat = @username chat = @username
# Should we show link to post as a link after post content?
show-post-link = yes
# Should we show link to original author before post content?
show-boost-from = yes
# Should we make posts silent? # Should we make posts silent?
# https://core.telegram.org/bots/api#sendmessage `disable_notification` # https://core.telegram.org/bots/api#sendmessage `disable_notification`
silent = true silent = true
@ -63,6 +57,15 @@ type = discord
# Webhook URL with the `?wait=true` # Webhook URL with the `?wait=true`
webhook = url webhook = url
# Jinja2 template string for the post. Works only in Telegram.
# This is the default template, not specifying that property at all will result
# in this string (probably)
# Pay attention to 4 spaces in the empty line, I think it's required
template = {% if status.reblog %}Boost from <a href="{{status.reblog.account.url}}">{{status.reblog.account.name}}</a>
{% endif %}{% if status.reblog_or_status.spoiler_text %}{{status.reblog_or_status.spoiler_text}}
<tg-spoiler>{% endif %}{{ status.reblog_or_status.content_flathtml }}{% if status.reblog_or_status.spoiler_text %}</tg-spoiler>{% endif %}
<a href="{{status.link}}">Link to post</a>
;# Boost filter. Only boosts will be matched by that one ;# Boost filter. Only boosts will be matched by that one
;[filter/boost] ;[filter/boost]

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from asyncio import run from asyncio import run
from configparser import ConfigParser from configparser import ConfigParser, ExtendedInterpolation
from mastoposter import execute_integrations, load_integrations_from from mastoposter import execute_integrations, load_integrations_from
from mastoposter.integrations import FilteredIntegration from mastoposter.integrations import FilteredIntegration
from mastoposter.sources import websocket_source from mastoposter.sources import websocket_source
@ -34,7 +34,7 @@ async def listen(
def main(config_path: str): def main(config_path: str):
conf = ConfigParser() conf = ConfigParser(interpolation=ExtendedInterpolation())
conf.read(config_path) conf.read(config_path)
for section in conf.sections(): for section in conf.sections():

View File

@ -1,6 +1,5 @@
from configparser import SectionProxy from configparser import SectionProxy
from typing import List, Optional from typing import List, Optional
from bs4 import BeautifulSoup, PageElement, Tag
from httpx import AsyncClient from httpx import AsyncClient
from zlib import crc32 from zlib import crc32
from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.base import BaseIntegration
@ -16,38 +15,6 @@ class DiscordIntegration(BaseIntegration):
def __init__(self, section: SectionProxy): def __init__(self, section: SectionProxy):
self.webhook = section.get("webhook", "") self.webhook = section.get("webhook", "")
@staticmethod
def md_escape(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("*", "\\*")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("_", "\\_")
.replace("~", "\\~")
.replace("|", "\\|")
.replace("`", "\\`")
)
@classmethod
def node_to_text(cls, el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "[%s](%s)" % (
cls.md_escape(
str.join("", map(cls.node_to_text, el.children))
),
el.attrs["href"],
)
elif el.name == "p":
return (
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
)
elif el.name == "br":
return "\n"
return str.join("", map(cls.node_to_text, el.children))
return cls.md_escape(str(el))
async def execute_webhook( async def execute_webhook(
self, self,
content: Optional[str] = None, content: Optional[str] = None,
@ -75,9 +42,7 @@ class DiscordIntegration(BaseIntegration):
source = status.reblog or status source = status.reblog or status
embeds: List[DiscordEmbed] = [] embeds: List[DiscordEmbed] = []
text = self.node_to_text( text = source.content_markdown
BeautifulSoup(source.content, features="lxml")
)
if source.spoiler_text: if source.spoiler_text:
text = f"{source.spoiler_text}\n||{text}||" text = f"{source.spoiler_text}\n||{text}||"

View File

@ -1,11 +1,11 @@
from configparser import SectionProxy from configparser import SectionProxy
from dataclasses import dataclass from dataclasses import dataclass
from html import escape
from typing import Any, List, Mapping, Optional from typing import Any, List, Mapping, Optional
from bs4 import BeautifulSoup, Tag, PageElement
from httpx import AsyncClient from httpx import AsyncClient
from jinja2 import Template
from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Attachment, Poll, Status from mastoposter.types import Attachment, Poll, Status
from emoji import emojize
@dataclass @dataclass
@ -25,7 +25,6 @@ class TGResponse:
) )
class TelegramIntegration(BaseIntegration):
API_URL: str = "https://api.telegram.org/bot{}/{}" API_URL: str = "https://api.telegram.org/bot{}/{}"
MEDIA_COMPATIBILITY: Mapping[str, set] = { MEDIA_COMPATIBILITY: Mapping[str, set] = {
"image": {"image", "video"}, "image": {"image", "video"},
@ -41,16 +40,30 @@ class TelegramIntegration(BaseIntegration):
"audio": "audio", "audio": "audio",
"unknown": "document", "unknown": "document",
} }
DEFAULT_TEMPLATE: str = """\
{% if status.reblog %}\
Boost from <a href="{{status.reblog.account.url}}">\
{{status.reblog.account.name}}</a>\
{% endif %}\
{% if status.reblog_or_status.spoiler_text %}\
{{status.reblog_or_status.spoiler_text}}
<tg-spoiler>{% endif %}{{ status.reblog_or_status.content_flathtml }}\
{% if status.reblog_or_status.spoiler_text %}</tg-spoiler>{% endif %}
<a href="{{status.link}}">Link to post</a>"""
class TelegramIntegration(BaseIntegration):
def __init__(self, sect: SectionProxy): def __init__(self, sect: SectionProxy):
self.token = sect.get("token", "") self.token = sect.get("token", "")
self.chat_id = sect.get("chat", "") self.chat_id = sect.get("chat", "")
self.show_post_link = sect.getboolean("show_post_link", True)
self.show_boost_from = sect.getboolean("show_boost_from", True)
self.silent = sect.getboolean("silent", True) self.silent = sect.getboolean("silent", True)
self.template: Template = Template(
emojize(sect.get("template", DEFAULT_TEMPLATE))
)
async def _tg_request(self, method: str, **kwargs) -> TGResponse: async def _tg_request(self, method: str, **kwargs) -> TGResponse:
url = self.API_URL.format(self.token, method) url = API_URL.format(self.token, method)
async with AsyncClient() as client: async with AsyncClient() as client:
return TGResponse.from_dict( return TGResponse.from_dict(
(await client.post(url, json=kwargs)).json(), kwargs (await client.post(url, json=kwargs)).json(), kwargs
@ -68,17 +81,17 @@ class TelegramIntegration(BaseIntegration):
async def _post_media(self, text: str, media: Attachment) -> TGResponse: async def _post_media(self, text: str, media: Attachment) -> TGResponse:
# Just to be safe # Just to be safe
if media.type not in self.MEDIA_MAPPING: if media.type not in MEDIA_MAPPING:
return await self._post_plaintext(text) return await self._post_plaintext(text)
return await self._tg_request( return await self._tg_request(
"send%s" % self.MEDIA_MAPPING[media.type].title(), "send%s" % MEDIA_MAPPING[media.type].title(),
parse_mode="HTML", parse_mode="HTML",
disable_notification=self.silent, disable_notification=self.silent,
disable_web_page_preview=True, disable_web_page_preview=True,
chat_id=self.chat_id, chat_id=self.chat_id,
caption=text, caption=text,
**{self.MEDIA_MAPPING[media.type]: media.url}, **{MEDIA_MAPPING[media.type]: media.url},
) )
async def _post_mediagroup( async def _post_mediagroup(
@ -89,12 +102,12 @@ class TelegramIntegration(BaseIntegration):
for attachment in media: for attachment in media:
if attachment.type not in allowed_medias: if attachment.type not in allowed_medias:
continue continue
if attachment.type not in self.MEDIA_COMPATIBILITY: if attachment.type not in MEDIA_COMPATIBILITY:
continue continue
allowed_medias &= self.MEDIA_COMPATIBILITY[attachment.type] allowed_medias &= MEDIA_COMPATIBILITY[attachment.type]
media_list.append( media_list.append(
{ {
"type": self.MEDIA_MAPPING[attachment.type], "type": MEDIA_MAPPING[attachment.type],
"media": attachment.url, "media": attachment.url,
} }
) )
@ -128,46 +141,10 @@ class TelegramIntegration(BaseIntegration):
options=[opt.title for opt in poll.options], options=[opt.title for opt in poll.options],
) )
@classmethod
def node_to_text(cls, el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return '<a href="{}">{}</a>'.format(
escape(el.attrs["href"]),
str.join("", map(cls.node_to_text, el.children)),
)
elif el.name == "p":
return (
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
)
elif el.name == "br":
return "\n"
return str.join("", map(cls.node_to_text, el.children))
return escape(str(el))
async def __call__(self, status: Status) -> Optional[str]: async def __call__(self, status: Status) -> Optional[str]:
source = status.reblog or status source = status.reblog or status
text = self.node_to_text(
BeautifulSoup(source.content, features="lxml")
)
text = text.rstrip()
if source.spoiler_text: text = self.template.render({"status": status})
text = "Spoiler: {cw}\n<tg-spoiler>{text}</tg-spoiler>".format(
cw=source.spoiler_text, text=text
)
if self.show_post_link:
text += '\n\n<a href="%s">Link to post</a>' % status.link
if status.reblog and self.show_boost_from:
text = (
'Boosted post from <a href="{}">{}</a>\n'.format(
source.account.url,
source.account.display_name or source.account.username,
)
+ text
)
ids = [] ids = []
@ -205,12 +182,6 @@ class TelegramIntegration(BaseIntegration):
return ( return (
"<TelegramIntegration " "<TelegramIntegration "
"chat_id={chat!r} " "chat_id={chat!r} "
"show_post_link={show_post_link!r} " "template={template!r} "
"show_boost_from={show_boost_from!r} "
"silent={silent!r}>" "silent={silent!r}>"
).format( ).format(chat=self.chat_id, silent=self.silent, template=self.template)
chat=self.chat_id,
show_post_link=self.show_post_link,
show_boost_from=self.show_boost_from,
silent=self.silent,
)

View File

@ -2,6 +2,10 @@ from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import Any, Callable, Optional, List, Literal, TypeVar from typing import Any, Callable, Optional, List, Literal, TypeVar
from bs4 import BeautifulSoup
from mastoposter.utils import node_to_html, node_to_markdown, node_to_plaintext
def _date(val: str) -> datetime: def _date(val: str) -> datetime:
return datetime.fromisoformat(val.rstrip("Z")) return datetime.fromisoformat(val.rstrip("Z"))
@ -100,6 +104,10 @@ class Account:
bot=bool(data.get("bot")), bot=bool(data.get("bot")),
) )
@property
def name(self) -> str:
return self.display_name or self.username
@dataclass @dataclass
class AttachmentMetaImage: class AttachmentMetaImage:
@ -304,6 +312,28 @@ class Status:
tags=[Tag.from_dict(m) for m in data.get("tags", [])], tags=[Tag.from_dict(m) for m in data.get("tags", [])],
) )
@property
def reblog_or_status(self) -> "Status":
return self.reblog or self
@property @property
def link(self) -> str: def link(self) -> str:
return self.account.url + "/" + str(self.id) return self.account.url + "/" + str(self.id)
@property
def content_flathtml(self) -> str:
return node_to_html(
BeautifulSoup(self.content, features="lxml")
).rstrip()
@property
def content_markdown(self) -> str:
return node_to_markdown(
BeautifulSoup(self.content, features="lxml")
).rstrip()
@property
def content_plaintext(self) -> str:
return node_to_plaintext(
BeautifulSoup(self.content, features="lxml")
).rstrip()

60
mastoposter/utils.py Normal file
View File

@ -0,0 +1,60 @@
from html import escape
from bs4.element import Tag, PageElement
def md_escape(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("*", "\\*")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("_", "\\_")
.replace("~", "\\~")
.replace("|", "\\|")
.replace("`", "\\`")
)
def node_to_html(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return '<a href="{}">{}</a>'.format(
escape(el.attrs["href"]),
str.join("", map(node_to_html, el.children)),
)
elif el.name == "p":
return str.join("", map(node_to_html, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_html, el.children))
return escape(str(el))
def node_to_markdown(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "[%s](%s)" % (
md_escape(str.join("", map(node_to_markdown, el.children))),
el.attrs["href"],
)
elif el.name == "p":
return str.join("", map(node_to_markdown, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_markdown, el.children))
return md_escape(str(el))
def node_to_plaintext(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "%s (%s)" % (
str.join("", map(node_to_plaintext, el.children)),
el.attrs["href"],
)
elif el.name == "p":
return str.join("", map(node_to_plaintext, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_plaintext, el.children))
return str(el)

View File

@ -2,11 +2,14 @@ anyio==3.6.1
beautifulsoup4==4.11.1 beautifulsoup4==4.11.1
bs4==0.0.1 bs4==0.0.1
certifi==2022.6.15 certifi==2022.6.15
emoji==2.0.0
h11==0.12.0 h11==0.12.0
httpcore==0.15.0 httpcore==0.15.0
httpx==0.23.0 httpx==0.23.0
idna==3.3 idna==3.3
Jinja2==3.1.2
lxml==4.9.1 lxml==4.9.1
MarkupSafe==2.1.1
rfc3986==1.5.0 rfc3986==1.5.0
sniffio==1.2.0 sniffio==1.2.0
soupsieve==2.3.2.post1 soupsieve==2.3.2.post1