Try, try again

This commit is contained in:
Casey 2023-03-07 10:26:45 +03:00
parent 6d3e00ba4a
commit 854bb859ec
Signed by: hkc
GPG Key ID: F0F6CFE11CDB0960
5 changed files with 86 additions and 52 deletions

View File

@ -1,3 +1,8 @@
[DEFAULT]
# Number of retries in case request fails. Applies globally
# Can be changed on per-module basis
http-retries = 5
[main] [main]
# This is a list of output modules. Each module should be defined in section, # This is a list of output modules. Each module should be defined in section,
# named "module/MODULENAME". Space-separated list of strings. # named "module/MODULENAME". Space-separated list of strings.
@ -31,6 +36,7 @@ list = 1
# That option exists because it's not really a big deal when crossposter runs # That option exists because it's not really a big deal when crossposter runs
# as a service and restarts automatically by the service manager. # as a service and restarts automatically by the service manager.
auto-reconnect = yes auto-reconnect = yes
reconnect-delay = 1.0
# Example Telegram integration. You can use it as a template # Example Telegram integration. You can use it as a template
[module/telegram] [module/telegram]

View File

@ -15,7 +15,7 @@ from mastoposter.integrations import FilteredIntegration
from mastoposter.sources import websocket_source from mastoposter.sources import websocket_source
from typing import AsyncGenerator, Callable, List from typing import AsyncGenerator, Callable, List
from mastoposter.types import Account, Status from mastoposter.types import Account, Status
from httpx import Client from httpx import Client, HTTPTransport
from mastoposter.utils import normalize_config from mastoposter.utils import normalize_config
@ -89,13 +89,14 @@ def main(config_path: str):
normalize_config(conf) normalize_config(conf)
modules: List[FilteredIntegration] = load_integrations_from(conf) modules: List[FilteredIntegration] = load_integrations_from(conf)
retries: int = conf["main"].getint("http-retries", 5)
logger.info("Loaded %d integrations", len(modules)) logger.info("Loaded %d integrations", len(modules))
user_id: str = conf["main"]["user"] user_id: str = conf["main"]["user"]
if user_id == "auto": if user_id == "auto":
logger.info("config.main.user is set to auto, getting user ID") logger.info("config.main.user is set to auto, getting user ID")
with Client() as c: with Client(transport=HTTPTransport(retries=retries)) as c:
rq = c.get( rq = c.get(
VERIFY_CREDS_TEMPLATE.format(**conf["main"]), VERIFY_CREDS_TEMPLATE.format(**conf["main"]),
params={"access_token": conf["main"]["token"]}, params={"access_token": conf["main"]["token"]},
@ -114,6 +115,7 @@ def main(config_path: str):
user_id, user_id,
url=url, url=url,
reconnect=conf["main"].getboolean("auto_reconnect", False), reconnect=conf["main"].getboolean("auto_reconnect", False),
reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0),
list=conf["main"]["list"], list=conf["main"]["list"],
access_token=conf["main"]["token"], access_token=conf["main"]["token"],
) )

View File

@ -1,7 +1,7 @@
from configparser import SectionProxy from configparser import SectionProxy
from logging import getLogger from logging import getLogger
from typing import List, Optional from typing import List, Optional
from httpx import AsyncClient from httpx import AsyncClient, AsyncHTTPTransport
from zlib import crc32 from zlib import crc32
from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.base import BaseIntegration
from mastoposter.integrations.discord.types import ( from mastoposter.integrations.discord.types import (
@ -15,12 +15,13 @@ logger = getLogger("integrations.discord")
class DiscordIntegration(BaseIntegration): class DiscordIntegration(BaseIntegration):
def __init__(self, webhook: str): def __init__(self, webhook: str, retries: int = 5):
self.webhook = webhook self.webhook = webhook
self.retries = retries
@classmethod @classmethod
def from_section(cls, section: SectionProxy) -> "DiscordIntegration": def from_section(cls, section: SectionProxy) -> "DiscordIntegration":
return cls(section["webhook"]) return cls(section["webhook"], section.getint("retries", 5))
async def execute_webhook( async def execute_webhook(
self, self,
@ -29,7 +30,9 @@ class DiscordIntegration(BaseIntegration):
avatar_url: Optional[str] = None, avatar_url: Optional[str] = None,
embeds: Optional[List[DiscordEmbed]] = None, embeds: Optional[List[DiscordEmbed]] = None,
) -> dict: ) -> dict:
async with AsyncClient() as c: async with AsyncClient(
transport=AsyncHTTPTransport(retries=self.retries)
) as c:
json = { json = {
"content": content, "content": content,
"username": username, "username": username,

View File

@ -2,7 +2,7 @@ from configparser import SectionProxy
from dataclasses import dataclass from dataclasses import dataclass
from logging import getLogger from logging import getLogger
from typing import Any, List, Mapping, Optional from typing import Any, List, Mapping, Optional
from httpx import AsyncClient from httpx import AsyncClient, AsyncHTTPTransport
from jinja2 import Template from jinja2 import Template
from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Attachment, Poll, Status from mastoposter.types import Attachment, Poll, Status
@ -71,10 +71,12 @@ class TelegramIntegration(BaseIntegration):
chat_id: str, chat_id: str,
template: Optional[Template] = None, template: Optional[Template] = None,
silent: bool = True, silent: bool = True,
retries: int = 5,
): ):
self.token = token self.token = token
self.chat_id = chat_id self.chat_id = chat_id
self.silent = silent self.silent = silent
self.retries = retries
if template is None: if template is None:
self.template = Template(emojize(DEFAULT_TEMPLATE)) self.template = Template(emojize(DEFAULT_TEMPLATE))
@ -86,29 +88,34 @@ class TelegramIntegration(BaseIntegration):
return cls( return cls(
token=section["token"], token=section["token"],
chat_id=section["chat"], chat_id=section["chat"],
silent=section.getboolean("silent", True),
template=Template( template=Template(
emojize(section.get("template", DEFAULT_TEMPLATE)) emojize(section.get("template", DEFAULT_TEMPLATE))
), ),
silent=section.getboolean("silent", True),
retries=section.getint("http_retries", 5),
) )
async def _tg_request(self, method: str, **kwargs) -> TGResponse: async def _tg_request(
self, client: AsyncClient, method: str, **kwargs
) -> TGResponse:
url = API_URL.format(self.token, method) url = API_URL.format(self.token, method)
async with AsyncClient() as client: logger.debug("TG request: %s(%r)", method, kwargs)
logger.debug("TG request: %s(%r)", method, kwargs) response = TGResponse.from_dict(
response = TGResponse.from_dict( (await client.post(url, json=kwargs)).json(), kwargs
(await client.post(url, json=kwargs)).json(), kwargs )
) if not response.ok:
if not response.ok: logger.error("TG error: %r", response.error)
logger.error("TG error: %r", response.error) logger.error("parameters: %r", kwargs)
logger.error("parameters: %r", kwargs) else:
else: logger.debug("Result: %r", response.result)
logger.debug("Result: %r", response.result) return response
return response
async def _post_plaintext(self, text: str) -> TGResponse: async def _post_plaintext(
self, client: AsyncClient, text: str
) -> TGResponse:
logger.debug("Sending HTML message: %r", text) logger.debug("Sending HTML message: %r", text)
return await self._tg_request( return await self._tg_request(
client,
"sendMessage", "sendMessage",
parse_mode="HTML", parse_mode="HTML",
disable_notification=self.silent, disable_notification=self.silent,
@ -118,16 +125,21 @@ class TelegramIntegration(BaseIntegration):
) )
async def _post_media( async def _post_media(
self, text: str, media: Attachment, spoiler: bool = False self,
client: AsyncClient,
text: str,
media: Attachment,
spoiler: bool = False,
) -> TGResponse: ) -> TGResponse:
# Just to be safe # Just to be safe
if media.type not in MEDIA_MAPPING: if media.type not in MEDIA_MAPPING:
logger.warning( logger.warning(
"Media %r has unknown type, falling back to plaintext", media "Media %r has unknown type, falling back to plaintext", media
) )
return await self._post_plaintext(text) return await self._post_plaintext(client, text)
return await self._tg_request( return await self._tg_request(
client,
"send%s" % MEDIA_MAPPING[media.type].title(), "send%s" % MEDIA_MAPPING[media.type].title(),
parse_mode="HTML", parse_mode="HTML",
disable_notification=self.silent, disable_notification=self.silent,
@ -143,7 +155,11 @@ class TelegramIntegration(BaseIntegration):
) )
async def _post_mediagroup( async def _post_mediagroup(
self, text: str, media: List[Attachment], spoiler: bool = False self,
client: AsyncClient,
text: str,
media: List[Attachment],
spoiler: bool = False,
) -> TGResponse: ) -> TGResponse:
logger.debug("Sendind media group: %r (text=%r)", media, text) logger.debug("Sendind media group: %r (text=%r)", media, text)
media_list: List[dict] = [] media_list: List[dict] = []
@ -179,6 +195,7 @@ class TelegramIntegration(BaseIntegration):
) )
return await self._tg_request( return await self._tg_request(
client,
"sendMediaGroup", "sendMediaGroup",
disable_notification=self.silent, disable_notification=self.silent,
disable_web_page_preview=True, disable_web_page_preview=True,
@ -187,10 +204,11 @@ class TelegramIntegration(BaseIntegration):
) )
async def _post_poll( async def _post_poll(
self, poll: Poll, reply_to: Optional[str] = None self, client: AsyncClient, poll: Poll, reply_to: Optional[str] = None
) -> TGResponse: ) -> TGResponse:
logger.debug("Sending poll: %r", poll) logger.debug("Sending poll: %r", poll)
return await self._tg_request( return await self._tg_request(
client,
"sendPoll", "sendPoll",
disable_notification=self.silent, disable_notification=self.silent,
disable_web_page_preview=True, disable_web_page_preview=True,
@ -209,33 +227,36 @@ class TelegramIntegration(BaseIntegration):
ids = [] ids = []
if not source.media_attachments: async with AsyncClient(
if (res := await self._post_plaintext(text)).ok: transport=AsyncHTTPTransport(retries=self.retries)
if res.result: ) as client:
if not source.media_attachments:
if (res := await self._post_plaintext(client, text)).ok:
if res.result:
ids.append(res.result["message_id"])
elif len(source.media_attachments) == 1:
if (
res := await self._post_media(
client, text, source.media_attachments[0], has_spoiler
)
).ok and res.result is not None:
ids.append(res.result["message_id"])
else:
if (
res := await self._post_mediagroup(
client, text, source.media_attachments, has_spoiler
)
).ok and res.result is not None:
ids.append(res.result["message_id"]) ids.append(res.result["message_id"])
elif len(source.media_attachments) == 1: if source.poll:
if ( if (
res := await self._post_media( res := await self._post_poll(
text, source.media_attachments[0], has_spoiler client, source.poll, reply_to=ids[0] if ids else None
) )
).ok and res.result is not None: ).ok and res.result:
ids.append(res.result["message_id"]) ids.append(res.result["message_id"])
else:
if (
res := await self._post_mediagroup(
text, source.media_attachments, has_spoiler
)
).ok and res.result is not None:
ids.append(res.result["message_id"])
if source.poll:
if (
res := await self._post_poll(
source.poll, reply_to=ids[0] if ids else None
)
).ok and res.result:
ids.append(res.result["message_id"])
return str.join(",", map(str, ids)) return str.join(",", map(str, ids))

View File

@ -1,4 +1,4 @@
from asyncio import exceptions from asyncio import exceptions, sleep
from json import loads from json import loads
from logging import getLogger from logging import getLogger
from typing import AsyncGenerator from typing import AsyncGenerator
@ -10,7 +10,7 @@ logger = getLogger("sources")
async def websocket_source( async def websocket_source(
url: str, reconnect: bool = False, **params url: str, reconnect: bool = False, reconnect_delay: float = 1.0, **params
) -> AsyncGenerator[Status, None]: ) -> AsyncGenerator[Status, None]:
from websockets.client import connect from websockets.client import connect
from websockets.exceptions import WebSocketException from websockets.exceptions import WebSocketException
@ -37,8 +37,10 @@ async def websocket_source(
raise raise
else: else:
logger.warn("%r caught, reconnecting", e) logger.warn("%r caught, reconnecting", e)
await sleep(reconnect_delay)
else: else:
logger.info( logger.info(
"WebSocket closed connection without any errors, " "WebSocket closed connection without any errors, "
"but we're not done yet" "but we're not done yet"
) )
await sleep(reconnect_delay)