from typing import Callable, Dict, List, Optional, Tuple, Union import datetime import functools import sys import threading import time from queue import Empty, Queue from phasmplatform.common import valuetype from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml from phasmplatform.common.method import Method, MethodArgument, MethodCall from phasmplatform.common.router import MethodCallRouterInterface from phasmplatform.common.service import Service, ServiceDiscoveryInterface from phasmplatform.common.value import Value, NoneValue from phasmplatform.common.state import from_toml as state_from_toml from .runners.base import RunnerInterface from .runners.wasmtime import WasmTimeRunner class ShuttingDown(): pass def runner_thread(runner: RunnerInterface, queue: Queue[Union[MethodCall, ShuttingDown]]) -> None: while True: try: call = queue.get(block=True, timeout=1) except Empty: continue if isinstance(call, ShuttingDown): break runner.do_call(call) def make_prelude() -> Service: methods: List[Method] = [] methods.append(Method('log_bytes', [ MethodArgument('data', valuetype.bytes) ], valuetype.none)) return Service('prelude', methods) class LocalhostRunner(RunnerInterface): def do_call(self, call: MethodCall) -> None: if call.method.name == 'on_module_loaded': print('LocalhostRunner loaded') call.on_success(NoneValue) return if call.method.name == 'log_bytes': print('LOG-BYTES:', repr(call.args[0].data)) call.on_success(NoneValue) return raise NotImplementedError(call) class LocalhostServiceDiscovery(ServiceDiscoveryInterface): services: Dict[str, Tuple[Service, Queue[Union[MethodCall, ShuttingDown]]]] def __init__(self) -> None: self.services = {} def register_service(self, service: Service, queue: Queue[Union[MethodCall, ShuttingDown]]) -> None: self.services[service.name] = (service, queue, ) def find_service(self, name: str) -> Optional[Service]: parts = self.services.get(name, None) if parts is None: return None return parts[0] class LocalhostMethodCallRouter(MethodCallRouterInterface): def __init__(self, config: WorkerConfig, service_discovery: LocalhostServiceDiscovery) -> None: self.config = config self.service_discovery = service_discovery def send_call(self, service: Service, call: MethodCall) -> None: self.config.clickhouse_write.insert('routing_logs', [ [datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name], ], ['timestamp', 'to_service', 'to_method']) call.on_success = functools.partial(self._send_call_on_succes, service, call, call.on_success) assert service.name in self.service_discovery.services queue = self.service_discovery.services[service.name][1] queue.put(call) def _send_call_on_succes(self, service: Service, call: MethodCall, orig_on_succes: Callable[[Value], None], value: Value) -> None: self.config.clickhouse_write.insert('routing_logs', [ [datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name, repr(value.data)], ], ['timestamp', 'to_service', 'to_method', 'result']) orig_on_succes(value) def main() -> int: with open('config.toml', 'rb') as fil: config = config_from_toml(fil) with open('./examples/echoserver.toml', 'rb') as fil: state = state_from_toml(fil) del state # TODO: Replace the stuff below with the loading from the example state localhost_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue() echo_client_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue() echo_server_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue() service_discovery = LocalhostServiceDiscovery() method_call_router = LocalhostMethodCallRouter(config.worker_config, service_discovery) localhost = LocalhostRunner() service_discovery.register_service(make_prelude(), localhost_queue) with open('./examples/echoserver.wasm', 'rb') as fil: echo_server = WasmTimeRunner(service_discovery, method_call_router, fil.read()) service_discovery.register_service(Service('echoserver', [ Method('echo', [ MethodArgument('msg', valuetype.bytes) ], valuetype.bytes) ]), echo_server_queue) with open('./examples/echoclient.wasm', 'rb') as fil: echo_client = WasmTimeRunner(service_discovery, method_call_router, fil.read()) # service_discovery.register_service(echo_client, echo_client_queue) # service_discovery.register_service(echo_server, echo_server_queue) on_module_loaded = MethodCall( Method('on_module_loaded', [], valuetype.none), [], lambda x: None, lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it ) localhost_queue.put(on_module_loaded) echo_client_queue.put(on_module_loaded) echo_server_queue.put(on_module_loaded) localhost_thread = threading.Thread(target=runner_thread, args=(localhost, localhost_queue)) echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue)) echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue)) localhost_thread.start() echo_client_thread.start() echo_server_thread.start() try: while 1: time.sleep(1) except KeyboardInterrupt: pass localhost_queue.put(ShuttingDown()) echo_client_queue.put(ShuttingDown()) echo_server_queue.put(ShuttingDown()) return 0 if __name__ == '__main__': sys.exit(main())