From 854bb859ecf627da1ee9040b2e33fd42e5ae9dad Mon Sep 17 00:00:00 2001 From: hkc Date: Tue, 7 Mar 2023 10:26:45 +0300 Subject: [PATCH] Try, try again --- config.ini | 6 + mastoposter/__main__.py | 6 +- mastoposter/integrations/discord/__init__.py | 11 +- mastoposter/integrations/telegram.py | 109 +++++++++++-------- mastoposter/sources.py | 6 +- 5 files changed, 86 insertions(+), 52 deletions(-) diff --git a/config.ini b/config.ini index 5188dba..f383747 100644 --- a/config.ini +++ b/config.ini @@ -1,3 +1,8 @@ +[DEFAULT] +# Number of retries in case request fails. Applies globally +# Can be changed on per-module basis +http-retries = 5 + [main] # This is a list of output modules. Each module should be defined in section, # named "module/MODULENAME". Space-separated list of strings. @@ -31,6 +36,7 @@ list = 1 # That option exists because it's not really a big deal when crossposter runs # as a service and restarts automatically by the service manager. auto-reconnect = yes +reconnect-delay = 1.0 # Example Telegram integration. You can use it as a template [module/telegram] diff --git a/mastoposter/__main__.py b/mastoposter/__main__.py index 5accd80..2ef3a6b 100644 --- a/mastoposter/__main__.py +++ b/mastoposter/__main__.py @@ -15,7 +15,7 @@ from mastoposter.integrations import FilteredIntegration from mastoposter.sources import websocket_source from typing import AsyncGenerator, Callable, List from mastoposter.types import Account, Status -from httpx import Client +from httpx import Client, HTTPTransport from mastoposter.utils import normalize_config @@ -89,13 +89,14 @@ def main(config_path: str): normalize_config(conf) modules: List[FilteredIntegration] = load_integrations_from(conf) + retries: int = conf["main"].getint("http-retries", 5) logger.info("Loaded %d integrations", len(modules)) user_id: str = conf["main"]["user"] if user_id == "auto": logger.info("config.main.user is set to auto, getting user ID") - with Client() as c: + with Client(transport=HTTPTransport(retries=retries)) as c: rq = c.get( VERIFY_CREDS_TEMPLATE.format(**conf["main"]), params={"access_token": conf["main"]["token"]}, @@ -114,6 +115,7 @@ def main(config_path: str): user_id, url=url, reconnect=conf["main"].getboolean("auto_reconnect", False), + reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0), list=conf["main"]["list"], access_token=conf["main"]["token"], ) diff --git a/mastoposter/integrations/discord/__init__.py b/mastoposter/integrations/discord/__init__.py index dd88b66..359a25f 100644 --- a/mastoposter/integrations/discord/__init__.py +++ b/mastoposter/integrations/discord/__init__.py @@ -1,7 +1,7 @@ from configparser import SectionProxy from logging import getLogger from typing import List, Optional -from httpx import AsyncClient +from httpx import AsyncClient, AsyncHTTPTransport from zlib import crc32 from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.discord.types import ( @@ -15,12 +15,13 @@ logger = getLogger("integrations.discord") class DiscordIntegration(BaseIntegration): - def __init__(self, webhook: str): + def __init__(self, webhook: str, retries: int = 5): self.webhook = webhook + self.retries = retries @classmethod def from_section(cls, section: SectionProxy) -> "DiscordIntegration": - return cls(section["webhook"]) + return cls(section["webhook"], section.getint("retries", 5)) async def execute_webhook( self, @@ -29,7 +30,9 @@ class DiscordIntegration(BaseIntegration): avatar_url: Optional[str] = None, embeds: Optional[List[DiscordEmbed]] = None, ) -> dict: - async with AsyncClient() as c: + async with AsyncClient( + transport=AsyncHTTPTransport(retries=self.retries) + ) as c: json = { "content": content, "username": username, diff --git a/mastoposter/integrations/telegram.py b/mastoposter/integrations/telegram.py index 8f1dc7c..f002de8 100644 --- a/mastoposter/integrations/telegram.py +++ b/mastoposter/integrations/telegram.py @@ -2,7 +2,7 @@ from configparser import SectionProxy from dataclasses import dataclass from logging import getLogger from typing import Any, List, Mapping, Optional -from httpx import AsyncClient +from httpx import AsyncClient, AsyncHTTPTransport from jinja2 import Template from mastoposter.integrations.base import BaseIntegration from mastoposter.types import Attachment, Poll, Status @@ -71,10 +71,12 @@ class TelegramIntegration(BaseIntegration): chat_id: str, template: Optional[Template] = None, silent: bool = True, + retries: int = 5, ): self.token = token self.chat_id = chat_id self.silent = silent + self.retries = retries if template is None: self.template = Template(emojize(DEFAULT_TEMPLATE)) @@ -86,29 +88,34 @@ class TelegramIntegration(BaseIntegration): return cls( token=section["token"], chat_id=section["chat"], - silent=section.getboolean("silent", True), template=Template( emojize(section.get("template", DEFAULT_TEMPLATE)) ), + silent=section.getboolean("silent", True), + retries=section.getint("http_retries", 5), ) - async def _tg_request(self, method: str, **kwargs) -> TGResponse: + async def _tg_request( + self, client: AsyncClient, method: str, **kwargs + ) -> TGResponse: url = API_URL.format(self.token, method) - async with AsyncClient() as client: - logger.debug("TG request: %s(%r)", method, kwargs) - response = TGResponse.from_dict( - (await client.post(url, json=kwargs)).json(), kwargs - ) - if not response.ok: - logger.error("TG error: %r", response.error) - logger.error("parameters: %r", kwargs) - else: - logger.debug("Result: %r", response.result) - return response + logger.debug("TG request: %s(%r)", method, kwargs) + response = TGResponse.from_dict( + (await client.post(url, json=kwargs)).json(), kwargs + ) + if not response.ok: + logger.error("TG error: %r", response.error) + logger.error("parameters: %r", kwargs) + else: + logger.debug("Result: %r", response.result) + return response - async def _post_plaintext(self, text: str) -> TGResponse: + async def _post_plaintext( + self, client: AsyncClient, text: str + ) -> TGResponse: logger.debug("Sending HTML message: %r", text) return await self._tg_request( + client, "sendMessage", parse_mode="HTML", disable_notification=self.silent, @@ -118,16 +125,21 @@ class TelegramIntegration(BaseIntegration): ) async def _post_media( - self, text: str, media: Attachment, spoiler: bool = False + self, + client: AsyncClient, + text: str, + media: Attachment, + spoiler: bool = False, ) -> TGResponse: # Just to be safe if media.type not in MEDIA_MAPPING: logger.warning( "Media %r has unknown type, falling back to plaintext", media ) - return await self._post_plaintext(text) + return await self._post_plaintext(client, text) return await self._tg_request( + client, "send%s" % MEDIA_MAPPING[media.type].title(), parse_mode="HTML", disable_notification=self.silent, @@ -143,7 +155,11 @@ class TelegramIntegration(BaseIntegration): ) async def _post_mediagroup( - self, text: str, media: List[Attachment], spoiler: bool = False + self, + client: AsyncClient, + text: str, + media: List[Attachment], + spoiler: bool = False, ) -> TGResponse: logger.debug("Sendind media group: %r (text=%r)", media, text) media_list: List[dict] = [] @@ -179,6 +195,7 @@ class TelegramIntegration(BaseIntegration): ) return await self._tg_request( + client, "sendMediaGroup", disable_notification=self.silent, disable_web_page_preview=True, @@ -187,10 +204,11 @@ class TelegramIntegration(BaseIntegration): ) async def _post_poll( - self, poll: Poll, reply_to: Optional[str] = None + self, client: AsyncClient, poll: Poll, reply_to: Optional[str] = None ) -> TGResponse: logger.debug("Sending poll: %r", poll) return await self._tg_request( + client, "sendPoll", disable_notification=self.silent, disable_web_page_preview=True, @@ -209,33 +227,36 @@ class TelegramIntegration(BaseIntegration): ids = [] - if not source.media_attachments: - if (res := await self._post_plaintext(text)).ok: - if res.result: + async with AsyncClient( + transport=AsyncHTTPTransport(retries=self.retries) + ) as client: + if not source.media_attachments: + if (res := await self._post_plaintext(client, text)).ok: + if res.result: + ids.append(res.result["message_id"]) + + elif len(source.media_attachments) == 1: + if ( + res := await self._post_media( + client, text, source.media_attachments[0], has_spoiler + ) + ).ok and res.result is not None: + ids.append(res.result["message_id"]) + else: + if ( + res := await self._post_mediagroup( + client, text, source.media_attachments, has_spoiler + ) + ).ok and res.result is not None: ids.append(res.result["message_id"]) - elif len(source.media_attachments) == 1: - if ( - res := await self._post_media( - text, source.media_attachments[0], has_spoiler - ) - ).ok and res.result is not None: - ids.append(res.result["message_id"]) - else: - if ( - res := await self._post_mediagroup( - text, source.media_attachments, has_spoiler - ) - ).ok and res.result is not None: - ids.append(res.result["message_id"]) - - if source.poll: - if ( - res := await self._post_poll( - source.poll, reply_to=ids[0] if ids else None - ) - ).ok and res.result: - ids.append(res.result["message_id"]) + if source.poll: + if ( + res := await self._post_poll( + client, source.poll, reply_to=ids[0] if ids else None + ) + ).ok and res.result: + ids.append(res.result["message_id"]) return str.join(",", map(str, ids)) diff --git a/mastoposter/sources.py b/mastoposter/sources.py index 5865d46..6334297 100644 --- a/mastoposter/sources.py +++ b/mastoposter/sources.py @@ -1,4 +1,4 @@ -from asyncio import exceptions +from asyncio import exceptions, sleep from json import loads from logging import getLogger from typing import AsyncGenerator @@ -10,7 +10,7 @@ logger = getLogger("sources") async def websocket_source( - url: str, reconnect: bool = False, **params + url: str, reconnect: bool = False, reconnect_delay: float = 1.0, **params ) -> AsyncGenerator[Status, None]: from websockets.client import connect from websockets.exceptions import WebSocketException @@ -37,8 +37,10 @@ async def websocket_source( raise else: logger.warn("%r caught, reconnecting", e) + await sleep(reconnect_delay) else: logger.info( "WebSocket closed connection without any errors, " "but we're not done yet" ) + await sleep(reconnect_delay)