import io import os import subprocess import sys from tempfile import NamedTemporaryFile import pywasm import wasm3 import wasmer import wasmer_compiler_cranelift import wasmtime from phasm.codestyle import phasm_render from phasm.compiler import phasm_compile from phasm.parser import phasm_parse DASHES = '-' * 16 def wat2wasm(code_wat): path = os.environ.get('WAT2WASM', 'wat2wasm') with NamedTemporaryFile('w+t') as input_fp: input_fp.write(code_wat) input_fp.flush() with NamedTemporaryFile('w+b') as output_fp: subprocess.run( [ path, input_fp.name, '-o', output_fp.name, ], check=True, ) output_fp.seek(0) return output_fp.read() class SuiteResult: def __init__(self): self.log_int32_list = [] self.returned_value = None def callback_log_int32(self, store, value): del store # auto passed by pywasm self.log_int32_list.append(value) def make_imports(self): return { 'console': { 'logInt32': self.callback_log_int32, } } class Suite: """ WebAssembly test suite """ def __init__(self, code_py): self.code_py = code_py def run_code(self, *args, runtime='pywasm3', imports=None): """ Compiles the given python code into wasm and then runs it Returned is an object with the results set """ phasm_module = phasm_parse(self.code_py) # Check if code formatting works assert self.code_py == '\n' + phasm_render(phasm_module) # \n for formatting in tests # Compile wasm_module = phasm_compile(phasm_module) # Render as WebAssembly text code_wat = wasm_module.to_wat() sys.stderr.write(f'{DASHES} Assembly {DASHES}\n') _write_numbered_lines(code_wat) # Compile to assembly code code_wasm = wat2wasm(code_wat) # Run assembly code if 'pywasm' == runtime: return _run_pywasm(code_wasm, args) if 'pywasm3' == runtime: return _run_pywasm3(code_wasm, args) if 'wasmtime' == runtime: return _run_wasmtime(code_wasm, args) if 'wasmer' == runtime: return _run_wasmer(code_wasm, args, imports) raise Exception(f'Invalid runtime: {runtime}') def _run_pywasm(code_wasm, args): # https://pypi.org/project/pywasm/ result = SuiteResult() module = pywasm.binary.Module.from_reader(io.BytesIO(code_wasm)) runtime = pywasm.Runtime(module, result.make_imports(), {}) def set_byte(idx, byt): runtime.store.mems[0].data[idx] = byt args = _convert_bytes_arguments( args, lambda x: runtime.exec('___new_reference___', [x]), set_byte ) sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n') _dump_memory(runtime.store.mems[0].data) result.returned_value = runtime.exec('testEntry', args) sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n') _dump_memory(runtime.store.mems[0].data) return result def _run_pywasm3(code_wasm, args): # https://pypi.org/project/pywasm3/ result = SuiteResult() env = wasm3.Environment() mod = env.parse_module(code_wasm) rtime = env.new_runtime(1024 * 1024) rtime.load(mod) def set_byte(idx, byt): rtime.get_memory(0)[idx] = byt args = _convert_bytes_arguments( args, rtime.find_function('___new_reference___'), set_byte ) sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n') _dump_memory(rtime.get_memory(0)) result.returned_value = rtime.find_function('testEntry')(*args) sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n') _dump_memory(rtime.get_memory(0)) return result def _run_wasmtime(code_wasm, args): # https://pypi.org/project/wasmtime/ result = SuiteResult() store = wasmtime.Store() module = wasmtime.Module(store.engine, code_wasm) instance = wasmtime.Instance(store, module, []) sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n') sys.stderr.write('\n') result.returned_value = instance.exports(store)['testEntry'](store, *args) sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n') sys.stderr.write('\n') return result def _run_wasmer(code_wasm, args, imports): # https://pypi.org/project/wasmer/ result = SuiteResult() store = wasmer.Store(wasmer.engine.JIT(wasmer_compiler_cranelift.Compiler)) import_object = wasmer.ImportObject() import_object.register('imports', { k: wasmer.Function(store, v) for k, v in (imports or {}).items() }) # Let's compile the module to be able to execute it! module = wasmer.Module(store, code_wasm) # Now the module is compiled, we can instantiate it. instance = wasmer.Instance(module, import_object) sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n') sys.stderr.write('\n') # Call the exported `sum` function. result.returned_value = instance.exports.testEntry(*args) sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n') sys.stderr.write('\n') return result def _convert_bytes_arguments(args, new_reference, set_byte): result = [] for arg in args: if not isinstance(arg, bytes): result.append(arg) continue # TODO: Implement and use the bytes constructor function offset = new_reference(len(arg) + 4) result.append(offset) # Store the length prefix for idx, byt in enumerate(len(arg).to_bytes(4, byteorder='little')): set_byte(offset + idx, byt) # Store the actual bytes for idx, byt in enumerate(arg): set_byte(offset + 4 + idx, byt) return result def _dump_memory(mem): line_width = 16 prev_line = None skip = False for idx in range(0, len(mem), line_width): line = '' for idx2 in range(0, line_width): line += f'{mem[idx + idx2]:02X}' if idx2 % 2 == 1: line += ' ' if prev_line == line: if not skip: sys.stderr.write('**\n') skip = True else: sys.stderr.write(f'{idx:08x} {line}\n') prev_line = line def _write_numbered_lines(text: str) -> None: line_list = text.split('\n') line_no_width = len(str(len(line_list))) for line_no, line_txt in enumerate(line_list): sys.stderr.write('{} {}\n'.format( str(line_no + 1).zfill(line_no_width), line_txt, ))