phasm/tests/integration/helpers.py
Johan B.W. de Vries 89ad648f34 Moved rendering to codestyle, parsing to parser
Also, removed name argument when parsing, wasn't used.
2022-07-09 14:04:40 +02:00

262 lines
6.8 KiB
Python

import io
import os
import subprocess
import sys
from tempfile import NamedTemporaryFile
import pywasm
import wasm3
import wasmer
import wasmer_compiler_cranelift
import wasmtime
from phasm.codestyle import phasm_render
from phasm.compiler import phasm_compile
from phasm.parser import phasm_parse
DASHES = '-' * 16
def wat2wasm(code_wat):
path = os.environ.get('WAT2WASM', 'wat2wasm')
with NamedTemporaryFile('w+t') as input_fp:
input_fp.write(code_wat)
input_fp.flush()
with NamedTemporaryFile('w+b') as output_fp:
subprocess.run(
[
path,
input_fp.name,
'-o',
output_fp.name,
],
check=True,
)
output_fp.seek(0)
return output_fp.read()
class SuiteResult:
def __init__(self):
self.log_int32_list = []
self.returned_value = None
def callback_log_int32(self, store, value):
del store # auto passed by pywasm
self.log_int32_list.append(value)
def make_imports(self):
return {
'console': {
'logInt32': self.callback_log_int32,
}
}
class Suite:
"""
WebAssembly test suite
"""
def __init__(self, code_py):
self.code_py = code_py
def run_code(self, *args, runtime='pywasm3', imports=None):
"""
Compiles the given python code into wasm and
then runs it
Returned is an object with the results set
"""
phasm_module = phasm_parse(self.code_py)
# Check if code formatting works
assert self.code_py == '\n' + phasm_render(phasm_module) # \n for formatting in tests
# Compile
wasm_module = phasm_compile(phasm_module)
# Render as WebAssembly text
code_wat = wasm_module.to_wat()
sys.stderr.write(f'{DASHES} Assembly {DASHES}\n')
_write_numbered_lines(code_wat)
# Compile to assembly code
code_wasm = wat2wasm(code_wat)
# Run assembly code
if 'pywasm' == runtime:
return _run_pywasm(code_wasm, args)
if 'pywasm3' == runtime:
return _run_pywasm3(code_wasm, args)
if 'wasmtime' == runtime:
return _run_wasmtime(code_wasm, args)
if 'wasmer' == runtime:
return _run_wasmer(code_wasm, args, imports)
raise Exception(f'Invalid runtime: {runtime}')
def _run_pywasm(code_wasm, args):
# https://pypi.org/project/pywasm/
result = SuiteResult()
module = pywasm.binary.Module.from_reader(io.BytesIO(code_wasm))
runtime = pywasm.Runtime(module, result.make_imports(), {})
def set_byte(idx, byt):
runtime.store.mems[0].data[idx] = byt
args = _convert_bytes_arguments(
args,
lambda x: runtime.exec('___new_reference___', [x]),
set_byte
)
sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
_dump_memory(runtime.store.mems[0].data)
result.returned_value = runtime.exec('testEntry', args)
sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
_dump_memory(runtime.store.mems[0].data)
return result
def _run_pywasm3(code_wasm, args):
# https://pypi.org/project/pywasm3/
result = SuiteResult()
env = wasm3.Environment()
mod = env.parse_module(code_wasm)
rtime = env.new_runtime(1024 * 1024)
rtime.load(mod)
def set_byte(idx, byt):
rtime.get_memory(0)[idx] = byt
args = _convert_bytes_arguments(
args,
rtime.find_function('___new_reference___'),
set_byte
)
sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
_dump_memory(rtime.get_memory(0))
result.returned_value = rtime.find_function('testEntry')(*args)
sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
_dump_memory(rtime.get_memory(0))
return result
def _run_wasmtime(code_wasm, args):
# https://pypi.org/project/wasmtime/
result = SuiteResult()
store = wasmtime.Store()
module = wasmtime.Module(store.engine, code_wasm)
instance = wasmtime.Instance(store, module, [])
sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
sys.stderr.write('<Not available on wasmtime>\n')
result.returned_value = instance.exports(store)['testEntry'](store, *args)
sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
sys.stderr.write('<Not available on wasmtime>\n')
return result
def _run_wasmer(code_wasm, args, imports):
# https://pypi.org/project/wasmer/
result = SuiteResult()
store = wasmer.Store(wasmer.engine.JIT(wasmer_compiler_cranelift.Compiler))
import_object = wasmer.ImportObject()
import_object.register('imports', {
k: wasmer.Function(store, v)
for k, v in (imports or {}).items()
})
# Let's compile the module to be able to execute it!
module = wasmer.Module(store, code_wasm)
# Now the module is compiled, we can instantiate it.
instance = wasmer.Instance(module, import_object)
sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
sys.stderr.write('<Not available on wasmer>\n')
# Call the exported `sum` function.
result.returned_value = instance.exports.testEntry(*args)
sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
sys.stderr.write('<Not available on wasmer>\n')
return result
def _convert_bytes_arguments(args, new_reference, set_byte):
result = []
for arg in args:
if not isinstance(arg, bytes):
result.append(arg)
continue
# TODO: Implement and use the bytes constructor function
offset = new_reference(len(arg) + 4)
result.append(offset)
# Store the length prefix
for idx, byt in enumerate(len(arg).to_bytes(4, byteorder='little')):
set_byte(offset + idx, byt)
# Store the actual bytes
for idx, byt in enumerate(arg):
set_byte(offset + 4 + idx, byt)
return result
def _dump_memory(mem):
line_width = 16
prev_line = None
skip = False
for idx in range(0, len(mem), line_width):
line = ''
for idx2 in range(0, line_width):
line += f'{mem[idx + idx2]:02X}'
if idx2 % 2 == 1:
line += ' '
if prev_line == line:
if not skip:
sys.stderr.write('**\n')
skip = True
else:
sys.stderr.write(f'{idx:08x} {line}\n')
prev_line = line
def _write_numbered_lines(text: str) -> None:
line_list = text.split('\n')
line_no_width = len(str(len(line_list)))
for line_no, line_txt in enumerate(line_list):
sys.stderr.write('{} {}\n'.format(
str(line_no + 1).zfill(line_no_width),
line_txt,
))