from typing import Any, Callable, List, Union import ctypes import struct import wasmtime from phasmplatform.common.router import BaseRouter from phasmplatform.common.value import BaseValue, BytesValue, UntypedValue from .base import BaseRunner class WasmTimeRunner(BaseRunner): __slots__ = ('store', 'module', 'instance', 'exports') def __init__(self, router: BaseRouter, wasm_bin: bytes) -> None: super().__init__(router) self.store = wasmtime.Store() post_message_bind = wasmtime.Func(self.store, wasmtime.FuncType([ wasmtime.ValType.i32(), wasmtime.ValType.i32(), wasmtime.ValType.i32(), wasmtime.ValType.i32(), ], []), self.post_message) self.module = wasmtime.Module(self.store.engine, wasm_bin) self.instance = wasmtime.Instance(self.store, self.module, [ post_message_bind, ]) self.exports = self.instance.exports(self.store) def alloc_bytes(self, data: bytes) -> int: memory = self.exports['memory'] assert isinstance(memory, wasmtime.Memory) # type hint data_ptr = memory.data_ptr(self.store) data_len = memory.data_len(self.store) alloc_bytes = self.exports['stdlib.types.__alloc_bytes__'] assert isinstance(alloc_bytes, wasmtime.Func) ptr = alloc_bytes(self.store, len(data)) assert isinstance(ptr, int) # type hint idx = ptr + 4 # Skip the header from header from __alloc_bytes__ for byt in data: assert idx < data_len data_ptr[idx] = ctypes.c_ubyte(byt) idx += 1 return ptr def read_bytes(self, ptr: int) -> bytes: memory = self.exports['memory'] assert isinstance(memory, wasmtime.Memory) # type hint data_ptr = memory.data_ptr(self.store) data_len = memory.data_len(self.store) raw = ctypes.string_at(data_ptr, data_len) length, = struct.unpack(' Union[int, float]: if isinstance(val, BytesValue): return self.alloc_bytes(val.data) raise NotImplementedError(val) def do_call(self, method_name: str, args: List[BaseValue[Any]], callback: Callable[[BaseValue[Any]], None]) -> None: method = self.exports[method_name] assert isinstance(method, wasmtime.Func) act_args = [self.convert_value(x) for x in args] result = method(self.store, *act_args) callback(UntypedValue(result)) # TODO: This returns a bytes pointer, but we can't detect that in advance def handle_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None: namespace_ptr = self.alloc_bytes(namespace) topic_ptr = self.alloc_bytes(topic) kind_ptr = self.alloc_bytes(kind) body_ptr = self.alloc_bytes(body) handle_message = self.exports['handle_message'] assert isinstance(handle_message, wasmtime.Func) handle_message(self.store, namespace_ptr, topic_ptr, kind_ptr, body_ptr)