type5 is much more first principles based, so we get a lot of weird quirks removed: - FromLiteral no longer needs to understand AST - Type unifications works more like Haskell - Function types are just ordinary types, saving a lot of manual busywork and more.
552 lines
18 KiB
Python
552 lines
18 KiB
Python
import struct
|
|
from typing import Any, Protocol
|
|
|
|
from phasm.build.base import BuildBase
|
|
from phasm.build.typerouter import BuildTypeRouter
|
|
from phasm.type5.record import Record
|
|
from phasm.type5.typeexpr import AtomicType, TypeExpr
|
|
from phasm.wasm import (
|
|
WasmTypeFloat32,
|
|
WasmTypeFloat64,
|
|
WasmTypeInt32,
|
|
WasmTypeInt64,
|
|
WasmTypeNone,
|
|
)
|
|
|
|
|
|
class MemoryAccess(Protocol):
|
|
def call(self, function: str, *args: Any) -> Any:
|
|
"""
|
|
Use for calling allocator methods inside the WASM environment.
|
|
"""
|
|
|
|
def interpreter_write_memory(self, offset: int, data: bytes) -> None:
|
|
"""
|
|
Writes bytes directly to WASM environment memory.
|
|
|
|
Addresses should be generated using allocators via call.
|
|
"""
|
|
|
|
def interpreter_read_memory(self, offset: int, length: int) -> bytes:
|
|
"""
|
|
Reads bytes directly from WASM environment memory.
|
|
"""
|
|
|
|
class MemorySlice:
|
|
__slots__ = ('memory', 'offset', )
|
|
|
|
def __init__(self, memory: bytes, offset: int) -> None:
|
|
self.memory = memory
|
|
self.offset = offset
|
|
|
|
def __call__(self, size: int) -> bytes:
|
|
return self.memory[self.offset:self.offset + size]
|
|
|
|
def __repr__(self) -> str:
|
|
return f'MemorySlice({self.memory!r}, {self.offset!r})'
|
|
|
|
class AllocatorFunc(Protocol):
|
|
alloc_size: int
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
"""
|
|
Takes a Python value and allocaties it in the given memory
|
|
Based on the phasm type.
|
|
|
|
When the parent already has allocated memory, the store_at_adr is set.
|
|
In that case, write your value to the given address, and return it.
|
|
"""
|
|
|
|
class Allocator(BuildTypeRouter[AllocatorFunc]):
|
|
__slots__ = ('access', )
|
|
|
|
access: MemoryAccess
|
|
|
|
def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None:
|
|
super().__init__(build)
|
|
self.access = access
|
|
|
|
def when_atomic(self, typ: AtomicType) -> AllocatorFunc:
|
|
type_info = self.build.type_info_map[typ.name]
|
|
|
|
if type_info.wasm_type is WasmTypeNone:
|
|
raise NotImplementedError
|
|
|
|
if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64:
|
|
if type_info.signed is None:
|
|
raise NotImplementedError
|
|
|
|
return IntAllocator(self.access, type_info.signed, type_info.alloc_size)
|
|
|
|
if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64:
|
|
return FloatAllocator(self.access, type_info.alloc_size)
|
|
|
|
raise NotImplementedError(typ)
|
|
|
|
def when_dynamic_array(self, da_arg: TypeExpr) -> AllocatorFunc:
|
|
if da_arg.name == 'u8':
|
|
return BytesAllocator(self.access)
|
|
|
|
return DynamicArrayAllocator(self.access, self(da_arg))
|
|
|
|
def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> AllocatorFunc:
|
|
return StaticArrayAllocator(self.access, sa_len, self(sa_typ))
|
|
|
|
def when_struct(self, typ: Record) -> AllocatorFunc:
|
|
return StructAllocator(self.access, [(x_nam, self(x_typ)) for x_nam, x_typ in typ.fields])
|
|
|
|
def when_tuple(self, tp_args: list[TypeExpr]) -> AllocatorFunc:
|
|
return TupleAllocator(self.access, list(map(self, tp_args)))
|
|
|
|
class ExtractorFunc(Protocol):
|
|
alloc_size: int
|
|
|
|
def __call__(self, wasm_value: Any) -> Any:
|
|
"""
|
|
Takes a WASM value and returns a Python value
|
|
Based on the phasm type
|
|
"""
|
|
|
|
class Extractor(BuildTypeRouter[ExtractorFunc]):
|
|
__slots__ = ('access', )
|
|
|
|
access: MemoryAccess
|
|
|
|
def __init__(self, build: BuildBase[Any], access: MemoryAccess) -> None:
|
|
super().__init__(build)
|
|
self.access = access
|
|
|
|
def when_atomic(self, typ: AtomicType) -> ExtractorFunc:
|
|
type_info = self.build.type_info_map[typ.name]
|
|
|
|
if type_info.wasm_type is WasmTypeNone:
|
|
return NoneExtractor()
|
|
|
|
if type_info.wasm_type is WasmTypeInt32 or type_info.wasm_type is WasmTypeInt64:
|
|
if type_info.signed is None:
|
|
return BoolExtractor()
|
|
|
|
return IntExtractor(type_info.signed, type_info.alloc_size)
|
|
|
|
if type_info.wasm_type is WasmTypeFloat32 or type_info.wasm_type is WasmTypeFloat64:
|
|
return FloatExtractor(type_info.alloc_size)
|
|
|
|
raise NotImplementedError(typ)
|
|
|
|
def when_dynamic_array(self, da_arg: TypeExpr) -> ExtractorFunc:
|
|
if da_arg.name == 'u8':
|
|
return BytesExtractor(self.access)
|
|
|
|
return DynamicArrayExtractor(self.access, self(da_arg))
|
|
|
|
def when_static_array(self, sa_len: int, sa_typ: TypeExpr) -> ExtractorFunc:
|
|
return StaticArrayExtractor(self.access, sa_len, self(sa_typ))
|
|
|
|
def when_struct(self, typ: Record) -> ExtractorFunc:
|
|
return StructExtractor(self.access, [(x_nam, self(x_typ)) for x_nam, x_typ in typ.fields])
|
|
|
|
def when_tuple(self, tp_args: list[TypeExpr]) -> ExtractorFunc:
|
|
return TupleExtractor(self.access, list(map(self, tp_args)))
|
|
|
|
class NoneExtractor:
|
|
__slots__ = ('alloc_size', )
|
|
|
|
alloc_size: int
|
|
|
|
def __init__(self) -> None:
|
|
# Do not set alloc_size, it should not be called
|
|
# this will generate an AttributeError
|
|
pass
|
|
|
|
def __call__(self, wasm_value: Any) -> None:
|
|
assert wasm_value is None
|
|
|
|
class BoolExtractor:
|
|
__slots__ = ('alloc_size', )
|
|
|
|
def __init__(self) -> None:
|
|
self.alloc_size = 1
|
|
|
|
def __call__(self, wasm_value: Any) -> bool:
|
|
assert isinstance(wasm_value, int), wasm_value
|
|
|
|
return wasm_value != 0
|
|
|
|
class IntAllocator:
|
|
__slots__ = ('access', 'alloc_size', 'signed', )
|
|
|
|
def __init__(self, access: MemoryAccess, signed: bool, alloc_size: int) -> None:
|
|
self.access = access
|
|
self.signed = signed
|
|
self.alloc_size = alloc_size
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
if store_at_adr is None:
|
|
raise NotImplementedError
|
|
|
|
assert isinstance(py_value, int), py_value
|
|
data = py_value.to_bytes(self.alloc_size, 'little', signed=self.signed)
|
|
self.access.interpreter_write_memory(store_at_adr, data)
|
|
|
|
return store_at_adr
|
|
|
|
class IntExtractor:
|
|
__slots__ = ('alloc_size', 'signed', )
|
|
|
|
def __init__(self, signed: bool, alloc_size: int) -> None:
|
|
self.signed = signed
|
|
self.alloc_size = alloc_size
|
|
|
|
def __call__(self, wasm_value: Any) -> int:
|
|
if isinstance(wasm_value, MemorySlice):
|
|
# Memory stored int
|
|
data = wasm_value(self.alloc_size)
|
|
else:
|
|
# Int received from the wasm interface
|
|
# Work around the fact that phasm has unsigned integers but wasm does not
|
|
# Use little endian since that matches with what WASM uses internally
|
|
assert isinstance(wasm_value, int), wasm_value
|
|
data = wasm_value.to_bytes(8, 'little', signed=True)
|
|
data = data[:self.alloc_size]
|
|
|
|
return int.from_bytes(data, 'little', signed=self.signed)
|
|
|
|
class PtrAllocator(IntAllocator):
|
|
def __init__(self, access: MemoryAccess) -> None:
|
|
super().__init__(access, False, 4)
|
|
|
|
class PtrExtractor(IntExtractor):
|
|
def __init__(self) -> None:
|
|
super().__init__(False, 4)
|
|
|
|
FLOAT_LETTER_MAP = {
|
|
4: 'f',
|
|
8: 'd'
|
|
}
|
|
|
|
class FloatAllocator:
|
|
__slots__ = ('access', 'alloc_size', )
|
|
|
|
def __init__(self, access: MemoryAccess, alloc_size: int) -> None:
|
|
self.access = access
|
|
self.alloc_size = alloc_size
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
if store_at_adr is None:
|
|
raise NotImplementedError
|
|
|
|
assert isinstance(py_value, float), py_value
|
|
data = struct.pack(f'<{FLOAT_LETTER_MAP[self.alloc_size]}', py_value)
|
|
self.access.interpreter_write_memory(store_at_adr, data)
|
|
|
|
return store_at_adr
|
|
|
|
class FloatExtractor:
|
|
__slots__ = ('alloc_size', )
|
|
|
|
def __init__(self, alloc_size: int) -> None:
|
|
self.alloc_size = alloc_size
|
|
|
|
def __call__(self, wasm_value: Any) -> float:
|
|
if isinstance(wasm_value, MemorySlice):
|
|
# Memory stored float
|
|
data = wasm_value(self.alloc_size)
|
|
wasm_value, = struct.unpack(f'<{FLOAT_LETTER_MAP[self.alloc_size]}', data)
|
|
|
|
assert isinstance(wasm_value, float), wasm_value
|
|
|
|
return wasm_value
|
|
|
|
class DynamicArrayAllocator:
|
|
__slots__ = ('access', 'alloc_size', 'sub_allocator', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sub_allocator: AllocatorFunc
|
|
|
|
def __init__(self, access: MemoryAccess, sub_allocator: AllocatorFunc) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
self.sub_allocator = sub_allocator
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
if store_at_adr is not None:
|
|
raise NotImplementedError
|
|
|
|
assert isinstance(py_value, tuple), py_value
|
|
|
|
py_len = len(py_value)
|
|
|
|
alloc_size = 4 + py_len * self.sub_allocator.alloc_size
|
|
|
|
adr = self.access.call('stdlib.alloc.__alloc__', alloc_size)
|
|
assert isinstance(adr, int) # Type int
|
|
PtrAllocator(self.access)(py_len, adr)
|
|
|
|
for idx, el_value in enumerate(py_value):
|
|
offset = adr + 4 + idx * self.sub_allocator.alloc_size
|
|
self.sub_allocator(el_value, offset)
|
|
|
|
return adr
|
|
|
|
class DynamicArrayExtractor:
|
|
__slots__ = ('access', 'alloc_size', 'sub_extractor', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sub_extractor: ExtractorFunc
|
|
|
|
def __init__(self, access: MemoryAccess, sub_extractor: ExtractorFunc) -> None:
|
|
self.access = access
|
|
self.sub_extractor = sub_extractor
|
|
|
|
def __call__(self, wasm_value: Any) -> Any:
|
|
assert isinstance(wasm_value, int), wasm_value
|
|
adr = wasm_value
|
|
del wasm_value
|
|
|
|
# wasm_value must be a pointer
|
|
# The first value at said pointer is the length of the array
|
|
|
|
read_bytes = self.access.interpreter_read_memory(adr, 4)
|
|
array_len, = struct.unpack('<I', read_bytes)
|
|
|
|
read_bytes = self.access.interpreter_read_memory(adr + 4, array_len * self.sub_extractor.alloc_size)
|
|
|
|
return tuple(
|
|
self.sub_extractor(MemorySlice(read_bytes, idx * self.sub_extractor.alloc_size))
|
|
for idx in range(array_len)
|
|
)
|
|
|
|
class BytesAllocator:
|
|
__slots__ = ('access', 'alloc_size', )
|
|
|
|
access: MemoryAccess
|
|
|
|
def __init__(self, access: MemoryAccess) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
assert isinstance(py_value, bytes), py_value
|
|
|
|
adr = self.access.call('stdlib.types.__alloc_bytes__', len(py_value))
|
|
assert isinstance(adr, int)
|
|
self.access.interpreter_write_memory(adr + 4, py_value)
|
|
|
|
if store_at_adr is not None:
|
|
PtrAllocator(self.access)(adr, store_at_adr)
|
|
|
|
return adr
|
|
|
|
class BytesExtractor:
|
|
__slots__ = ('access', 'alloc_size', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
|
|
def __init__(self, access: MemoryAccess) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
|
|
def __call__(self, wasm_value: Any) -> bytes:
|
|
if isinstance(wasm_value, MemorySlice):
|
|
wasm_value = PtrExtractor()(wasm_value)
|
|
|
|
assert isinstance(wasm_value, int), wasm_value
|
|
adr = wasm_value
|
|
del wasm_value
|
|
|
|
# wasm_value must be a pointer
|
|
# The first value at said pointer is the length of the array
|
|
|
|
read_bytes = self.access.interpreter_read_memory(adr, 4)
|
|
array_len, = struct.unpack('<I', read_bytes)
|
|
adr += 4
|
|
|
|
return self.access.interpreter_read_memory(adr, array_len)
|
|
|
|
class StaticArrayAllocator:
|
|
__slots__ = ('access', 'alloc_size', 'sa_len', 'sub_allocator', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sa_len: int
|
|
sub_allocator: AllocatorFunc
|
|
|
|
def __init__(self, access: MemoryAccess, sa_len: int, sub_allocator: AllocatorFunc) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
self.sa_len = sa_len
|
|
self.sub_allocator = sub_allocator
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
assert isinstance(py_value, tuple), py_value
|
|
assert len(py_value) == self.sa_len
|
|
|
|
alloc_size = self.sa_len * self.sub_allocator.alloc_size
|
|
|
|
adr = self.access.call('stdlib.alloc.__alloc__', alloc_size)
|
|
assert isinstance(adr, int) # Type int
|
|
|
|
for idx, el_value in enumerate(py_value):
|
|
sub_adr = adr + idx * self.sub_allocator.alloc_size
|
|
self.sub_allocator(el_value, sub_adr)
|
|
|
|
if store_at_adr is not None:
|
|
PtrAllocator(self.access)(adr, store_at_adr)
|
|
|
|
return adr
|
|
|
|
class StaticArrayExtractor:
|
|
__slots__ = ('access', 'alloc_size', 'sa_len', 'sub_extractor', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sa_len: int
|
|
sub_extractor: ExtractorFunc
|
|
|
|
def __init__(self, access: MemoryAccess, sa_len: int, sub_extractor: ExtractorFunc) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
self.sa_len = sa_len
|
|
self.sub_extractor = sub_extractor
|
|
|
|
def __call__(self, wasm_value: Any) -> Any:
|
|
if isinstance(wasm_value, MemorySlice):
|
|
wasm_value = PtrExtractor()(wasm_value)
|
|
|
|
assert isinstance(wasm_value, int), wasm_value
|
|
adr = wasm_value
|
|
del wasm_value
|
|
|
|
read_bytes = self.access.interpreter_read_memory(adr, self.sa_len * self.sub_extractor.alloc_size)
|
|
|
|
return tuple(
|
|
self.sub_extractor(MemorySlice(read_bytes, idx * self.sub_extractor.alloc_size))
|
|
for idx in range(self.sa_len)
|
|
)
|
|
|
|
class TupleAllocator:
|
|
__slots__ = ('access', 'alloc_size', 'sub_allocator_list', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sub_allocator_list: list[AllocatorFunc]
|
|
|
|
def __init__(self, access: MemoryAccess, sub_allocator_list: list[AllocatorFunc]) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
self.sub_allocator_list = sub_allocator_list
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
assert isinstance(py_value, tuple), py_value
|
|
|
|
total_alloc_size = sum(x.alloc_size for x in self.sub_allocator_list)
|
|
|
|
adr = self.access.call('stdlib.alloc.__alloc__', total_alloc_size)
|
|
assert isinstance(adr, int) # Type int
|
|
|
|
sub_adr = adr
|
|
for sub_allocator, sub_value in zip(self.sub_allocator_list, py_value, strict=True):
|
|
sub_allocator(sub_value, sub_adr)
|
|
sub_adr += sub_allocator.alloc_size
|
|
|
|
if store_at_adr is not None:
|
|
PtrAllocator(self.access)(adr, store_at_adr)
|
|
|
|
return adr
|
|
|
|
class TupleExtractor:
|
|
__slots__ = ('access', 'alloc_size', 'sub_extractor_list', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sub_extractor_list: list[ExtractorFunc]
|
|
|
|
def __init__(self, access: MemoryAccess, sub_extractor_list: list[ExtractorFunc]) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
self.sub_extractor_list = sub_extractor_list
|
|
|
|
def __call__(self, wasm_value: Any) -> tuple[Any]:
|
|
if isinstance(wasm_value, MemorySlice):
|
|
wasm_value = PtrExtractor()(wasm_value)
|
|
|
|
assert isinstance(wasm_value, int), wasm_value
|
|
adr = wasm_value
|
|
del wasm_value
|
|
|
|
total_alloc_size = sum(x.alloc_size for x in self.sub_extractor_list)
|
|
|
|
read_bytes = self.access.interpreter_read_memory(adr, total_alloc_size)
|
|
|
|
result = []
|
|
offset = 0
|
|
for sub_extractor in self.sub_extractor_list:
|
|
result.append(sub_extractor(MemorySlice(read_bytes, offset)))
|
|
offset += sub_extractor.alloc_size
|
|
return tuple(result)
|
|
|
|
class StructAllocator:
|
|
__slots__ = ('access', 'alloc_size', 'sub_allocator_list', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sub_allocator_list: list[tuple[str, AllocatorFunc]]
|
|
|
|
def __init__(self, access: MemoryAccess, sub_allocator_list: list[tuple[str, AllocatorFunc]]) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
self.sub_allocator_list = sub_allocator_list
|
|
|
|
def __call__(self, py_value: Any, store_at_adr: int | None = None) -> int:
|
|
assert isinstance(py_value, dict), py_value
|
|
|
|
total_alloc_size = sum(x.alloc_size for _, x in self.sub_allocator_list)
|
|
|
|
adr = self.access.call('stdlib.alloc.__alloc__', total_alloc_size)
|
|
assert isinstance(adr, int) # Type int
|
|
|
|
sub_adr = adr
|
|
for field_name, sub_allocator in self.sub_allocator_list:
|
|
sub_value = py_value[field_name]
|
|
sub_allocator(sub_value, sub_adr)
|
|
sub_adr += sub_allocator.alloc_size
|
|
|
|
if store_at_adr is not None:
|
|
PtrAllocator(self.access)(adr, store_at_adr)
|
|
|
|
return adr
|
|
|
|
class StructExtractor:
|
|
__slots__ = ('access', 'alloc_size', 'sub_extractor_list', )
|
|
|
|
access: MemoryAccess
|
|
alloc_size: int
|
|
sub_extractor_list: list[tuple[str, ExtractorFunc]]
|
|
|
|
def __init__(self, access: MemoryAccess, sub_extractor_list: list[tuple[str, ExtractorFunc]]) -> None:
|
|
self.access = access
|
|
self.alloc_size = 4 # ptr
|
|
self.sub_extractor_list = sub_extractor_list
|
|
|
|
def __call__(self, wasm_value: Any) -> dict[str, Any]:
|
|
if isinstance(wasm_value, MemorySlice):
|
|
wasm_value = PtrExtractor()(wasm_value)
|
|
|
|
assert isinstance(wasm_value, int), wasm_value
|
|
adr = wasm_value
|
|
del wasm_value
|
|
|
|
total_alloc_size = sum(x.alloc_size for _, x in self.sub_extractor_list)
|
|
|
|
read_bytes = self.access.interpreter_read_memory(adr, total_alloc_size)
|
|
|
|
result = {}
|
|
offset = 0
|
|
for field_name, sub_extractor in self.sub_extractor_list:
|
|
result[field_name] = sub_extractor(MemorySlice(read_bytes, offset))
|
|
offset += sub_extractor.alloc_size
|
|
return result
|