phasm/tests/integration/helpers.py
Johan B.W. de Vries 97b61e3ee1 Test generation framework with typing improvements
Prior to this PR, each type would have its own handwritten
test suite. The end result was that not all types were tested
for all situations.

This PR adds a framework based on a Markdown file, which
generates the basic tests for the types defined in json
files. These are auto generated and updated by the Makefile
before the test suite is run.

Also, a number of unsupported type combinations are now
supported.

Also, we now support negative literals.

Also, allocation calculation fixes for nested types.

Also, the test helpers can now properly import and export
typed variables such as bytes, static arrays and tuples. This
may come in handy when it comes to phasm platform wanting to
route data.

Also, adds better support for i8 type.

Also, started on a runtime.py, since there's quite some code
now that deals with compile time handling of WebAssembly stuff.

Also, minor improvement to the type constrains, namely we
better match 'tuple' literals with static array types.

Also, reduced spam when printing the type analysis results;
constraints that go back on the backlog are now no longer
printed one by one. It now also prints the end results of
the typing analysis.

Also, reorganized the big test_primitives test into type
classes.

Also, replaced pylint with ruff.
2023-11-15 12:52:23 +01:00

448 lines
15 KiB
Python

import struct
import sys
from typing import Any, Generator, Iterable, List, TextIO, Union
from phasm import compiler
from phasm.codestyle import phasm_render
from phasm.runtime import calculate_alloc_size
from phasm.type3 import types as type3types
from . import runners
DASHES = '-' * 16
class SuiteResult:
def __init__(self) -> None:
self.returned_value = None
RUNNER_CLASS_MAP = {
'pywasm': runners.RunnerPywasm,
'pywasm3': runners.RunnerPywasm3,
'wasmtime': runners.RunnerWasmtime,
'wasmer': runners.RunnerWasmer,
}
class Suite:
"""
WebAssembly test suite
"""
def __init__(self, code_py: str) -> None:
self.code_py = code_py
def run_code(self, *args: Any, runtime: str = 'pywasm3', func_name: str = 'testEntry', imports: runners.Imports = None) -> Any:
"""
Compiles the given python code into wasm and
then runs it
Returned is an object with the results set
"""
class_ = RUNNER_CLASS_MAP[runtime]
runner = class_(self.code_py)
write_header(sys.stderr, 'Phasm')
runner.dump_phasm_code(sys.stderr)
runner.parse()
runner.compile_ast()
runner.compile_wat()
write_header(sys.stderr, 'Assembly')
runner.dump_wasm_wat(sys.stderr)
runner.compile_wasm()
runner.interpreter_setup()
runner.interpreter_load(imports)
# Check if code formatting works
assert self.code_py == '\n' + phasm_render(runner.phasm_ast) # \n for formatting in tests
func_args = [x.type3 for x in runner.phasm_ast.functions[func_name].posonlyargs]
if len(func_args) != len(args):
raise RuntimeError(f'Invalid number of args for {func_name}')
wasm_args: List[Union[float, int]] = []
if args:
write_header(sys.stderr, 'Memory (pre alloc)')
runner.interpreter_dump_memory(sys.stderr)
for arg, arg_typ in zip(args, func_args):
assert not isinstance(arg_typ, type3types.PlaceholderForType), \
'Cannot call polymorphic function from outside'
if arg_typ in (type3types.u8, type3types.u32, type3types.u64, ):
assert isinstance(arg, int)
wasm_args.append(arg)
continue
if arg_typ in (type3types.i8, type3types.i32, type3types.i64, ):
assert isinstance(arg, int)
wasm_args.append(arg)
continue
if arg_typ in (type3types.f32, type3types.f64, ):
assert isinstance(arg, float)
wasm_args.append(arg)
continue
if arg_typ is type3types.bytes:
adr = _allocate_memory_stored_value(runner, arg_typ, arg)
wasm_args.append(adr)
continue
if isinstance(arg_typ, type3types.AppliedType3):
if arg_typ.base is type3types.static_array:
adr = _allocate_memory_stored_value(runner, arg_typ, arg)
wasm_args.append(adr)
continue
if arg_typ.base is type3types.tuple:
adr = _allocate_memory_stored_value(runner, arg_typ, arg)
wasm_args.append(adr)
continue
if isinstance(arg_typ, type3types.StructType3):
adr = _allocate_memory_stored_value(runner, arg_typ, arg)
wasm_args.append(adr)
continue
raise NotImplementedError(arg)
write_header(sys.stderr, 'Memory (pre run)')
runner.interpreter_dump_memory(sys.stderr)
result = SuiteResult()
result.returned_value = runner.call(func_name, *wasm_args)
result.returned_value = _load_memory_stored_returned_value(
runner,
func_name,
result.returned_value,
)
write_header(sys.stderr, 'Memory (post run)')
runner.interpreter_dump_memory(sys.stderr)
return result
def write_header(textio: TextIO, msg: str) -> None:
textio.write(f'{DASHES} {msg.ljust(16)} {DASHES}\n')
WRITE_LOOKUP_MAP = {
'u8': compiler.module_data_u8,
'u32': compiler.module_data_u32,
'u64': compiler.module_data_u64,
'i8': compiler.module_data_i8,
'i32': compiler.module_data_i32,
'i64': compiler.module_data_i64,
'f32': compiler.module_data_f32,
'f64': compiler.module_data_f64,
}
def _write_memory_stored_value(
runner: runners.RunnerBase,
adr: int,
val_typ: type3types.Type3,
val: Any,
) -> int:
if val_typ is type3types.bytes:
adr2 = _allocate_memory_stored_value(runner, val_typ, val)
runner.interpreter_write_memory(adr, compiler.module_data_u32(adr2))
return 4
if isinstance(val_typ, type3types.PrimitiveType3):
to_write = WRITE_LOOKUP_MAP[val_typ.name](val)
runner.interpreter_write_memory(adr, to_write)
return len(to_write)
if isinstance(val_typ, type3types.AppliedType3):
if val_typ.base in (type3types.static_array, type3types.tuple, ):
adr2 = _allocate_memory_stored_value(runner, val_typ, val)
runner.interpreter_write_memory(adr, compiler.module_data_u32(adr2))
return 4
if isinstance(val_typ, type3types.StructType3):
adr2 = _allocate_memory_stored_value(runner, val_typ, val)
runner.interpreter_write_memory(adr, compiler.module_data_u32(adr2))
return 4
raise NotImplementedError(val_typ, val)
def _allocate_memory_stored_value(
runner: runners.RunnerBase,
val_typ: type3types.Type3,
val: Any
) -> int:
if val_typ is type3types.bytes:
assert isinstance(val, bytes)
adr = runner.call('stdlib.types.__alloc_bytes__', len(val))
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
runner.interpreter_write_memory(adr + 4, val)
return adr
if isinstance(val_typ, type3types.AppliedType3):
if val_typ.base is type3types.static_array:
assert isinstance(val, tuple)
alloc_size = calculate_alloc_size(val_typ)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
val_el_typ = val_typ.args[0]
assert not isinstance(val_el_typ, type3types.PlaceholderForType)
tuple_len_obj = val_typ.args[1]
assert isinstance(tuple_len_obj, type3types.IntType3)
tuple_len = tuple_len_obj.value
assert tuple_len == len(val)
offset = adr
for val_el_val in val:
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
return adr
if val_typ.base is type3types.tuple:
assert isinstance(val, tuple)
alloc_size = calculate_alloc_size(val_typ)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
assert len(val) == len(val_typ.args)
offset = adr
for val_el_val, val_el_typ in zip(val, val_typ.args):
assert not isinstance(val_el_typ, type3types.PlaceholderForType)
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
return adr
if isinstance(val_typ, type3types.StructType3):
assert isinstance(val, dict)
alloc_size = calculate_alloc_size(val_typ)
adr = runner.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int)
sys.stderr.write(f'Allocation 0x{adr:08x} {repr(val)}\n')
assert list(val.keys()) == list(val_typ.members.keys())
offset = adr
for val_el_name, val_el_typ in val_typ.members.items():
assert not isinstance(val_el_typ, type3types.PlaceholderForType)
val_el_val = val[val_el_name]
offset += _write_memory_stored_value(runner, offset, val_el_typ, val_el_val)
return adr
raise NotImplementedError(val_typ, val)
def _load_memory_stored_returned_value(
runner: runners.RunnerBase,
func_name: str,
wasm_value: Any,
) -> Any:
ret_type3 = runner.phasm_ast.functions[func_name].returns_type3
if ret_type3 is type3types.none:
return None
if ret_type3 in (type3types.i32, type3types.i64):
assert isinstance(wasm_value, int), wasm_value
return wasm_value
if ret_type3 in (type3types.u8, type3types.u32, type3types.u64):
assert isinstance(wasm_value, int), wasm_value
if wasm_value < 0:
# WASM does not support unsigned values through its interface
# Cast and then reinterpret
letter = {
'u32': 'i',
'u64': 'q',
}[ret_type3.name]
data = struct.pack(f'<{letter}', wasm_value)
wasm_value, = struct.unpack(f'<{letter.upper()}', data)
return wasm_value
if ret_type3 in (type3types.f32, type3types.f64, ):
assert isinstance(wasm_value, float), wasm_value
return wasm_value
if ret_type3 is type3types.bytes:
assert isinstance(wasm_value, int), wasm_value
return _load_bytes_from_address(runner, ret_type3, wasm_value)
if isinstance(ret_type3, type3types.AppliedType3):
if ret_type3.base is type3types.static_array:
assert isinstance(wasm_value, int), wasm_value
return _load_static_array_from_address(runner, ret_type3, wasm_value)
if ret_type3.base is type3types.tuple:
assert isinstance(wasm_value, int), wasm_value
return _load_tuple_from_address(runner, ret_type3, wasm_value)
if isinstance(ret_type3, type3types.StructType3):
return _load_struct_from_address(runner, ret_type3, wasm_value)
raise NotImplementedError(ret_type3, wasm_value)
def _unpack(runner: runners.RunnerBase, typ: type3types.Type3, inp: bytes) -> Any:
if typ is type3types.u8:
# See compiler.py, LOAD_STORE_TYPE_MAP and module_data_u8
assert len(inp) == 4
return struct.unpack('<I', inp)[0]
if typ is type3types.u32:
assert len(inp) == 4
return struct.unpack('<I', inp)[0]
if typ is type3types.u64:
assert len(inp) == 8
return struct.unpack('<Q', inp)[0]
if typ is type3types.i8:
# See compiler.py, LOAD_STORE_TYPE_MAP and module_data_i8
assert len(inp) == 4
return struct.unpack('<i', inp)[0]
if typ is type3types.i32:
assert len(inp) == 4
return struct.unpack('<i', inp)[0]
if typ is type3types.i64:
assert len(inp) == 8
return struct.unpack('<q', inp)[0]
if typ is type3types.f32:
assert len(inp) == 4
return struct.unpack('<f', inp)[0]
if typ is type3types.f64:
assert len(inp) == 8
return struct.unpack('<d', inp)[0]
if typ is type3types.bytes:
# Note: For bytes, inp should contain a 4 byte pointer
assert len(inp) == 4
adr = struct.unpack('<I', inp)[0]
return _load_bytes_from_address(runner, typ, adr)
if isinstance(typ, type3types.AppliedType3):
# Note: For applied types, inp should contain a 4 byte pointer
assert len(inp) == 4
adr = struct.unpack('<I', inp)[0]
if typ.base is type3types.static_array:
return _load_static_array_from_address(runner, typ, adr)
if typ.base is type3types.tuple:
return _load_tuple_from_address(runner, typ, adr)
if isinstance(typ, type3types.StructType3):
# Note: For structs, inp should contain a 4 byte pointer
assert len(inp) == 4
adr = struct.unpack('<I', inp)[0]
return _load_struct_from_address(runner, typ, adr)
raise NotImplementedError(typ, inp)
def _load_bytes_from_address(runner: runners.RunnerBase, typ: type3types.Type3, adr: int) -> bytes:
sys.stderr.write(f'Reading 0x{adr:08x} {typ:s}\n')
read_bytes = runner.interpreter_read_memory(adr, 4)
bytes_len, = struct.unpack('<I', read_bytes)
adr += 4
return runner.interpreter_read_memory(adr, bytes_len)
def _split_read_bytes(all_bytes: bytes, split_sizes: Iterable[int]) -> Generator[bytes, None, None]:
offset = 0
for size in split_sizes:
yield all_bytes[offset:offset + size]
offset += size
def _load_static_array_from_address(runner: runners.RunnerBase, typ: type3types.AppliedType3, adr: int) -> Any:
sys.stderr.write(f'Reading 0x{adr:08x} {typ:s}\n')
assert 2 == len(typ.args)
sub_typ, len_typ = typ.args
assert not isinstance(sub_typ, type3types.PlaceholderForType)
assert isinstance(len_typ, type3types.IntType3)
sa_len = len_typ.value
arg_size_1 = calculate_alloc_size(sub_typ, is_member=True)
arg_sizes = [arg_size_1 for _ in range(sa_len)] # _split_read_bytes requires one arg per value
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, sub_typ, arg_bytes)
for arg_bytes in _split_read_bytes(read_bytes, arg_sizes)
)
def _load_tuple_from_address(runner: runners.RunnerBase, typ: type3types.Type3, adr: int) -> Any:
sys.stderr.write(f'Reading 0x{adr:08x} {typ:s}\n')
assert isinstance(typ, type3types.AppliedType3)
assert typ.base is type3types.tuple
typ_list = [
x
for x in typ.args
if not isinstance(x, type3types.PlaceholderForType)
]
assert len(typ_list) == len(typ.args)
arg_sizes = [
calculate_alloc_size(x, is_member=True)
for x in typ_list
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return tuple(
_unpack(runner, arg_typ, arg_bytes)
for arg_typ, arg_bytes in zip(typ_list, _split_read_bytes(read_bytes, arg_sizes))
)
def _load_struct_from_address(runner: runners.RunnerBase, typ: type3types.Type3, adr: int) -> Any:
sys.stderr.write(f'Reading 0x{adr:08x} {typ:s}\n')
assert isinstance(typ, type3types.StructType3)
name_list = list(typ.members)
typ_list = [
x
for x in typ.members.values()
if not isinstance(x, type3types.PlaceholderForType)
]
assert len(typ_list) == len(typ.members)
arg_sizes = [
calculate_alloc_size(x, is_member=True)
for x in typ_list
]
read_bytes = runner.interpreter_read_memory(adr, sum(arg_sizes))
return {
arg_name: _unpack(runner, arg_typ, arg_bytes)
for arg_name, arg_typ, arg_bytes in zip(name_list, typ_list, _split_read_bytes(read_bytes, arg_sizes))
}