bta-proxy/bta_proxy/proxy.py

40 lines
1.5 KiB
Python

from asyncio.queues import Queue
from asyncio import AbstractEventLoop, get_event_loop
from asyncio.streams import StreamReader, StreamWriter, open_connection
from typing import Optional
from bta_proxy.dpi import inspect_client, inspect_server
class BTAProxy:
def __init__(self, host: str, port: int, loop: Optional[AbstractEventLoop] = None):
self.host = host
self.port = port
self.loop = loop or get_event_loop()
@staticmethod
async def pipe(reader: StreamReader, writer: StreamWriter, queue: Queue):
try:
while not reader.at_eof():
packet = await reader.read(0x400000)
queue.put_nowait(packet)
writer.write(packet)
finally:
queue.put_nowait(None)
writer.close()
async def handle_client(self, cli_reader: StreamReader, cli_writer: StreamWriter):
try:
peername = cli_writer.get_extra_info("peername")
srv_reader, srv_writer = await open_connection(self.host, self.port)
queue_srv: Queue = Queue()
queue_cli: Queue = Queue()
self.loop.create_task(inspect_client(queue_cli, peername))
self.loop.create_task(inspect_server(queue_srv, peername))
self.loop.create_task(self.pipe(cli_reader, srv_writer, queue_cli))
self.loop.create_task(self.pipe(srv_reader, cli_writer, queue_srv))
except Exception as e:
print(f"oopsie whoopsie {e}")