diff --git a/phasmplatform/common/method.py b/phasmplatform/common/method.py index 8d4bbc8..3e786ca 100644 --- a/phasmplatform/common/method.py +++ b/phasmplatform/common/method.py @@ -1,6 +1,7 @@ -from typing import Any, List +from typing import Any, Callable, List +from .value import Value from .valuetype import ValueType @@ -32,3 +33,32 @@ class Method: self.name = name self.args = args self.return_type = return_type + + +class MethodCallError: + pass + + +class MethodNotFoundError(MethodCallError): + pass + + +class MethodCall: + __slots__ = ('method', 'args', 'on_success', 'on_error', ) + + method: Method + args: List[Value] + on_success: Callable[[Value], None] + on_error: Callable[[MethodCallError], None] + + def __init__( + self, + method: Method, + args: List[Value], + on_success: Callable[[Value], None], + on_error: Callable[[MethodCallError], None], + ) -> None: + self.method = method + self.args = args + self.on_success = on_success + self.on_error = on_error diff --git a/phasmplatform/worker/__main__.py b/phasmplatform/worker/__main__.py index 5dc30ae..bc4cf34 100644 --- a/phasmplatform/worker/__main__.py +++ b/phasmplatform/worker/__main__.py @@ -1,26 +1,25 @@ import sys +import threading +import time +from queue import Empty, Queue from phasmplatform.common import valuetype from phasmplatform.common.config import from_toml -from phasmplatform.common.method import Method, MethodArgument +from phasmplatform.common.method import Method, MethodCall from phasmplatform.common.router import StdOutRouter -from phasmplatform.common.value import Value from .runners.base import RunnerInterface from .runners.wasmtime import WasmTimeRunner -def somefunc(runner: RunnerInterface) -> None: - inp = Value(valuetype.bytes, b'Hello, world!') - print('inp', inp) +def runner_thread(runner: RunnerInterface, queue: Queue[MethodCall]) -> None: + while True: + try: + call = queue.get(block=True, timeout=1) + except Empty: + break - def on_respond(out: Value) -> None: - print('out', out) - assert out == inp - - echo = Method('echo', [MethodArgument('msg', valuetype.bytes)], valuetype.bytes) - - runner.do_call(echo, [inp], on_respond) + runner.do_call(call) def main() -> int: @@ -31,17 +30,32 @@ def main() -> int: stdout_router = StdOutRouter() - with open('/home/johan/projects/idea/phasm/examples/platform.wasm', 'rb') as fil: - foo = WasmTimeRunner(stdout_router, fil.read()) + with open('/home/johan/projects/idea/phasm/examples/echoclient.wasm', 'rb') as fil: + echo_client = WasmTimeRunner(stdout_router, fil.read()) - # namespace = b'test-namespace' - # topic = b'test-topic' - # kind = b'test-kind' - # body = b'test-body' + with open('/home/johan/projects/idea/phasm/examples/echoserver.wasm', 'rb') as fil: + echo_server = WasmTimeRunner(stdout_router, fil.read()) - # foo.handle_message(namespace, topic, kind, body) + on_module_loaded = MethodCall( + Method('on_module_loaded', [], valuetype.none), + [], + lambda x: None, + lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it + ) - somefunc(foo) + echo_client_queue: Queue[MethodCall] = Queue() + echo_client_queue.put(on_module_loaded) + + echo_server_queue: Queue[MethodCall] = Queue() + echo_server_queue.put(on_module_loaded) + + echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue)) + echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue)) + + echo_client_thread.start() + echo_server_thread.start() + + time.sleep(3) return 0 diff --git a/phasmplatform/worker/runners/base.py b/phasmplatform/worker/runners/base.py index dd20aa9..23e4816 100644 --- a/phasmplatform/worker/runners/base.py +++ b/phasmplatform/worker/runners/base.py @@ -1,14 +1,16 @@ -from typing import Callable, List, TextIO +from typing import TextIO, Union +from phasmplatform.common import valuetype from phasmplatform.common.router import BaseRouter -from phasmplatform.common.method import Method +from phasmplatform.common.method import MethodCall from phasmplatform.common.value import Value +from phasmplatform.common.valuetype import ValueType class RunnerInterface: __slots__ = ('router', ) - def do_call(self, method: Method, args: List[Value], on_result: Callable[[Value], None]) -> None: + def do_call(self, call: MethodCall) -> None: raise NotImplementedError @@ -32,16 +34,26 @@ class BaseRunner(RunnerInterface): """ raise NotImplementedError - def handle_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None: - raise NotImplementedError + def value_to_wasm(self, val: Value) -> Union[None, int, float]: + if val.value_type is valuetype.bytes: + assert isinstance(val.data, bytes) # type hint + return self.alloc_bytes(val.data) - def post_message(self, namespace_ptr: int, topic_ptr: int, kind_ptr: int, body_ptr: int) -> None: - namespace = self.read_bytes(namespace_ptr) - topic = self.read_bytes(topic_ptr) - kind = self.read_bytes(kind_ptr) - body = self.read_bytes(body_ptr) + raise NotImplementedError(val) - self.router.post_message(namespace, topic, kind, body) + def value_from_wasm(self, value_type: ValueType, val: Union[None, int, float]) -> Value: + if value_type is valuetype.none: + assert val is None # type hint + return Value(value_type, None) + + if value_type is valuetype.bytes: + assert isinstance(val, int) # type hint + return Value(value_type, self.read_bytes(val)) + + raise NotImplementedError(value_type, val) + + def log_bytes(self, msg_ptr: int) -> None: + print('LOG: ' + self.read_bytes(msg_ptr).decode()) def dump_memory(textio: TextIO, mem: bytes) -> None: diff --git a/phasmplatform/worker/runners/wasmtime.py b/phasmplatform/worker/runners/wasmtime.py index e3d1a41..3732a68 100644 --- a/phasmplatform/worker/runners/wasmtime.py +++ b/phasmplatform/worker/runners/wasmtime.py @@ -1,15 +1,12 @@ -from typing import Callable, List, Union +from typing import List import ctypes import struct import wasmtime -from phasmplatform.common import valuetype -from phasmplatform.common.method import Method +from phasmplatform.common.method import MethodCall, MethodNotFoundError from phasmplatform.common.router import BaseRouter -from phasmplatform.common.value import Value -from phasmplatform.common.valuetype import ValueType from .base import BaseRunner @@ -21,17 +18,25 @@ class WasmTimeRunner(BaseRunner): self.store = wasmtime.Store() - post_message_bind = wasmtime.Func(self.store, wasmtime.FuncType([ - wasmtime.ValType.i32(), - wasmtime.ValType.i32(), - wasmtime.ValType.i32(), - wasmtime.ValType.i32(), - ], []), self.post_message) + possible_imports = { + 'prelude': { + 'log_bytes': wasmtime.Func(self.store, wasmtime.FuncType([wasmtime.ValType.i32()], []), self.log_bytes), + } + } self.module = wasmtime.Module(self.store.engine, wasm_bin) - self.instance = wasmtime.Instance(self.store, self.module, [ - post_message_bind, - ]) + + imports: List[wasmtime.Func] = [] + for imprt in self.module.imports: + if imprt.module not in possible_imports: + raise Exception('Service not found') + + if imprt.name not in possible_imports[imprt.module]: + raise Exception('Method not found in service') + + imports.append(possible_imports[imprt.module][imprt.name]) + + self.instance = wasmtime.Instance(self.store, self.module, imports) self.exports = self.instance.exports(self.store) @@ -69,46 +74,16 @@ class WasmTimeRunner(BaseRunner): return raw[ptr + 4:ptr + 4 + length] - def value_to_wasm(self, val: Value) -> Union[None, int, float]: - if val.value_type is valuetype.bytes: - assert isinstance(val.data, bytes) # type hint - return self.alloc_bytes(val.data) + def do_call(self, call: MethodCall) -> None: + try: + wasm_method = self.exports[call.method.name] + except KeyError: + call.on_error(MethodNotFoundError()) + return - raise NotImplementedError(val) - - def value_from_wasm(self, value_type: ValueType, val: Union[None, int, float]) -> Value: - if value_type is valuetype.bytes: - assert isinstance(val, int) # typ hint - return Value(valuetype.bytes, self.read_bytes(val)) - - raise NotImplementedError(value_type, val) - - def do_call(self, method: Method, args: List[Value], on_result: Callable[[Value], None]) -> None: - wasm_method = self.exports[method.name] assert isinstance(wasm_method, wasmtime.Func) - act_args = [self.value_to_wasm(x) for x in args] + act_args = [self.value_to_wasm(x) for x in call.args] result = wasm_method(self.store, *act_args) assert result is None or isinstance(result, (int, float, )) # type hint - on_result(self.value_from_wasm(method.return_type, result)) - - # callback(UntypedValue(result)) # TODO: This returns a bytes pointer, but we can't detect that in advance - - # def do_call(self, method_name: str, args: List[BaseValue[Any]], callback: Callable[[BaseValue[Any]], None]) -> None: - # method = self.exports[method_name] - # assert isinstance(method, wasmtime.Func) - - # act_args = [self.convert_value(x) for x in args] - # result = method(self.store, *act_args) - - # callback(UntypedValue(result)) # TODO: This returns a bytes pointer, but we can't detect that in advance - - def handle_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None: - namespace_ptr = self.alloc_bytes(namespace) - topic_ptr = self.alloc_bytes(topic) - kind_ptr = self.alloc_bytes(kind) - body_ptr = self.alloc_bytes(body) - - handle_message = self.exports['handle_message'] - assert isinstance(handle_message, wasmtime.Func) - handle_message(self.store, namespace_ptr, topic_ptr, kind_ptr, body_ptr) + call.on_success(self.value_from_wasm(call.method.return_type, result))