From d999ca14c467b1425e0b5a91294165c53afd3cab Mon Sep 17 00:00:00 2001 From: Vftdan Date: Mon, 1 Jul 2024 15:30:48 +0200 Subject: [PATCH] Add an ability to manually run mastoposter on single status urls --- mastoposter/__main__.py | 40 ++++++++++++++++++--------- mastoposter/sources.py | 60 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 15 deletions(-) diff --git a/mastoposter/__main__.py b/mastoposter/__main__.py index 682e3c1..d21f28f 100644 --- a/mastoposter/__main__.py +++ b/mastoposter/__main__.py @@ -37,7 +37,7 @@ from mastoposter import ( __description__, ) from mastoposter.integrations import FilteredIntegration -from mastoposter.sources import websocket_source +from mastoposter.sources import websocket_source, single_status_source from mastoposter.types import Account, Status from mastoposter.utils import normalize_config @@ -110,6 +110,10 @@ def main(): "config", nargs="?", default=getenv("MASTOPOSTER_CONFIG_FILE") ) parser.add_argument("-v", action="version", version=__version__) + parser.add_argument( + "--single-status", nargs="?", type=str, + help="process single status and exit" + ) args = parser.parse_args() if not args.config: @@ -142,22 +146,32 @@ def main(): "wss://{}/api/v1/streaming".format(conf["main"]["instance"]), ) + source = websocket_source + source_params = dict( + url=url, + replies_to_other_accounts_should_not_be_skipped=conf[ + "main" + ].getboolean( + "replies_to_other_accounts_should_not_be_skipped", False + ), + reconnect=conf["main"].getboolean("auto_reconnect", False), + reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0), + connect_timeout=conf["main"].getfloat("connect_timeout", 60.0), + list=conf["main"]["list"], + access_token=conf["main"]["token"], + ) + + if args.single_status: + source = single_status_source + source_params["status_url"] = args.single_status + source_params["retries"] = retries + run( listen( - websocket_source, + source, modules, user_id, - url=url, - replies_to_other_accounts_should_not_be_skipped=conf[ - "main" - ].getboolean( - "replies_to_other_accounts_should_not_be_skipped", False - ), - reconnect=conf["main"].getboolean("auto_reconnect", False), - reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0), - connect_timeout=conf["main"].getfloat("connect_timeout", 60.0), - list=conf["main"]["list"], - access_token=conf["main"]["token"], + **source_params ) ) diff --git a/mastoposter/sources.py b/mastoposter/sources.py index 39ad42b..f3a23b2 100644 --- a/mastoposter/sources.py +++ b/mastoposter/sources.py @@ -16,8 +16,8 @@ GNU General Public License for more details. from asyncio import exceptions, sleep from json import loads from logging import getLogger -from typing import AsyncGenerator -from urllib.parse import urlencode +from typing import AsyncGenerator, List +from urllib.parse import urlencode, urlparse from mastoposter.types import Status logger = getLogger("sources") @@ -66,3 +66,59 @@ async def websocket_source( "but we're not done yet" ) await sleep(reconnect_delay) + + +async def single_status_source( + status_url: str, url: str = None, access_token: str = None, retries: int = 5, **kwargs +) -> AsyncGenerator[Status, None]: + # TODO: catch exceptions + from httpx import Client, HTTPTransport + + user_authority = urlparse(url).netloc if url is not None else None + try: + status_url = f"https://{user_authority}/api/v1/statuses/{int(status_url)}" + except ValueError: + pass + parsed_status_url = urlparse(status_url) + + with Client(transport=HTTPTransport(retries=retries)) as c: + status: Status + if parsed_status_url.path.startswith("/api/v1/statuses/"): + if parsed_status_url.netloc != user_authority: + access_token = None + + # headers = {} + # if access_token is not None: + # headers['Authorization'] = 'Bearer ' + access_token + + params = {} + if access_token is not None: + params['access_token'] = access_token + + rq = c.get( + status_url, + params=params, + ) + status = Status.from_dict(rq.json()) + else: + search_instance = user_authority if user_authority is not None else parsed_status_url.netloc + + if search_instance != user_authority: + access_token = None + + params = {} + if access_token is not None: + params["access_token"] = access_token + + params["q"] = status_url + rq = c.get( + f"https://{search_instance}/api/v2/search", + params=params, + ) + statuses: List[Status] = rq.json().get("statuses", []) + if len(statuses) < 1: + logger.error("Instance %s hasn't found status %r", + search_instance, status_url) + return + status = Status.from_dict(statuses[0]) + yield status