phasm/tests/integration/helpers.py
Johan B.W. de Vries 0da309a280 Remove old code
2022-06-19 17:04:20 +02:00

205 lines
5.2 KiB
Python

import io
import os
import subprocess
import sys
from tempfile import NamedTemporaryFile
import pywasm
import wasm3
import wasmer
import wasmer_compiler_cranelift
import wasmtime
from py2wasm.utils import our_process
from py2wasm.compiler import module
DASHES = '-' * 16
def wat2wasm(code_wat):
path = os.environ.get('WAT2WASM', 'wat2wasm')
with NamedTemporaryFile('w+t') as input_fp:
input_fp.write(code_wat)
input_fp.flush()
with NamedTemporaryFile('w+b') as output_fp:
subprocess.run(
[
path,
input_fp.name,
'-o',
output_fp.name,
],
check=True,
)
output_fp.seek(0)
return output_fp.read()
class SuiteResult:
def __init__(self):
self.log_int32_list = []
self.returned_value = None
def callback_log_int32(self, store, value):
del store # auto passed by pywasm
self.log_int32_list.append(value)
def make_imports(self):
return {
'console': {
'logInt32': self.callback_log_int32,
}
}
class Suite:
def __init__(self, code_py, test_name):
self.code_py = code_py
self.test_name = test_name
def run_code(self, *args):
"""
Compiles the given python code into wasm and
then runs it
Returned is an object with the results set
"""
our_module = our_process(self.code_py, self.test_name)
# Check if code formatting works
assert self.code_py == '\n' + our_module.render() # \n for formatting in tests
# Compile
wat_module = module(our_module)
# Render as text
code_wat = wat_module.generate()
sys.stderr.write(f'{DASHES} Assembly {DASHES}\n')
_write_numbered_lines(code_wat)
# Compile to assembly code
code_wasm = wat2wasm(code_wat)
# Run assembly code
return _run_pywasm3(code_wasm, args)
def _run_pywasm(code_wasm, args):
# https://pypi.org/project/pywasm/
result = SuiteResult()
module = pywasm.binary.Module.from_reader(io.BytesIO(code_wasm))
runtime = pywasm.Runtime(module, result.make_imports(), {})
sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
_dump_memory(runtime.store.mems[0].data)
result.returned_value = runtime.exec('testEntry', args)
sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
_dump_memory(runtime.store.mems[0].data)
return result
def _run_pywasm3(code_wasm, args):
# https://pypi.org/project/pywasm3/
result = SuiteResult()
env = wasm3.Environment()
mod = env.parse_module(code_wasm)
rtime = env.new_runtime(1024 * 1024)
rtime.load(mod)
# sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
# _dump_memory(rtime.get_memory(0).tobytes())
result.returned_value = rtime.find_function('testEntry')(*args)
# sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
# _dump_memory(rtime.get_memory(0).tobytes())
return result
def _run_wasmtime(code_wasm, args):
# https://pypi.org/project/wasmtime/
result = SuiteResult()
store = wasmtime.Store()
module = wasmtime.Module(store.engine, code_wasm)
instance = wasmtime.Instance(store, module, [])
sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
sys.stderr.write('<Not available on wasmtime>\n')
result.returned_value = instance.exports(store)['testEntry'](store, *args)
sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
sys.stderr.write('<Not available on wasmtime>\n')
return result
def _run_wasmer(code_wasm, args):
# https://pypi.org/project/wasmer/
result = SuiteResult()
store = wasmer.Store(wasmer.engine.JIT(wasmer_compiler_cranelift.Compiler))
# Let's compile the module to be able to execute it!
module = wasmer.Module(store, code_wasm)
# Now the module is compiled, we can instantiate it.
instance = wasmer.Instance(module)
sys.stderr.write(f'{DASHES} Memory (pre run) {DASHES}\n')
sys.stderr.write('<Not available on wasmer>\n')
# Call the exported `sum` function.
result.returned_value = instance.exports.testEntry(*args)
sys.stderr.write(f'{DASHES} Memory (post run) {DASHES}\n')
sys.stderr.write('<Not available on wasmer>\n')
return result
def _dump_memory(mem):
line_width = 16
prev_line = None
skip = False
for idx in range(0, len(mem), line_width):
line = ''
for idx2 in range(0, line_width):
line += f'{mem[idx + idx2]:02X}'
if idx2 % 2 == 1:
line += ' '
if prev_line == line:
if not skip:
sys.stderr.write('**\n')
skip = True
else:
sys.stderr.write(f'{idx:08x} {line}\n')
prev_line = line
def _write_numbered_lines(text: str) -> None:
line_list = text.split('\n')
line_no_width = len(str(len(line_list)))
for line_no, line_txt in enumerate(line_list):
sys.stderr.write('{} {}\n'.format(
str(line_no + 1).zfill(line_no_width),
line_txt,
))