""" Runners to help run WebAssembly code on various interpreters """ from typing import Any, Callable, Dict, Iterable, Optional, TextIO import ctypes import io import pywasm.binary import wasm3 import wasmer import wasmtime from phasm.compiler import phasm_compile from phasm.parser import phasm_parse from phasm.type3.entry import phasm_type3 from phasm import ourlang from phasm import wasm class RunnerBase: """ Base class """ phasm_code: str phasm_ast: ourlang.Module wasm_ast: wasm.Module wasm_asm: str wasm_bin: bytes def __init__(self, phasm_code: str) -> None: self.phasm_code = phasm_code def dump_phasm_code(self, textio: TextIO) -> None: """ Dumps the input Phasm code for debugging """ _dump_code(textio, self.phasm_code) def parse(self) -> None: """ Parses the Phasm code into an AST """ self.phasm_ast = phasm_parse(self.phasm_code) phasm_type3(self.phasm_ast, verbose=True) def compile_ast(self) -> None: """ Compiles the Phasm AST into an WebAssembly AST """ self.wasm_ast = phasm_compile(self.phasm_ast) def compile_wat(self) -> None: """ Compiles the WebAssembly AST into WebAssembly Assembly code """ self.wasm_asm = self.wasm_ast.to_wat() def dump_wasm_wat(self, textio: TextIO) -> None: """ Dumps the intermediate WebAssembly Assembly code for debugging """ _dump_code(textio, self.wasm_asm) def compile_wasm(self) -> None: """ Compiles the WebAssembly AST into WebAssembly Binary """ self.wasm_bin = wasmer.wat2wasm(self.wasm_asm) def interpreter_setup(self) -> None: """ Sets up the interpreter """ raise NotImplementedError def interpreter_load(self, imports: Optional[Dict[str, Callable[[Any], Any]]] = None) -> None: """ Loads the code into the interpreter """ raise NotImplementedError def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None: """ Writes into the interpreters memory """ raise NotImplementedError def interpreter_read_memory(self, offset: int, length: int) -> bytes: """ Reads from the interpreters memory """ raise NotImplementedError def interpreter_dump_memory(self, textio: TextIO) -> None: """ Dumps the interpreters memory for debugging """ raise NotImplementedError def call(self, function: str, *args: Any) -> Any: """ Calls the given function with the given arguments, returning the result """ raise NotImplementedError class RunnerPywasm(RunnerBase): """ Implements a runner for pywasm See https://pypi.org/project/pywasm/ """ module: pywasm.binary.Module runtime: pywasm.Runtime def interpreter_setup(self) -> None: # Nothing to set up pass def interpreter_load(self, imports: Optional[Dict[str, Callable[[Any], Any]]] = None) -> None: if imports is not None: raise NotImplementedError bytesio = io.BytesIO(self.wasm_bin) self.module = pywasm.binary.Module.from_reader(bytesio) self.runtime = pywasm.Runtime(self.module, {}, None) def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None: for idx, byt in enumerate(data): self.runtime.store.memory_list[0].data[offset + idx] = byt def interpreter_read_memory(self, offset: int, length: int) -> bytes: return self.runtime.store.memory_list[0].data[offset:length] def interpreter_dump_memory(self, textio: TextIO) -> None: _dump_memory(textio, self.runtime.store.memory_list[0].data) def call(self, function: str, *args: Any) -> Any: return self.runtime.exec(function, [*args]) class RunnerPywasm3(RunnerBase): """ Implements a runner for pywasm3 See https://pypi.org/project/pywasm3/ """ env: wasm3.Environment rtime: wasm3.Runtime mod: wasm3.Module def interpreter_setup(self) -> None: self.env = wasm3.Environment() self.rtime = self.env.new_runtime(1024 * 1024) def interpreter_load(self, imports: Optional[Dict[str, Callable[[Any], Any]]] = None) -> None: if imports is not None: raise NotImplementedError self.mod = self.env.parse_module(self.wasm_bin) self.rtime.load(self.mod) def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None: memory = self.rtime.get_memory(0) for idx, byt in enumerate(data): memory[offset + idx] = byt def interpreter_read_memory(self, offset: int, length: int) -> bytes: memory = self.rtime.get_memory(0) return memory[offset:length].tobytes() def interpreter_dump_memory(self, textio: TextIO) -> None: _dump_memory(textio, self.rtime.get_memory(0)) def call(self, function: str, *args: Any) -> Any: return self.rtime.find_function(function)(*args) class RunnerWasmtime(RunnerBase): """ Implements a runner for wasmtime See https://pypi.org/project/wasmtime/ """ store: wasmtime.Store module: wasmtime.Module instance: wasmtime.Instance def interpreter_setup(self) -> None: self.store = wasmtime.Store() def interpreter_load(self, imports: Optional[Dict[str, Callable[[Any], Any]]] = None) -> None: if imports is not None: raise NotImplementedError self.module = wasmtime.Module(self.store.engine, self.wasm_bin) self.instance = wasmtime.Instance(self.store, self.module, []) def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None: exports = self.instance.exports(self.store) memory = exports['memory'] assert isinstance(memory, wasmtime.Memory) # type hint data_ptr = memory.data_ptr(self.store) data_len = memory.data_len(self.store) idx = offset for byt in data: assert idx < data_len data_ptr[idx] = ctypes.c_ubyte(byt) idx += 1 def interpreter_read_memory(self, offset: int, length: int) -> bytes: exports = self.instance.exports(self.store) memory = exports['memory'] assert isinstance(memory, wasmtime.Memory) # type hint data_ptr = memory.data_ptr(self.store) data_len = memory.data_len(self.store) raw = ctypes.string_at(data_ptr, data_len) return raw[offset:length] def interpreter_dump_memory(self, textio: TextIO) -> None: exports = self.instance.exports(self.store) memory = exports['memory'] assert isinstance(memory, wasmtime.Memory) # type hint data_ptr = memory.data_ptr(self.store) data_len = memory.data_len(self.store) _dump_memory(textio, ctypes.string_at(data_ptr, data_len)) def call(self, function: str, *args: Any) -> Any: exports = self.instance.exports(self.store) func = exports[function] assert isinstance(func, wasmtime.Func) return func(self.store, *args) class RunnerWasmer(RunnerBase): """ Implements a runner for wasmer See https://pypi.org/project/wasmer/ """ # pylint: disable=E1101 store: wasmer.Store module: wasmer.Module instance: wasmer.Instance def interpreter_setup(self) -> None: self.store = wasmer.Store() def interpreter_load(self, imports: Optional[Dict[str, Callable[[Any], Any]]] = None) -> None: import_object = wasmer.ImportObject() if imports: import_object.register('imports', { k: wasmer.Function(self.store, v) for k, v in (imports or {}).items() }) self.module = wasmer.Module(self.store, self.wasm_bin) self.instance = wasmer.Instance(self.module, import_object) def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None: exports = self.instance.exports memory = getattr(exports, 'memory') assert isinstance(memory, wasmer.Memory) view = memory.uint8_view(offset) for idx, byt in enumerate(data): view[idx] = byt def interpreter_read_memory(self, offset: int, length: int) -> bytes: exports = self.instance.exports memory = getattr(exports, 'memory') assert isinstance(memory, wasmer.Memory) view = memory.uint8_view(offset) return bytes(view[offset:length]) def interpreter_dump_memory(self, textio: TextIO) -> None: exports = self.instance.exports memory = getattr(exports, 'memory') assert isinstance(memory, wasmer.Memory) view = memory.uint8_view() _dump_memory(textio, view) # type: ignore def call(self, function: str, *args: Any) -> Any: exports = self.instance.exports func = getattr(exports, function) return func(*args) def _dump_memory(textio: TextIO, mem: bytes) -> None: line_width = 16 prev_line = None skip = False for idx in range(0, len(mem), line_width): line = '' for idx2 in range(0, line_width): line += f'{mem[idx + idx2]:02X}' if idx2 % 2 == 1: line += ' ' if prev_line == line: if not skip: textio.write('**\n') skip = True else: textio.write(f'{idx:08x} {line}\n') prev_line = line def _dump_code(textio: TextIO, text: str) -> None: line_list = text.split('\n') line_no_width = len(str(len(line_list))) for line_no, line_txt in enumerate(line_list): textio.write('{} {}\n'.format( str(line_no + 1).zfill(line_no_width), line_txt, ))