from typing import Callable, List, Union import ctypes import struct import wasmtime from phasmplatform.common import valuetype from phasmplatform.common.method import Method from phasmplatform.common.router import BaseRouter from phasmplatform.common.value import Value from phasmplatform.common.valuetype import ValueType from .base import BaseRunner class WasmTimeRunner(BaseRunner): __slots__ = ('store', 'module', 'instance', 'exports') def __init__(self, router: BaseRouter, wasm_bin: bytes) -> None: super().__init__(router) self.store = wasmtime.Store() post_message_bind = wasmtime.Func(self.store, wasmtime.FuncType([ wasmtime.ValType.i32(), wasmtime.ValType.i32(), wasmtime.ValType.i32(), wasmtime.ValType.i32(), ], []), self.post_message) self.module = wasmtime.Module(self.store.engine, wasm_bin) self.instance = wasmtime.Instance(self.store, self.module, [ post_message_bind, ]) self.exports = self.instance.exports(self.store) def alloc_bytes(self, data: bytes) -> int: memory = self.exports['memory'] assert isinstance(memory, wasmtime.Memory) # type hint data_ptr = memory.data_ptr(self.store) data_len = memory.data_len(self.store) alloc_bytes = self.exports['stdlib.types.__alloc_bytes__'] assert isinstance(alloc_bytes, wasmtime.Func) ptr = alloc_bytes(self.store, len(data)) assert isinstance(ptr, int) # type hint idx = ptr + 4 # Skip the header from header from __alloc_bytes__ for byt in data: assert idx < data_len data_ptr[idx] = ctypes.c_ubyte(byt) idx += 1 return ptr def read_bytes(self, ptr: int) -> bytes: memory = self.exports['memory'] assert isinstance(memory, wasmtime.Memory) # type hint data_ptr = memory.data_ptr(self.store) data_len = memory.data_len(self.store) raw = ctypes.string_at(data_ptr, data_len) length, = struct.unpack(' Union[None, int, float]: if val.value_type is valuetype.bytes: assert isinstance(val.data, bytes) # type hint return self.alloc_bytes(val.data) raise NotImplementedError(val) def value_from_wasm(self, value_type: ValueType, val: Union[None, int, float]) -> Value: if value_type is valuetype.bytes: assert isinstance(val, int) # typ hint return Value(valuetype.bytes, self.read_bytes(val)) raise NotImplementedError(value_type, val) def do_call(self, method: Method, args: List[Value], on_result: Callable[[Value], None]) -> None: wasm_method = self.exports[method.name] assert isinstance(wasm_method, wasmtime.Func) act_args = [self.value_to_wasm(x) for x in args] result = wasm_method(self.store, *act_args) assert result is None or isinstance(result, (int, float, )) # type hint on_result(self.value_from_wasm(method.return_type, result)) # callback(UntypedValue(result)) # TODO: This returns a bytes pointer, but we can't detect that in advance # def do_call(self, method_name: str, args: List[BaseValue[Any]], callback: Callable[[BaseValue[Any]], None]) -> None: # method = self.exports[method_name] # assert isinstance(method, wasmtime.Func) # act_args = [self.convert_value(x) for x in args] # result = method(self.store, *act_args) # callback(UntypedValue(result)) # TODO: This returns a bytes pointer, but we can't detect that in advance def handle_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None: namespace_ptr = self.alloc_bytes(namespace) topic_ptr = self.alloc_bytes(topic) kind_ptr = self.alloc_bytes(kind) body_ptr = self.alloc_bytes(body) handle_message = self.exports['handle_message'] assert isinstance(handle_message, wasmtime.Func) handle_message(self.store, namespace_ptr, topic_ptr, kind_ptr, body_ptr)