diff --git a/config.ini b/config.ini
index f5c74a3..35dd7ab 100644
--- a/config.ini
+++ b/config.ini
@@ -43,16 +43,20 @@ token = 12345:blahblah
# username, if it is public
chat = @username
-# Should we show link to post as a link after post content?
-show-post-link = yes
-
-# Should we show link to original author before post content?
-show-boost-from = yes
-
# Should we make posts silent?
# https://core.telegram.org/bots/api#sendmessage `disable_notification`
silent = true
+# Jinja2 template string for the post. Works only in Telegram.
+# This is the default template, not specifying that property at all will result
+# in this string (probably)
+# Pay attention to 4 spaces in the empty line, I think it's required
+template = {% if status.reblog %}Boost from {{status.reblog.account.name}}
+ {% endif %}{% if status.reblog_or_status.spoiler_text %}{{status.reblog_or_status.spoiler_text}}
+ {% endif %}{{ status.reblog_or_status.content_flathtml }}{% if status.reblog_or_status.spoiler_text %}{% endif %}
+
+ Link to post
+
# Discord integration
[module/discord]
type = discord
@@ -60,7 +64,6 @@ type = discord
# Webhook URL with the `?wait=true`
webhook = url
-
;# Boost filter. Only boosts will be matched by that one
;[filter/boost]
;type = boost
diff --git a/mastoposter/__main__.py b/mastoposter/__main__.py
index 3ba7de0..d7ef77a 100644
--- a/mastoposter/__main__.py
+++ b/mastoposter/__main__.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
from asyncio import run
-from configparser import ConfigParser
+from configparser import ConfigParser, ExtendedInterpolation
from mastoposter import execute_integrations, load_integrations_from
from mastoposter.integrations import FilteredIntegration
from mastoposter.sources import websocket_source
@@ -25,7 +25,7 @@ async def listen(
continue
# TODO: add option/filter to handle that
- if status.visibility in ("direct", "private"):
+ if status.visibility in ("direct",):
continue
# TODO: find a better way to handle threads
@@ -39,7 +39,7 @@ async def listen(
def main(config_path: str):
- conf = ConfigParser()
+ conf = ConfigParser(interpolation=ExtendedInterpolation())
conf.read(config_path)
for section in conf.sections():
diff --git a/mastoposter/integrations/discord/__init__.py b/mastoposter/integrations/discord/__init__.py
index 03b990a..38b96bb 100644
--- a/mastoposter/integrations/discord/__init__.py
+++ b/mastoposter/integrations/discord/__init__.py
@@ -1,6 +1,5 @@
from configparser import SectionProxy
from typing import List, Optional
-from bs4 import BeautifulSoup, PageElement, Tag
from httpx import AsyncClient
from zlib import crc32
from mastoposter.integrations.base import BaseIntegration
@@ -16,38 +15,6 @@ class DiscordIntegration(BaseIntegration):
def __init__(self, section: SectionProxy):
self.webhook = section.get("webhook", "")
- @staticmethod
- def md_escape(text: str) -> str:
- return (
- text.replace("\\", "\\\\")
- .replace("*", "\\*")
- .replace("[", "\\[")
- .replace("]", "\\]")
- .replace("_", "\\_")
- .replace("~", "\\~")
- .replace("|", "\\|")
- .replace("`", "\\`")
- )
-
- @classmethod
- def node_to_text(cls, el: PageElement) -> str:
- if isinstance(el, Tag):
- if el.name == "a":
- return "[%s](%s)" % (
- cls.md_escape(
- str.join("", map(cls.node_to_text, el.children))
- ),
- el.attrs["href"],
- )
- elif el.name == "p":
- return (
- str.join("", map(cls.node_to_text, el.children)) + "\n\n"
- )
- elif el.name == "br":
- return "\n"
- return str.join("", map(cls.node_to_text, el.children))
- return cls.md_escape(str(el))
-
async def execute_webhook(
self,
content: Optional[str] = None,
@@ -75,9 +42,7 @@ class DiscordIntegration(BaseIntegration):
source = status.reblog or status
embeds: List[DiscordEmbed] = []
- text = self.node_to_text(
- BeautifulSoup(source.content, features="lxml")
- )
+ text = source.content_markdown
if source.spoiler_text:
text = f"{source.spoiler_text}\n||{text}||"
diff --git a/mastoposter/integrations/telegram.py b/mastoposter/integrations/telegram.py
index cd952d4..03beba4 100644
--- a/mastoposter/integrations/telegram.py
+++ b/mastoposter/integrations/telegram.py
@@ -1,11 +1,11 @@
from configparser import SectionProxy
from dataclasses import dataclass
-from html import escape
from typing import Any, List, Mapping, Optional
-from bs4 import BeautifulSoup, Tag, PageElement
from httpx import AsyncClient
+from jinja2 import Template
from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Attachment, Poll, Status
+from emoji import emojize
@dataclass
@@ -25,32 +25,45 @@ class TGResponse:
)
-class TelegramIntegration(BaseIntegration):
- API_URL: str = "https://api.telegram.org/bot{}/{}"
- MEDIA_COMPATIBILITY: Mapping[str, set] = {
- "image": {"image", "video"},
- "video": {"image", "video"},
- "gifv": {"gifv"},
- "audio": {"audio"},
- "unknown": {"unknown"},
- }
- MEDIA_MAPPING: Mapping[str, str] = {
- "image": "photo",
- "video": "video",
- "gifv": "animation",
- "audio": "audio",
- "unknown": "document",
- }
+API_URL: str = "https://api.telegram.org/bot{}/{}"
+MEDIA_COMPATIBILITY: Mapping[str, set] = {
+ "image": {"image", "video"},
+ "video": {"image", "video"},
+ "gifv": {"gifv"},
+ "audio": {"audio"},
+ "unknown": {"unknown"},
+}
+MEDIA_MAPPING: Mapping[str, str] = {
+ "image": "photo",
+ "video": "video",
+ "gifv": "animation",
+ "audio": "audio",
+ "unknown": "document",
+}
+DEFAULT_TEMPLATE: str = """\
+{% if status.reblog %}\
+Boost from \
+{{status.reblog.account.name}}\
+{% endif %}\
+{% if status.reblog_or_status.spoiler_text %}\
+{{status.reblog_or_status.spoiler_text}}
+{% endif %}{{ status.reblog_or_status.content_flathtml }}\
+{% if status.reblog_or_status.spoiler_text %}{% endif %}
+Link to post"""
+
+
+class TelegramIntegration(BaseIntegration):
def __init__(self, sect: SectionProxy):
self.token = sect.get("token", "")
self.chat_id = sect.get("chat", "")
- self.show_post_link = sect.getboolean("show_post_link", True)
- self.show_boost_from = sect.getboolean("show_boost_from", True)
self.silent = sect.getboolean("silent", True)
+ self.template: Template = Template(
+ emojize(sect.get("template", DEFAULT_TEMPLATE))
+ )
async def _tg_request(self, method: str, **kwargs) -> TGResponse:
- url = self.API_URL.format(self.token, method)
+ url = API_URL.format(self.token, method)
async with AsyncClient() as client:
return TGResponse.from_dict(
(await client.post(url, json=kwargs)).json(), kwargs
@@ -68,17 +81,17 @@ class TelegramIntegration(BaseIntegration):
async def _post_media(self, text: str, media: Attachment) -> TGResponse:
# Just to be safe
- if media.type not in self.MEDIA_MAPPING:
+ if media.type not in MEDIA_MAPPING:
return await self._post_plaintext(text)
return await self._tg_request(
- "send%s" % self.MEDIA_MAPPING[media.type].title(),
+ "send%s" % MEDIA_MAPPING[media.type].title(),
parse_mode="HTML",
disable_notification=self.silent,
disable_web_page_preview=True,
chat_id=self.chat_id,
caption=text,
- **{self.MEDIA_MAPPING[media.type]: media.url},
+ **{MEDIA_MAPPING[media.type]: media.url},
)
async def _post_mediagroup(
@@ -89,12 +102,12 @@ class TelegramIntegration(BaseIntegration):
for attachment in media:
if attachment.type not in allowed_medias:
continue
- if attachment.type not in self.MEDIA_COMPATIBILITY:
+ if attachment.type not in MEDIA_COMPATIBILITY:
continue
- allowed_medias &= self.MEDIA_COMPATIBILITY[attachment.type]
+ allowed_medias &= MEDIA_COMPATIBILITY[attachment.type]
media_list.append(
{
- "type": self.MEDIA_MAPPING[attachment.type],
+ "type": MEDIA_MAPPING[attachment.type],
"media": attachment.url,
}
)
@@ -128,46 +141,10 @@ class TelegramIntegration(BaseIntegration):
options=[opt.title for opt in poll.options],
)
- @classmethod
- def node_to_text(cls, el: PageElement) -> str:
- if isinstance(el, Tag):
- if el.name == "a":
- return '{}'.format(
- escape(el.attrs["href"]),
- str.join("", map(cls.node_to_text, el.children)),
- )
- elif el.name == "p":
- return (
- str.join("", map(cls.node_to_text, el.children)) + "\n\n"
- )
- elif el.name == "br":
- return "\n"
- return str.join("", map(cls.node_to_text, el.children))
- return escape(str(el))
-
async def __call__(self, status: Status) -> Optional[str]:
source = status.reblog or status
- text = self.node_to_text(
- BeautifulSoup(source.content, features="lxml")
- )
- text = text.rstrip()
- if source.spoiler_text:
- text = "Spoiler: {cw}\n{text}".format(
- cw=source.spoiler_text, text=text
- )
-
- if self.show_post_link:
- text += '\n\nLink to post' % status.link
-
- if status.reblog and self.show_boost_from:
- text = (
- 'Boosted post from {}\n'.format(
- source.account.url,
- source.account.display_name or source.account.username,
- )
- + text
- )
+ text = self.template.render({"status": status})
ids = []
@@ -205,12 +182,6 @@ class TelegramIntegration(BaseIntegration):
return (
""
- ).format(
- chat=self.chat_id,
- show_post_link=self.show_post_link,
- show_boost_from=self.show_boost_from,
- silent=self.silent,
- )
+ ).format(chat=self.chat_id, silent=self.silent, template=self.template)
diff --git a/mastoposter/sources.py b/mastoposter/sources.py
index 67e3edb..e2bdbb9 100644
--- a/mastoposter/sources.py
+++ b/mastoposter/sources.py
@@ -1,3 +1,4 @@
+from asyncio import exceptions
from json import loads
from typing import AsyncGenerator
from urllib.parse import urlencode
@@ -21,6 +22,6 @@ async def websocket_source(
raise Exception(event["error"])
if event["event"] == "update":
yield Status.from_dict(loads(event["payload"]))
- except WebSocketException:
+ except (WebSocketException, TimeoutError, exceptions.TimeoutError):
if not reconnect:
raise
diff --git a/mastoposter/types.py b/mastoposter/types.py
index e209c1c..7f6a279 100644
--- a/mastoposter/types.py
+++ b/mastoposter/types.py
@@ -2,6 +2,10 @@ from dataclasses import dataclass, field, fields
from datetime import datetime
from typing import Any, Callable, Optional, List, Literal, TypeVar
+from bs4 import BeautifulSoup
+
+from mastoposter.utils import node_to_html, node_to_markdown, node_to_plaintext
+
def _date(val: str) -> datetime:
return datetime.fromisoformat(val.rstrip("Z"))
@@ -100,6 +104,10 @@ class Account:
bot=bool(data.get("bot")),
)
+ @property
+ def name(self) -> str:
+ return self.display_name or self.username
+
@dataclass
class AttachmentMetaImage:
@@ -304,6 +312,28 @@ class Status:
tags=[Tag.from_dict(m) for m in data.get("tags", [])],
)
+ @property
+ def reblog_or_status(self) -> "Status":
+ return self.reblog or self
+
@property
def link(self) -> str:
return self.account.url + "/" + str(self.id)
+
+ @property
+ def content_flathtml(self) -> str:
+ return node_to_html(
+ BeautifulSoup(self.content, features="lxml")
+ ).rstrip()
+
+ @property
+ def content_markdown(self) -> str:
+ return node_to_markdown(
+ BeautifulSoup(self.content, features="lxml")
+ ).rstrip()
+
+ @property
+ def content_plaintext(self) -> str:
+ return node_to_plaintext(
+ BeautifulSoup(self.content, features="lxml")
+ ).rstrip()
diff --git a/mastoposter/utils.py b/mastoposter/utils.py
new file mode 100644
index 0000000..7e234a2
--- /dev/null
+++ b/mastoposter/utils.py
@@ -0,0 +1,60 @@
+from html import escape
+from bs4.element import Tag, PageElement
+
+
+def md_escape(text: str) -> str:
+ return (
+ text.replace("\\", "\\\\")
+ .replace("*", "\\*")
+ .replace("[", "\\[")
+ .replace("]", "\\]")
+ .replace("_", "\\_")
+ .replace("~", "\\~")
+ .replace("|", "\\|")
+ .replace("`", "\\`")
+ )
+
+
+def node_to_html(el: PageElement) -> str:
+ if isinstance(el, Tag):
+ if el.name == "a":
+ return '{}'.format(
+ escape(el.attrs["href"]),
+ str.join("", map(node_to_html, el.children)),
+ )
+ elif el.name == "p":
+ return str.join("", map(node_to_html, el.children)) + "\n\n"
+ elif el.name == "br":
+ return "\n"
+ return str.join("", map(node_to_html, el.children))
+ return escape(str(el))
+
+
+def node_to_markdown(el: PageElement) -> str:
+ if isinstance(el, Tag):
+ if el.name == "a":
+ return "[%s](%s)" % (
+ md_escape(str.join("", map(node_to_markdown, el.children))),
+ el.attrs["href"],
+ )
+ elif el.name == "p":
+ return str.join("", map(node_to_markdown, el.children)) + "\n\n"
+ elif el.name == "br":
+ return "\n"
+ return str.join("", map(node_to_markdown, el.children))
+ return md_escape(str(el))
+
+
+def node_to_plaintext(el: PageElement) -> str:
+ if isinstance(el, Tag):
+ if el.name == "a":
+ return "%s (%s)" % (
+ str.join("", map(node_to_plaintext, el.children)),
+ el.attrs["href"],
+ )
+ elif el.name == "p":
+ return str.join("", map(node_to_plaintext, el.children)) + "\n\n"
+ elif el.name == "br":
+ return "\n"
+ return str.join("", map(node_to_plaintext, el.children))
+ return str(el)
diff --git a/requirements.txt b/requirements.txt
index d9d2f61..985a2f1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,11 +2,14 @@ anyio==3.6.1
beautifulsoup4==4.11.1
bs4==0.0.1
certifi==2022.6.15
+emoji==2.0.0
h11==0.12.0
httpcore==0.15.0
httpx==0.23.0
idna==3.3
+Jinja2==3.1.2
lxml==4.9.1
+MarkupSafe==2.1.1
rfc3986==1.5.0
sniffio==1.2.0
soupsieve==2.3.2.post1