from typing import Dict, List, Optional, Tuple import sys import threading import time from queue import Empty, Queue from phasmplatform.common import valuetype from phasmplatform.common.config import from_toml as config_from_toml from phasmplatform.common.method import Method, MethodArgument, MethodCall from phasmplatform.common.router import MethodCallRouterInterface from phasmplatform.common.service import Service, ServiceDiscoveryInterface from phasmplatform.common.value import NoneValue from phasmplatform.common.state import from_toml as state_from_toml from .runners.base import RunnerInterface from .runners.wasmtime import WasmTimeRunner def runner_thread(runner: RunnerInterface, queue: Queue[MethodCall]) -> None: while True: try: call = queue.get(block=True, timeout=1) except Empty: break runner.do_call(call) def make_prelude() -> Service: methods: List[Method] = [] methods.append(Method('log_bytes', [ MethodArgument('data', valuetype.bytes) ], valuetype.none)) return Service('prelude', methods) class LocalhostRunner(RunnerInterface): def do_call(self, call: MethodCall) -> None: if call.method.name == 'on_module_loaded': print('LocalhostRunner loaded') call.on_success(NoneValue) return if call.method.name == 'log_bytes': print('LOG-BYTES:', repr(call.args[0].data)) call.on_success(NoneValue) return raise NotImplementedError(call) class LocalhostServiceDiscovery(ServiceDiscoveryInterface): services: Dict[str, Tuple[Service, Queue[MethodCall]]] def __init__(self) -> None: self.services = {} def register_service(self, service: Service, queue: Queue[MethodCall]) -> None: self.services[service.name] = (service, queue, ) def find_service(self, name: str) -> Optional[Service]: parts = self.services.get(name, None) if parts is None: return None return parts[0] class LocalhostMethodCallRouter(MethodCallRouterInterface): def __init__(self, service_discovery: LocalhostServiceDiscovery) -> None: self.service_discovery = service_discovery def send_call(self, service: Service, call: MethodCall) -> None: assert service.name in self.service_discovery.services queue = self.service_discovery.services[service.name][1] print('send_call', service, call, queue) queue.put(call) def main() -> int: with open('config.toml', 'rb') as fil: config = config_from_toml(fil) del config with open('./examples/echoserver.toml', 'rb') as fil: state = state_from_toml(fil) print(state) localhost_queue: Queue[MethodCall] = Queue() echo_client_queue: Queue[MethodCall] = Queue() echo_server_queue: Queue[MethodCall] = Queue() service_discovery = LocalhostServiceDiscovery() method_call_router = LocalhostMethodCallRouter(service_discovery) localhost = LocalhostRunner() service_discovery.register_service(make_prelude(), localhost_queue) with open('./examples/echoserver.wasm', 'rb') as fil: echo_server = WasmTimeRunner(service_discovery, method_call_router, fil.read()) service_discovery.register_service(Service('echoserver', [ Method('echo', [ MethodArgument('msg', valuetype.bytes) ], valuetype.bytes) ]), echo_server_queue) with open('./examples/echoclient.wasm', 'rb') as fil: echo_client = WasmTimeRunner(service_discovery, method_call_router, fil.read()) # service_discovery.register_service(echo_client, echo_client_queue) # service_discovery.register_service(echo_server, echo_server_queue) on_module_loaded = MethodCall( Method('on_module_loaded', [], valuetype.none), [], lambda x: None, lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it ) localhost_queue.put(on_module_loaded) echo_client_queue.put(on_module_loaded) echo_server_queue.put(on_module_loaded) localhost_thread = threading.Thread(target=runner_thread, args=(localhost, localhost_queue)) echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue)) echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue)) localhost_thread.start() echo_client_thread.start() echo_server_thread.start() time.sleep(3) return 0 if __name__ == '__main__': sys.exit(main())