phasm/tests/integration/memory.py
Johan B.W. de Vries fd66292bb1 Replaces type3 with type5
type5 is much more first principles based, so we get a lot
of weird quirks removed:

- FromLiteral no longer needs to understand AST
- Type unifications works more like Haskell
- Function types are just ordinary types, saving a lot of
  manual busywork

and more.
2025-08-09 19:58:53 +02:00

539 lines
18 KiB
Python

import struct
from typing import Any, Protocol
from phasm.build.base import BuildBase
from phasm.build.typerouter import BuildTypeRouter
from phasm.type5.record import Record
from phasm.type5.typeexpr import AtomicType, TypeExpr
from phasm.wasm import (
WasmTypeFloat32,
WasmTypeFloat64,
WasmTypeInt32,
WasmTypeInt64,
WasmTypeNone,
)
class MemoryAccess(Protocol):
def call(self, function: str, *args: Any) -> Any:
"""
Use for calling allocator methods inside the WASM environment.
"""
def interpreter_write_memory(self, offset: int, data: bytes) -> None:
"""
Writes bytes directly to WASM environment memory.
Addresses should be generated using allocators via call.
"""
def interpreter_read_memory(self, offset: int, length: int) -> bytes:
"""
Reads bytes directly from WASM environment memory.
"""
class MemorySlice:
__slots__ = ('memory', 'offset', )
def __init__(self, memory: bytes, offset: int) -> None:
self.memory = memory
self.offset = offset
def __call__(self, size: int) -> bytes:
return self.memory[self.offset:self.offset + size]
def __repr__(self) -> str:
return f'MemorySlice({self.memory!r}, {self.offset!r})'
class AllocatorFunc(Protocol):
alloc_size: int
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
"""
Takes a Python value and allocaties it in the given memory
Based on the phasm type.
When the parent already has allocated memory, the store_at_adr is set.
In that case, write your value to the given address, and return it.
"""
class Allocator(BuildTypeRouter[AllocatorFunc]):
__slots__ = ('access', )
access: MemoryAccess
def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None:
super().__init__(build)
self.access = access
def when_atomic(self, typ: AtomicType) -> AllocatorFunc:
type_info = self.build.type_info_map[typ.name]
if type_info.wasm_type is WasmTypeNone:
raise NotImplementedError
if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64:
if type_info.signed is None:
raise NotImplementedError
return IntAllocator(self.access, type_info.signed, type_info.alloc_size)
if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64:
return FloatAllocator(self.access, type_info.alloc_size)
raise NotImplementedError(typ)
def when_dynamic_array(self, da_arg: TypeExpr) -> AllocatorFunc:
if da_arg.name == 'u8':
return BytesAllocator(self.access)
return DynamicArrayAllocator(self.access, self(da_arg))
def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> AllocatorFunc:
return StaticArrayAllocator(self.access, sa_len, self(sa_typ))
def when_struct(self, typ: Record) -> AllocatorFunc:
return StructAllocator(self.access, [(x_nam, self(x_typ)) for x_nam, x_typ in typ.fields])
def when_tuple(self, tp_args: list[TypeExpr]) -> AllocatorFunc:
return TupleAllocator(self.access, list(map(self, tp_args)))
class ExtractorFunc(Protocol):
alloc_size: int
def __call__(self, wasm_value: Any) -> Any:
"""
Takes a WASM value and returns a Python value
Based on the phasm type
"""
class Extractor(BuildTypeRouter[ExtractorFunc]):
__slots__ = ('access', )
access: MemoryAccess
def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None:
super().__init__(build)
self.access = access
def when_atomic(self, typ: AtomicType) -> ExtractorFunc:
type_info = self.build.type_info_map[typ.name]
if type_info.wasm_type is WasmTypeNone:
raise NotImplementedError
if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64:
if type_info.signed is None:
return BoolExtractor()
return IntExtractor(type_info.signed, type_info.alloc_size)
if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64:
return FloatExtractor(type_info.alloc_size)
raise NotImplementedError(typ)
def when_dynamic_array(self, da_arg: TypeExpr) -> ExtractorFunc:
if da_arg.name == 'u8':
return BytesExtractor(self.access)
return DynamicArrayExtractor(self.access, self(da_arg))
def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> ExtractorFunc:
return StaticArrayExtractor(self.access, sa_len, self(sa_typ))
def when_struct(self, typ: Record) -> ExtractorFunc:
return StructExtractor(self.access, [(x_nam, self(x_typ)) for x_nam, x_typ in typ.fields])
def when_tuple(self, tp_args: list[TypeExpr]) -> ExtractorFunc:
return TupleExtractor(self.access, list(map(self, tp_args)))
class BoolExtractor:
__slots__ = ('alloc_size', )
def __init__(self) -> None:
self.alloc_size = 1
def __call__(self, wasm_value: Any) -> bool:
assert isinstance(wasm_value, int), wasm_value
return wasm_value != 0
class IntAllocator:
__slots__ = ('access', 'alloc_size', 'signed', )
def __init__(self, access: MemoryAccess, signed: bool, alloc_size: int) -> None:
self.access = access
self.signed = signed
self.alloc_size = alloc_size
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
if store_at_adr is None:
raise NotImplementedError
assert isinstance(py_value, int), py_value
data = py_value.to_bytes(self.alloc_size, 'little', signed=self.signed)
self.access.interpreter_write_memory(store_at_adr, data)
return store_at_adr
class IntExtractor:
__slots__ = ('alloc_size', 'signed', )
def __init__(self, signed: bool, alloc_size: int) -> None:
self.signed = signed
self.alloc_size = alloc_size
def __call__(self, wasm_value: Any) -> int:
if isinstance(wasm_value, MemorySlice):
# Memory stored int
data = wasm_value(self.alloc_size)
else:
# Int received from the wasm interface
# Work around the fact that phasm has unsigned integers but wasm does not
# Use little endian since that matches with what WASM uses internally
assert isinstance(wasm_value, int), wasm_value
data = wasm_value.to_bytes(8, 'little', signed=True)
data = data[:self.alloc_size]
return int.from_bytes(data, 'little', signed=self.signed)
class PtrAllocator(IntAllocator):
def __init__(self, access: MemoryAccess) -> None:
super().__init__(access, False, 4)
class PtrExtractor(IntExtractor):
def __init__(self) -> None:
super().__init__(False, 4)
FLOAT_LETTER_MAP = {
4: 'f',
8: 'd'
}
class FloatAllocator:
__slots__ = ('access', 'alloc_size', )
def __init__(self, access: MemoryAccess, alloc_size: int) -> None:
self.access = access
self.alloc_size = alloc_size
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
if store_at_adr is None:
raise NotImplementedError
assert isinstance(py_value, float), py_value
data = struct.pack(f'<{FLOAT_LETTER_MAP[self.alloc_size]}', py_value)
self.access.interpreter_write_memory(store_at_adr, data)
return store_at_adr
class FloatExtractor:
__slots__ = ('alloc_size', )
def __init__(self, alloc_size: int) -> None:
self.alloc_size = alloc_size
def __call__(self, wasm_value: Any) -> float:
if isinstance(wasm_value, MemorySlice):
# Memory stored float
data = wasm_value(self.alloc_size)
wasm_value, = struct.unpack(f'<{FLOAT_LETTER_MAP[self.alloc_size]}', data)
assert isinstance(wasm_value, float), wasm_value
return wasm_value
class DynamicArrayAllocator:
__slots__ = ('access', 'alloc_size', 'sub_allocator', )
access: MemoryAccess
alloc_size: int
sub_allocator: AllocatorFunc
def __init__(self, access: MemoryAccess, sub_allocator: AllocatorFunc) -> None:
self.access = access
self.alloc_size = 4 # ptr
self.sub_allocator = sub_allocator
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
if store_at_adr is not None:
raise NotImplementedError
assert isinstance(py_value, tuple), py_value
py_len = len(py_value)
alloc_size = 4 + py_len * self.sub_allocator.alloc_size
adr = self.access.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int) # Type int
PtrAllocator(self.access)(py_len, adr)
for idx, el_value in enumerate(py_value):
offset = adr + 4 + idx * self.sub_allocator.alloc_size
self.sub_allocator(el_value, offset)
return adr
class DynamicArrayExtractor:
__slots__ = ('access', 'alloc_size', 'sub_extractor', )
access: MemoryAccess
alloc_size: int
sub_extractor: ExtractorFunc
def __init__(self, access: MemoryAccess, sub_extractor: ExtractorFunc) -> None:
self.access = access
self.sub_extractor = sub_extractor
def __call__(self, wasm_value: Any) -> Any:
assert isinstance(wasm_value, int), wasm_value
adr = wasm_value
del wasm_value
# wasm_value must be a pointer
# The first value at said pointer is the length of the array
read_bytes = self.access.interpreter_read_memory(adr, 4)
array_len, = struct.unpack('<I', read_bytes)
read_bytes = self.access.interpreter_read_memory(adr + 4, array_len * self.sub_extractor.alloc_size)
return tuple(
self.sub_extractor(MemorySlice(read_bytes, idx * self.sub_extractor.alloc_size))
for idx in range(array_len)
)
class BytesAllocator:
__slots__ = ('access', 'alloc_size', )
access: MemoryAccess
def __init__(self, access: MemoryAccess) -> None:
self.access = access
self.alloc_size = 4 # ptr
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
assert isinstance(py_value, bytes), py_value
adr = self.access.call('stdlib.types.__alloc_bytes__', len(py_value))
assert isinstance(adr, int)
self.access.interpreter_write_memory(adr + 4, py_value)
if store_at_adr is not None:
PtrAllocator(self.access)(adr, store_at_adr)
return adr
class BytesExtractor:
__slots__ = ('access', 'alloc_size', )
access: MemoryAccess
alloc_size: int
def __init__(self, access: MemoryAccess) -> None:
self.access = access
self.alloc_size = 4 # ptr
def __call__(self, wasm_value: Any) -> bytes:
if isinstance(wasm_value, MemorySlice):
wasm_value = PtrExtractor()(wasm_value)
assert isinstance(wasm_value, int), wasm_value
adr = wasm_value
del wasm_value
# wasm_value must be a pointer
# The first value at said pointer is the length of the array
read_bytes = self.access.interpreter_read_memory(adr, 4)
array_len, = struct.unpack('<I', read_bytes)
adr += 4
return self.access.interpreter_read_memory(adr, array_len)
class StaticArrayAllocator:
__slots__ = ('access', 'alloc_size', 'sa_len', 'sub_allocator', )
access: MemoryAccess
alloc_size: int
sa_len: int
sub_allocator: AllocatorFunc
def __init__(self, access: MemoryAccess, sa_len: int, sub_allocator: AllocatorFunc) -> None:
self.access = access
self.alloc_size = 4 # ptr
self.sa_len = sa_len
self.sub_allocator = sub_allocator
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
assert isinstance(py_value, tuple), py_value
assert len(py_value) == self.sa_len
alloc_size = self.sa_len * self.sub_allocator.alloc_size
adr = self.access.call('stdlib.alloc.__alloc__', alloc_size)
assert isinstance(adr, int) # Type int
for idx, el_value in enumerate(py_value):
sub_adr = adr + idx * self.sub_allocator.alloc_size
self.sub_allocator(el_value, sub_adr)
if store_at_adr is not None:
PtrAllocator(self.access)(adr, store_at_adr)
return adr
class StaticArrayExtractor:
__slots__ = ('access', 'alloc_size', 'sa_len', 'sub_extractor', )
access: MemoryAccess
alloc_size: int
sa_len: int
sub_extractor: ExtractorFunc
def __init__(self, access: MemoryAccess, sa_len: int, sub_extractor: ExtractorFunc) -> None:
self.access = access
self.alloc_size = 4 # ptr
self.sa_len = sa_len
self.sub_extractor = sub_extractor
def __call__(self, wasm_value: Any) -> Any:
if isinstance(wasm_value, MemorySlice):
wasm_value = PtrExtractor()(wasm_value)
assert isinstance(wasm_value, int), wasm_value
adr = wasm_value
del wasm_value
read_bytes = self.access.interpreter_read_memory(adr, self.sa_len * self.sub_extractor.alloc_size)
return tuple(
self.sub_extractor(MemorySlice(read_bytes, idx * self.sub_extractor.alloc_size))
for idx in range(self.sa_len)
)
class TupleAllocator:
__slots__ = ('access', 'alloc_size', 'sub_allocator_list', )
access: MemoryAccess
alloc_size: int
sub_allocator_list: list[AllocatorFunc]
def __init__(self, access: MemoryAccess, sub_allocator_list: list[AllocatorFunc]) -> None:
self.access = access
self.alloc_size = 4 # ptr
self.sub_allocator_list = sub_allocator_list
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
assert isinstance(py_value, tuple), py_value
total_alloc_size = sum(x.alloc_size for x in self.sub_allocator_list)
adr = self.access.call('stdlib.alloc.__alloc__', total_alloc_size)
assert isinstance(adr, int) # Type int
sub_adr = adr
for sub_allocator, sub_value in zip(self.sub_allocator_list, py_value, strict=True):
sub_allocator(sub_value, sub_adr)
sub_adr += sub_allocator.alloc_size
if store_at_adr is not None:
PtrAllocator(self.access)(adr, store_at_adr)
return adr
class TupleExtractor:
__slots__ = ('access', 'alloc_size', 'sub_extractor_list', )
access: MemoryAccess
alloc_size: int
sub_extractor_list: list[ExtractorFunc]
def __init__(self, access: MemoryAccess, sub_extractor_list: list[ExtractorFunc]) -> None:
self.access = access
self.alloc_size = 4 # ptr
self.sub_extractor_list = sub_extractor_list
def __call__(self, wasm_value: Any) -> tuple[Any]:
if isinstance(wasm_value, MemorySlice):
wasm_value = PtrExtractor()(wasm_value)
assert isinstance(wasm_value, int), wasm_value
adr = wasm_value
del wasm_value
total_alloc_size = sum(x.alloc_size for x in self.sub_extractor_list)
read_bytes = self.access.interpreter_read_memory(adr, total_alloc_size)
result = []
offset = 0
for sub_extractor in self.sub_extractor_list:
result.append(sub_extractor(MemorySlice(read_bytes, offset)))
offset += sub_extractor.alloc_size
return tuple(result)
class StructAllocator:
__slots__ = ('access', 'alloc_size', 'sub_allocator_list', )
access: MemoryAccess
alloc_size: int
sub_allocator_list: list[tuple[str, AllocatorFunc]]
def __init__(self, access: MemoryAccess, sub_allocator_list: list[tuple[str, AllocatorFunc]]) -> None:
self.access = access
self.alloc_size = 4 # ptr
self.sub_allocator_list = sub_allocator_list
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
assert isinstance(py_value, dict), py_value
total_alloc_size = sum(x.alloc_size for _, x in self.sub_allocator_list)
adr = self.access.call('stdlib.alloc.__alloc__', total_alloc_size)
assert isinstance(adr, int) # Type int
sub_adr = adr
for field_name, sub_allocator in self.sub_allocator_list:
sub_value = py_value[field_name]
sub_allocator(sub_value, sub_adr)
sub_adr += sub_allocator.alloc_size
if store_at_adr is not None:
PtrAllocator(self.access)(adr, store_at_adr)
return adr
class StructExtractor:
__slots__ = ('access', 'alloc_size', 'sub_extractor_list', )
access: MemoryAccess
alloc_size: int
sub_extractor_list: list[tuple[str, ExtractorFunc]]
def __init__(self, access: MemoryAccess, sub_extractor_list: list[tuple[str, ExtractorFunc]]) -> None:
self.access = access
self.alloc_size = 4 # ptr
self.sub_extractor_list = sub_extractor_list
def __call__(self, wasm_value: Any) -> dict[str, Any]:
if isinstance(wasm_value, MemorySlice):
wasm_value = PtrExtractor()(wasm_value)
assert isinstance(wasm_value, int), wasm_value
adr = wasm_value
del wasm_value
total_alloc_size = sum(x.alloc_size for _, x in self.sub_extractor_list)
read_bytes = self.access.interpreter_read_memory(adr, total_alloc_size)
result = {}
offset = 0
for field_name, sub_extractor in self.sub_extractor_list:
result[field_name] = sub_extractor(MemorySlice(read_bytes, offset))
offset += sub_extractor.alloc_size
return result