It lives \o/

This commit is contained in:
Johan B.W. de Vries 2023-04-11 11:35:08 +02:00
parent 1ff21c7f29
commit 13dc426fc5
9 changed files with 264 additions and 44 deletions

View File

@ -6,5 +6,9 @@ class PhashPlatformRuntimeError(PhasmPlatformError):
pass
class PhashPlatformNonIntMainReturnError(PhashPlatformRuntimeError):
class PhashPlatformServiceNotFound(PhashPlatformRuntimeError):
pass
class PhashPlatformServiceMethodNotFound(PhashPlatformRuntimeError):
pass

View File

@ -21,6 +21,9 @@ class MethodArgument:
return self.name == other.name and self.value_type == other.value_type
def __repr__(self) -> str:
return f'MethodArgument({repr(self.name)}, {repr(self.value_type)})'
class Method:
__slots__ = ('name', 'args', 'return_type', )
@ -34,6 +37,9 @@ class Method:
self.args = args
self.return_type = return_type
def __repr__(self) -> str:
return f'Method({repr(self.name)}, {repr(self.args)}, {repr(self.return_type)})'
class MethodCallError:
pass
@ -62,3 +68,6 @@ class MethodCall:
self.args = args
self.on_success = on_success
self.on_error = on_error
def __repr__(self) -> str:
return f'MethodCall({repr(self.method)}, {repr(self.args)}, {repr(self.on_success)}, {repr(self.on_error)})'

View File

@ -1,8 +1,7 @@
class BaseRouter:
def post_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None:
from .method import MethodCall
from .service import Service
class MethodCallRouterInterface:
def send_call(self, service: Service, call: MethodCall) -> None:
raise NotImplementedError
class StdOutRouter(BaseRouter):
def post_message(self, namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None:
print(f'ns={namespace.decode()},t={topic.decode()},k={kind.decode()} {body.decode()}')

View File

@ -0,0 +1,25 @@
from typing import Dict, Optional, List
from .method import Method
class Service:
__slots__ = ('name', 'methods', )
name: str
methods: Dict[str, Method]
def __init__(self, name: str, methods: List[Method]) -> None:
self.name = name
self.methods = {
x.name: x
for x in methods
}
def __repr__(self) -> str:
return f'Service({repr(self.name)}, {repr(list(self.methods.values()))})'
class ServiceDiscoveryInterface:
def find_service(self, name: str) -> Optional[Service]:
raise NotImplementedError

View File

@ -1,6 +1,6 @@
from typing import Any, Union
from .valuetype import ValueType
from .valuetype import ValueType, none
ValueData = Union[None, bytes]
@ -19,4 +19,7 @@ class Value:
return self.value_type is other.value_type and self.data == other.data
def __repr__(self) -> str:
return f'Value(valuetype.{self.value_type.name}, {repr(self.data)})'
return f'Value({repr(self.value_type)}, {repr(self.data)})'
NoneValue = Value(none, None)

View File

@ -15,6 +15,9 @@ class ValueType:
return self is other
def __repr__(self) -> str:
return f'valuetype.{self.name}'
bytes = ValueType('bytes')

View File

@ -1,3 +1,5 @@
from typing import Dict, List, Optional, Tuple
import sys
import threading
import time
@ -5,8 +7,10 @@ from queue import Empty, Queue
from phasmplatform.common import valuetype
from phasmplatform.common.config import from_toml
from phasmplatform.common.method import Method, MethodCall
from phasmplatform.common.router import StdOutRouter
from phasmplatform.common.method import Method, MethodArgument, MethodCall
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import Service, ServiceDiscoveryInterface
from phasmplatform.common.value import NoneValue
from .runners.base import RunnerInterface
from .runners.wasmtime import WasmTimeRunner
@ -19,22 +23,92 @@ def runner_thread(runner: RunnerInterface, queue: Queue[MethodCall]) -> None:
except Empty:
break
print('rt call', runner, queue, call)
runner.do_call(call)
def make_prelude() -> Service:
methods: List[Method] = []
methods.append(Method('log_bytes', [
MethodArgument('data', valuetype.bytes)
], valuetype.none))
return Service('prelude', methods)
class LocalhostRunner(RunnerInterface):
def do_call(self, call: MethodCall) -> None:
if call.method.name == 'on_module_loaded':
print('LocalhostRunner loaded')
call.on_success(NoneValue)
return
if call.method.name == 'log_bytes':
print('LOG-BYTES:', repr(call.args[0].data))
call.on_success(NoneValue)
return
raise NotImplementedError(call)
class LocalhostServiceDiscovery(ServiceDiscoveryInterface):
services: Dict[str, Tuple[Service, Queue[MethodCall]]]
def __init__(self) -> None:
self.services = {}
def register_service(self, service: Service, queue: Queue[MethodCall]) -> None:
self.services[service.name] = (service, queue, )
def find_service(self, name: str) -> Optional[Service]:
parts = self.services.get(name, None)
if parts is None:
return None
return parts[0]
class LocalhostMethodCallRouter(MethodCallRouterInterface):
def __init__(self, service_discovery: LocalhostServiceDiscovery) -> None:
self.service_discovery = service_discovery
def send_call(self, service: Service, call: MethodCall) -> None:
assert service.name in self.service_discovery.services
queue = self.service_discovery.services[service.name][1]
print('send_call', service, call, queue)
queue.put(call)
def main() -> int:
with open('config.toml', 'rb') as fil:
config = from_toml(fil)
del config
stdout_router = StdOutRouter()
localhost_queue: Queue[MethodCall] = Queue()
echo_client_queue: Queue[MethodCall] = Queue()
echo_server_queue: Queue[MethodCall] = Queue()
with open('/home/johan/projects/idea/phasm/examples/echoclient.wasm', 'rb') as fil:
echo_client = WasmTimeRunner(stdout_router, fil.read())
service_discovery = LocalhostServiceDiscovery()
method_call_router = LocalhostMethodCallRouter(service_discovery)
localhost = LocalhostRunner()
service_discovery.register_service(make_prelude(), localhost_queue)
with open('/home/johan/projects/idea/phasm/examples/echoserver.wasm', 'rb') as fil:
echo_server = WasmTimeRunner(stdout_router, fil.read())
echo_server = WasmTimeRunner(service_discovery, method_call_router, fil.read())
service_discovery.register_service(Service('echoserver', [
Method('echo', [
MethodArgument('msg', valuetype.bytes)
], valuetype.bytes)
]), echo_server_queue)
with open('/home/johan/projects/idea/phasm/examples/echoclient.wasm', 'rb') as fil:
echo_client = WasmTimeRunner(service_discovery, method_call_router, fil.read())
# service_discovery.register_service(echo_client, echo_client_queue)
# service_discovery.register_service(echo_server, echo_server_queue)
on_module_loaded = MethodCall(
Method('on_module_loaded', [], valuetype.none),
@ -43,15 +117,15 @@ def main() -> int:
lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it
)
echo_client_queue: Queue[MethodCall] = Queue()
localhost_queue.put(on_module_loaded)
echo_client_queue.put(on_module_loaded)
echo_server_queue: Queue[MethodCall] = Queue()
echo_server_queue.put(on_module_loaded)
localhost_thread = threading.Thread(target=runner_thread, args=(localhost, localhost_queue))
echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue))
echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue))
localhost_thread.start()
echo_client_thread.start()
echo_server_thread.start()

View File

@ -1,26 +1,37 @@
from typing import TextIO, Union
from phasmplatform.common import valuetype
from phasmplatform.common.router import BaseRouter
from phasmplatform.common.method import MethodCall
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import ServiceDiscoveryInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
WasmValue = Union[None, int, float]
class RunnerInterface:
__slots__ = ('router', )
def do_call(self, call: MethodCall) -> None:
"""
Executes the call on the current container
This method is responsible for calling the on_success or on_error method.
"""
raise NotImplementedError
class BaseRunner(RunnerInterface):
__slots__ = ('router', )
__slots__ = ('service_discovery', 'method_call_router', )
router: BaseRouter
service_discovery: ServiceDiscoveryInterface
method_call_router: MethodCallRouterInterface
def __init__(self, router: BaseRouter) -> None:
self.router = router
def __init__(self, service_discovery: ServiceDiscoveryInterface, method_call_router: MethodCallRouterInterface) -> None:
self.service_discovery = service_discovery
self.method_call_router = method_call_router
def alloc_bytes(self, data: bytes) -> int:
"""
@ -34,14 +45,18 @@ class BaseRunner(RunnerInterface):
"""
raise NotImplementedError
def value_to_wasm(self, val: Value) -> Union[None, int, float]:
def value_to_wasm(self, val: Value) -> WasmValue:
if val.value_type is valuetype.none:
assert val.data is None # type hint
return None
if val.value_type is valuetype.bytes:
assert isinstance(val.data, bytes) # type hint
return self.alloc_bytes(val.data)
raise NotImplementedError(val)
def value_from_wasm(self, value_type: ValueType, val: Union[None, int, float]) -> Value:
def value_from_wasm(self, value_type: ValueType, val: WasmValue) -> Value:
if value_type is valuetype.none:
assert val is None # type hint
return Value(value_type, None)

View File

@ -1,40 +1,66 @@
from typing import List
from typing import Any, List
import ctypes
import functools
import struct
from queue import Empty, Queue
import wasmtime
from phasmplatform.common.method import MethodCall, MethodNotFoundError
from phasmplatform.common.router import BaseRouter
from .base import BaseRunner
from phasmplatform.common import valuetype
from phasmplatform.common.exceptions import PhashPlatformServiceNotFound, PhashPlatformServiceMethodNotFound
from phasmplatform.common.method import Method, MethodCall, MethodCallError, MethodNotFoundError
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import Service, ServiceDiscoveryInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
from .base import BaseRunner, WasmValue
class WasmTimeRunner(BaseRunner):
__slots__ = ('store', 'module', 'instance', 'exports')
def __init__(self, router: BaseRouter, wasm_bin: bytes) -> None:
super().__init__(router)
def __init__(
self,
service_discovery: ServiceDiscoveryInterface,
method_call_router: MethodCallRouterInterface,
wasm_bin: bytes,
) -> None:
super().__init__(service_discovery, method_call_router)
self.store = wasmtime.Store()
possible_imports = {
'prelude': {
'log_bytes': wasmtime.Func(self.store, wasmtime.FuncType([wasmtime.ValType.i32()], []), self.log_bytes),
}
}
self.module = wasmtime.Module(self.store.engine, wasm_bin)
from typing import Any
def dump_args(*args: Any, **kwargs: Any) -> None:
print('args', args)
print('kwargs', kwargs)
imports: List[wasmtime.Func] = []
for imprt in self.module.imports:
if imprt.module not in possible_imports:
raise Exception('Service not found')
service = service_discovery.find_service(imprt.module)
if service is None:
raise PhashPlatformServiceNotFound(
f'Dependent service "{imprt.module}" not found; could not provide "{imprt.name}"'
)
if imprt.name not in possible_imports[imprt.module]:
raise Exception('Method not found in service')
assert imprt.name is not None # type hint
imports.append(possible_imports[imprt.module][imprt.name])
method = service.methods.get(imprt.name)
if method is None:
raise PhashPlatformServiceMethodNotFound(
f'Dependent service "{imprt.module}" found, but it does not provide "{imprt.name}"'
)
func = wasmtime.Func(
self.store,
build_func_type(method),
functools.partial(self.send_service_call, service, method)
)
imports.append(func)
self.instance = wasmtime.Instance(self.store, self.module, imports)
@ -87,3 +113,65 @@ class WasmTimeRunner(BaseRunner):
result = wasm_method(self.store, *act_args)
assert result is None or isinstance(result, (int, float, )) # type hint
call.on_success(self.value_from_wasm(call.method.return_type, result))
def send_service_call(self, service: Service, method: Method, *args: Any) -> WasmValue:
assert len(method.args) == len(args) # type hint
call_args = [
self.value_from_wasm(x.value_type, y)
for x, y in zip(method.args, args)
]
queue: Queue[Value] = Queue(maxsize=1)
def on_success(val: Value) -> None:
print('hi mom')
queue.put(val)
def on_error(err: MethodCallError) -> None:
print('Error while calling', service, method, args)
print('on_success', on_success)
call = MethodCall(method, call_args, on_success, on_error)
print(
'send_service_call',
'from-service=?', 'from-method=?', # TODO
f'to-service={service.name}', f'to-method={method.name}',
*args,
)
self.method_call_router.send_call(service, call)
try:
value = queue.get(block=True, timeout=10)
except Empty:
print(
'send_service_call',
'from-service=?', 'from-method=?', # TODO
f'to-service={service.name}', f'to-method={method.name}',
'TIMEOUT',
)
raise Exception() # TODO
return self.value_to_wasm(value)
def build_func_type(method: Method) -> wasmtime.FuncType:
if method.return_type is valuetype.none:
returns = []
else:
returns = [build_wasm_type(method.return_type)]
args = []
for arg in method.args:
assert arg.value_type is not valuetype.none # type hint
args.append(build_wasm_type(arg.value_type))
return wasmtime.FuncType(args, returns)
def build_wasm_type(value_type: ValueType) -> wasmtime.ValType:
if value_type is valuetype.bytes:
return wasmtime.ValType.i32() # Bytes are passed as pointer
raise NotImplementedError