mastoposter-oss_images/mastoposter/sources.py

69 lines
2.5 KiB
Python
Raw Normal View History

"""
mastoposter - configurable reposter from Mastodon-compatible Fediverse servers
Copyright (C) 2022-2023 hatkidchan <hatkidchan@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
"""
2023-03-07 10:26:45 +03:00
from asyncio import exceptions, sleep
2022-08-24 08:09:41 +03:00
from json import loads
2022-11-02 20:11:38 +03:00
from logging import getLogger
2022-08-24 08:09:41 +03:00
from typing import AsyncGenerator
from urllib.parse import urlencode
2022-08-24 08:28:18 +03:00
from mastoposter.types import Status
2022-08-24 08:09:41 +03:00
2022-11-02 20:11:38 +03:00
logger = getLogger("sources")
2022-08-24 08:09:41 +03:00
async def websocket_source(
2024-06-11 21:01:24 +03:00
url: str, reconnect: bool = False, reconnect_delay: float = 1.0,
connect_timeout: float = 60.0, **params
) -> AsyncGenerator[Status, None]:
2022-08-24 08:09:41 +03:00
from websockets.client import connect
from websockets.exceptions import WebSocketException
2022-08-24 08:09:41 +03:00
param_dict = {"stream": "list", **params}
public_param_dict = param_dict.copy()
public_param_dict["access_token"] = 'SCRUBBED'
public_url = f"{url}?" + urlencode(public_param_dict)
url = f"{url}?" + urlencode(param_dict)
while True:
try:
logger.info("attempting to connect to %s", public_url)
2024-06-11 21:01:24 +03:00
async with connect(url, open_timeout=connect_timeout) as ws:
logger.info("Connected to WebSocket")
while (msg := await ws.recv()) is not None:
event = loads(msg)
logger.debug("data: %r", event)
if "error" in event:
raise Exception(event["error"])
if event["event"] == "update":
yield Status.from_dict(loads(event["payload"]))
2022-11-02 20:11:38 +03:00
else:
logger.warn("unknown event type %r", event["event"])
except (
WebSocketException,
TimeoutError,
exceptions.TimeoutError,
2023-03-19 20:09:43 +03:00
ConnectionError,
2022-11-02 20:11:38 +03:00
) as e:
if not reconnect:
raise
2022-11-02 20:11:38 +03:00
else:
logger.warn("%r caught, reconnecting", e)
2023-03-07 10:26:45 +03:00
await sleep(reconnect_delay)
2022-11-02 20:11:38 +03:00
else:
logger.info(
"WebSocket closed connection without any errors, "
"but we're not done yet"
)
2023-03-07 10:26:45 +03:00
await sleep(reconnect_delay)