1
0
Fork 0

Add an ability to manually run mastoposter on single status urls

This commit is contained in:
Vftdan 2024-07-01 15:30:48 +02:00
parent e7e2a13c7d
commit d999ca14c4
Signed by: vftdan
GPG Key ID: 5D49CE378C2B20C7
2 changed files with 85 additions and 15 deletions

View File

@ -37,7 +37,7 @@ from mastoposter import (
__description__,
)
from mastoposter.integrations import FilteredIntegration
from mastoposter.sources import websocket_source
from mastoposter.sources import websocket_source, single_status_source
from mastoposter.types import Account, Status
from mastoposter.utils import normalize_config
@ -110,6 +110,10 @@ def main():
"config", nargs="?", default=getenv("MASTOPOSTER_CONFIG_FILE")
)
parser.add_argument("-v", action="version", version=__version__)
parser.add_argument(
"--single-status", nargs="?", type=str,
help="process single status and exit"
)
args = parser.parse_args()
if not args.config:
@ -142,22 +146,32 @@ def main():
"wss://{}/api/v1/streaming".format(conf["main"]["instance"]),
)
source = websocket_source
source_params = dict(
url=url,
replies_to_other_accounts_should_not_be_skipped=conf[
"main"
].getboolean(
"replies_to_other_accounts_should_not_be_skipped", False
),
reconnect=conf["main"].getboolean("auto_reconnect", False),
reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0),
connect_timeout=conf["main"].getfloat("connect_timeout", 60.0),
list=conf["main"]["list"],
access_token=conf["main"]["token"],
)
if args.single_status:
source = single_status_source
source_params["status_url"] = args.single_status
source_params["retries"] = retries
run(
listen(
websocket_source,
source,
modules,
user_id,
url=url,
replies_to_other_accounts_should_not_be_skipped=conf[
"main"
].getboolean(
"replies_to_other_accounts_should_not_be_skipped", False
),
reconnect=conf["main"].getboolean("auto_reconnect", False),
reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0),
connect_timeout=conf["main"].getfloat("connect_timeout", 60.0),
list=conf["main"]["list"],
access_token=conf["main"]["token"],
**source_params
)
)

View File

@ -16,8 +16,8 @@ GNU General Public License for more details.
from asyncio import exceptions, sleep
from json import loads
from logging import getLogger
from typing import AsyncGenerator
from urllib.parse import urlencode
from typing import AsyncGenerator, List
from urllib.parse import urlencode, urlparse
from mastoposter.types import Status
logger = getLogger("sources")
@ -66,3 +66,59 @@ async def websocket_source(
"but we're not done yet"
)
await sleep(reconnect_delay)
async def single_status_source(
status_url: str, url: str = None, access_token: str = None, retries: int = 5, **kwargs
) -> AsyncGenerator[Status, None]:
# TODO: catch exceptions
from httpx import Client, HTTPTransport
user_authority = urlparse(url).netloc if url is not None else None
try:
status_url = f"https://{user_authority}/api/v1/statuses/{int(status_url)}"
except ValueError:
pass
parsed_status_url = urlparse(status_url)
with Client(transport=HTTPTransport(retries=retries)) as c:
status: Status
if parsed_status_url.path.startswith("/api/v1/statuses/"):
if parsed_status_url.netloc != user_authority:
access_token = None
# headers = {}
# if access_token is not None:
# headers['Authorization'] = 'Bearer ' + access_token
params = {}
if access_token is not None:
params['access_token'] = access_token
rq = c.get(
status_url,
params=params,
)
status = Status.from_dict(rq.json())
else:
search_instance = user_authority if user_authority is not None else parsed_status_url.netloc
if search_instance != user_authority:
access_token = None
params = {}
if access_token is not None:
params["access_token"] = access_token
params["q"] = status_url
rq = c.get(
f"https://{search_instance}/api/v2/search",
params=params,
)
statuses: List[Status] = rq.json().get("statuses", [])
if len(statuses) < 1:
logger.error("Instance %s hasn't found status %r",
search_instance, status_url)
return
status = Status.from_dict(statuses[0])
yield status