phasm/tests/integration/helpers.py
Johan B.W. de Vries f8d107f4fa Replaces did_construct with a proper router
By annotating types with the constructor application
that was used to create them.

Later on we can use the router to replace compiler's
INSTANCES or for user defined types.
2025-05-10 16:49:10 +02:00

381 lines
12 KiB
Python

import struct
import sys
from typing import Any, Generator, Iterable, List, TextIO, Union
from phasm import compiler, prelude
from phasm.codestyle import phasm_render
from phasm.runtime import (
calculate_alloc_size,
calculate_alloc_size_static_array,
calculate_alloc_size_struct,
calculate_alloc_size_tuple,
)
from phasm.type3 import types as type3types
from . import runners
DASHES = '-' * 16
class SuiteResult:
def __init__(self) -> None:
self.returned_value = None
RUNNER_CLASS_MAP = {
'wasmtime': runners.RunnerWasmtime,
}
class Suite:
"""
WebAssembly test suite
"""
def __init__(self, code_py: str) -> None:
self.code_py = code_py
def run_code(self, *args: Any, runtime: str = 'wasmtime', func_name: str = 'testEntry', imports: runners.Imports = None, do_format_check: bool = True) -> Any:
"""
Compiles the given python code into wasm and
then runs it
Returned is an object with the results set
"""
class_ = RUNNER_CLASS_MAP[runtime]
runner = class_(self.code_py)
write_header(sys.stderr, 'Phasm')
runner.dump_phasm_code(sys.stderr)
runner.parse()
runner.compile_ast()
runner.compile_wat()
write_header(sys.stderr, 'Assembly')
runner.dump_wasm_wat(sys.stderr)
runner.interpreter_setup()
runner.interpreter_load(imports)
# Check if code formatting works
if do_format_check:
assert self.code_py == '\n' + phasm_render(runner.phasm_ast) # \n for formatting in tests
func_args = [x.type3 for x in runner.phasm_ast.functions[func_name].posonlyargs]
if len(func_args) != len(args):
raise RuntimeError(f'Invalid number of args for {func_name}')
wasm_args: List[Union[float, int]] = []
if args:
write_header(sys.stderr, 'Memory (pre alloc)')
runner.interpreter_dump_memory(sys.stderr)
for arg, arg_typ in zip(args, func_args, strict=True):
if arg_typ in (prelude.u8, prelude.u32, prelude.u64, ):
assert isinstance(arg, int)
wasm_args.append(arg)
continue
if arg_typ in (prelude.i8, prelude.i32, prelude.i64, ):
assert isinstance(arg, int)
wasm_args.append(arg)
continue
if arg_typ in (prelude.f32, prelude.f64, ):
assert isinstance(arg, float)
wasm_args.append(arg)
continue
try:
adr = ALLOCATE_MEMORY_STORED_ROUTER((runner, arg), arg_typ)
wasm_args.append(adr)
except type3types.NoRouteForTypeException:
raise NotImplementedError(arg_typ, arg)
write_header(sys.stderr, 'Memory (pre run)')
runner.interpreter_dump_memory(sys.stderr)
result = SuiteResult()
result.returned_value = runner.call(func_name, *wasm_args)
result.returned_value = _load_memory_stored_returned_value(
runner,
func_name,
result.returned_value,
)
write_header(sys.stderr, 'Memory (post run)')
runner.interpreter_dump_memory(sys.stderr)
return result
def write_header(textio: TextIO, msg: str) -> None:
textio.write(f'{DASHES} {msg.ljust(16)} {DASHES}\n')
WRITE_LOOKUP_MAP = {
'u8': compiler.module_data_u8,
'u32': compiler.module_data_u32,
'u64': compiler.module_data_u64,
'i8': compiler.module_data_i8,
'i32': compiler.module_data_i32,
'i64': compiler.module_data_i64,
'f32': compiler.module_data_f32,
'f64': compiler.module_data_f64,
}
def _write_memory_stored_value(
runner: runners.RunnerBase,
adr: int,
val_typ: type3types.Type3,
val: Any,
) -> int:
try:
adr2 = ALLOCATE_MEMORY_STORED_ROUTER((runner, val), val_typ)
runner.interpreter_write_memory(adr, compiler.module_data_u32(adr2))
return 4
except type3types.NoRouteForTypeException:
to_write = WRITE_LOOKUP_MAP[val_typ.name](val)
runner.interpreter_write_memory(adr, to_write)
return len(to_write)
def _allocate_memory_stored_bytes(attrs: tuple[runners.RunnerBase, bytes]) -> int:
runner, val = attrs
assert isinstance(val, bytes)
adr = runner.call('stdlib.types.__alloc_bytes__', len(val))
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
runner.interpreter_write_memory(adr + 4, val)
return adr
def _allocate_memory_stored_static_array(attrs: tuple[runners.RunnerBase, Any], sa_args: tuple[type3types.Type3, type3types.IntType3]) -> int:
runner, val = attrs
assert isinstance(val, tuple)
sa_type, sa_len = sa_args
alloc_size = calculate_alloc_size_static_array(False, sa_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
tuple_len = sa_len.value
assert tuple_len == len(val)
offset = adr
for val_el_val in val:
offset += _write_memory_stored_value(runner, offset, sa_type, val_el_val)
return adr
def _allocate_memory_stored_struct(attrs: tuple[runners.RunnerBase, Any], st_args: tuple[tuple[str, type3types.Type3], ...]) -> int:
runner, val = attrs
assert isinstance(val, dict)
alloc_size = calculate_alloc_size_struct(False, st_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
offset = adr
for val_el_name, val_el_typ in st_args:
assert val_el_name in val, f'Missing key value {val_el_name}'
val_el_val = val.pop(val_el_name)
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
assert not val, f'Additional values: {list(val)!r}'
return adr
def _allocate_memory_stored_tuple(attrs: tuple[runners.RunnerBase, Any], tp_args: tuple[type3types.Type3, ...]) -> int:
runner, val = attrs
assert isinstance(val, tuple)
alloc_size = calculate_alloc_size_tuple(False, tp_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
assert len(val) == len(tp_args)
offset = adr
for val_el_val, val_el_typ in zip(val, tp_args, strict=True):
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
return adr
ALLOCATE_MEMORY_STORED_ROUTER = type3types.TypeApplicationRouter[tuple[runners.RunnerBase, Any], Any]()
ALLOCATE_MEMORY_STORED_ROUTER.add_n(prelude.bytes_, _allocate_memory_stored_bytes)
ALLOCATE_MEMORY_STORED_ROUTER.add(prelude.static_array, _allocate_memory_stored_static_array)
ALLOCATE_MEMORY_STORED_ROUTER.add(prelude.struct, _allocate_memory_stored_struct)
ALLOCATE_MEMORY_STORED_ROUTER.add(prelude.tuple_, _allocate_memory_stored_tuple)
def _load_memory_stored_returned_value(
runner: runners.RunnerBase,
func_name: str,
wasm_value: Any,
) -> Any:
ret_type3 = runner.phasm_ast.functions[func_name].returns_type3
if ret_type3 is prelude.none:
return None
if ret_type3 is prelude.bool_:
assert isinstance(wasm_value, int), wasm_value
return 0 != wasm_value
if ret_type3 in (prelude.i8, prelude.i32, prelude.i64):
assert isinstance(wasm_value, int), wasm_value
if ret_type3 is prelude.i8:
# Values are actually i32
# Have to reinterpret to load proper value
data = struct.pack('<i', wasm_value)
wasm_value, = struct.unpack('<bxxx', data)
return wasm_value
if ret_type3 in (prelude.u8, prelude.u32, prelude.u64):
assert isinstance(wasm_value, int), wasm_value
if wasm_value < 0:
# WASM does not support unsigned values through its interface
# Cast and then reinterpret
letter = {
'u32': 'i',
'u64': 'q',
}[ret_type3.name]
data = struct.pack(f'<{letter}', wasm_value)
wasm_value, = struct.unpack(f'<{letter.upper()}', data)
return wasm_value
if ret_type3 in (prelude.f32, prelude.f64, ):
assert isinstance(wasm_value, float), wasm_value
return wasm_value
assert isinstance(wasm_value, int), wasm_value
return LOAD_FROM_ADDRESS_ROUTER((runner, wasm_value), ret_type3)
def _unpack(runner: runners.RunnerBase, typ: type3types.Type3, inp: bytes) -> Any:
if typ is prelude.u8:
# See compiler.py, LOAD_STORE_TYPE_MAP and module_data_u8
assert len(inp) == 4
return struct.unpack('<I', inp)[0]
if typ is prelude.u32:
assert len(inp) == 4
return struct.unpack('<I', inp)[0]
if typ is prelude.u64:
assert len(inp) == 8
return struct.unpack('<Q', inp)[0]
if typ is prelude.i8:
# See compiler.py, LOAD_STORE_TYPE_MAP and module_data_i8
assert len(inp) == 4
return struct.unpack('<i', inp)[0]
if typ is prelude.i32:
assert len(inp) == 4
return struct.unpack('<i', inp)[0]
if typ is prelude.i64:
assert len(inp) == 8
return struct.unpack('<q', inp)[0]
if typ is prelude.f32:
assert len(inp) == 4
return struct.unpack('<f', inp)[0]
if typ is prelude.f64:
assert len(inp) == 8
return struct.unpack('<d', inp)[0]
if (prelude.InternalPassAsPointer, (typ, )) in prelude.PRELUDE_TYPE_CLASS_INSTANCES_EXISTING:
# Note: For applied types, inp should contain a 4 byte pointer
assert len(inp) == 4
adr = struct.unpack('<I', inp)[0]
return LOAD_FROM_ADDRESS_ROUTER((runner, adr), typ)
raise NotImplementedError(typ, inp)
def _load_bytes_from_address(attrs: tuple[runners.RunnerBase, int]) -> bytes:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} bytes\n')
read_bytes = runner.interpreter_read_memory(adr, 4)
bytes_len, = struct.unpack('<I', read_bytes)
adr += 4
return runner.interpreter_read_memory(adr, bytes_len)
def _split_read_bytes(all_bytes: bytes, split_sizes: Iterable[int]) -> Generator[bytes, None, None]:
offset = 0
for size in split_sizes:
yield all_bytes[offset:offset + size]
offset += size
def _load_static_array_from_address(attrs: tuple[runners.RunnerBase, int], sa_args: tuple[type3types.Type3, type3types.IntType3]) -> Any:
runner, adr = attrs
sub_typ, len_typ = sa_args
sys.stderr.write(f'Reading 0x{adr:08x} {sub_typ:s} {len_typ:s}\n')
sa_len = len_typ.value
arg_size_1 = calculate_alloc_size(sub_typ, is_member=True)
arg_sizes = [arg_size_1 for _ in range(sa_len)] # _split_read_bytes requires one arg per value
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, sub_typ, arg_bytes)
for arg_bytes in _split_read_bytes(read_bytes, arg_sizes)
)
def _load_struct_from_address(attrs: tuple[runners.RunnerBase, int], st_args: tuple[tuple[str, type3types.Type3], ...]) -> dict[str, Any]:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} struct {list(st_args)}\n')
arg_sizes = [
calculate_alloc_size(x, is_member=True)
for _, x in st_args
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return {
arg_name: _unpack(runner, arg_typ, arg_bytes)
for (arg_name, arg_typ, ), arg_bytes in zip(st_args, _split_read_bytes(read_bytes, arg_sizes), strict=True)
}
def _load_tuple_from_address(attrs: tuple[runners.RunnerBase, int], tp_args: tuple[type3types.Type3, ...]) -> Any:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} tuple {len(tp_args)}\n')
arg_sizes = [
calculate_alloc_size(x, is_member=True)
for x in tp_args
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, arg_typ, arg_bytes)
for arg_typ, arg_bytes in zip(tp_args, _split_read_bytes(read_bytes, arg_sizes), strict=True)
)
LOAD_FROM_ADDRESS_ROUTER = type3types.TypeApplicationRouter[tuple[runners.RunnerBase, int], Any]()
LOAD_FROM_ADDRESS_ROUTER.add_n(prelude.bytes_, _load_bytes_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(prelude.static_array, _load_static_array_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(prelude.struct, _load_struct_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(prelude.tuple_, _load_tuple_from_address)