from typing import Any, Generator, Iterable, TextIO import struct import sys from phasm.codestyle import phasm_render from phasm.type3 import types as type3types from phasm.runtime import calculate_alloc_size from . import runners DASHES = '-' * 16 class SuiteResult: def __init__(self) -> None: self.returned_value = None RUNNER_CLASS_MAP = { 'pywasm': runners.RunnerPywasm, 'pywasm3': runners.RunnerPywasm3, 'wasmtime': runners.RunnerWasmtime, 'wasmer': runners.RunnerWasmer, } class Suite: """ WebAssembly test suite """ def __init__(self, code_py: str) -> None: self.code_py = code_py def run_code(self, *args: Any, runtime: str = 'pywasm3', func_name: str = 'testEntry', imports: runners.Imports = None) -> Any: """ Compiles the given python code into wasm and then runs it Returned is an object with the results set """ class_ = RUNNER_CLASS_MAP[runtime] runner = class_(self.code_py) runner.parse() runner.compile_ast() runner.compile_wat() runner.compile_wasm() runner.interpreter_setup() runner.interpreter_load(imports) write_header(sys.stderr, 'Phasm') runner.dump_phasm_code(sys.stderr) write_header(sys.stderr, 'Assembly') runner.dump_wasm_wat(sys.stderr) # Check if code formatting works assert self.code_py == '\n' + phasm_render(runner.phasm_ast) # \n for formatting in tests wasm_args = [] if args: write_header(sys.stderr, 'Memory (pre alloc)') runner.interpreter_dump_memory(sys.stderr) for arg in args: if isinstance(arg, (int, float, )): wasm_args.append(arg) continue if isinstance(arg, bytes): adr = runner.call('stdlib.types.__alloc_bytes__', len(arg)) sys.stderr.write(f'Allocation 0x{adr:08x} {repr(arg)}\n') runner.interpreter_write_memory(adr + 4, arg) wasm_args.append(adr) continue raise NotImplementedError(arg) write_header(sys.stderr, 'Memory (pre run)') runner.interpreter_dump_memory(sys.stderr) result = SuiteResult() result.returned_value = runner.call(func_name, *wasm_args) result.returned_value = _load_memory_stored_returned_value( runner, func_name, result.returned_value, ) write_header(sys.stderr, 'Memory (post run)') runner.interpreter_dump_memory(sys.stderr) return result def write_header(textio: TextIO, msg: str) -> None: textio.write(f'{DASHES} {msg.ljust(16)} {DASHES}\n') def _load_memory_stored_returned_value( runner: runners.RunnerBase, func_name: str, wasm_value: Any, ) -> Any: ret_type3 = runner.phasm_ast.functions[func_name].returns_type3 if ret_type3 is type3types.none: return None if ret_type3 in (type3types.i32, type3types.i64): assert isinstance(wasm_value, int), wasm_value return wasm_value if ret_type3 in (type3types.u8, type3types.u32, type3types.u64): assert isinstance(wasm_value, int), wasm_value assert 0 <= wasm_value, 'TODO: Extract negative values' return wasm_value if ret_type3 in (type3types.f32, type3types.f64, ): assert isinstance(wasm_value, float), wasm_value return wasm_value if ret_type3 is type3types.bytes: assert isinstance(wasm_value, int), wasm_value return _load_bytes_from_address(runner, ret_type3, wasm_value) if isinstance(ret_type3, type3types.AppliedType3): if ret_type3.base is type3types.static_array: assert isinstance(wasm_value, int), wasm_value return _load_static_array_from_address(runner, ret_type3, wasm_value) if ret_type3.base is type3types.tuple: assert isinstance(wasm_value, int), wasm_value return _load_tuple_from_address(runner, ret_type3, wasm_value) raise NotImplementedError(ret_type3, wasm_value) def _unpack(runner: runners.RunnerBase, typ: type3types.Type3, inp: bytes) -> Any: if typ is type3types.u8: # See compiler.py, LOAD_STORE_TYPE_MAP and module_data_u8 assert len(inp) == 4 return struct.unpack(' bytes: sys.stderr.write(f'Reading 0x{adr:08x}\n') read_bytes = runner.interpreter_read_memory(adr, 4) bytes_len, = struct.unpack(' Generator[bytes, None, None]: offset = 0 for size in split_sizes: yield all_bytes[offset:offset + size] offset += size def _load_static_array_from_address(runner: runners.RunnerBase, typ: type3types.AppliedType3, adr: int) -> Any: assert 2 == len(typ.args) sub_typ, len_typ = typ.args assert not isinstance(sub_typ, type3types.PlaceholderForType) assert isinstance(len_typ, type3types.IntType3) sa_len = len_typ.value arg_size_1 = calculate_alloc_size(sub_typ, is_member=True) arg_sizes = [arg_size_1 for _ in range(sa_len)] # _split_read_bytes requires one arg per value read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes)) return tuple( _unpack(runner, sub_typ, arg_bytes) for arg_bytes in _split_read_bytes(read_bytes, arg_sizes) ) def _load_tuple_from_address(runner: runners.RunnerBase, typ: type3types.Type3, adr: int) -> Any: assert isinstance(typ, type3types.AppliedType3) assert typ.base is type3types.tuple typ_list = [ x for x in typ.args if not isinstance(x, type3types.PlaceholderForType) ] assert len(typ_list) == len(typ.args) arg_sizes = [ calculate_alloc_size(x, is_member=True) for x in typ_list ] read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes)) return tuple( _unpack(runner, arg_typ, arg_bytes) for arg_typ, arg_bytes in zip(typ_list, _split_read_bytes(read_bytes, arg_sizes)) )