import struct from typing import Any, Protocol from phasm.build.base import BuildBase from phasm.build.typerouter import BuildTypeRouter from phasm.type5.typeexpr import AtomicType, TypeExpr from phasm.wasm import ( WasmTypeFloat32, WasmTypeFloat64, WasmTypeInt32, WasmTypeInt64, WasmTypeNone, ) class MemoryAccess(Protocol): def call(self, function: str, *args: Any) -> Any: """ Use for calling allocator methods inside the WASM environment. """ def interpreter_write_memory(self, offset: int, data: bytes) -> None: """ Writes bytes directly to WASM environment memory. Addresses should be generated using allocators via call. """ def interpreter_read_memory(self, offset: int, length: int) -> bytes: """ Reads bytes directly from WASM environment memory. """ class AllocatorFunc(Protocol): def __call__(self, py_value: Any) -> int: """ Takes a Python value and allocaties it in the given memory Based on the phasm type """ class Allocator(BuildTypeRouter[AllocatorFunc]): __slots__ = ('access', ) access: MemoryAccess def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None: super().__init__(build) self.access = access def when_dynamic_array(self, da_arg: TypeExpr) -> AllocatorFunc: if da_arg.name == 'u8': return BytesAllocator(self.access) return DynamicArrayAllocator(self.access, self(da_arg)) class BytesAllocator: __slots__ = ('access', 'alloc_size', ) access: MemoryAccess def __init__(self, access: MemoryAccess) -> None: self.access = access def __call__(self, py_value: Any) -> int: assert isinstance(py_value, bytes) adr = self.access.call('stdlib.types.__alloc_bytes__', len(py_value)) assert isinstance(adr, int) self.access.interpreter_write_memory(adr + 4, py_value) return adr class ExtractorFunc(Protocol): alloc_size: int def __call__(self, wasm_value: Any) -> Any: """ Takes a WASM value and returns a Python value Based on the phasm type """ class Extractor(BuildTypeRouter[ExtractorFunc]): __slots__ = ('access', ) access: MemoryAccess def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None: super().__init__(build) self.access = access def when_atomic(self, typ: AtomicType) -> ExtractorFunc: type_info = self.build.type_info_map[typ.name] if type_info.wasm_type is WasmTypeNone: raise NotImplementedError() if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64: if type_info.signed is None: return BoolExtractor() return IntExtractor(type_info.signed, type_info.alloc_size) if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64: return FloatExtractor(type_info.alloc_size) raise NotImplementedError(typ) def when_dynamic_array(self, da_arg: TypeExpr) -> ExtractorFunc: if da_arg.name == 'u8': return BytesExtractor(self.access) return DynamicArrayExtractor(self.access, self(da_arg)) class BoolExtractor: __slots__ = ('alloc_size', ) def __init__(self) -> None: self.alloc_size = 1 def __call__(self, wasm_value: Any) -> bool: assert isinstance(wasm_value, int), wasm_value return wasm_value != 0 class IntExtractor: __slots__ = ('alloc_size', 'signed', ) def __init__(self, signed: bool, alloc_size: int) -> None: self.signed = signed self.alloc_size = alloc_size def __call__(self, wasm_value: Any) -> int: assert isinstance(wasm_value, int), wasm_value # Work around the fact that phasm has unsigned integers but wasm does not data = wasm_value.to_bytes(8, 'big', signed=True) data = data[-self.alloc_size:] return int.from_bytes(data, 'big', signed=self.signed) class FloatExtractor: __slots__ = ('alloc_size', ) def __init__(self, alloc_size: int) -> None: self.alloc_size = alloc_size def __call__(self, wasm_value: Any) -> float: assert isinstance(wasm_value, float), wasm_value return wasm_value class DynamicArrayExtractor: __slots__ = ('access', 'alloc_size', 'sub_Extractor', ) access: MemoryAccess alloc_size: int sub_Extractor: ExtractorFunc def __init__(self, access: MemoryAccess, sub_Extractor: ExtractorFunc) -> None: self.access = access self.sub_Extractor = sub_Extractor def __call__(self, wasm_value: Any) -> Any: assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value # wasm_value must be a pointer # The first value at said pointer is the length of the array read_bytes = self.access.interpreter_read_memory(adr, 4) array_len, = struct.unpack(' None: self.access = access def __call__(self, wasm_value: Any) -> bytes: assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value # wasm_value must be a pointer # The first value at said pointer is the length of the array read_bytes = self.access.interpreter_read_memory(adr, 4) array_len, = struct.unpack('