diff --git a/mastoposter/integrations/discord/__init__.py b/mastoposter/integrations/discord/__init__.py
index 03b990a..38b96bb 100644
--- a/mastoposter/integrations/discord/__init__.py
+++ b/mastoposter/integrations/discord/__init__.py
@@ -1,6 +1,5 @@
from configparser import SectionProxy
from typing import List, Optional
-from bs4 import BeautifulSoup, PageElement, Tag
from httpx import AsyncClient
from zlib import crc32
from mastoposter.integrations.base import BaseIntegration
@@ -16,38 +15,6 @@ class DiscordIntegration(BaseIntegration):
def __init__(self, section: SectionProxy):
self.webhook = section.get("webhook", "")
- @staticmethod
- def md_escape(text: str) -> str:
- return (
- text.replace("\\", "\\\\")
- .replace("*", "\\*")
- .replace("[", "\\[")
- .replace("]", "\\]")
- .replace("_", "\\_")
- .replace("~", "\\~")
- .replace("|", "\\|")
- .replace("`", "\\`")
- )
-
- @classmethod
- def node_to_text(cls, el: PageElement) -> str:
- if isinstance(el, Tag):
- if el.name == "a":
- return "[%s](%s)" % (
- cls.md_escape(
- str.join("", map(cls.node_to_text, el.children))
- ),
- el.attrs["href"],
- )
- elif el.name == "p":
- return (
- str.join("", map(cls.node_to_text, el.children)) + "\n\n"
- )
- elif el.name == "br":
- return "\n"
- return str.join("", map(cls.node_to_text, el.children))
- return cls.md_escape(str(el))
-
async def execute_webhook(
self,
content: Optional[str] = None,
@@ -75,9 +42,7 @@ class DiscordIntegration(BaseIntegration):
source = status.reblog or status
embeds: List[DiscordEmbed] = []
- text = self.node_to_text(
- BeautifulSoup(source.content, features="lxml")
- )
+ text = source.content_markdown
if source.spoiler_text:
text = f"{source.spoiler_text}\n||{text}||"
diff --git a/mastoposter/integrations/telegram.py b/mastoposter/integrations/telegram.py
index cd952d4..7e7b879 100644
--- a/mastoposter/integrations/telegram.py
+++ b/mastoposter/integrations/telegram.py
@@ -1,11 +1,11 @@
from configparser import SectionProxy
from dataclasses import dataclass
-from html import escape
from typing import Any, List, Mapping, Optional
-from bs4 import BeautifulSoup, Tag, PageElement
from httpx import AsyncClient
+from jinja2 import Template
from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Attachment, Poll, Status
+from emoji import emojize
@dataclass
@@ -25,32 +25,44 @@ class TGResponse:
)
-class TelegramIntegration(BaseIntegration):
- API_URL: str = "https://api.telegram.org/bot{}/{}"
- MEDIA_COMPATIBILITY: Mapping[str, set] = {
- "image": {"image", "video"},
- "video": {"image", "video"},
- "gifv": {"gifv"},
- "audio": {"audio"},
- "unknown": {"unknown"},
- }
- MEDIA_MAPPING: Mapping[str, str] = {
- "image": "photo",
- "video": "video",
- "gifv": "animation",
- "audio": "audio",
- "unknown": "document",
- }
+API_URL: str = "https://api.telegram.org/bot{}/{}"
+MEDIA_COMPATIBILITY: Mapping[str, set] = {
+ "image": {"image", "video"},
+ "video": {"image", "video"},
+ "gifv": {"gifv"},
+ "audio": {"audio"},
+ "unknown": {"unknown"},
+}
+MEDIA_MAPPING: Mapping[str, str] = {
+ "image": "photo",
+ "video": "video",
+ "gifv": "animation",
+ "audio": "audio",
+ "unknown": "document",
+}
+DEFAULT_TEMPLATE: str = """\
+{% if status.reblog %}\
+Boost from \
+{{status.reblog.account.name}}\
+{% endif %}\
+{% if status.spoiler_text %}{{status.spoiler_text}}
+{% endif %}{{ status.content_flathtml }}\
+{% if status.spoiler_text %}{% endif %}
+Link to post"""
+
+
+class TelegramIntegration(BaseIntegration):
def __init__(self, sect: SectionProxy):
self.token = sect.get("token", "")
self.chat_id = sect.get("chat", "")
self.show_post_link = sect.getboolean("show_post_link", True)
self.show_boost_from = sect.getboolean("show_boost_from", True)
self.silent = sect.getboolean("silent", True)
+ self.template = Template(sect.get("template", DEFAULT_TEMPLATE))
async def _tg_request(self, method: str, **kwargs) -> TGResponse:
- url = self.API_URL.format(self.token, method)
+ url = API_URL.format(self.token, method)
async with AsyncClient() as client:
return TGResponse.from_dict(
(await client.post(url, json=kwargs)).json(), kwargs
@@ -68,17 +80,17 @@ class TelegramIntegration(BaseIntegration):
async def _post_media(self, text: str, media: Attachment) -> TGResponse:
# Just to be safe
- if media.type not in self.MEDIA_MAPPING:
+ if media.type not in MEDIA_MAPPING:
return await self._post_plaintext(text)
return await self._tg_request(
- "send%s" % self.MEDIA_MAPPING[media.type].title(),
+ "send%s" % MEDIA_MAPPING[media.type].title(),
parse_mode="HTML",
disable_notification=self.silent,
disable_web_page_preview=True,
chat_id=self.chat_id,
caption=text,
- **{self.MEDIA_MAPPING[media.type]: media.url},
+ **{MEDIA_MAPPING[media.type]: media.url},
)
async def _post_mediagroup(
@@ -89,12 +101,12 @@ class TelegramIntegration(BaseIntegration):
for attachment in media:
if attachment.type not in allowed_medias:
continue
- if attachment.type not in self.MEDIA_COMPATIBILITY:
+ if attachment.type not in MEDIA_COMPATIBILITY:
continue
- allowed_medias &= self.MEDIA_COMPATIBILITY[attachment.type]
+ allowed_medias &= MEDIA_COMPATIBILITY[attachment.type]
media_list.append(
{
- "type": self.MEDIA_MAPPING[attachment.type],
+ "type": MEDIA_MAPPING[attachment.type],
"media": attachment.url,
}
)
@@ -128,46 +140,10 @@ class TelegramIntegration(BaseIntegration):
options=[opt.title for opt in poll.options],
)
- @classmethod
- def node_to_text(cls, el: PageElement) -> str:
- if isinstance(el, Tag):
- if el.name == "a":
- return '{}'.format(
- escape(el.attrs["href"]),
- str.join("", map(cls.node_to_text, el.children)),
- )
- elif el.name == "p":
- return (
- str.join("", map(cls.node_to_text, el.children)) + "\n\n"
- )
- elif el.name == "br":
- return "\n"
- return str.join("", map(cls.node_to_text, el.children))
- return escape(str(el))
-
async def __call__(self, status: Status) -> Optional[str]:
source = status.reblog or status
- text = self.node_to_text(
- BeautifulSoup(source.content, features="lxml")
- )
- text = text.rstrip()
- if source.spoiler_text:
- text = "Spoiler: {cw}\n{text}".format(
- cw=source.spoiler_text, text=text
- )
-
- if self.show_post_link:
- text += '\n\nLink to post' % status.link
-
- if status.reblog and self.show_boost_from:
- text = (
- 'Boosted post from {}\n'.format(
- source.account.url,
- source.account.display_name or source.account.username,
- )
- + text
- )
+ text = emojize(self.template.render({"status": status}))
ids = []
diff --git a/mastoposter/types.py b/mastoposter/types.py
index ecffd2f..8010285 100644
--- a/mastoposter/types.py
+++ b/mastoposter/types.py
@@ -2,6 +2,10 @@ from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Optional, List, Literal, TypeVar
+from bs4 import BeautifulSoup
+
+from mastoposter.utils import node_to_html, node_to_markdown, node_to_plaintext
+
def _date(val: str) -> datetime:
return datetime.fromisoformat(val.rstrip("Z"))
@@ -100,6 +104,10 @@ class Account:
bot=bool(data.get("bot")),
)
+ @property
+ def name(self) -> str:
+ return self.display_name or self.username
+
@dataclass
class AttachmentMetaImage:
@@ -307,3 +315,21 @@ class Status:
@property
def link(self) -> str:
return self.account.url + "/" + str(self.id)
+
+ @property
+ def content_flathtml(self) -> str:
+ return node_to_html(
+ BeautifulSoup(self.content, features="lxml")
+ ).rstrip()
+
+ @property
+ def content_markdown(self) -> str:
+ return node_to_markdown(
+ BeautifulSoup(self.content, features="lxml")
+ ).rstrip()
+
+ @property
+ def content_plaintext(self) -> str:
+ return node_to_plaintext(
+ BeautifulSoup(self.content, features="lxml")
+ ).rstrip()
diff --git a/mastoposter/utils.py b/mastoposter/utils.py
new file mode 100644
index 0000000..7e234a2
--- /dev/null
+++ b/mastoposter/utils.py
@@ -0,0 +1,60 @@
+from html import escape
+from bs4.element import Tag, PageElement
+
+
+def md_escape(text: str) -> str:
+ return (
+ text.replace("\\", "\\\\")
+ .replace("*", "\\*")
+ .replace("[", "\\[")
+ .replace("]", "\\]")
+ .replace("_", "\\_")
+ .replace("~", "\\~")
+ .replace("|", "\\|")
+ .replace("`", "\\`")
+ )
+
+
+def node_to_html(el: PageElement) -> str:
+ if isinstance(el, Tag):
+ if el.name == "a":
+ return '{}'.format(
+ escape(el.attrs["href"]),
+ str.join("", map(node_to_html, el.children)),
+ )
+ elif el.name == "p":
+ return str.join("", map(node_to_html, el.children)) + "\n\n"
+ elif el.name == "br":
+ return "\n"
+ return str.join("", map(node_to_html, el.children))
+ return escape(str(el))
+
+
+def node_to_markdown(el: PageElement) -> str:
+ if isinstance(el, Tag):
+ if el.name == "a":
+ return "[%s](%s)" % (
+ md_escape(str.join("", map(node_to_markdown, el.children))),
+ el.attrs["href"],
+ )
+ elif el.name == "p":
+ return str.join("", map(node_to_markdown, el.children)) + "\n\n"
+ elif el.name == "br":
+ return "\n"
+ return str.join("", map(node_to_markdown, el.children))
+ return md_escape(str(el))
+
+
+def node_to_plaintext(el: PageElement) -> str:
+ if isinstance(el, Tag):
+ if el.name == "a":
+ return "%s (%s)" % (
+ str.join("", map(node_to_plaintext, el.children)),
+ el.attrs["href"],
+ )
+ elif el.name == "p":
+ return str.join("", map(node_to_plaintext, el.children)) + "\n\n"
+ elif el.name == "br":
+ return "\n"
+ return str.join("", map(node_to_plaintext, el.children))
+ return str(el)
diff --git a/requirements.txt b/requirements.txt
index d9d2f61..985a2f1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,11 +2,14 @@ anyio==3.6.1
beautifulsoup4==4.11.1
bs4==0.0.1
certifi==2022.6.15
+emoji==2.0.0
h11==0.12.0
httpcore==0.15.0
httpx==0.23.0
idna==3.3
+Jinja2==3.1.2
lxml==4.9.1
+MarkupSafe==2.1.1
rfc3986==1.5.0
sniffio==1.2.0
soupsieve==2.3.2.post1