phasm/tests/integration/runners.py
2022-08-06 20:50:43 +02:00

223 lines
6.1 KiB
Python

"""
Runners to help run WebAssembly code on various interpreters
"""
from typing import Any, Iterable, TextIO
import io
import os
import subprocess
import tempfile
import pywasm.binary
import wasm3
from phasm.compiler import phasm_compile
from phasm.parser import phasm_parse
from phasm import ourlang
from phasm import wasm
def wat2wasm(code_wat: str) -> bytes:
"""
Converts the given WebAssembly Assembly code into WebAssembly Binary
"""
path = os.environ.get('WAT2WASM', 'wat2wasm')
with tempfile.NamedTemporaryFile('w+t') as input_fp:
input_fp.write(code_wat)
input_fp.flush()
with tempfile.NamedTemporaryFile('w+b') as output_fp:
subprocess.run(
[
path,
input_fp.name,
'-o',
output_fp.name,
],
check=True,
)
output_fp.seek(0)
return output_fp.read()
class RunnerBase:
"""
Base class
"""
phasm_code: str
phasm_ast: ourlang.Module
wasm_ast: wasm.Module
wasm_asm: str
wasm_bin: bytes
def __init__(self, phasm_code: str) -> None:
self.phasm_code = phasm_code
def dump_phasm_code(self, textio: TextIO) -> None:
"""
Dumps the input Phasm code for debugging
"""
_dump_code(textio, self.phasm_code)
def parse(self) -> None:
"""
Parses the Phasm code into an AST
"""
self.phasm_ast = phasm_parse(self.phasm_code)
def compile_ast(self) -> None:
"""
Compiles the Phasm AST into an WebAssembly AST
"""
self.wasm_ast = phasm_compile(self.phasm_ast)
def compile_wat(self) -> None:
"""
Compiles the WebAssembly AST into WebAssembly Assembly code
"""
self.wasm_asm = self.wasm_ast.to_wat()
def dump_wasm_wat(self, textio: TextIO) -> None:
"""
Dumps the intermediate WebAssembly Assembly code for debugging
"""
_dump_code(textio, self.wasm_asm)
def compile_wasm(self) -> None:
"""
Compiles the WebAssembly AST into WebAssembly Binary
"""
self.wasm_bin = wat2wasm(self.wasm_asm)
def interpreter_setup(self) -> None:
"""
Sets up the interpreter
"""
raise NotImplementedError
def interpreter_load(self) -> None:
"""
Loads the code into the interpreter
"""
raise NotImplementedError
def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None:
"""
Writes into the interpreters memory
"""
raise NotImplementedError
def interpreter_read_memory(self, offset: int, length: int) -> bytes:
"""
Reads from the interpreters memory
"""
raise NotImplementedError
def interpreter_dump_memory(self, textio: TextIO) -> None:
"""
Dumps the interpreters memory for debugging
"""
raise NotImplementedError
def call(self, function: str, *args: Any) -> Any:
"""
Calls the given function with the given arguments, returning the result
"""
raise NotImplementedError
class RunnerPywasm(RunnerBase):
"""
Implements a runner for pywasm
See https://pypi.org/project/pywasm/
"""
module: pywasm.binary.Module
runtime: pywasm.Runtime
def interpreter_setup(self) -> None:
# Nothing to set up
pass
def interpreter_load(self) -> None:
bytesio = io.BytesIO(self.wasm_bin)
self.module = pywasm.binary.Module.from_reader(bytesio)
self.runtime = pywasm.Runtime(self.module, {}, None)
def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None:
for idx, byt in enumerate(data):
self.runtime.store.memory_list[0].data[offset + idx] = byt
def interpreter_read_memory(self, offset: int, length: int) -> bytes:
return self.runtime.store.memory_list[0].data[offset:length]
def interpreter_dump_memory(self, textio: TextIO) -> None:
_dump_memory(textio, self.runtime.store.memory_list[0].data)
def call(self, function: str, *args: Any) -> Any:
return self.runtime.exec(function, [*args])
class RunnerPywasm3(RunnerBase):
"""
Implements a runner for pywasm3
See https://pypi.org/project/pywasm3/
"""
env: wasm3.Environment
rtime: wasm3.Runtime
mod: wasm3.Module
def interpreter_setup(self) -> None:
self.env = wasm3.Environment()
self.rtime = self.env.new_runtime(1024 * 1024)
def interpreter_load(self) -> None:
self.mod = self.env.parse_module(self.wasm_bin)
self.rtime.load(self.mod)
def interpreter_write_memory(self, offset: int, data: Iterable[int]) -> None:
memory = self.rtime.get_memory(0)
for idx, byt in enumerate(data):
memory[offset + idx] = byt # type: ignore
def interpreter_read_memory(self, offset: int, length: int) -> bytes:
memory = self.rtime.get_memory(0)
return memory[offset:length].tobytes()
def interpreter_dump_memory(self, textio: TextIO) -> None:
_dump_memory(textio, self.rtime.get_memory(0))
def call(self, function: str, *args: Any) -> Any:
return self.rtime.find_function(function)(*args)
def _dump_memory(textio: TextIO, mem: bytes) -> None:
line_width = 16
prev_line = None
skip = False
for idx in range(0, len(mem), line_width):
line = ''
for idx2 in range(0, line_width):
line += f'{mem[idx + idx2]:02X}'
if idx2 % 2 == 1:
line += ' '
if prev_line == line:
if not skip:
textio.write('**\n')
skip = True
else:
textio.write(f'{idx:08x} {line}\n')
prev_line = line
def _dump_code(textio: TextIO, text: str) -> None:
line_list = text.split('\n')
line_no_width = len(str(len(line_list)))
for line_no, line_txt in enumerate(line_list):
textio.write('{} {}\n'.format(
str(line_no + 1).zfill(line_no_width),
line_txt,
))