From 13dc426fc57aa0fc6bffdef2a8f1248a594c7df7 Mon Sep 17 00:00:00 2001 From: "Johan B.W. de Vries" Date: Tue, 11 Apr 2023 11:35:08 +0200 Subject: [PATCH] It lives \o/ --- phasmplatform/common/exceptions.py | 6 +- phasmplatform/common/method.py | 9 ++ phasmplatform/common/router.py | 13 ++- phasmplatform/common/service.py | 25 +++++ phasmplatform/common/value.py | 7 +- phasmplatform/common/valuetype.py | 3 + phasmplatform/worker/__main__.py | 92 +++++++++++++++-- phasmplatform/worker/runners/base.py | 29 ++++-- phasmplatform/worker/runners/wasmtime.py | 124 +++++++++++++++++++---- 9 files changed, 264 insertions(+), 44 deletions(-) create mode 100644 phasmplatform/common/service.py diff --git a/phasmplatform/common/exceptions.py b/phasmplatform/common/exceptions.py index 5da196a..e92b502 100644 --- a/phasmplatform/common/exceptions.py +++ b/phasmplatform/common/exceptions.py @@ -6,5 +6,9 @@ class PhashPlatformRuntimeError(PhasmPlatformError): pass -class PhashPlatformNonIntMainReturnError(PhashPlatformRuntimeError): +class PhashPlatformServiceNotFound(PhashPlatformRuntimeError): + pass + + +class PhashPlatformServiceMethodNotFound(PhashPlatformRuntimeError): pass diff --git a/phasmplatform/common/method.py b/phasmplatform/common/method.py index 3e786ca..eb7fd3f 100644 --- a/phasmplatform/common/method.py +++ b/phasmplatform/common/method.py @@ -21,6 +21,9 @@ class MethodArgument: return self.name == other.name and self.value_type == other.value_type + def __repr__(self) -> str: + return f'MethodArgument({repr(self.name)}, {repr(self.value_type)})' + class Method: __slots__ = ('name', 'args', 'return_type', ) @@ -34,6 +37,9 @@ class Method: self.args = args self.return_type = return_type + def __repr__(self) -> str: + return f'Method({repr(self.name)}, {repr(self.args)}, {repr(self.return_type)})' + class MethodCallError: pass @@ -62,3 +68,6 @@ class MethodCall: self.args = args self.on_success = on_success self.on_error = on_error + + def __repr__(self) -> str: + return f'MethodCall({repr(self.method)}, {repr(self.args)}, {repr(self.on_success)}, {repr(self.on_error)})' diff --git a/phasmplatform/common/router.py b/phasmplatform/common/router.py index 5d2580d..ecb4968 100644 --- a/phasmplatform/common/router.py +++ b/phasmplatform/common/router.py @@ -1,8 +1,7 @@ -class BaseRouter: - def post_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None: +from .method import MethodCall +from .service import Service + + +class MethodCallRouterInterface: + def send_call(self, service: Service, call: MethodCall) -> None: raise NotImplementedError - - -class StdOutRouter(BaseRouter): - def post_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None: - print(f'ns={namespace.decode()},t={topic.decode()},k={kind.decode()} {body.decode()}') diff --git a/phasmplatform/common/service.py b/phasmplatform/common/service.py new file mode 100644 index 0000000..f0a0b3a --- /dev/null +++ b/phasmplatform/common/service.py @@ -0,0 +1,25 @@ +from typing import Dict, Optional, List + +from .method import Method + + +class Service: + __slots__ = ('name', 'methods', ) + + name: str + methods: Dict[str, Method] + + def __init__(self, name: str, methods: List[Method]) -> None: + self.name = name + self.methods = { + x.name: x + for x in methods + } + + def __repr__(self) -> str: + return f'Service({repr(self.name)}, {repr(list(self.methods.values()))})' + + +class ServiceDiscoveryInterface: + def find_service(self, name: str) -> Optional[Service]: + raise NotImplementedError diff --git a/phasmplatform/common/value.py b/phasmplatform/common/value.py index 61a59c9..28c7547 100644 --- a/phasmplatform/common/value.py +++ b/phasmplatform/common/value.py @@ -1,6 +1,6 @@ from typing import Any, Union -from .valuetype import ValueType +from .valuetype import ValueType, none ValueData = Union[None, bytes] @@ -19,4 +19,7 @@ class Value: return self.value_type is other.value_type and self.data == other.data def __repr__(self) -> str: - return f'Value(valuetype.{self.value_type.name}, {repr(self.data)})' + return f'Value({repr(self.value_type)}, {repr(self.data)})' + + +NoneValue = Value(none, None) diff --git a/phasmplatform/common/valuetype.py b/phasmplatform/common/valuetype.py index 36f544e..9e8e090 100644 --- a/phasmplatform/common/valuetype.py +++ b/phasmplatform/common/valuetype.py @@ -15,6 +15,9 @@ class ValueType: return self is other + def __repr__(self) -> str: + return f'valuetype.{self.name}' + bytes = ValueType('bytes') diff --git a/phasmplatform/worker/__main__.py b/phasmplatform/worker/__main__.py index bc4cf34..de93cd4 100644 --- a/phasmplatform/worker/__main__.py +++ b/phasmplatform/worker/__main__.py @@ -1,3 +1,5 @@ +from typing import Dict, List, Optional, Tuple + import sys import threading import time @@ -5,8 +7,10 @@ from queue import Empty, Queue from phasmplatform.common import valuetype from phasmplatform.common.config import from_toml -from phasmplatform.common.method import Method, MethodCall -from phasmplatform.common.router import StdOutRouter +from phasmplatform.common.method import Method, MethodArgument, MethodCall +from phasmplatform.common.router import MethodCallRouterInterface +from phasmplatform.common.service import Service, ServiceDiscoveryInterface +from phasmplatform.common.value import NoneValue from .runners.base import RunnerInterface from .runners.wasmtime import WasmTimeRunner @@ -19,22 +23,92 @@ def runner_thread(runner: RunnerInterface, queue: Queue[MethodCall]) -> None: except Empty: break + print('rt call', runner, queue, call) runner.do_call(call) +def make_prelude() -> Service: + methods: List[Method] = [] + + methods.append(Method('log_bytes', [ + MethodArgument('data', valuetype.bytes) + ], valuetype.none)) + + return Service('prelude', methods) + + +class LocalhostRunner(RunnerInterface): + def do_call(self, call: MethodCall) -> None: + if call.method.name == 'on_module_loaded': + print('LocalhostRunner loaded') + call.on_success(NoneValue) + return + + if call.method.name == 'log_bytes': + print('LOG-BYTES:', repr(call.args[0].data)) + call.on_success(NoneValue) + return + + raise NotImplementedError(call) + + +class LocalhostServiceDiscovery(ServiceDiscoveryInterface): + services: Dict[str, Tuple[Service, Queue[MethodCall]]] + + def __init__(self) -> None: + self.services = {} + + def register_service(self, service: Service, queue: Queue[MethodCall]) -> None: + self.services[service.name] = (service, queue, ) + + def find_service(self, name: str) -> Optional[Service]: + parts = self.services.get(name, None) + if parts is None: + return None + + return parts[0] + + +class LocalhostMethodCallRouter(MethodCallRouterInterface): + def __init__(self, service_discovery: LocalhostServiceDiscovery) -> None: + self.service_discovery = service_discovery + + def send_call(self, service: Service, call: MethodCall) -> None: + assert service.name in self.service_discovery.services + queue = self.service_discovery.services[service.name][1] + print('send_call', service, call, queue) + queue.put(call) + + def main() -> int: with open('config.toml', 'rb') as fil: config = from_toml(fil) del config - stdout_router = StdOutRouter() + localhost_queue: Queue[MethodCall] = Queue() + echo_client_queue: Queue[MethodCall] = Queue() + echo_server_queue: Queue[MethodCall] = Queue() - with open('/home/johan/projects/idea/phasm/examples/echoclient.wasm', 'rb') as fil: - echo_client = WasmTimeRunner(stdout_router, fil.read()) + service_discovery = LocalhostServiceDiscovery() + method_call_router = LocalhostMethodCallRouter(service_discovery) + + localhost = LocalhostRunner() + service_discovery.register_service(make_prelude(), localhost_queue) with open('/home/johan/projects/idea/phasm/examples/echoserver.wasm', 'rb') as fil: - echo_server = WasmTimeRunner(stdout_router, fil.read()) + echo_server = WasmTimeRunner(service_discovery, method_call_router, fil.read()) + service_discovery.register_service(Service('echoserver', [ + Method('echo', [ + MethodArgument('msg', valuetype.bytes) + ], valuetype.bytes) + ]), echo_server_queue) + + with open('/home/johan/projects/idea/phasm/examples/echoclient.wasm', 'rb') as fil: + echo_client = WasmTimeRunner(service_discovery, method_call_router, fil.read()) + + # service_discovery.register_service(echo_client, echo_client_queue) + # service_discovery.register_service(echo_server, echo_server_queue) on_module_loaded = MethodCall( Method('on_module_loaded', [], valuetype.none), @@ -43,15 +117,15 @@ def main() -> int: lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it ) - echo_client_queue: Queue[MethodCall] = Queue() + localhost_queue.put(on_module_loaded) echo_client_queue.put(on_module_loaded) - - echo_server_queue: Queue[MethodCall] = Queue() echo_server_queue.put(on_module_loaded) + localhost_thread = threading.Thread(target=runner_thread, args=(localhost, localhost_queue)) echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue)) echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue)) + localhost_thread.start() echo_client_thread.start() echo_server_thread.start() diff --git a/phasmplatform/worker/runners/base.py b/phasmplatform/worker/runners/base.py index 23e4816..a78c442 100644 --- a/phasmplatform/worker/runners/base.py +++ b/phasmplatform/worker/runners/base.py @@ -1,26 +1,37 @@ from typing import TextIO, Union from phasmplatform.common import valuetype -from phasmplatform.common.router import BaseRouter from phasmplatform.common.method import MethodCall +from phasmplatform.common.router import MethodCallRouterInterface +from phasmplatform.common.service import ServiceDiscoveryInterface from phasmplatform.common.value import Value from phasmplatform.common.valuetype import ValueType +WasmValue = Union[None, int, float] + + class RunnerInterface: __slots__ = ('router', ) def do_call(self, call: MethodCall) -> None: + """ + Executes the call on the current container + + This method is responsible for calling the on_success or on_error method. + """ raise NotImplementedError class BaseRunner(RunnerInterface): - __slots__ = ('router', ) + __slots__ = ('service_discovery', 'method_call_router', ) - router: BaseRouter + service_discovery: ServiceDiscoveryInterface + method_call_router: MethodCallRouterInterface - def __init__(self, router: BaseRouter) -> None: - self.router = router + def __init__(self, service_discovery: ServiceDiscoveryInterface, method_call_router: MethodCallRouterInterface) -> None: + self.service_discovery = service_discovery + self.method_call_router = method_call_router def alloc_bytes(self, data: bytes) -> int: """ @@ -34,14 +45,18 @@ class BaseRunner(RunnerInterface): """ raise NotImplementedError - def value_to_wasm(self, val: Value) -> Union[None, int, float]: + def value_to_wasm(self, val: Value) -> WasmValue: + if val.value_type is valuetype.none: + assert val.data is None # type hint + return None + if val.value_type is valuetype.bytes: assert isinstance(val.data, bytes) # type hint return self.alloc_bytes(val.data) raise NotImplementedError(val) - def value_from_wasm(self, value_type: ValueType, val: Union[None, int, float]) -> Value: + def value_from_wasm(self, value_type: ValueType, val: WasmValue) -> Value: if value_type is valuetype.none: assert val is None # type hint return Value(value_type, None) diff --git a/phasmplatform/worker/runners/wasmtime.py b/phasmplatform/worker/runners/wasmtime.py index 3732a68..7180020 100644 --- a/phasmplatform/worker/runners/wasmtime.py +++ b/phasmplatform/worker/runners/wasmtime.py @@ -1,40 +1,66 @@ -from typing import List +from typing import Any, List import ctypes +import functools import struct +from queue import Empty, Queue import wasmtime -from phasmplatform.common.method import MethodCall, MethodNotFoundError -from phasmplatform.common.router import BaseRouter -from .base import BaseRunner +from phasmplatform.common import valuetype +from phasmplatform.common.exceptions import PhashPlatformServiceNotFound, PhashPlatformServiceMethodNotFound +from phasmplatform.common.method import Method, MethodCall, MethodCallError, MethodNotFoundError +from phasmplatform.common.router import MethodCallRouterInterface +from phasmplatform.common.service import Service, ServiceDiscoveryInterface +from phasmplatform.common.value import Value +from phasmplatform.common.valuetype import ValueType + +from .base import BaseRunner, WasmValue class WasmTimeRunner(BaseRunner): __slots__ = ('store', 'module', 'instance', 'exports') - def __init__(self, router: BaseRouter, wasm_bin: bytes) -> None: - super().__init__(router) + def __init__( + self, + service_discovery: ServiceDiscoveryInterface, + method_call_router: MethodCallRouterInterface, + wasm_bin: bytes, + ) -> None: + super().__init__(service_discovery, method_call_router) self.store = wasmtime.Store() - - possible_imports = { - 'prelude': { - 'log_bytes': wasmtime.Func(self.store, wasmtime.FuncType([wasmtime.ValType.i32()], []), self.log_bytes), - } - } - self.module = wasmtime.Module(self.store.engine, wasm_bin) + from typing import Any + + def dump_args(*args: Any, **kwargs: Any) -> None: + print('args', args) + print('kwargs', kwargs) + imports: List[wasmtime.Func] = [] for imprt in self.module.imports: - if imprt.module not in possible_imports: - raise Exception('Service not found') + service = service_discovery.find_service(imprt.module) + if service is None: + raise PhashPlatformServiceNotFound( + f'Dependent service "{imprt.module}" not found; could not provide "{imprt.name}"' + ) - if imprt.name not in possible_imports[imprt.module]: - raise Exception('Method not found in service') + assert imprt.name is not None # type hint - imports.append(possible_imports[imprt.module][imprt.name]) + method = service.methods.get(imprt.name) + if method is None: + raise PhashPlatformServiceMethodNotFound( + f'Dependent service "{imprt.module}" found, but it does not provide "{imprt.name}"' + ) + + func = wasmtime.Func( + self.store, + build_func_type(method), + functools.partial(self.send_service_call, service, method) + ) + + imports.append(func) self.instance = wasmtime.Instance(self.store, self.module, imports) @@ -87,3 +113,65 @@ class WasmTimeRunner(BaseRunner): result = wasm_method(self.store, *act_args) assert result is None or isinstance(result, (int, float, )) # type hint call.on_success(self.value_from_wasm(call.method.return_type, result)) + + def send_service_call(self, service: Service, method: Method, *args: Any) -> WasmValue: + assert len(method.args) == len(args) # type hint + + call_args = [ + self.value_from_wasm(x.value_type, y) + for x, y in zip(method.args, args) + ] + + queue: Queue[Value] = Queue(maxsize=1) + + def on_success(val: Value) -> None: + print('hi mom') + queue.put(val) + + def on_error(err: MethodCallError) -> None: + print('Error while calling', service, method, args) + + print('on_success', on_success) + call = MethodCall(method, call_args, on_success, on_error) + + print( + 'send_service_call', + 'from-service=?', 'from-method=?', # TODO + f'to-service={service.name}', f'to-method={method.name}', + *args, + ) + self.method_call_router.send_call(service, call) + + try: + value = queue.get(block=True, timeout=10) + except Empty: + print( + 'send_service_call', + 'from-service=?', 'from-method=?', # TODO + f'to-service={service.name}', f'to-method={method.name}', + 'TIMEOUT', + ) + raise Exception() # TODO + + return self.value_to_wasm(value) + + +def build_func_type(method: Method) -> wasmtime.FuncType: + if method.return_type is valuetype.none: + returns = [] + else: + returns = [build_wasm_type(method.return_type)] + + args = [] + for arg in method.args: + assert arg.value_type is not valuetype.none # type hint + args.append(build_wasm_type(arg.value_type)) + + return wasmtime.FuncType(args, returns) + + +def build_wasm_type(value_type: ValueType) -> wasmtime.ValType: + if value_type is valuetype.bytes: + return wasmtime.ValType.i32() # Bytes are passed as pointer + + raise NotImplementedError