2023-04-11 10:27:36 +02:00

90 lines
2.9 KiB
Python

from typing import List
import ctypes
import struct
import wasmtime
from phasmplatform.common.method import MethodCall, MethodNotFoundError
from phasmplatform.common.router import BaseRouter
from .base import BaseRunner
class WasmTimeRunner(BaseRunner):
__slots__ = ('store', 'module', 'instance', 'exports')
def __init__(self, router: BaseRouter, wasm_bin: bytes) -> None:
super().__init__(router)
self.store = wasmtime.Store()
possible_imports = {
'prelude': {
'log_bytes': wasmtime.Func(self.store, wasmtime.FuncType([wasmtime.ValType.i32()], []), self.log_bytes),
}
}
self.module = wasmtime.Module(self.store.engine, wasm_bin)
imports: List[wasmtime.Func] = []
for imprt in self.module.imports:
if imprt.module not in possible_imports:
raise Exception('Service not found')
if imprt.name not in possible_imports[imprt.module]:
raise Exception('Method not found in service')
imports.append(possible_imports[imprt.module][imprt.name])
self.instance = wasmtime.Instance(self.store, self.module, imports)
self.exports = self.instance.exports(self.store)
def alloc_bytes(self, data: bytes) -> int:
memory = self.exports['memory']
assert isinstance(memory, wasmtime.Memory) # type hint
data_ptr = memory.data_ptr(self.store)
data_len = memory.data_len(self.store)
alloc_bytes = self.exports['stdlib.types.__alloc_bytes__']
assert isinstance(alloc_bytes, wasmtime.Func)
ptr = alloc_bytes(self.store, len(data))
assert isinstance(ptr, int) # type hint
idx = ptr + 4 # Skip the header from header from __alloc_bytes__
for byt in data:
assert idx < data_len
data_ptr[idx] = ctypes.c_ubyte(byt)
idx += 1
return ptr
def read_bytes(self, ptr: int) -> bytes:
memory = self.exports['memory']
assert isinstance(memory, wasmtime.Memory) # type hint
data_ptr = memory.data_ptr(self.store)
data_len = memory.data_len(self.store)
raw = ctypes.string_at(data_ptr, data_len)
length, = struct.unpack('<I', raw[ptr:ptr + 4]) # Header prefixed by __alloc_bytes__
return raw[ptr + 4:ptr + 4 + length]
def do_call(self, call: MethodCall) -> None:
try:
wasm_method = self.exports[call.method.name]
except KeyError:
call.on_error(MethodNotFoundError())
return
assert isinstance(wasm_method, wasmtime.Func)
act_args = [self.value_to_wasm(x) for x in call.args]
result = wasm_method(self.store, *act_args)
assert result is None or isinstance(result, (int, float, )) # type hint
call.on_success(self.value_from_wasm(call.method.return_type, result))