phasm/tests/integration/helpers.py
Johan B.W. de Vries 073cd31091 Moves the prelude to runtime
Previously, it was hardcoded at 'compile' time (in as much
Python has that). This would make it more difficult to add
stuff to it. Also, in a lot of places we made assumptions
about prelude instead of checking properly.
2025-05-29 11:45:58 +02:00

489 lines
16 KiB
Python

import os
import struct
import sys
from typing import Any, Callable, Generator, Iterable, List, TextIO, Union
from phasm import compiler
from phasm.build import builtins
from phasm.codestyle import phasm_render
from phasm.type3 import types as type3types
from phasm.type3.routers import NoRouteForTypeException, TypeApplicationRouter
from phasm.wasm import (
WasmTypeFloat32,
WasmTypeFloat64,
WasmTypeInt32,
WasmTypeInt64,
)
from . import runners
DASHES = '-' * 16
class InvalidArgumentException(Exception):
pass
class SuiteResult:
def __init__(self) -> None:
self.returned_value = None
RUNNER_CLASS_MAP = {
'wasmtime': runners.RunnerWasmtime,
}
TRACE_CODE_PREFIX = """
@imported
def trace_adr(adr: i32) -> i32:
pass
@imported
def trace_i32(x: i32) -> i32:
pass
"""
def make_int_trace(prefix: str) -> Callable[[int], int]:
def trace_helper(adr: int) -> int:
sys.stderr.write(f'{prefix} {adr}\n')
sys.stderr.flush()
return adr
return trace_helper
class Suite:
"""
WebAssembly test suite
"""
def __init__(self, code_py: str) -> None:
self.code_py = code_py
def run_code(
self,
*args: Any,
runtime: str = 'wasmtime',
func_name: str = 'testEntry',
imports: runners.Imports = None,
do_format_check: bool = True,
verbose: bool | None = None,
with_traces: bool = False,
) -> Any:
"""
Compiles the given python code into wasm and
then runs it
Returned is an object with the results set
"""
if verbose is None:
verbose = bool(os.environ.get('VERBOSE'))
code_prefix = ''
if with_traces:
assert do_format_check is False
if imports is None:
imports = {}
imports.update({
'trace_adr': make_int_trace('ADR'),
'trace_i32': make_int_trace('i32'),
})
code_prefix = TRACE_CODE_PREFIX
class_ = RUNNER_CLASS_MAP[runtime]
runner = class_(code_prefix + self.code_py)
if verbose:
write_header(sys.stderr, 'Phasm')
runner.dump_phasm_code(sys.stderr)
runner.parse(verbose=verbose)
runner.compile_ast()
runner.optimise_wasm_ast()
runner.compile_wat()
if verbose:
write_header(sys.stderr, 'Assembly')
runner.dump_wasm_wat(sys.stderr)
runner.interpreter_setup()
runner.interpreter_load(imports)
# Check if code formatting works
if do_format_check:
assert self.code_py == '\n' + phasm_render(runner.phasm_ast) # \n for formatting in tests
func_args = [x.type3 for x in runner.phasm_ast.functions[func_name].posonlyargs]
if len(func_args) != len(args):
raise RuntimeError(f'Invalid number of args for {func_name}')
wasm_args: List[Union[float, int]] = []
if args:
if verbose:
write_header(sys.stderr, 'Memory (pre alloc)')
runner.interpreter_dump_memory(sys.stderr)
for arg, arg_typ in zip(args, func_args, strict=True):
arg_typ_info = runner.phasm_ast.build.type_info_map.get(arg_typ.name)
if arg_typ_info and (arg_typ_info.wasm_type is WasmTypeInt32 or arg_typ_info.wasm_type is WasmTypeInt64):
assert isinstance(arg, int)
wasm_args.append(arg)
continue
if arg_typ_info and (arg_typ_info.wasm_type is WasmTypeFloat32 or arg_typ_info.wasm_type is WasmTypeFloat64):
assert isinstance(arg, float)
wasm_args.append(arg)
continue
try:
adr = ALLOCATE_MEMORY_STORED_ROUTER((runner, arg), arg_typ)
wasm_args.append(adr)
except NoRouteForTypeException:
raise NotImplementedError(arg_typ, arg)
if verbose:
write_header(sys.stderr, 'Memory (pre run)')
runner.interpreter_dump_memory(sys.stderr)
result = SuiteResult()
result.returned_value = runner.call(func_name, *wasm_args)
result.returned_value = _load_memory_stored_returned_value(
runner,
func_name,
result.returned_value,
)
if verbose:
write_header(sys.stderr, 'Memory (post run)')
runner.interpreter_dump_memory(sys.stderr)
return result
def write_header(textio: TextIO, msg: str) -> None:
textio.write(f'{DASHES} {msg.ljust(16)} {DASHES}\n')
def _write_memory_stored_value(
runner: runners.RunnerBase,
adr: int,
val_typ: type3types.Type3,
val: Any,
) -> int:
try:
adr2 = ALLOCATE_MEMORY_STORED_ROUTER((runner, val), val_typ)
ptr_type_info = runner.phasm_ast.build.type_info_map['ptr']
to_write = compiler.module_data_primitive(ptr_type_info, adr2)
runner.interpreter_write_memory(adr, to_write)
return runner.phasm_ast.build.type_info_constructed.alloc_size
except NoRouteForTypeException:
val_typ_info = runner.phasm_ast.build.type_info_map[val_typ.name]
to_write = compiler.module_data_primitive(val_typ_info, val)
runner.interpreter_write_memory(adr, to_write)
return len(to_write)
def _allocate_memory_stored_bytes(attrs: tuple[runners.RunnerBase, bytes]) -> int:
runner, val = attrs
assert isinstance(val, bytes)
adr = runner.call('stdlib.types.__alloc_bytes__', len(val))
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
runner.interpreter_write_memory(adr + 4, val)
return adr
def _allocate_memory_stored_dynamic_array(attrs: tuple[runners.RunnerBase, Any], da_args: tuple[type3types.Type3]) -> int:
runner, val = attrs
da_type, = da_args
if da_type.name == 'u8':
return _allocate_memory_stored_bytes(attrs)
if not isinstance(val, tuple):
raise InvalidArgumentException(f'Expected tuple; got {val!r} instead')
alloc_size = 4 + len(val) * runner.phasm_ast.build.calculate_alloc_size(da_type, True)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int) # Type int
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
offset = adr
offset += _write_memory_stored_value(runner, offset, runner.phasm_ast.build.types['u32'], len(val))
for val_el_val in val:
offset += _write_memory_stored_value(runner, offset, da_type, val_el_val)
return adr
def _allocate_memory_stored_static_array(attrs: tuple[runners.RunnerBase, Any], sa_args: tuple[type3types.Type3, type3types.IntType3]) -> int:
runner, val = attrs
sa_type, sa_len = sa_args
if not isinstance(val, tuple):
raise InvalidArgumentException(f'Expected tuple of length {sa_len.value}; got {val!r} instead')
if sa_len.value != len(val):
raise InvalidArgumentException(f'Expected tuple of length {sa_len.value}; got {val!r} instead')
alloc_size = runner.phasm_ast.build.calculate_alloc_size_static_array(sa_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int) # Type int
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
offset = adr
for val_el_val in val:
offset += _write_memory_stored_value(runner, offset, sa_type, val_el_val)
return adr
def _allocate_memory_stored_struct(attrs: tuple[runners.RunnerBase, Any], st_args: tuple[tuple[str, type3types.Type3], ...]) -> int:
runner, val = attrs
assert isinstance(val, dict)
alloc_size = runner.phasm_ast.build.calculate_alloc_size_struct(st_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
offset = adr
for val_el_name, val_el_typ in st_args:
assert val_el_name in val, f'Missing key value {val_el_name}'
val_el_val = val.pop(val_el_name)
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
assert not val, f'Additional values: {list(val)!r}'
return adr
def _allocate_memory_stored_tuple(attrs: tuple[runners.RunnerBase, Any], tp_args: tuple[type3types.Type3, ...]) -> int:
runner, val = attrs
assert isinstance(val, tuple)
alloc_size = runner.phasm_ast.build.calculate_alloc_size_tuple(tp_args)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
assert len(val) == len(tp_args)
offset = adr
for val_el_val, val_el_typ in zip(val, tp_args, strict=True):
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
return adr
ALLOCATE_MEMORY_STORED_ROUTER = TypeApplicationRouter[tuple[runners.RunnerBase, Any], Any]()
# ALLOCATE_MEMORY_STORED_ROUTER.add_n(prelude.bytes_, _allocate_memory_stored_bytes)
ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.dynamic_array, _allocate_memory_stored_dynamic_array)
ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.static_array, _allocate_memory_stored_static_array)
ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.struct, _allocate_memory_stored_struct)
ALLOCATE_MEMORY_STORED_ROUTER.add(builtins.tuple_, _allocate_memory_stored_tuple)
def _load_memory_stored_returned_value(
runner: runners.RunnerBase,
func_name: str,
wasm_value: Any,
) -> Any:
ret_type3 = runner.phasm_ast.functions[func_name].returns_type3
if ret_type3.name in runner.phasm_ast.build.type_info_map:
type_info = runner.phasm_ast.build.type_info_map[ret_type3.name]
# if ret_type3 is prelude.none:
# return None
# if ret_type3 is prelude.bool_:
# assert isinstance(wasm_value, int), wasm_value
# return 0 != wasm_value
# if ret_type3 in (prelude.i8, prelude.i16, prelude.i32, prelude.i64):
# assert isinstance(wasm_value, int), wasm_value
# if ret_type3 is prelude.i8:
# # Values are actually i32
# # Have to reinterpret to load proper value
# data = struct.pack('<i', wasm_value)
# wasm_value, = struct.unpack('<bxxx', data)
# if ret_type3 is prelude.i16:
# # Values are actually i32
# # Have to reinterpret to load proper value
# data = struct.pack('<i', wasm_value)
# wasm_value, = struct.unpack('<hxx', data)
# return wasm_value
# if ret_type3 in (prelude.u8, prelude.u16, prelude.u32, prelude.u64):
# assert isinstance(wasm_value, int), wasm_value
# if wasm_value < 0:
# # WASM does not support unsigned values through its interface
# # Cast and then reinterpret
# letter = {
# 'u32': 'i',
# 'u64': 'q',
# }[ret_type3.name]
# data = struct.pack(f'<{letter}', wasm_value)
# wasm_value, = struct.unpack(f'<{letter.upper()}', data)
# return wasm_value
if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64:
assert isinstance(wasm_value, int), wasm_value
return wasm_value
if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64:
assert isinstance(wasm_value, float), wasm_value
return wasm_value
assert isinstance(wasm_value, int), wasm_value
return LOAD_FROM_ADDRESS_ROUTER((runner, wasm_value), ret_type3)
def _unpack(runner: runners.RunnerBase, typ: type3types.Type3, inp: bytes) -> Any:
typ_info = runner.phasm_ast.build.type_info_map.get(typ.name)
if typ_info is None:
assert len(inp) == 4
# Note: For applied types, inp should contain a 4 byte pointer
adr = struct.unpack('<I', inp)[0]
return LOAD_FROM_ADDRESS_ROUTER((runner, adr), typ)
assert len(inp) == typ_info.alloc_size
if typ.name == 'u8':
return struct.unpack('<B', inp)[0]
if typ.name == 'u16':
return struct.unpack('<H', inp)[0]
if typ.name == 'u32':
return struct.unpack('<I', inp)[0]
if typ.name == 'u64':
return struct.unpack('<Q', inp)[0]
if typ.name == 'i8':
return struct.unpack('<b', inp)[0]
if typ.name == 'i16':
return struct.unpack('<h', inp)[0]
if typ.name == 'i32':
return struct.unpack('<i', inp)[0]
if typ.name == 'i64':
return struct.unpack('<q', inp)[0]
if typ.name == 'f32':
return struct.unpack('<f', inp)[0]
if typ.name == 'f64':
return struct.unpack('<d', inp)[0]
raise NotImplementedError(typ, inp)
def _load_bytes_from_address(attrs: tuple[runners.RunnerBase, int]) -> bytes:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} bytes\n')
read_bytes = runner.interpreter_read_memory(adr, 4)
bytes_len, = struct.unpack('<I', read_bytes)
adr += 4
return runner.interpreter_read_memory(adr, bytes_len)
def _split_read_bytes(all_bytes: bytes, split_sizes: Iterable[int]) -> Generator[bytes, None, None]:
offset = 0
for size in split_sizes:
yield all_bytes[offset:offset + size]
offset += size
def _load_dynamic_array_from_address(attrs: tuple[runners.RunnerBase, int], da_args: tuple[type3types.Type3]) -> Any:
runner, adr = attrs
da_type, = da_args
if da_type.name == 'u8':
return _load_bytes_from_address(attrs)
sys.stderr.write(f'Reading 0x{adr:08x} {da_type:s}[...]\n')
read_bytes = runner.interpreter_read_memory(adr, 4)
array_len, = struct.unpack('<I', read_bytes)
adr += 4
arg_size_1 = runner.phasm_ast.build.calculate_alloc_size(da_type, is_member=True)
arg_sizes = [arg_size_1 for _ in range(array_len)] # _split_read_bytes requires one arg per value
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, da_type, arg_bytes)
for arg_bytes in _split_read_bytes(read_bytes, arg_sizes)
)
def _load_static_array_from_address(attrs: tuple[runners.RunnerBase, int], sa_args: tuple[type3types.Type3, type3types.IntType3]) -> Any:
runner, adr = attrs
sub_typ, len_typ = sa_args
sys.stderr.write(f'Reading 0x{adr:08x} {sub_typ:s} {len_typ:s}\n')
sa_len = len_typ.value
arg_size_1 = runner.phasm_ast.build.calculate_alloc_size(sub_typ, is_member=True)
arg_sizes = [arg_size_1 for _ in range(sa_len)] # _split_read_bytes requires one arg per value
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, sub_typ, arg_bytes)
for arg_bytes in _split_read_bytes(read_bytes, arg_sizes)
)
def _load_struct_from_address(attrs: tuple[runners.RunnerBase, int], st_args: tuple[tuple[str, type3types.Type3], ...]) -> dict[str, Any]:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} struct {list(st_args)}\n')
arg_sizes = [
runner.phasm_ast.build.calculate_alloc_size(x, is_member=True)
for _, x in st_args
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return {
arg_name: _unpack(runner, arg_typ, arg_bytes)
for (arg_name, arg_typ, ), arg_bytes in zip(st_args, _split_read_bytes(read_bytes, arg_sizes), strict=True)
}
def _load_tuple_from_address(attrs: tuple[runners.RunnerBase, int], tp_args: tuple[type3types.Type3, ...]) -> Any:
runner, adr = attrs
sys.stderr.write(f'Reading 0x{adr:08x} tuple {len(tp_args)}\n')
arg_sizes = [
runner.phasm_ast.build.calculate_alloc_size(x, is_member=True)
for x in tp_args
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, arg_typ, arg_bytes)
for arg_typ, arg_bytes in zip(tp_args, _split_read_bytes(read_bytes, arg_sizes), strict=True)
)
LOAD_FROM_ADDRESS_ROUTER = TypeApplicationRouter[tuple[runners.RunnerBase, int], Any]()
LOAD_FROM_ADDRESS_ROUTER.add(builtins.dynamic_array, _load_dynamic_array_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(builtins.static_array, _load_static_array_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(builtins.struct, _load_struct_from_address)
LOAD_FROM_ADDRESS_ROUTER.add(builtins.tuple_, _load_tuple_from_address)