forked from hkc/mastoposter
Compare commits
10 Commits
master
...
manual-inv
Author | SHA1 | Date |
---|---|---|
Vftdan | a22eac0c6f | |
Vftdan | 3076544e38 | |
Vftdan | 2e3cd69f6a | |
Vftdan | 834a5a7fe2 | |
Vftdan | d999ca14c4 | |
Casey | e7e2a13c7d | |
Casey | b824d088ae | |
Vftdan | 37c1052b39 | |
Vftdan | fac1e46859 | |
Vftdan | 3e82b5c979 |
|
@ -42,6 +42,10 @@ list = 1
|
|||
auto-reconnect = yes
|
||||
reconnect-delay = 1.0
|
||||
|
||||
# Change websocket connection opening timeout.
|
||||
# It may be useful when initial server connection may take a long time.
|
||||
connect-timeout = 60.0
|
||||
|
||||
# Number of retries in case request fails. Applies globally
|
||||
# Can be changed on per-module basis
|
||||
http-retries = 5
|
||||
|
|
|
@ -37,7 +37,7 @@ from mastoposter import (
|
|||
__description__,
|
||||
)
|
||||
from mastoposter.integrations import FilteredIntegration
|
||||
from mastoposter.sources import websocket_source
|
||||
from mastoposter.sources import websocket_source, single_status_source
|
||||
from mastoposter.types import Account, Status
|
||||
from mastoposter.utils import normalize_config
|
||||
|
||||
|
@ -64,8 +64,8 @@ async def listen(
|
|||
source: Callable[..., AsyncGenerator[Status, None]],
|
||||
drains: List[FilteredIntegration],
|
||||
user: str,
|
||||
replies_to_other_accounts_should_not_be_skipped: bool = False,
|
||||
/,
|
||||
replies_to_other_accounts_should_not_be_skipped: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
logger.info("Starting listening...")
|
||||
|
@ -110,6 +110,14 @@ def main():
|
|||
"config", nargs="?", default=getenv("MASTOPOSTER_CONFIG_FILE")
|
||||
)
|
||||
parser.add_argument("-v", action="version", version=__version__)
|
||||
parser.add_argument(
|
||||
"--single-status", nargs="?", type=str,
|
||||
help="process single status and exit"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-skip-replies", action="store_true",
|
||||
help="override replies_to_other_accounts_should_not_be_skipped to true"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.config:
|
||||
|
@ -142,21 +150,38 @@ def main():
|
|||
"wss://{}/api/v1/streaming".format(conf["main"]["instance"]),
|
||||
)
|
||||
|
||||
replies_to_other_accounts_should_not_be_skipped = conf[
|
||||
"main"
|
||||
].getboolean(
|
||||
"replies_to_other_accounts_should_not_be_skipped", False
|
||||
)
|
||||
|
||||
if args.no_skip_replies:
|
||||
replies_to_other_accounts_should_not_be_skipped = True
|
||||
|
||||
source = websocket_source
|
||||
source_params = dict(
|
||||
url=url,
|
||||
replies_to_other_accounts_should_not_be_skipped=(
|
||||
replies_to_other_accounts_should_not_be_skipped),
|
||||
reconnect=conf["main"].getboolean("auto_reconnect", False),
|
||||
reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0),
|
||||
connect_timeout=conf["main"].getfloat("connect_timeout", 60.0),
|
||||
list=conf["main"]["list"],
|
||||
access_token=conf["main"]["token"],
|
||||
)
|
||||
|
||||
if args.single_status:
|
||||
source = single_status_source
|
||||
source_params["status_url"] = args.single_status
|
||||
source_params["retries"] = retries
|
||||
|
||||
run(
|
||||
listen(
|
||||
websocket_source,
|
||||
source,
|
||||
modules,
|
||||
user_id,
|
||||
url=url,
|
||||
replies_to_other_accounts_should_not_be_skipped=conf[
|
||||
"main"
|
||||
].getboolean(
|
||||
"replies_to_other_accounts_should_not_be_skipped", False
|
||||
),
|
||||
reconnect=conf["main"].getboolean("auto_reconnect", False),
|
||||
reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0),
|
||||
list=conf["main"]["list"],
|
||||
access_token=conf["main"]["token"],
|
||||
**source_params
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -16,23 +16,30 @@ GNU General Public License for more details.
|
|||
from asyncio import exceptions, sleep
|
||||
from json import loads
|
||||
from logging import getLogger
|
||||
from typing import AsyncGenerator
|
||||
from urllib.parse import urlencode
|
||||
from typing import AsyncGenerator, List
|
||||
from urllib.parse import urlencode, urlparse
|
||||
from mastoposter.types import Status
|
||||
|
||||
logger = getLogger("sources")
|
||||
|
||||
|
||||
async def websocket_source(
|
||||
url: str, reconnect: bool = False, reconnect_delay: float = 1.0, **params
|
||||
url: str, reconnect: bool = False, reconnect_delay: float = 1.0,
|
||||
connect_timeout: float = 60.0, **params
|
||||
) -> AsyncGenerator[Status, None]:
|
||||
from websockets.client import connect
|
||||
from websockets.exceptions import WebSocketException
|
||||
|
||||
url = f"{url}?" + urlencode({"stream": "list", **params})
|
||||
param_dict = {"stream": "list", **params}
|
||||
public_param_dict = param_dict.copy()
|
||||
public_param_dict["access_token"] = 'SCRUBBED'
|
||||
public_url = f"{url}?" + urlencode(public_param_dict)
|
||||
url = f"{url}?" + urlencode(param_dict)
|
||||
while True:
|
||||
try:
|
||||
async with connect(url) as ws:
|
||||
logger.info("attempting to connect to %s", public_url)
|
||||
async with connect(url, open_timeout=connect_timeout) as ws:
|
||||
logger.info("Connected to WebSocket")
|
||||
while (msg := await ws.recv()) is not None:
|
||||
event = loads(msg)
|
||||
logger.debug("data: %r", event)
|
||||
|
@ -59,3 +66,63 @@ async def websocket_source(
|
|||
"but we're not done yet"
|
||||
)
|
||||
await sleep(reconnect_delay)
|
||||
|
||||
|
||||
async def single_status_source(
|
||||
status_url: str, url: str = None, access_token: str = None,
|
||||
retries: int = 5, **kwargs
|
||||
) -> AsyncGenerator[Status, None]:
|
||||
# TODO: catch exceptions
|
||||
from httpx import Client, HTTPTransport
|
||||
|
||||
user_authority = urlparse(url).netloc if url is not None else None
|
||||
try:
|
||||
status_url = \
|
||||
f"https://{user_authority}/api/v1/statuses/{int(status_url)}"
|
||||
except ValueError:
|
||||
pass
|
||||
parsed_status_url = urlparse(status_url)
|
||||
|
||||
with Client(transport=HTTPTransport(retries=retries)) as c:
|
||||
status: Status
|
||||
if parsed_status_url.path.startswith("/api/v1/statuses/"):
|
||||
if parsed_status_url.netloc != user_authority:
|
||||
access_token = None
|
||||
|
||||
# headers = {}
|
||||
# if access_token is not None:
|
||||
# headers['Authorization'] = 'Bearer ' + access_token
|
||||
|
||||
params = {}
|
||||
if access_token is not None:
|
||||
params['access_token'] = access_token
|
||||
|
||||
rq = c.get(
|
||||
status_url,
|
||||
params=params,
|
||||
)
|
||||
status = Status.from_dict(rq.json())
|
||||
else:
|
||||
search_instance = user_authority if user_authority is not None \
|
||||
else parsed_status_url.netloc
|
||||
|
||||
if search_instance != user_authority:
|
||||
access_token = None
|
||||
|
||||
params = {}
|
||||
if access_token is not None:
|
||||
params["access_token"] = access_token
|
||||
|
||||
params["q"] = status_url
|
||||
params["resolve"] = "true"
|
||||
rq = c.get(
|
||||
f"https://{search_instance}/api/v2/search",
|
||||
params=params,
|
||||
)
|
||||
statuses: List[dict] = rq.json().get("statuses", [])
|
||||
if len(statuses) < 1:
|
||||
logger.error("Instance %s hasn't found status %r",
|
||||
search_instance, status_url)
|
||||
return
|
||||
status = Status.from_dict(statuses[0])
|
||||
yield status
|
||||
|
|
Loading…
Reference in New Issue