import struct from typing import Any, Protocol from phasm.build.base import BuildBase from phasm.build.typerouter import BuildTypeRouter from phasm.type5.record import Record from phasm.type5.typeexpr import AtomicType, TypeExpr from phasm.wasm import ( WasmTypeFloat32, WasmTypeFloat64, WasmTypeInt32, WasmTypeInt64, WasmTypeNone, ) class MemoryAccess(Protocol): def call(self, function: str, *args: Any) -> Any: """ Use for calling allocator methods inside the WASM environment. """ def interpreter_write_memory(self, offset: int, data: bytes) -> None: """ Writes bytes directly to WASM environment memory. Addresses should be generated using allocators via call. """ def interpreter_read_memory(self, offset: int, length: int) -> bytes: """ Reads bytes directly from WASM environment memory. """ class MemorySlice: __slots__ = ('memory', 'offset', ) def __init__(self, memory: bytes, offset: int) -> None: self.memory = memory self.offset = offset def __call__(self, size: int) -> bytes: return self.memory[self.offset:self.offset + size] def __repr__(self) -> str: return f'MemorySlice({self.memory!r}, {self.offset!r})' class AllocatorFunc(Protocol): alloc_size: int def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: """ Takes a Python value and allocaties it in the given memory Based on the phasm type. When the parent already has allocated memory, the store_at_adr is set. In that case, write your value to the given address, and return it. """ class Allocator(BuildTypeRouter[AllocatorFunc]): __slots__ = ('access', ) access: MemoryAccess def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None: super().__init__(build) self.access = access def when_atomic(self, typ: AtomicType) -> AllocatorFunc: type_info = self.build.type_info_map[typ.name] if type_info.wasm_type is WasmTypeNone: raise NotImplementedError if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64: if type_info.signed is None: raise NotImplementedError return IntAllocator(self.access, type_info.signed, type_info.alloc_size) if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64: return FloatAllocator(self.access, type_info.alloc_size) raise NotImplementedError(typ) def when_dynamic_array(self, da_arg: TypeExpr) -> AllocatorFunc: if da_arg.name == 'u8': return BytesAllocator(self.access) return DynamicArrayAllocator(self.access, self(da_arg)) def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> AllocatorFunc: return StaticArrayAllocator(self.access, sa_len, self(sa_typ)) def when_struct(self, typ: Record) -> AllocatorFunc: return StructAllocator(self.access, [(x_nam, self(x_typ)) for x_nam, x_typ in typ.fields]) def when_tuple(self, tp_args: list[TypeExpr]) -> AllocatorFunc: return TupleAllocator(self.access, list(map(self, tp_args))) class ExtractorFunc(Protocol): alloc_size: int def __call__(self, wasm_value: Any) -> Any: """ Takes a WASM value and returns a Python value Based on the phasm type """ class Extractor(BuildTypeRouter[ExtractorFunc]): __slots__ = ('access', ) access: MemoryAccess def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None: super().__init__(build) self.access = access def when_atomic(self, typ: AtomicType) -> ExtractorFunc: type_info = self.build.type_info_map[typ.name] if type_info.wasm_type is WasmTypeNone: return NoneExtractor() if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64: if type_info.signed is None: return BoolExtractor() return IntExtractor(type_info.signed, type_info.alloc_size) if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64: return FloatExtractor(type_info.alloc_size) raise NotImplementedError(typ) def when_dynamic_array(self, da_arg: TypeExpr) -> ExtractorFunc: if da_arg.name == 'u8': return BytesExtractor(self.access) return DynamicArrayExtractor(self.access, self(da_arg)) def when_io(self, io_arg: TypeExpr) -> ExtractorFunc: # IO is a type only annotation, it is not related to allocation return self(io_arg) def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> ExtractorFunc: return StaticArrayExtractor(self.access, sa_len, self(sa_typ)) def when_struct(self, typ: Record) -> ExtractorFunc: return StructExtractor(self.access, [(x_nam, self(x_typ)) for x_nam, x_typ in typ.fields]) def when_tuple(self, tp_args: list[TypeExpr]) -> ExtractorFunc: return TupleExtractor(self.access, list(map(self, tp_args))) class NoneExtractor: __slots__ = ('alloc_size', ) alloc_size: int def __init__(self) -> None: # Do not set alloc_size, it should not be called # this will generate an AttributeError pass def __call__(self, wasm_value: Any) -> None: assert wasm_value is None class BoolExtractor: __slots__ = ('alloc_size', ) def __init__(self) -> None: self.alloc_size = 1 def __call__(self, wasm_value: Any) -> bool: assert isinstance(wasm_value, int), wasm_value return wasm_value != 0 class IntAllocator: __slots__ = ('access', 'alloc_size', 'signed', ) def __init__(self, access: MemoryAccess, signed: bool, alloc_size: int) -> None: self.access = access self.signed = signed self.alloc_size = alloc_size def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: if store_at_adr is None: raise NotImplementedError assert isinstance(py_value, int), py_value data = py_value.to_bytes(self.alloc_size, 'little', signed=self.signed) self.access.interpreter_write_memory(store_at_adr, data) return store_at_adr class IntExtractor: __slots__ = ('alloc_size', 'signed', ) def __init__(self, signed: bool, alloc_size: int) -> None: self.signed = signed self.alloc_size = alloc_size def __call__(self, wasm_value: Any) -> int: if isinstance(wasm_value, MemorySlice): # Memory stored int data = wasm_value(self.alloc_size) else: # Int received from the wasm interface # Work around the fact that phasm has unsigned integers but wasm does not # Use little endian since that matches with what WASM uses internally assert isinstance(wasm_value, int), wasm_value data = wasm_value.to_bytes(8, 'little', signed=True) data = data[:self.alloc_size] return int.from_bytes(data, 'little', signed=self.signed) class PtrAllocator(IntAllocator): def __init__(self, access: MemoryAccess) -> None: super().__init__(access, False, 4) class PtrExtractor(IntExtractor): def __init__(self) -> None: super().__init__(False, 4) FLOAT_LETTER_MAP = { 4: 'f', 8: 'd' } class FloatAllocator: __slots__ = ('access', 'alloc_size', ) def __init__(self, access: MemoryAccess, alloc_size: int) -> None: self.access = access self.alloc_size = alloc_size def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: if store_at_adr is None: raise NotImplementedError assert isinstance(py_value, (float, int, )), py_value data = struct.pack(f'<{FLOAT_LETTER_MAP[self.alloc_size]}', py_value) self.access.interpreter_write_memory(store_at_adr, data) return store_at_adr class FloatExtractor: __slots__ = ('alloc_size', ) def __init__(self, alloc_size: int) -> None: self.alloc_size = alloc_size def __call__(self, wasm_value: Any) -> float: if isinstance(wasm_value, MemorySlice): # Memory stored float data = wasm_value(self.alloc_size) wasm_value, = struct.unpack(f'<{FLOAT_LETTER_MAP[self.alloc_size]}', data) assert isinstance(wasm_value, float), wasm_value return wasm_value class DynamicArrayAllocator: __slots__ = ('access', 'alloc_size', 'sub_allocator', ) access: MemoryAccess alloc_size: int sub_allocator: AllocatorFunc def __init__(self, access: MemoryAccess, sub_allocator: AllocatorFunc) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_allocator = sub_allocator def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: if store_at_adr is not None: raise NotImplementedError assert isinstance(py_value, tuple), py_value py_len = len(py_value) alloc_size = 4 + py_len * self.sub_allocator.alloc_size adr = self.access.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) # Type int PtrAllocator(self.access)(py_len, adr) for idx, el_value in enumerate(py_value): offset = adr + 4 + idx * self.sub_allocator.alloc_size self.sub_allocator(el_value, offset) return adr class DynamicArrayExtractor: __slots__ = ('access', 'alloc_size', 'sub_extractor', ) access: MemoryAccess alloc_size: int sub_extractor: ExtractorFunc def __init__(self, access: MemoryAccess, sub_extractor: ExtractorFunc) -> None: self.access = access self.sub_extractor = sub_extractor def __call__(self, wasm_value: Any) -> Any: assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value # wasm_value must be a pointer # The first value at said pointer is the length of the array read_bytes = self.access.interpreter_read_memory(adr, 4) array_len, = struct.unpack(' None: self.access = access self.alloc_size = 4 # ptr def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: assert isinstance(py_value, bytes), py_value adr = self.access.call('stdlib.types.__alloc_bytes__', len(py_value)) assert isinstance(adr, int) self.access.interpreter_write_memory(adr + 4, py_value) if store_at_adr is not None: PtrAllocator(self.access)(adr, store_at_adr) return adr class BytesExtractor: __slots__ = ('access', 'alloc_size', ) access: MemoryAccess alloc_size: int def __init__(self, access: MemoryAccess) -> None: self.access = access self.alloc_size = 4 # ptr def __call__(self, wasm_value: Any) -> bytes: if isinstance(wasm_value, MemorySlice): wasm_value = PtrExtractor()(wasm_value) assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value # wasm_value must be a pointer # The first value at said pointer is the length of the array read_bytes = self.access.interpreter_read_memory(adr, 4) array_len, = struct.unpack(' None: self.access = access self.alloc_size = 4 # ptr self.sa_len = sa_len self.sub_allocator = sub_allocator def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: assert isinstance(py_value, tuple), py_value assert len(py_value) == self.sa_len alloc_size = self.sa_len * self.sub_allocator.alloc_size adr = self.access.call('stdlib.alloc.__alloc__', alloc_size) assert isinstance(adr, int) # Type int for idx, el_value in enumerate(py_value): sub_adr = adr + idx * self.sub_allocator.alloc_size self.sub_allocator(el_value, sub_adr) if store_at_adr is not None: PtrAllocator(self.access)(adr, store_at_adr) return adr class StaticArrayExtractor: __slots__ = ('access', 'alloc_size', 'sa_len', 'sub_extractor', ) access: MemoryAccess alloc_size: int sa_len: int sub_extractor: ExtractorFunc def __init__(self, access: MemoryAccess, sa_len: int, sub_extractor: ExtractorFunc) -> None: self.access = access self.alloc_size = 4 # ptr self.sa_len = sa_len self.sub_extractor = sub_extractor def __call__(self, wasm_value: Any) -> Any: if isinstance(wasm_value, MemorySlice): wasm_value = PtrExtractor()(wasm_value) assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value read_bytes = self.access.interpreter_read_memory(adr, self.sa_len * self.sub_extractor.alloc_size) return tuple( self.sub_extractor(MemorySlice(read_bytes, idx * self.sub_extractor.alloc_size)) for idx in range(self.sa_len) ) class TupleAllocator: __slots__ = ('access', 'alloc_size', 'sub_allocator_list', ) access: MemoryAccess alloc_size: int sub_allocator_list: list[AllocatorFunc] def __init__(self, access: MemoryAccess, sub_allocator_list: list[AllocatorFunc]) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_allocator_list = sub_allocator_list def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: assert isinstance(py_value, tuple), py_value total_alloc_size = sum(x.alloc_size for x in self.sub_allocator_list) adr = self.access.call('stdlib.alloc.__alloc__', total_alloc_size) assert isinstance(adr, int) # Type int sub_adr = adr for sub_allocator, sub_value in zip(self.sub_allocator_list, py_value, strict=True): sub_allocator(sub_value, sub_adr) sub_adr += sub_allocator.alloc_size if store_at_adr is not None: PtrAllocator(self.access)(adr, store_at_adr) return adr class TupleExtractor: __slots__ = ('access', 'alloc_size', 'sub_extractor_list', ) access: MemoryAccess alloc_size: int sub_extractor_list: list[ExtractorFunc] def __init__(self, access: MemoryAccess, sub_extractor_list: list[ExtractorFunc]) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_extractor_list = sub_extractor_list def __call__(self, wasm_value: Any) -> tuple[Any]: if isinstance(wasm_value, MemorySlice): wasm_value = PtrExtractor()(wasm_value) assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value total_alloc_size = sum(x.alloc_size for x in self.sub_extractor_list) read_bytes = self.access.interpreter_read_memory(adr, total_alloc_size) result = [] offset = 0 for sub_extractor in self.sub_extractor_list: result.append(sub_extractor(MemorySlice(read_bytes, offset))) offset += sub_extractor.alloc_size return tuple(result) class StructAllocator: __slots__ = ('access', 'alloc_size', 'sub_allocator_list', ) access: MemoryAccess alloc_size: int sub_allocator_list: list[tuple[str, AllocatorFunc]] def __init__(self, access: MemoryAccess, sub_allocator_list: list[tuple[str, AllocatorFunc]]) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_allocator_list = sub_allocator_list def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int: assert isinstance(py_value, dict), py_value total_alloc_size = sum(x.alloc_size for _, x in self.sub_allocator_list) adr = self.access.call('stdlib.alloc.__alloc__', total_alloc_size) assert isinstance(adr, int) # Type int sub_adr = adr for field_name, sub_allocator in self.sub_allocator_list: sub_value = py_value[field_name] sub_allocator(sub_value, sub_adr) sub_adr += sub_allocator.alloc_size if store_at_adr is not None: PtrAllocator(self.access)(adr, store_at_adr) return adr class StructExtractor: __slots__ = ('access', 'alloc_size', 'sub_extractor_list', ) access: MemoryAccess alloc_size: int sub_extractor_list: list[tuple[str, ExtractorFunc]] def __init__(self, access: MemoryAccess, sub_extractor_list: list[tuple[str, ExtractorFunc]]) -> None: self.access = access self.alloc_size = 4 # ptr self.sub_extractor_list = sub_extractor_list def __call__(self, wasm_value: Any) -> dict[str, Any]: if isinstance(wasm_value, MemorySlice): wasm_value = PtrExtractor()(wasm_value) assert isinstance(wasm_value, int), wasm_value adr = wasm_value del wasm_value total_alloc_size = sum(x.alloc_size for _, x in self.sub_extractor_list) read_bytes = self.access.interpreter_read_memory(adr, total_alloc_size) result = {} offset = 0 for field_name, sub_extractor in self.sub_extractor_list: result[field_name] = sub_extractor(MemorySlice(read_bytes, offset)) offset += sub_extractor.alloc_size return result