phasm/tests/integration/helpers.py
Johan B.W. de Vries 84e7c42ea4 Implements u16 / i16 support
Keep in mind that WebAssembly is u32 native by default,
some operations may be more expensive than you expect
them to be.
2025-05-25 15:31:23 +02:00

486 lines
16 KiB
Python

import os
import struct
import sys
from typing import Any, Callable, Generator, Iterable, List, TextIO, Union
from phasm import compiler, prelude
from phasm.codestyle import phasm_render
from phasm.runtime import (
calculate_alloc_size,
calculate_alloc_size_static_array,
calculate_alloc_size_struct,
calculate_alloc_size_tuple,
)
from phasm.stdlib.types import TYPE_INFO_CONSTRUCTED, TYPE_INFO_MAP
from phasm.type3 import types as type3types
from phasm.type3.routers import NoRouteForTypeException, TypeApplicationRouter
from . import runners
DASHES = '-' * 16
class InvalidArgumentException(Exception):
pass
class SuiteResult:
def __init__(self) -> None:
self.returned_value = None
RUNNER_CLASS_MAP = {
'wasmtime': runners.RunnerWasmtime,
}
TRACE_CODE_PREFIX = """
@imported
def trace_adr(adr: i32) -> i32:
pass
@imported
def trace_i32(x: i32) -> i32:
pass
"""
def make_int_trace(prefix: str) -> Callable[[int], int]:
def trace_helper(adr: int) -> int:
sys.stderr.write(f'{prefix} {adr}\n')
sys.stderr.flush()
return adr
return trace_helper
class Suite:
"""
WebAssembly test suite
"""
def __init__(self, code_py: str) -> None:
self.code_py = code_py
def run_code(
self,
*args: Any,
runtime: str = 'wasmtime',
func_name: str = 'testEntry',
imports: runners.Imports = None,
do_format_check: bool = True,
verbose: bool | None = None,
with_traces: bool = False,
) -> Any:
"""
Compiles the given python code into wasm and
then runs it
Returned is an object with the results set
"""
if verbose is None:
verbose = bool(os.environ.get('VERBOSE'))
code_prefix = ''
if with_traces:
assert do_format_check is False
if imports is None:
imports = {}
imports.update({
'trace_adr': make_int_trace('ADR'),
'trace_i32': make_int_trace('i32'),
})
code_prefix = TRACE_CODE_PREFIX
class_ = RUNNER_CLASS_MAP[runtime]
runner = class_(code_prefix + self.code_py)
if verbose:
write_header(sys.stderr, 'Phasm')
runner.dump_phasm_code(sys.stderr)
runner.parse(verbose=verbose)
runner.compile_ast()
runner.compile_wat()
if verbose:
write_header(sys.stderr, 'Assembly')
runner.dump_wasm_wat(sys.stderr)
runner.interpreter_setup()
runner.interpreter_load(imports)
# Check if code formatting works
if do_format_check:
assert self.code_py == '\n' + phasm_render(runner.phasm_ast) # \n for formatting in tests
func_args = [x.type3 for x in runner.phasm_ast.functions[func_name].posonlyargs]
if len(func_args) != len(args):
raise RuntimeError(f'Invalid number of args for {func_name}')
wasm_args: List[Union[float, int]] = []
if args:
if verbose:
write_header(sys.stderr, 'Memory (pre alloc)')
runner.interpreter_dump_memory(sys.stderr)
for arg, arg_typ in zip(args, func_args, strict=True):
if arg_typ in (prelude.u8, prelude.u16, prelude.u32, prelude.u64, ):
assert isinstance(arg, int)
wasm_args.append(arg)
continue
if arg_typ in (prelude.i8, prelude.i16, prelude.i32, prelude.i64, ):
assert isinstance(arg, int)
wasm_args.append(arg)
continue
if arg_typ in (prelude.f32, prelude.f64, ):
assert isinstance(arg, float)
wasm_args.append(arg)
continue
try:
adr = ALLOCATE_MEMORY_STORED_ROUTER((runner, arg), arg_typ)
wasm_args.append(adr)
except NoRouteForTypeException:
raise NotImplementedError(arg_typ, arg)
if verbose:
write_header(sys.stderr, 'Memory (pre run)')
runner.interpreter_dump_memory(sys.stderr)
result = SuiteResult()
result.returned_value = runner.call(func_name, *wasm_args)
result.returned_value = _load_memory_stored_returned_value(
runner,
func_name,
result.returned_value,
)
if verbose:
write_header(sys.stderr, 'Memory (post run)')
runner.interpreter_dump_memory(sys.stderr)
return result
def write_header(textio: TextIO, msg: str) -> None:
textio.write(f'{DASHES} {msg.ljust(16)} {DASHES}\n')
WRITE_LOOKUP_MAP = {
'u8': compiler.module_data_u8,
'u16': compiler.module_data_u16,
'u32': compiler.module_data_u32,
'u64': compiler.module_data_u64,
'i8': compiler.module_data_i8,
'i16': compiler.module_data_i16,
'i32': compiler.module_data_i32,
'i64': compiler.module_data_i64,
'f32': compiler.module_data_f32,
'f64': compiler.module_data_f64,
}
def _write_memory_stored_value(
runner: runners.RunnerBase,
adr: int,
val_typ: type3types.Type3,
val: Any,
) -> int:
try:
adr2 = ALLOCATE_MEMORY_STORED_ROUTER((runner, val), val_typ)
runner.interpreter_write_memory(adr, compiler.module_data_u32(adr2))
return TYPE_INFO_CONSTRUCTED.alloc_size
except NoRouteForTypeException:
to_write = WRITE_LOOKUP_MAP[val_typ.name](val)
runner.interpreter_write_memory(adr, to_write)
return len(to_write)
def _allocate_memory_stored_bytes(attrs: tuple[runners.RunnerBase, bytes]) -> int:
runner, val = attrs
assert isinstance(val, bytes)
adr = runner.call('stdlib.types.__alloc_bytes__', len(val))
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
runner.interpreter_write_memory(adr + 4, val)
return adr
def _allocate_memory_stored_dynamic_array(attrs: tuple[runners.RunnerBase, Any], da_args: tuple[type3types.Type3]) -> int:
runner, val = attrs
da_type, = da_args
if not isinstance(val, tuple):
raise InvalidArgumentException(f'Expected tuple; got {val!r} instead')
alloc_size = 4 + len(val) * calculate_alloc_size(da_type, True)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int) # Type int
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
offset = adr
offset += _write_memory_stored_value(runner, offset, prelude.u32, len(val))
for val_el_val in val:
offset += _write_memory_stored_value(runner, offset, da_type, val_el_val)
return adr
def _allocate_memory_stored_static_array(attrs: tuple[runners.RunnerBase, Any], sa_args: tuple[type3types.Type3, type3types.IntType3]) -> int:
runner, val = attrs
sa_type, sa_len = sa_args
if not isinstance(val, tuple):
raise InvalidArgumentException(f'Expected tuple of length {sa_len.value}; got {val!r} instead')
if sa_len.value != len(val):
raise InvalidArgumentException(f'Expected tuple of length {sa_len.value}; got {val!r} instead')
alloc_size = calculate_alloc_size_static_array(False, sa_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int) # Type int
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
offset = adr
for val_el_val in val:
offset += _write_memory_stored_value(runner, offset, sa_type, val_el_val)
return adr
def _allocate_memory_stored_struct(attrs: tuple[runners.RunnerBase, Any], st_args: tuple[tuple[str, type3types.Type3], ...]) -> int:
runner, val = attrs
assert isinstance(val, dict)
alloc_size = calculate_alloc_size_struct(False, st_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
offset = adr
for val_el_name, val_el_typ in st_args:
assert val_el_name in val, f'Missing key value {val_el_name}'
val_el_val = val.pop(val_el_name)
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
assert not val, f'Additional values: {list(val)!r}'
return adr
def _allocate_memory_stored_tuple(attrs: tuple[runners.RunnerBase, Any], tp_args: tuple[type3types.Type3, ...]) -> int:
runner, val = attrs
assert isinstance(val, tuple)
alloc_size = calculate_alloc_size_tuple(False, tp_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
assert len(val) == len(tp_args)
offset = adr
for val_el_val, val_el_typ in zip(val, tp_args, strict=True):
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
return adr
ALLOCATE_MEMORY_STORED_ROUTER = TypeApplicationRouter[tuple[runners.RunnerBase, Any], Any]()
ALLOCATE_MEMORY_STORED_ROUTER.add_n(prelude.bytes_, _allocate_memory_stored_bytes)
ALLOCATE_MEMORY_STORED_ROUTER.add(prelude.dynamic_array, _allocate_memory_stored_dynamic_array)
ALLOCATE_MEMORY_STORED_ROUTER.add(prelude.static_array, _allocate_memory_stored_static_array)
ALLOCATE_MEMORY_STORED_ROUTER.add(prelude.struct, _allocate_memory_stored_struct)
ALLOCATE_MEMORY_STORED_ROUTER.add(prelude.tuple_, _allocate_memory_stored_tuple)
def _load_memory_stored_returned_value(
runner: runners.RunnerBase,
func_name: str,
wasm_value: Any,
) -> Any:
ret_type3 = runner.phasm_ast.functions[func_name].returns_type3
if ret_type3 is prelude.none:
return None
if ret_type3 is prelude.bool_:
assert isinstance(wasm_value, int), wasm_value
return 0 != wasm_value
if ret_type3 in (prelude.i8, prelude.i16, prelude.i32, prelude.i64):
assert isinstance(wasm_value, int), wasm_value
if ret_type3 is prelude.i8:
# Values are actually i32
# Have to reinterpret to load proper value
data = struct.pack('<i', wasm_value)
wasm_value, = struct.unpack('<bxxx', data)
if ret_type3 is prelude.i16:
# Values are actually i32
# Have to reinterpret to load proper value
data = struct.pack('<i', wasm_value)
wasm_value, = struct.unpack('<hxx', data)
return wasm_value
if ret_type3 in (prelude.u8, prelude.u16, prelude.u32, prelude.u64):
assert isinstance(wasm_value, int), wasm_value
if wasm_value < 0:
# WASM does not support unsigned values through its interface
# Cast and then reinterpret
letter = {
'u32': 'i',
'u64': 'q',
}[ret_type3.name]
data = struct.pack(f'<{letter}', wasm_value)
wasm_value, = struct.unpack(f'<{letter.upper()}', data)
return wasm_value
if ret_type3 in (prelude.f32, prelude.f64, ):
assert isinstance(wasm_value, float), wasm_value
return wasm_value
assert isinstance(wasm_value, int), wasm_value
return LOAD_FROM_ADDRESS_ROUTER((runner, wasm_value), ret_type3)
def _unpack(runner: runners.RunnerBase, typ: type3types.Type3, inp: bytes) -> Any:
typ_info = TYPE_INFO_MAP.get(typ.name, TYPE_INFO_CONSTRUCTED)
assert len(inp) == typ_info.alloc_size
if typ is prelude.u8:
return struct.unpack('<B', inp)[0]
if typ is prelude.u16:
return struct.unpack('<H', inp)[0]
if typ is prelude.u32:
return struct.unpack('<I', inp)[0]
if typ is prelude.u64:
return struct.unpack('<Q', inp)[0]
if typ is prelude.i8:
return struct.unpack('<b', inp)[0]
if typ is prelude.i16:
return struct.unpack('<h', inp)[0]
if typ is prelude.i32:
return struct.unpack('<i', inp)[0]
if typ is prelude.i64:
return struct.unpack('<q', inp)[0]
if typ is prelude.f32:
return struct.unpack('<f', inp)[0]
if typ is prelude.f64:
return struct.unpack('<d', inp)[0]
if typ_info is TYPE_INFO_CONSTRUCTED:
# Note: For applied types, inp should contain a 4 byte pointer
adr = struct.unpack('<I', inp)[0]
return LOAD_FROM_ADDRESS_ROUTER((runner, adr), typ)
raise NotImplementedError(typ, inp)
def _load_bytes_from_address(attrs: tuple[runners.RunnerBase, int]) -> bytes:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} bytes\n')
read_bytes = runner.interpreter_read_memory(adr, 4)
bytes_len, = struct.unpack('<I', read_bytes)
adr += 4
return runner.interpreter_read_memory(adr, bytes_len)
def _split_read_bytes(all_bytes: bytes, split_sizes: Iterable[int]) -> Generator[bytes, None, None]:
offset = 0
for size in split_sizes:
yield all_bytes[offset:offset + size]
offset += size
def _load_dynamic_array_from_address(attrs: tuple[runners.RunnerBase, int], da_args: tuple[type3types.Type3]) -> Any:
runner, adr = attrs
da_type, = da_args
sys.stderr.write(f'Reading 0x{adr:08x} {da_type:s}[...]\n')
read_bytes = runner.interpreter_read_memory(adr, 4)
array_len, = struct.unpack('<I', read_bytes)
adr += 4
arg_size_1 = calculate_alloc_size(da_type, is_member=True)
arg_sizes = [arg_size_1 for _ in range(array_len)] # _split_read_bytes requires one arg per value
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, da_type, arg_bytes)
for arg_bytes in _split_read_bytes(read_bytes, arg_sizes)
)
def _load_static_array_from_address(attrs: tuple[runners.RunnerBase, int], sa_args: tuple[type3types.Type3, type3types.IntType3]) -> Any:
runner, adr = attrs
sub_typ, len_typ = sa_args
sys.stderr.write(f'Reading 0x{adr:08x} {sub_typ:s} {len_typ:s}\n')
sa_len = len_typ.value
arg_size_1 = calculate_alloc_size(sub_typ, is_member=True)
arg_sizes = [arg_size_1 for _ in range(sa_len)] # _split_read_bytes requires one arg per value
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, sub_typ, arg_bytes)
for arg_bytes in _split_read_bytes(read_bytes, arg_sizes)
)
def _load_struct_from_address(attrs: tuple[runners.RunnerBase, int], st_args: tuple[tuple[str, type3types.Type3], ...]) -> dict[str, Any]:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} struct {list(st_args)}\n')
arg_sizes = [
calculate_alloc_size(x, is_member=True)
for _, x in st_args
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return {
arg_name: _unpack(runner, arg_typ, arg_bytes)
for (arg_name, arg_typ, ), arg_bytes in zip(st_args, _split_read_bytes(read_bytes, arg_sizes), strict=True)
}
def _load_tuple_from_address(attrs: tuple[runners.RunnerBase, int], tp_args: tuple[type3types.Type3, ...]) -> Any:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} tuple {len(tp_args)}\n')
arg_sizes = [
calculate_alloc_size(x, is_member=True)
for x in tp_args
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, arg_typ, arg_bytes)
for arg_typ, arg_bytes in zip(tp_args, _split_read_bytes(read_bytes, arg_sizes), strict=True)
)
LOAD_FROM_ADDRESS_ROUTER = TypeApplicationRouter[tuple[runners.RunnerBase, int], Any]()
LOAD_FROM_ADDRESS_ROUTER.add_n(prelude.bytes_, _load_bytes_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(prelude.dynamic_array, _load_dynamic_array_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(prelude.static_array, _load_static_array_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(prelude.struct, _load_struct_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(prelude.tuple_, _load_tuple_from_address)