import os import struct import sys from typing import Any, Callable, Generator, Iterable, List, TextIO, Union from phasm import compiler from phasm.build import builtins from phasm.codestyle import phasm_render from phasm.type3 import types as type3types from phasm.type3.routers import NoRouteForTypeException, TypeApplicationRouter from phasm.wasm import ( WasmTypeFloat32, WasmTypeFloat64, WasmTypeInt32, WasmTypeInt64, WasmTypeNone, ) from . import runners DASHES = '-' * 16 class InvalidArgumentException(Exception): pass class SuiteResult: def __init__(self) -> None: self.returned_value = None RUNNER_CLASS_MAP = { 'wasmtime': runners.RunnerWasmtime, } TRACE_CODE_PREFIX = """ @imported def trace_adr(adr: i32) -> i32: pass @imported def trace_i32(x: i32) -> i32: pass """ def make_int_trace(prefix: str) -> Callable[[int], int]: def trace_helper(adr: int) -> int: sys.stderr.write(f'{prefix} {adr}\n') sys.stderr.flush() return adr return trace_helper class Suite: """ WebAssembly test suite """ def __init__(self, code_py: str) -> None: self.code_py = code_py def run_code( self, *args: Any, runtime: str = 'wasmtime', func_name: str = 'testEntry', imports: runners.Imports = None, do_format_check: bool = True, verbose: bool | None = None, with_traces: bool = False, ) -> Any: """ Compiles the given python code into wasm and then runs it Returned is an object with the results set """ if verbose is None: verbose = bool(os.environ.get('VERBOSE')) code_prefix = '' if with_traces: assert do_format_check is False if imports is None: imports = {} imports.update({ 'trace_adr': make_int_trace('ADR'), 'trace_i32': make_int_trace('i32'), }) code_prefix = TRACE_CODE_PREFIX class_ = RUNNER_CLASS_MAP[runtime] runner = class_(code_prefix + self.code_py) if verbose: write_header(sys.stderr, 'Phasm') runner.dump_phasm_code(sys.stderr) runner.parse(verbose=verbose) runner.compile_ast() runner.optimise_wasm_ast() runner.compile_wat() if verbose: write_header(sys.stderr, 'Assembly') runner.dump_wasm_wat(sys.stderr) runner.interpreter_setup() runner.interpreter_load(imports) # Check if code formatting works if do_format_check: assert self.code_py == '\n' + phasm_render(runner.phasm_ast) # \n for formatting in tests func_args = [x.type3 for x in runner.phasm_ast.functions[func_name].posonlyargs] if len(func_args) != len(args): raise RuntimeError(f'Invalid number of args for {func_name}') wasm_args: List[Union[float, int]] = [] if args: if verbose: write_header(sys.stderr, 'Memory (pre alloc)') runner.interpreter_dump_memory(sys.stderr) for arg, arg_typ in zip(args, func_args, strict=True): arg_typ_info = runner.phasm_ast.build.type_info_map.get(arg_typ.name) if arg_typ_info and (arg_typ_info.wasm_type is WasmTypeInt32 or arg_typ_info.wasm_type is WasmTypeInt64): assert isinstance(arg, int) wasm_args.append(arg) continue if arg_typ_info and (arg_typ_info.wasm_type is WasmTypeFloat32 or arg_typ_info.wasm_type is WasmTypeFloat64): assert isinstance(arg, float) wasm_args.append(arg) continue try: adr = ALLOCATE_MEMORY_STORED_ROUTER((runner, arg), arg_typ) wasm_args.append(adr) except NoRouteForTypeException: raise NotImplementedError(arg_typ, arg) if verbose: write_header(sys.stderr, 'Memory (pre run)') runner.interpreter_dump_memory(sys.stderr) result = SuiteResult() result.returned_value = runner.call(func_name, *wasm_args) result.returned_value = _load_memory_stored_returned_value( runner, func_name, result.returned_value, ) if verbose: write_header(sys.stderr, 'Memory (post run)') runner.interpreter_dump_memory(sys.stderr) return result def write_header(textio: TextIO, msg: str) -> None: textio.write(f'{DASHES} {msg.ljust(16)} {DASHES}\n') def _write_memory_stored_value( runner: runners.RunnerBase, adr: int, val_typ: type3types.Type3, val: Any, ) -> int: try: adr2 = ALLOCATE_MEMORY_STORED_ROUTER((runner, val), val_typ) ptr_type_info = runner.phasm_ast.build.type_info_map['ptr'] to_write = compiler.module_data_primitive(ptr_type_info, adr2) runner.interpreter_write_memory(adr, to_write) return runner.phasm_ast.build.type_info_constructed.alloc_size except NoRouteForTypeException: val_typ_info = runner.phasm_ast.build.type_info_map[val_typ.name] to_write = compiler.module_data_primitive(val_typ_info, val) runner.interpreter_write_memory(adr, to_write) return len(to_write) def _allocate_memory_stored_bytes(attrs: tuple[runners.RunnerBase, bytes]) -> int: runner, val = attrs assert isinstance(val, bytes) adr = runner.call('stdlib.types.__alloc_bytes__', len(val)) assert isinstance(adr, int) sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') runner.interpreter_write_memory(adr + 4, val) return adr def _allocate_memory_stored_dynamic_array(attrs: tuple[runners.RunnerBase, Any], da_args: tuple[type3types.Type3]) -> int: runner, val = attrs da_type, = da_args if da_type.name == 'u8': return _allocate_memory_stored_bytes(attrs) if not isinstance(val, tuple): raise InvalidArgumentException(f'Expected tuple; got {val!r} instead') alloc_size = 4 + len(val) * runner.phasm_ast.build.calculate_alloc_size(da_type, True) adr = runner.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) # Type int sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') offset = adr offset += _write_memory_stored_value(runner, offset, runner.phasm_ast.build.types['u32'], len(val)) for val_el_val in val: offset += _write_memory_stored_value(runner, offset, da_type, val_el_val) return adr def _allocate_memory_stored_static_array(attrs: tuple[runners.RunnerBase, Any], sa_args: tuple[type3types.Type3, type3types.IntType3]) -> int: runner, val = attrs sa_type, sa_len = sa_args if not isinstance(val, tuple): raise InvalidArgumentException(f'Expected tuple of length {sa_len.value}; got {val!r} instead') if sa_len.value != len(val): raise InvalidArgumentException(f'Expected tuple of length {sa_len.value}; got {val!r} instead') alloc_size = runner.phasm_ast.build.calculate_alloc_size_static_array(sa_args) adr = runner.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) # Type int sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') offset = adr for val_el_val in val: offset += _write_memory_stored_value(runner, offset, sa_type, val_el_val) return adr def _allocate_memory_stored_struct(attrs: tuple[runners.RunnerBase, Any], st_args: tuple[tuple[str, type3types.Type3], ...]) -> int: runner, val = attrs assert isinstance(val, dict) alloc_size = runner.phasm_ast.build.calculate_alloc_size_struct(st_args) adr = runner.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') offset = adr for val_el_name, val_el_typ in st_args: assert val_el_name in val, f'Missing key value {val_el_name}' val_el_val = val.pop(val_el_name) offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val) assert not val, f'Additional values: {list(val)!r}' return adr def _allocate_memory_stored_tuple(attrs: tuple[runners.RunnerBase, Any], tp_args: tuple[type3types.Type3, ...]) -> int: runner, val = attrs assert isinstance(val, tuple) alloc_size = runner.phasm_ast.build.calculate_alloc_size_tuple(tp_args) adr = runner.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n') assert len(val) == len(tp_args) offset = adr for val_el_val, val_el_typ in zip(val, tp_args, strict=True): offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val) return adr ALLOCATE_MEMORY_STORED_ROUTER = TypeApplicationRouter[tuple[runners.RunnerBase, Any], Any]() # ALLOCATE_MEMORY_STORED_ROUTER.add_n(prelude.bytes_, _allocate_memory_stored_bytes) ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.dynamic_array, _allocate_memory_stored_dynamic_array) ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.static_array, _allocate_memory_stored_static_array) ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.struct, _allocate_memory_stored_struct) ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.tuple_, _allocate_memory_stored_tuple) def _load_memory_stored_returned_value( runner: runners.RunnerBase, func_name: str, wasm_value: Any, ) -> Any: ret_type3 = runner.phasm_ast.functions[func_name].returns_type3 if ret_type3.name in runner.phasm_ast.build.type_info_map: type_info = runner.phasm_ast.build.type_info_map[ret_type3.name] if type_info.wasm_type is WasmTypeNone: return None if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64: assert isinstance(wasm_value, int), wasm_value if type_info.signed is None: # Must be bool type return 0 != wasm_value if not type_info.signed and wasm_value < 0: data = wasm_value.to_bytes(type_info.alloc_size, 'big', signed=True) wasm_value = int.from_bytes(data, 'big', signed=False) return wasm_value if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64: assert isinstance(wasm_value, float), wasm_value return wasm_value assert isinstance(wasm_value, int), wasm_value return LOAD_FROM_ADDRESS_ROUTER((runner, wasm_value), ret_type3) def _unpack(runner: runners.RunnerBase, typ: type3types.Type3, inp: bytes) -> Any: typ_info = runner.phasm_ast.build.type_info_map.get(typ.name) if typ_info is None: assert len(inp) == 4 # Note: For applied types, inp should contain a 4 byte pointer adr = struct.unpack(' bytes: runner, adr = attrs sys.stderr.write(f'Reading 0x{adr:08x} bytes\n') read_bytes = runner.interpreter_read_memory(adr, 4) bytes_len, = struct.unpack(' Generator[bytes, None, None]: offset = 0 for size in split_sizes: yield all_bytes[offset:offset + size] offset += size def _load_dynamic_array_from_address(attrs: tuple[runners.RunnerBase, int], da_args: tuple[type3types.Type3]) -> Any: runner, adr = attrs da_type, = da_args if da_type.name == 'u8': return _load_bytes_from_address(attrs) sys.stderr.write(f'Reading 0x{adr:08x} {da_type:s}[...]\n') read_bytes = runner.interpreter_read_memory(adr, 4) array_len, = struct.unpack(' Any: runner, adr = attrs sub_typ, len_typ = sa_args sys.stderr.write(f'Reading 0x{adr:08x} {sub_typ:s} {len_typ:s}\n') sa_len = len_typ.value arg_size_1 = runner.phasm_ast.build.calculate_alloc_size(sub_typ, is_member=True) arg_sizes = [arg_size_1 for _ in range(sa_len)] # _split_read_bytes requires one arg per value read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes)) return tuple( _unpack(runner, sub_typ, arg_bytes) for arg_bytes in _split_read_bytes(read_bytes, arg_sizes) ) def _load_struct_from_address(attrs: tuple[runners.RunnerBase, int], st_args: tuple[tuple[str, type3types.Type3], ...]) -> dict[str, Any]: runner, adr = attrs sys.stderr.write(f'Reading 0x{adr:08x} struct {list(st_args)}\n') arg_sizes = [ runner.phasm_ast.build.calculate_alloc_size(x, is_member=True) for _, x in st_args ] read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes)) return { arg_name: _unpack(runner, arg_typ, arg_bytes) for (arg_name, arg_typ, ), arg_bytes in zip(st_args, _split_read_bytes(read_bytes, arg_sizes), strict=True) } def _load_tuple_from_address(attrs: tuple[runners.RunnerBase, int], tp_args: tuple[type3types.Type3, ...]) -> Any: runner, adr = attrs sys.stderr.write(f'Reading 0x{adr:08x} tuple {len(tp_args)}\n') arg_sizes = [ runner.phasm_ast.build.calculate_alloc_size(x, is_member=True) for x in tp_args ] read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes)) return tuple( _unpack(runner, arg_typ, arg_bytes) for arg_typ, arg_bytes in zip(tp_args, _split_read_bytes(read_bytes, arg_sizes), strict=True) ) LOAD_FROM_ADDRESS_ROUTER = TypeApplicationRouter[tuple[runners.RunnerBase, int], Any]() LOAD_FROM_ADDRESS_ROUTER.add(builtins.dynamic_array, _load_dynamic_array_from_address) LOAD_FROM_ADDRESS_ROUTER.add(builtins.static_array, _load_static_array_from_address) LOAD_FROM_ADDRESS_ROUTER.add(builtins.struct, _load_struct_from_address) LOAD_FROM_ADDRESS_ROUTER.add(builtins.tuple_, _load_tuple_from_address)