from typing import Any, Generator, Iterable, List, TextIO, Union import struct import sys from phasm import compiler from phasm.codestyle import phasm_render from phasm.type3 import types as type3types from phasm.runtime import calculate_alloc_size from . import runners DASHES = '-' * 16 class SuiteResult: def __init__(self) -> None: self.returned_value = None RUNNER_CLASS_MAP = { 'pywasm': runners.RunnerPywasm, 'pywasm3': runners.RunnerPywasm3, 'wasmtime': runners.RunnerWasmtime, 'wasmer': runners.RunnerWasmer, } class Suite: """ WebAssembly test suite """ def __init__(self, code_py: str) -> None: self.code_py = code_py def run_code(self, *args: Any, runtime: str = 'pywasm3', func_name: str = 'testEntry', imports: runners.Imports = None) -> Any: """ Compiles the given python code into wasm and then runs it Returned is an object with the results set """ class_ = RUNNER_CLASS_MAP[runtime] runner = class_(self.code_py) write_header(sys.stderr, 'Phasm') runner.dump_phasm_code(sys.stderr) runner.parse() runner.compile_ast() runner.compile_wat() write_header(sys.stderr, 'Assembly') runner.dump_wasm_wat(sys.stderr) runner.compile_wasm() runner.interpreter_setup() runner.interpreter_load(imports) # Check if code formatting works assert self.code_py == '\n' + phasm_render(runner.phasm_ast) # \n for formatting in tests func_args = [x.type3 for x in runner.phasm_ast.functions[func_name].posonlyargs] if len(func_args) != len(args): raise RuntimeError(f'Invalid number of args for {func_name}') wasm_args: List[Union[float, int]] = [] if args: write_header(sys.stderr, 'Memory (pre alloc)') runner.interpreter_dump_memory(sys.stderr) for arg, arg_typ in zip(args, func_args): assert not isinstance(arg_typ, type3types.PlaceholderForType), \ 'Cannot call polymorphic function from outside' if arg_typ in (type3types.u8, type3types.u32, type3types.u64, ): assert isinstance(arg, int) wasm_args.append(arg) continue if arg_typ in (type3types.i8, type3types.i32, type3types.i64, ): assert isinstance(arg, int) wasm_args.append(arg) continue if arg_typ in (type3types.f32, type3types.f64, ): assert isinstance(arg, float) wasm_args.append(arg) continue if arg_typ is type3types.bytes: adr = _allocate_memory_stored_value(runner, arg_typ, arg) wasm_args.append(adr) continue if isinstance(arg_typ, type3types.AppliedType3): if arg_typ.base is type3types.static_array: adr = _allocate_memory_stored_value(runner, arg_typ, arg) wasm_args.append(adr) continue if arg_typ.base is type3types.tuple: adr = _allocate_memory_stored_value(runner, arg_typ, arg) wasm_args.append(adr) continue raise NotImplementedError(arg) write_header(sys.stderr, 'Memory (pre run)') runner.interpreter_dump_memory(sys.stderr) result = SuiteResult() result.returned_value = runner.call(func_name, *wasm_args) result.returned_value = _load_memory_stored_returned_value( runner, func_name, result.returned_value, ) write_header(sys.stderr, 'Memory (post run)') runner.interpreter_dump_memory(sys.stderr) return result def write_header(textio: TextIO, msg: str) -> None: textio.write(f'{DASHES} {msg.ljust(16)} {DASHES}\n') WRITE_LOOKUP_MAP = { 'u8': compiler.module_data_u8, 'u32': compiler.module_data_u32, 'u64': compiler.module_data_u64, 'i8': compiler.module_data_i8, 'i32': compiler.module_data_i32, 'i64': compiler.module_data_i64, 'f32': compiler.module_data_f32, 'f64': compiler.module_data_f64, } def _write_memory_stored_value( runner: runners.RunnerBase, adr: int, val_typ: type3types.Type3, val: Any, ) -> int: if val_typ is type3types.bytes: adr2 = _allocate_memory_stored_value(runner, val_typ, val) runner.interpreter_write_memory(adr, compiler.module_data_u32(adr2)) return 4 if isinstance(val_typ, type3types.PrimitiveType3): to_write = WRITE_LOOKUP_MAP[val_typ.name](val) runner.interpreter_write_memory(adr, to_write) return len(to_write) if isinstance(val_typ, type3types.AppliedType3): if val_typ.base in (type3types.static_array, type3types.tuple, ): adr2 = _allocate_memory_stored_value(runner, val_typ, val) runner.interpreter_write_memory(adr, compiler.module_data_u32(adr2)) return 4 raise NotImplementedError(val_typ, val) def _allocate_memory_stored_value( runner: runners.RunnerBase, val_typ: type3types.Type3, val: Any ) -> int: if val_typ is type3types.bytes: assert isinstance(val, bytes) adr = runner.call('stdlib.types.__alloc_bytes__', len(val)) assert isinstance(adr, int) sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') runner.interpreter_write_memory(adr + 4, val) return adr if isinstance(val_typ, type3types.AppliedType3): if val_typ.base is type3types.static_array: assert isinstance(val, tuple) alloc_size = calculate_alloc_size(val_typ) adr = runner.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') val_el_typ = val_typ.args[0] assert not isinstance(val_el_typ, type3types.PlaceholderForType) val_el_alloc_size = calculate_alloc_size(val_el_typ) tuple_len_obj = val_typ.args[1] assert isinstance(tuple_len_obj, type3types.IntType3) tuple_len = tuple_len_obj.value assert tuple_len == len(val) offset = adr for val_el_val in val: offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val) return adr if val_typ.base is type3types.tuple: assert isinstance(val, tuple) alloc_size = calculate_alloc_size(val_typ) adr = runner.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') assert len(val) == len(val_typ.args) offset = adr for val_el_val, val_el_typ in zip(val, val_typ.args): assert not isinstance(val_el_typ, type3types.PlaceholderForType) offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val) return adr raise NotImplementedError(val_typ, val) def _load_memory_stored_returned_value( runner: runners.RunnerBase, func_name: str, wasm_value: Any, ) -> Any: ret_type3 = runner.phasm_ast.functions[func_name].returns_type3 if ret_type3 is type3types.none: return None if ret_type3 in (type3types.i32, type3types.i64): assert isinstance(wasm_value, int), wasm_value return wasm_value if ret_type3 in (type3types.u8, type3types.u32, type3types.u64): assert isinstance(wasm_value, int), wasm_value if wasm_value < 0: # WASM does not support unsigned values through its interface # Cast and then reinterpret letter = { 'u32': 'i', 'u64': 'q', }[ret_type3.name] data = struct.pack(f'<{letter}', wasm_value) wasm_value, = struct.unpack(f'<{letter.upper()}', data) return wasm_value if ret_type3 in (type3types.f32, type3types.f64, ): assert isinstance(wasm_value, float), wasm_value return wasm_value if ret_type3 is type3types.bytes: assert isinstance(wasm_value, int), wasm_value return _load_bytes_from_address(runner, ret_type3, wasm_value) if isinstance(ret_type3, type3types.AppliedType3): if ret_type3.base is type3types.static_array: assert isinstance(wasm_value, int), wasm_value return _load_static_array_from_address(runner, ret_type3, wasm_value) if ret_type3.base is type3types.tuple: assert isinstance(wasm_value, int), wasm_value return _load_tuple_from_address(runner, ret_type3, wasm_value) raise NotImplementedError(ret_type3, wasm_value) def _unpack(runner: runners.RunnerBase, typ: type3types.Type3, inp: bytes) -> Any: if typ is type3types.u8: # See compiler.py, LOAD_STORE_TYPE_MAP and module_data_u8 assert len(inp) == 4 return struct.unpack(' bytes: sys.stderr.write(f'Reading 0x{adr:08x} {typ:s}\n') read_bytes = runner.interpreter_read_memory(adr, 4) bytes_len, = struct.unpack(' Generator[bytes, None, None]: offset = 0 for size in split_sizes: yield all_bytes[offset:offset + size] offset += size def _load_static_array_from_address(runner: runners.RunnerBase, typ: type3types.AppliedType3, adr: int) -> Any: sys.stderr.write(f'Reading 0x{adr:08x} {typ:s}\n') assert 2 == len(typ.args) sub_typ, len_typ = typ.args assert not isinstance(sub_typ, type3types.PlaceholderForType) assert isinstance(len_typ, type3types.IntType3) sa_len = len_typ.value arg_size_1 = calculate_alloc_size(sub_typ, is_member=True) arg_sizes = [arg_size_1 for _ in range(sa_len)] # _split_read_bytes requires one arg per value read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes)) return tuple( _unpack(runner, sub_typ, arg_bytes) for arg_bytes in _split_read_bytes(read_bytes, arg_sizes) ) def _load_tuple_from_address(runner: runners.RunnerBase, typ: type3types.Type3, adr: int) -> Any: sys.stderr.write(f'Reading 0x{adr:08x} {typ:s}\n') assert isinstance(typ, type3types.AppliedType3) assert typ.base is type3types.tuple typ_list = [ x for x in typ.args if not isinstance(x, type3types.PlaceholderForType) ] assert len(typ_list) == len(typ.args) arg_sizes = [ calculate_alloc_size(x, is_member=True) for x in typ_list ] read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes)) return tuple( _unpack(runner, arg_typ, arg_bytes) for arg_typ, arg_bytes in zip(typ_list, _split_read_bytes(read_bytes, arg_sizes)) )