import struct from typing import Any, Protocol from phasm.build.base import BuildBase from phasm.build.typerouter import BuildTypeRouter from phasm.type5.typeexpr import AtomicType, TypeExpr from phasm.wasm import ( WasmTypeFloat32, WasmTypeFloat64, WasmTypeInt32, WasmTypeInt64, WasmTypeNone, ) class MemoryAccess(Protocol): def call(self, function: str, *args: Any) -> Any: """ Use for calling allocator methods inside the WASM environment. """ def interpreter_write_memory(self, offset: int, data: bytes) -> None: """ Writes bytes directly to WASM environment memory. Addresses should be generated using allocators via call. """ def interpreter_read_memory(self, offset: int, length: int) -> bytes: """ Reads bytes directly from WASM environment memory. """ class MemorySlice: __slots__ = ('memory', 'offset', ) def __init__(self, memory: bytes, offset: int) -> None: self.memory = memory self.offset = offset def __call__(self, size: int) -> bytes: return self.memory[self.offset:self.offset + size] def __repr__(self) -> str: return f'MemorySlice({self.memory!r}, {self.offset!r})' class AllocatorFunc(Protocol): alloc_size: int def __call__(self, py_value: Any, adr: int | None = None) -> int: """ Takes a Python value and allocaties it in the given memory Based on the phasm type. When the parent already has allocated memory, the adr is passed """ class Allocator(BuildTypeRouter[AllocatorFunc]): __slots__ = ('access', ) access: MemoryAccess def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None: super().__init__(build) self.access = access def when_atomic(self, typ: AtomicType) -> AllocatorFunc: type_info = self.build.type_info_map[typ.name] if type_info.wasm_type is WasmTypeNone: raise NotImplementedError if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64: if type_info.signed is None: raise NotImplementedError return IntAllocator(self.access, type_info.signed, type_info.alloc_size) if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64: raise NotImplementedError raise NotImplementedError(typ) def when_dynamic_array(self, da_arg: TypeExpr) -> AllocatorFunc: if da_arg.name == 'u8': return BytesAllocator(self.access) return DynamicArrayAllocator(self.access, self(da_arg)) def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> AllocatorFunc: return StaticArrayAllocator(self.access, sa_len, self(sa_typ)) def when_tuple(self, tp_args: list[TypeExpr]) -> AllocatorFunc: return TupleAllocator(self.access, list(map(self, tp_args))) class ExtractorFunc(Protocol): alloc_size: int def __call__(self, wasm_value: Any) -> Any: """ Takes a WASM value and returns a Python value Based on the phasm type """ class Extractor(BuildTypeRouter[ExtractorFunc]): __slots__ = ('access', ) access: MemoryAccess def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None: super().__init__(build) self.access = access def when_atomic(self, typ: AtomicType) -> ExtractorFunc: type_info = self.build.type_info_map[typ.name] if type_info.wasm_type is WasmTypeNone: raise NotImplementedError if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64: if type_info.signed is None: return BoolExtractor() return IntExtractor(type_info.signed, type_info.alloc_size) if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64: return FloatExtractor(type_info.alloc_size) raise NotImplementedError(typ) def when_dynamic_array(self, da_arg: TypeExpr) -> ExtractorFunc: if da_arg.name == 'u8': return BytesExtractor(self.access) return DynamicArrayExtractor(self.access, self(da_arg)) def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> ExtractorFunc: return StaticArrayExtractor(self.access, sa_len, self(sa_typ)) def when_tuple(self, tp_args: list[TypeExpr]) -> ExtractorFunc: return TupleExtractor(self.access, list(map(self, tp_args))) class BoolExtractor: __slots__ = ('alloc_size', ) def __init__(self) -> None: self.alloc_size = 1 def __call__(self, wasm_value: Any) -> bool: assert isinstance(wasm_value, int), wasm_value return wasm_value != 0 class IntAllocator: __slots__ = ('access', 'alloc_size', 'signed', ) def __init__(self, access: MemoryAccess, signed: bool, alloc_size: int) -> None: self.access = access self.signed = signed self.alloc_size = alloc_size def __call__(self, py_value: Any, adr: int | None = None) -> int: if adr is None: raise NotImplementedError assert isinstance(py_value, int) data = py_value.to_bytes(self.alloc_size, 'little', signed=self.signed) self.access.interpreter_write_memory(adr, data) return adr class IntExtractor: __slots__ = ('alloc_size', 'signed', ) def __init__(self, signed: bool, alloc_size: int) -> None: self.signed = signed self.alloc_size = alloc_size def __call__(self, wasm_value: Any) -> int: if isinstance(wasm_value, MemorySlice): # Memory stored int data = wasm_value(self.alloc_size) else: # Int received from the wasm interface # Work around the fact that phasm has unsigned integers but wasm does not # Use little endian since that matches with what WASM uses internally assert isinstance(wasm_value, int), wasm_value data = wasm_value.to_bytes(8, 'little', signed=True) data = data[:self.alloc_size] return int.from_bytes(data, 'little', signed=self.signed) class PtrAllocator(IntAllocator): def __init__(self, access: MemoryAccess) -> None: super().__init__(access, False, 4) class PtrExtractor(IntExtractor): def __init__(self) -> None: super().__init__(False, 4) class FloatExtractor: __slots__ = ('alloc_size', ) def __init__(self, alloc_size: int) -> None: self.alloc_size = alloc_size def __call__(self, wasm_value: Any) -> float: assert isinstance(wasm_value, float), wasm_value return wasm_value class DynamicArrayAllocator: __slots__ = ('access', 'alloc_size', 'sub_allocator', ) access: MemoryAccess alloc_size: int sub_allocator: AllocatorFunc def __init__(self, access: MemoryAccess, sub_allocator: AllocatorFunc) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_allocator = sub_allocator def __call__(self, py_value: Any, adr: int | None = None) -> int: if adr is not None: raise NotImplementedError assert isinstance(py_value, tuple) py_len = len(py_value) alloc_size = 4 + py_len * self.sub_allocator.alloc_size adr = self.access.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) # Type int PtrAllocator(self.access)(py_len, adr) for idx, el_value in enumerate(py_value): offset = adr + 4 + idx * self.sub_allocator.alloc_size self.sub_allocator(el_value, offset) return adr class DynamicArrayExtractor: __slots__ = ('access', 'alloc_size', 'sub_extractor', ) access: MemoryAccess alloc_size: int sub_extractor: ExtractorFunc def __init__(self, access: MemoryAccess, sub_extractor: ExtractorFunc) -> None: self.access = access self.sub_extractor = sub_extractor def __call__(self, wasm_value: Any) -> Any: assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value # wasm_value must be a pointer # The first value at said pointer is the length of the array read_bytes = self.access.interpreter_read_memory(adr, 4) array_len, = struct.unpack(' None: self.access = access self.alloc_size = 4 # ptr def __call__(self, py_value: Any, adr: int | None = None) -> int: if adr is not None: raise NotImplementedError assert isinstance(py_value, bytes) adr = self.access.call('stdlib.types.__alloc_bytes__', len(py_value)) assert isinstance(adr, int) self.access.interpreter_write_memory(adr + 4, py_value) return adr class BytesExtractor: __slots__ = ('access', 'alloc_size', ) access: MemoryAccess alloc_size: int def __init__(self, access: MemoryAccess) -> None: self.access = access def __call__(self, wasm_value: Any) -> bytes: assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value # wasm_value must be a pointer # The first value at said pointer is the length of the array read_bytes = self.access.interpreter_read_memory(adr, 4) array_len, = struct.unpack(' None: self.access = access self.alloc_size = 4 # ptr self.sa_len = sa_len self.sub_allocator = sub_allocator def __call__(self, py_value: Any, adr: int | None = None) -> int: if adr is not None: raise NotImplementedError assert isinstance(py_value, tuple) assert len(py_value) == self.sa_len alloc_size = self.sa_len * self.sub_allocator.alloc_size adr = self.access.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) # Type int for idx, el_value in enumerate(py_value): offset = adr + idx * self.sub_allocator.alloc_size self.sub_allocator(el_value, offset) return adr class StaticArrayExtractor: __slots__ = ('access', 'alloc_size', 'sa_len', 'sub_extractor', ) access: MemoryAccess alloc_size: int sa_len: int sub_extractor: ExtractorFunc def __init__(self, access: MemoryAccess, sa_len: int, sub_extractor: ExtractorFunc) -> None: self.access = access self.sa_len = sa_len self.sub_extractor = sub_extractor def __call__(self, wasm_value: Any) -> Any: assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value read_bytes = self.access.interpreter_read_memory(adr, self.sa_len * self.sub_extractor.alloc_size) return tuple( self.sub_extractor(MemorySlice(read_bytes, idx * self.sub_extractor.alloc_size)) for idx in range(self.sa_len) ) class TupleAllocator: __slots__ = ('access', 'alloc_size', 'sub_allocator_list', ) access: MemoryAccess alloc_size: int sub_allocator_list: list[AllocatorFunc] def __init__(self, access: MemoryAccess, sub_allocator_list: list[AllocatorFunc]) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_allocator_list = sub_allocator_list def __call__(self, py_value: Any, adr: int | None = None) -> int: if adr is not None: raise NotImplementedError assert isinstance(py_value, tuple), py_value total_alloc_size = sum(x.alloc_size for x in self.sub_allocator_list) adr = self.access.call('stdlib.alloc.__alloc__', total_alloc_size) assert isinstance(adr, int) # Type int offset = 0 for sub_allocator, sub_value in zip(self.sub_allocator_list, py_value): sub_allocator(sub_value, offset) offset += sub_allocator.alloc_size return adr class TupleExtractor: __slots__ = ('access', 'alloc_size', 'sub_extractor_list', ) access: MemoryAccess alloc_size: int sub_extractor: ExtractorFunc def __init__(self, access: MemoryAccess, sub_extractor_list: list[ExtractorFunc]) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_extractor_list = sub_extractor_list def __call__(self, wasm_value: Any) -> tuple[Any]: if isinstance(wasm_value, MemorySlice): print('wasm_value', wasm_value) wasm_value = PtrExtractor()(wasm_value) assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value total_alloc_size = sum(x.alloc_size for x in self.sub_extractor_list) read_bytes = self.access.interpreter_read_memory(adr, total_alloc_size) result = [] offset = 0 for sub_extractor in self.sub_extractor_list: result.append(sub_extractor(MemorySlice(read_bytes, offset))) offset += sub_extractor.alloc_size return tuple(result)