diff --git a/Makefile b/Makefile index 8a28c8f..a231426 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,15 @@ run-controller: sast venv/bin/python -m phasmplatform.controller -run-worker: sast +run-worker: sast examples/echoapp.toml venv/bin/python -m phasmplatform.worker +examples/echoapp.toml: examples/echoserver.toml examples/echoclient.toml + echo '# echoserver.toml' > $@ + cat examples/echoserver.toml >> $@ + echo '# echoclient.toml' >> $@ + cat examples/echoclient.toml >> $@ + clickhouse-sh: docker compose exec clickhouse /usr/bin/clickhouse client diff --git a/config/clickhouse/schema.sql b/config/clickhouse/schema.sql index d966b80..5ba9ae9 100644 --- a/config/clickhouse/schema.sql +++ b/config/clickhouse/schema.sql @@ -1,11 +1,11 @@ CREATE DATABASE IF NOT EXISTS phasm_platform; - USE phasm_platform; + CREATE TABLE routing_logs ( timestamp DateTime64(6, 'UTC'), - from_service String, + thread_id String, from_container String, from_method String, to_service String, @@ -15,3 +15,14 @@ CREATE TABLE routing_logs ) ENGINE = MergeTree ORDER BY timestamp; + + +CREATE TABLE container_logs +( + timestamp DateTime64(6, 'UTC'), + name String, + thread_id String, + msg String, +) +ENGINE = MergeTree +ORDER BY timestamp; diff --git a/examples/.gitignore b/examples/.gitignore index 1ce1802..8e95af2 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -1,2 +1,4 @@ /*.wasm /*.wat + +/echoapp.toml diff --git a/examples/echoclient.toml b/examples/echoclient.toml index 049c142..2c0f0fe 100644 --- a/examples/echoclient.toml +++ b/examples/echoclient.toml @@ -5,6 +5,11 @@ kind = "Image" path = "examples/echoclient.wasm" hash = "sha256@84cb22d12dfdd6b05cb906f6db83d59f473c9df85a33822f696344af2b92b502" +imports = [ + { service = "prelude", method = "log_bytes", arg_types = ["bytes"], return_type = "none"}, + { service = "echoserver", method = "echo", arg_types = ["bytes"], return_type = "bytes"}, +] + [echoclient-container] apiVersion = "v0" kind = "Container" diff --git a/examples/echoserver.py b/examples/echoserver.py index 7f1ff66..3f26f25 100644 --- a/examples/echoserver.py +++ b/examples/echoserver.py @@ -1,3 +1,11 @@ @exported def echo(msg: bytes) -> bytes: return msg + +@imported('prelude') +def log_bytes(data: bytes) -> None: + pass + +@exported +def on_module_loaded() -> None: + log_bytes(b'on_module_loaded') diff --git a/examples/echoserver.toml b/examples/echoserver.toml index f48dc75..e7189e0 100644 --- a/examples/echoserver.toml +++ b/examples/echoserver.toml @@ -5,6 +5,10 @@ kind = "Image" path = "examples/echoserver.wasm" hash = "sha256@dfe03b4f7ce5e921931f8715384e35a6776fdc28837e42ffa04305bbadffcfc9" +imports = [ + { service = "prelude", method = "log_bytes", arg_types = ["bytes"], return_type = "none"} +] + [echoserver-container-0] apiVersion = "v0" kind = "Container" @@ -19,16 +23,15 @@ kind = "Container" image = "echoserver-image" runtime = "wasmtime" - [echoserver-service] apiVersion = "v0" kind = "Service" name = "echoserver" -[echoserver-service.container] +[echoserver-service.containerMatch] byName = "echoserver-container-*" [[echoserver-service.methods]] name = "echo" -returns = "none" -args = [ {name = "msg", type = "bytes"} ] +arg_types = [ "bytes" ] +return_type = "bytes" diff --git a/phasmplatform/common/container.py b/phasmplatform/common/container.py index f09e0af..5fe0a7d 100644 --- a/phasmplatform/common/container.py +++ b/phasmplatform/common/container.py @@ -2,14 +2,16 @@ from .image import Image class Container: - __slots__ = ('image', 'runtime', ) + __slots__ = ('name', 'image', 'runtime', ) + name: str image: Image runtime: str - def __init__(self, image: Image, runtime: str) -> None: + def __init__(self, name: str, image: Image, runtime: str) -> None: + self.name = name self.image = image self.runtime = runtime def __repr__(self) -> str: - return f'Container({repr(self.image)}, {repr(self.runtime)})' + return f'Container({repr(self.name)}, {repr(self.image)}, {repr(self.runtime)})' diff --git a/phasmplatform/common/image.py b/phasmplatform/common/image.py index be246e0..5a0113b 100644 --- a/phasmplatform/common/image.py +++ b/phasmplatform/common/image.py @@ -1,18 +1,27 @@ -from typing import Optional +from typing import List, Optional, Tuple + +from .method import Method class Image: - __slots__ = ('path', 'hash', ) + __slots__ = ('path', 'hash', 'imports', ) path: Optional[str] hash: str + imports: List[Tuple[str, Method]] - def __init__(self, path: Optional[str], hash: str) -> None: + def __init__( + self, + path: Optional[str], + hash: str, + imports: List[Tuple[str, Method]], + ) -> None: self.path = path self.hash = hash + self.imports = imports def __repr__(self) -> str: - return f'Image({repr(self.path)}, {repr(self.hash)})' + return f'Image({repr(self.path)}, {repr(self.hash)}, {repr(self.imports)})' class ImageReference(Image): diff --git a/phasmplatform/common/method.py b/phasmplatform/common/method.py index eb7fd3f..822bc54 100644 --- a/phasmplatform/common/method.py +++ b/phasmplatform/common/method.py @@ -1,73 +1,29 @@ -from typing import Any, Callable, List +from typing import Any, List - -from .value import Value from .valuetype import ValueType -class MethodArgument: - __slots__ = ('name', 'value_type', ) - - name: str - value_type: ValueType - - def __init__(self, name: str, value_type: ValueType) -> None: - self.name = name - self.value_type = value_type - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, MethodArgument): - raise NotImplementedError - - return self.name == other.name and self.value_type == other.value_type - - def __repr__(self) -> str: - return f'MethodArgument({repr(self.name)}, {repr(self.value_type)})' - - class Method: - __slots__ = ('name', 'args', 'return_type', ) + __slots__ = ('name', 'arg_types', 'return_type', ) name: str - args: List[MethodArgument] + arg_types: List[ValueType] return_type: ValueType - def __init__(self, name: str, args: List[MethodArgument], return_type: ValueType) -> None: + def __init__(self, name: str, arg_types: List[ValueType], return_type: ValueType) -> None: self.name = name - self.args = args + self.arg_types = arg_types self.return_type = return_type - def __repr__(self) -> str: - return f'Method({repr(self.name)}, {repr(self.args)}, {repr(self.return_type)})' + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Method): + raise NotImplementedError - -class MethodCallError: - pass - - -class MethodNotFoundError(MethodCallError): - pass - - -class MethodCall: - __slots__ = ('method', 'args', 'on_success', 'on_error', ) - - method: Method - args: List[Value] - on_success: Callable[[Value], None] - on_error: Callable[[MethodCallError], None] - - def __init__( - self, - method: Method, - args: List[Value], - on_success: Callable[[Value], None], - on_error: Callable[[MethodCallError], None], - ) -> None: - self.method = method - self.args = args - self.on_success = on_success - self.on_error = on_error + return ( + self.name == other.name + and self.arg_types == other.arg_types + and self.return_type == other.return_type + ) def __repr__(self) -> str: - return f'MethodCall({repr(self.method)}, {repr(self.args)}, {repr(self.on_success)}, {repr(self.on_error)})' + return f'Method({repr(self.name)}, {repr(self.arg_types)}, {repr(self.return_type)})' diff --git a/phasmplatform/common/methodcall.py b/phasmplatform/common/methodcall.py new file mode 100644 index 0000000..7a6fe89 --- /dev/null +++ b/phasmplatform/common/methodcall.py @@ -0,0 +1,117 @@ +from typing import Callable, List, Optional, Union + +from .container import Container +from .method import Method +from .value import Value + + +class MethodCallError: + __slots__ = ('msg', ) + + msg: Optional[str] + + def __init__(self, msg: Optional[str] = None) -> None: + self.msg = msg + + def __repr__(self) -> str: + return f'{self.__class__.__name__}({repr(self.msg)})' + + +class ServiceNotFoundError(MethodCallError): + """ + The service was not found + + You have an `@imported('service_name')` somewhere, + but there is no `service_name` deployed to the platform. + """ + + +class MethodNotFoundError(MethodCallError): + """ + The method was not found + + You have an `@imported('service_name')` somewhere, + but while the `service_name` is deployed to the platform, + it does not provide the given function. + """ + + +class MethodTypeMismatchError(MethodCallError): + """ + The method's type did not match + + You have an `@imported('service_name')` somewhere, + but there while the `service_name` is deployed to the platform, + and the service does provide the function, + its type does not match what you defined in your import. + """ + + +class ServiceUnavailableError(MethodCallError): + """ + The service was not available + + You have an `@imported('service_name')` somewhere, + but there while the `service_name` is deployed to the platform, + and the service does provide the function, + and its type matches what you defined in your import, + there is currently no container running providing said service. + """ + + +class MethodCallExceptionError(MethodCallError): + """ + The service was not available + + You have an `@imported('service_name')` somewhere, + but there while the `service_name` is deployed to the platform, + and the service does provide the function, + and its type matches what you defined in your import, + and there is currently a container running providing said service, + when it tried to run the call, an exeption ocurred. + """ + + +class MethodCall: + __slots__ = ('method', 'args', ) + + method: Method + args: List[Value] + + def __init__( + self, + method: Method, + args: List[Value], + ) -> None: + self.method = method + self.args = args + + def __repr__(self) -> str: + return f'MethodCall({repr(self.method)}, {repr(self.args)})' + + +MethodResultCallable = Callable[[Union[Value, MethodCallError]], None] + + +class RoutedMethodCall: + __slots__ = ('call', 'from_container', 'to_container', 'on_result', ) + + call: MethodCall + from_container: Optional[Container] + to_container: Optional[Container] + on_result: MethodResultCallable + + def __init__( + self, + call: MethodCall, + from_container: Optional[Container], + to_container: Optional[Container], + on_result: MethodResultCallable, + ) -> None: + self.call = call + self.from_container = from_container + self.to_container = to_container + self.on_result = on_result + + def __repr__(self) -> str: + return f'RoutedMethodCall({repr(self.call)}, {repr(self.from_container)}, {repr(self.to_container)}, {repr(self.on_result)})' diff --git a/phasmplatform/common/router.py b/phasmplatform/common/router.py index ecb4968..415493e 100644 --- a/phasmplatform/common/router.py +++ b/phasmplatform/common/router.py @@ -1,7 +1,15 @@ -from .method import MethodCall -from .service import Service +from typing import Optional + +from .container import Container +from .methodcall import MethodCall, MethodResultCallable class MethodCallRouterInterface: - def send_call(self, service: Service, call: MethodCall) -> None: - raise NotImplementedError + def route_call( + self, + service: str, + call: MethodCall, + from_container: Optional[Container], + on_result: MethodResultCallable, + ) -> None: + raise NotImplementedError(service, call) diff --git a/phasmplatform/common/service.py b/phasmplatform/common/service.py index f0a0b3a..35f8793 100644 --- a/phasmplatform/common/service.py +++ b/phasmplatform/common/service.py @@ -3,21 +3,35 @@ from typing import Dict, Optional, List from .method import Method +class ContainerMatch: + __slots__ = ('by_name', ) + + by_name: str + + def __init__(self, by_name: str) -> None: + self.by_name = by_name + + def __repr__(self) -> str: + return f'ContainerMatch({repr(self.by_name)})' + + class Service: - __slots__ = ('name', 'methods', ) + __slots__ = ('name', 'container_match', 'methods', ) name: str + container_match: ContainerMatch methods: Dict[str, Method] - def __init__(self, name: str, methods: List[Method]) -> None: + def __init__(self, name: str, container_match: ContainerMatch, methods: List[Method]) -> None: self.name = name + self.container_match = container_match self.methods = { x.name: x for x in methods } def __repr__(self) -> str: - return f'Service({repr(self.name)}, {repr(list(self.methods.values()))})' + return f'Service({repr(self.name)}, {repr(self.container_match)}, {repr(list(self.methods.values()))})' class ServiceDiscoveryInterface: diff --git a/phasmplatform/common/state.py b/phasmplatform/common/state.py index 6e52fab..392b900 100644 --- a/phasmplatform/common/state.py +++ b/phasmplatform/common/state.py @@ -1,9 +1,10 @@ -from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List +from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List, Tuple +from . import valuetype from .container import Container from .image import Image, ImageReference from .method import Method -from .service import Service +from .service import ContainerMatch, Service if TYPE_CHECKING: @@ -36,27 +37,90 @@ class State: return f'State({repr(self.images)}, {repr(self.containers)}, {repr(self.services)})' +def image_import_from_toml(spec: Dict[str, Any]) -> Tuple[str, Method]: + """ + Loads an Method spec from toml, when it comes in the form of Image.imports + """ + service = spec.pop('service') + method = spec.pop('method') + arg_types = spec.pop('arg_types', []) + return_type = spec.pop('return_type', 'none') + + assert not spec, ('Unrecognized values in image.imports', spec) + + return (str(service), Method( + str(method), + [ + valuetype.LOOKUP_TABLE[x] + for x in arg_types + ], + valuetype.LOOKUP_TABLE[return_type], + ), ) + + def image_from_toml(spec: Dict[str, Any]) -> Image: """ Loads an Image spec from toml """ - return Image(str(spec['path']), str(spec['hash'])) + imports = [ + image_import_from_toml(imp_spec) + for imp_spec in spec.get('imports', []) + ] + + return Image(str(spec['path']), str(spec['hash']), imports) -def container_from_toml(spec: Dict[str, Any]) -> Container: +def container_from_toml(name: str, spec: Dict[str, Any]) -> Container: """ Loads a Container spec from toml """ - return Container(ImageReference(str(spec['image'])), str(spec['runtime'])) + return Container(name, ImageReference(str(spec['image'])), str(spec['runtime'])) + + +def service_container_match_from_toml(spec: Dict[str, Any]) -> ContainerMatch: + """ + Loads a service.ContainerMatch spec from toml + """ + return ContainerMatch(str(spec['byName'])) + + +def method_from_toml(spec: Dict[str, Any]) -> Method: + """ + Loads an Method spec from toml + """ + name = spec.pop('name') + arg_types = spec.pop('arg_types', []) + return_type = spec.pop('return_type', 'none') + + assert not spec, ('Unrecognized values in image.imports', spec) + + return Method( + str(name), + [ + valuetype.LOOKUP_TABLE[x] + for x in arg_types + ], + valuetype.LOOKUP_TABLE[return_type], + ) def service_from_toml(spec: Dict[str, Any]) -> Service: """ Loads a Service spec from toml """ - methods: List[Method] = [] + spec.pop('apiVersion') + spec.pop('kind') - return Service(str(spec['name']), methods) + name = spec.pop('name') + container_match = spec.pop('containerMatch') + methods = spec.pop('methods', []) + + assert not spec, ('Unrecognized values in service', spec) + + return Service(str(name), service_container_match_from_toml(container_match), [ + method_from_toml(x) + for x in methods + ]) def from_toml(toml: BinaryIO) -> State: @@ -83,7 +147,7 @@ def from_toml(toml: BinaryIO) -> State: continue if spec['kind'] == 'Container': - containers.append(container_from_toml(spec)) + containers.append(container_from_toml(name, spec)) continue if spec['kind'] == 'Service': diff --git a/phasmplatform/common/valuetype.py b/phasmplatform/common/valuetype.py index 9e8e090..2726fa1 100644 --- a/phasmplatform/common/valuetype.py +++ b/phasmplatform/common/valuetype.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Dict class ValueType: @@ -22,3 +22,8 @@ class ValueType: bytes = ValueType('bytes') none = ValueType('none') + +LOOKUP_TABLE: Dict[str, ValueType] = { + bytes.name: bytes, + none.name: none, +} diff --git a/phasmplatform/worker/__main__.py b/phasmplatform/worker/__main__.py index 7c3fbb3..6f89f85 100644 --- a/phasmplatform/worker/__main__.py +++ b/phasmplatform/worker/__main__.py @@ -1,169 +1,268 @@ -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Type, Union import datetime -import functools import sys +import random import threading import time +import uuid from queue import Empty, Queue from phasmplatform.common import valuetype +from phasmplatform.common.container import Container from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml -from phasmplatform.common.method import Method, MethodArgument, MethodCall +from phasmplatform.common.image import Image +from phasmplatform.common.method import Method +from phasmplatform.common.methodcall import MethodCall, RoutedMethodCall, MethodResultCallable +from phasmplatform.common.methodcall import ( + MethodCallExceptionError, MethodNotFoundError, MethodTypeMismatchError, ServiceNotFoundError, ServiceUnavailableError +) from phasmplatform.common.router import MethodCallRouterInterface -from phasmplatform.common.service import Service, ServiceDiscoveryInterface -from phasmplatform.common.value import Value, NoneValue +from phasmplatform.common.service import ContainerMatch, Service +from phasmplatform.common.value import Value from phasmplatform.common.state import from_toml as state_from_toml from .runners.base import RunnerInterface +from .runners.prelude import PreludeRunner from .runners.wasmtime import WasmTimeRunner -class ShuttingDown(): +RUNTIME_MAP: Dict[str, Type[RunnerInterface]] = { + 'prelude': PreludeRunner, + 'wasmtime': WasmTimeRunner, +} + + +def log_now() -> datetime.datetime: + return datetime.datetime.now(tz=datetime.timezone.utc) + + +class ShutDownCommand(): pass -def runner_thread(runner: RunnerInterface, queue: Queue[Union[MethodCall, ShuttingDown]]) -> None: +MethodCallQueue = Queue[Union[RoutedMethodCall, ShutDownCommand]] + + +class ManagedContainer: + __slots__ = ('config', 'method_call_router', 'container', 'thread', 'queue', ) + + config: WorkerConfig + method_call_router: MethodCallRouterInterface + container: Container + thread: threading.Thread + queue: MethodCallQueue + + def __init__(self, config: WorkerConfig, method_call_router: MethodCallRouterInterface, container: Container) -> None: + self.config = config + self.method_call_router = method_call_router + self.container = container + self.thread = threading.Thread(target=container_thread, args=(self, )) + self.queue = Queue() + + def __repr__(self) -> str: + return f'ManagedContainer(..., ..., {repr(self.container)}, ..., ...)' + + +def container_thread(mcont: ManagedContainer) -> None: + clickhouse_session_id = str(uuid.uuid1()) + clickhouse_write = mcont.config.clickhouse_write + + def container_log(msg: str) -> None: + clickhouse_write.insert( + table='container_logs', + data=[ + [log_now(), mcont.container.name, str(id(mcont.thread)), msg], + ], + column_names=['timestamp', 'name', 'thread_id', 'msg'], + column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'], + settings={'session_id': clickhouse_session_id} + ) + + def routing_log(from_container: str, to_method: str, result: str) -> None: + clickhouse_write.insert( + table='routing_logs', + data=[ + [log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_method, result], + ], + column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_method', 'result'], + column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String'], + settings={'session_id': clickhouse_session_id} + ) + + container_log(f'Creating runtime for {mcont.container.image.path}') + + cls = RUNTIME_MAP.get(mcont.container.runtime) + assert cls is not None, f'Unknown runtime: {mcont.container.runtime}' + runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container) + if isinstance(runtime, PreludeRunner): + runtime.set_config(mcont.config, container_log) + + container_log('Starting thread') + while True: try: - call = queue.get(block=True, timeout=1) + call = mcont.queue.get(block=True, timeout=1) except Empty: continue - if isinstance(call, ShuttingDown): + if isinstance(call, ShutDownCommand): break - runner.do_call(call) + try: + result = runtime.do_call(call.call) + except Exception as ex: + raise ex + + routing_log( + call.from_container.name if call.from_container is not None else '[SYSTEM]', + call.call.method.name, + repr(ex), + ) + call.on_result(MethodCallExceptionError(str(ex))) + continue + + routing_log( + call.from_container.name if call.from_container is not None else '[SYSTEM]', + call.call.method.name, + repr(result.data) if isinstance(result, Value) else repr(result) + ) + call.on_result(result) + + container_log('Stopping thread') + + +class LocalhostMethodCallRouter(MethodCallRouterInterface): + __slots__ = ('services', 'container_by_service') + + services: Dict[str, Service] + container_by_service: Dict[str, List[ManagedContainer]] + + def __init__(self) -> None: + self.services = {} + self.container_by_service = {} + + def route_call( + self, + service_name: str, + call: MethodCall, + from_container: Optional[Container], + on_result: MethodResultCallable, + ) -> None: + service = self.services.get(service_name) + if service is None: + on_result(ServiceNotFoundError(service_name)) + return + + method = service.methods.get(call.method.name) + if method is None: + on_result(MethodNotFoundError(f'{service_name}.{call.method.name}')) + return + + if method != call.method: + on_result(MethodTypeMismatchError()) + return + + to_mcont = self.find_one_container(service) + if to_mcont is None: + on_result(ServiceUnavailableError(service_name)) + return + + to_mcont.queue.put(RoutedMethodCall( + call, + from_container, + to_mcont.container, + on_result, + )) + + def register_service(self, service: Service) -> None: + assert service.name not in self.services + + self.services[service.name] = service + + def register_container(self, mcont: ManagedContainer) -> None: + for service in self.services.values(): + if service.container_match.by_name == mcont.container.name: + self.container_by_service.setdefault(service.name, []) + self.container_by_service[service.name].append(mcont) + continue + + if service.container_match.by_name[-1] == '*' and mcont.container.name.startswith(service.container_match.by_name[:-1]): + self.container_by_service.setdefault(service.name, []) + self.container_by_service[service.name].append(mcont) + continue + + def find_one_container(self, service: Service) -> Optional[ManagedContainer]: + container_list = self.container_by_service.get(service.name) + if not container_list: + return None + + return random.choice(container_list) def make_prelude() -> Service: methods: List[Method] = [] - methods.append(Method('log_bytes', [ - MethodArgument('data', valuetype.bytes) - ], valuetype.none)) + methods.append(Method('log_bytes', [valuetype.bytes], valuetype.none)) - return Service('prelude', methods) - - -class LocalhostRunner(RunnerInterface): - def do_call(self, call: MethodCall) -> None: - if call.method.name == 'on_module_loaded': - print('LocalhostRunner loaded') - call.on_success(NoneValue) - return - - if call.method.name == 'log_bytes': - print('LOG-BYTES:', repr(call.args[0].data)) - call.on_success(NoneValue) - return - - raise NotImplementedError(call) - - -class LocalhostServiceDiscovery(ServiceDiscoveryInterface): - services: Dict[str, Tuple[Service, Queue[Union[MethodCall, ShuttingDown]]]] - - def __init__(self) -> None: - self.services = {} - - def register_service(self, service: Service, queue: Queue[Union[MethodCall, ShuttingDown]]) -> None: - self.services[service.name] = (service, queue, ) - - def find_service(self, name: str) -> Optional[Service]: - parts = self.services.get(name, None) - if parts is None: - return None - - return parts[0] - - -class LocalhostMethodCallRouter(MethodCallRouterInterface): - def __init__(self, config: WorkerConfig, service_discovery: LocalhostServiceDiscovery) -> None: - self.config = config - self.service_discovery = service_discovery - - def send_call(self, service: Service, call: MethodCall) -> None: - self.config.clickhouse_write.insert('routing_logs', [ - [datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name], - ], ['timestamp', 'to_service', 'to_method']) - - call.on_success = functools.partial(self._send_call_on_succes, service, call, call.on_success) - - assert service.name in self.service_discovery.services - queue = self.service_discovery.services[service.name][1] - queue.put(call) - - def _send_call_on_succes(self, service: Service, call: MethodCall, orig_on_succes: Callable[[Value], None], value: Value) -> None: - self.config.clickhouse_write.insert('routing_logs', [ - [datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name, repr(value.data)], - ], ['timestamp', 'to_service', 'to_method', 'result']) - orig_on_succes(value) + return Service('prelude', ContainerMatch('__prelude__'), methods) def main() -> int: with open('config.toml', 'rb') as fil: config = config_from_toml(fil) - with open('./examples/echoserver.toml', 'rb') as fil: + with open('./examples/echoapp.toml', 'rb') as fil: state = state_from_toml(fil) - del state + method_call_router = LocalhostMethodCallRouter() - # TODO: Replace the stuff below with the loading from the example state + print('Registering services') + method_call_router.register_service(make_prelude()) - localhost_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue() - echo_client_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue() - echo_server_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue() + for service in state.services: + method_call_router.register_service(service) - service_discovery = LocalhostServiceDiscovery() - method_call_router = LocalhostMethodCallRouter(config.worker_config, service_discovery) + container_list: List[ManagedContainer] = [] - localhost = LocalhostRunner() - service_discovery.register_service(make_prelude(), localhost_queue) + prelude_container = Container( + '__prelude__', + Image('prelude', '', []), + 'prelude', + ) - with open('./examples/echoserver.wasm', 'rb') as fil: - echo_server = WasmTimeRunner(service_discovery, method_call_router, fil.read()) - service_discovery.register_service(Service('echoserver', [ - Method('echo', [ - MethodArgument('msg', valuetype.bytes) - ], valuetype.bytes) - ]), echo_server_queue) + for cont in [prelude_container] + state.containers: + mcont = ManagedContainer(config.worker_config, method_call_router, cont) + container_list.append(mcont) + method_call_router.register_container(mcont) - with open('./examples/echoclient.wasm', 'rb') as fil: - echo_client = WasmTimeRunner(service_discovery, method_call_router, fil.read()) - - # service_discovery.register_service(echo_client, echo_client_queue) - # service_discovery.register_service(echo_server, echo_server_queue) + print('Starting containers') + for mcont in container_list: + mcont.thread.start() + print('Sending out on_module_loaded calls') # TODO: Route this normally? on_module_loaded = MethodCall( Method('on_module_loaded', [], valuetype.none), [], - lambda x: None, - lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it ) - localhost_queue.put(on_module_loaded) - echo_client_queue.put(on_module_loaded) - echo_server_queue.put(on_module_loaded) - - localhost_thread = threading.Thread(target=runner_thread, args=(localhost, localhost_queue)) - echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue)) - echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue)) - - localhost_thread.start() - echo_client_thread.start() - echo_server_thread.start() + for mcont in container_list: + mcont.queue.put(RoutedMethodCall(on_module_loaded, None, mcont.container, lambda x: None)) try: while 1: time.sleep(1) except KeyboardInterrupt: + print('Caught KeyboardInterrupt, shutting down') pass - localhost_queue.put(ShuttingDown()) - echo_client_queue.put(ShuttingDown()) - echo_server_queue.put(ShuttingDown()) + shut_down_command = ShutDownCommand() + for mcont in container_list: + mcont.queue.put(shut_down_command) + + print('Awaiting containers') + for mcont in container_list: + mcont.thread.join() return 0 diff --git a/phasmplatform/worker/runners/base.py b/phasmplatform/worker/runners/base.py index f83548f..e1d12f1 100644 --- a/phasmplatform/worker/runners/base.py +++ b/phasmplatform/worker/runners/base.py @@ -1,9 +1,9 @@ from typing import TextIO, Union from phasmplatform.common import valuetype -from phasmplatform.common.method import MethodCall +from phasmplatform.common.container import Container +from phasmplatform.common.methodcall import MethodCall, MethodCallError from phasmplatform.common.router import MethodCallRouterInterface -from phasmplatform.common.service import ServiceDiscoveryInterface from phasmplatform.common.value import Value from phasmplatform.common.valuetype import ValueType @@ -12,9 +12,16 @@ WasmValue = Union[None, int, float] class RunnerInterface: - __slots__ = ('router', ) + __slots__ = ('method_call_router', 'container', ) - def do_call(self, call: MethodCall) -> None: + method_call_router: MethodCallRouterInterface + container: Container + + def __init__(self, method_call_router: MethodCallRouterInterface, container: Container) -> None: + self.method_call_router = method_call_router + self.container = container + + def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]: """ Executes the call on the current container @@ -24,14 +31,7 @@ class RunnerInterface: class BaseRunner(RunnerInterface): - __slots__ = ('service_discovery', 'method_call_router', ) - - service_discovery: ServiceDiscoveryInterface - method_call_router: MethodCallRouterInterface - - def __init__(self, service_discovery: ServiceDiscoveryInterface, method_call_router: MethodCallRouterInterface) -> None: - self.service_discovery = service_discovery - self.method_call_router = method_call_router + __slots__ = ('method_call_router', ) def alloc_bytes(self, data: bytes) -> int: """ diff --git a/phasmplatform/worker/runners/prelude.py b/phasmplatform/worker/runners/prelude.py new file mode 100644 index 0000000..13e3534 --- /dev/null +++ b/phasmplatform/worker/runners/prelude.py @@ -0,0 +1,29 @@ +from typing import Callable, Union, Tuple + +from phasmplatform.common.config import WorkerConfig +from phasmplatform.common.methodcall import MethodCall, MethodCallError +from phasmplatform.common.value import Value, NoneValue + +from .base import BaseRunner + + +class PreludeRunner(BaseRunner): + __slots__ = ('config', 'container_log', ) + + config: WorkerConfig + container_log: Tuple[Callable[[str], None]] # Tuple for typing issues + + def set_config(self, config: WorkerConfig, container_log: Callable[[str], None]) -> None: + self.config = config + self.container_log = (container_log, ) + + def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]: + if call.method.name == 'on_module_loaded': + self.container_log[0]('PreludeRunner loaded') + return NoneValue + + if call.method.name == 'log_bytes': + self.container_log[0](f'LOG-BYTES: {repr(call.args[0].data)}') + return NoneValue + + raise NotImplementedError(call) diff --git a/phasmplatform/worker/runners/wasmtime.py b/phasmplatform/worker/runners/wasmtime.py index 79db99d..95430d9 100644 --- a/phasmplatform/worker/runners/wasmtime.py +++ b/phasmplatform/worker/runners/wasmtime.py @@ -1,4 +1,4 @@ -from typing import Any, List +from typing import Any, Dict, List, Union import ctypes import functools @@ -8,10 +8,10 @@ from queue import Empty, Queue import wasmtime from phasmplatform.common import valuetype -from phasmplatform.common.exceptions import PhashPlatformServiceNotFound, PhashPlatformServiceMethodNotFound -from phasmplatform.common.method import Method, MethodCall, MethodCallError, MethodNotFoundError +from phasmplatform.common.container import Container +from phasmplatform.common.method import Method +from phasmplatform.common.methodcall import MethodCall, MethodCallError, MethodNotFoundError from phasmplatform.common.router import MethodCallRouterInterface -from phasmplatform.common.service import Service, ServiceDiscoveryInterface from phasmplatform.common.value import Value from phasmplatform.common.valuetype import ValueType @@ -21,40 +21,30 @@ from .base import BaseRunner, WasmValue class WasmTimeRunner(BaseRunner): __slots__ = ('store', 'module', 'instance', 'exports') - def __init__( - self, - service_discovery: ServiceDiscoveryInterface, - method_call_router: MethodCallRouterInterface, - wasm_bin: bytes, - ) -> None: - super().__init__(service_discovery, method_call_router) + def __init__(self, method_call_router: MethodCallRouterInterface, container: Container) -> None: + super().__init__(method_call_router, container) + + with open(f'./{container.image.path}', 'rb') as fil: # TODO: ImageLoader? + wasm_bin = fil.read() self.store = wasmtime.Store() self.module = wasmtime.Module(self.store.engine, wasm_bin) - imports: List[wasmtime.Func] = [] - for imprt in self.module.imports: - service = service_discovery.find_service(imprt.module) - if service is None: - raise PhashPlatformServiceNotFound( - f'Dependent service "{imprt.module}" not found; could not provide "{imprt.name}"' - ) - - assert imprt.name is not None # type hint - - method = service.methods.get(imprt.name) - if method is None: - raise PhashPlatformServiceMethodNotFound( - f'Dependent service "{imprt.module}" found, but it does not provide "{imprt.name}"' - ) - - func = wasmtime.Func( + import_map: Dict[str, wasmtime.Func] = { + f'{imprt_service}.{imprt_method.name}': wasmtime.Func( self.store, - build_func_type(method), - functools.partial(self.send_service_call, service, method) + build_func_type(imprt_method), + functools.partial(self.send_service_call, imprt_service, imprt_method) ) + for (imprt_service, imprt_method, ) in container.image.imports + } - imports.append(func) + # Make sure the given import lists order matches the one given by wasmtime + # Otherwise, wasmtime can't match them up. + imports: List[wasmtime.Func] = [ + import_map[f'{imprt.module}.{imprt.name}'] + for imprt in self.module.imports + ] self.instance = wasmtime.Instance(self.store, self.module, imports) @@ -94,44 +84,44 @@ class WasmTimeRunner(BaseRunner): return raw[ptr + 4:ptr + 4 + length] - def do_call(self, call: MethodCall) -> None: + def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]: try: wasm_method = self.exports[call.method.name] except KeyError: - call.on_error(MethodNotFoundError()) - return + return MethodNotFoundError() assert isinstance(wasm_method, wasmtime.Func) act_args = [self.value_to_wasm(x) for x in call.args] result = wasm_method(self.store, *act_args) assert result is None or isinstance(result, (int, float, )) # type hint - call.on_success(self.value_from_wasm(call.method.return_type, result)) - def send_service_call(self, service: Service, method: Method, *args: Any) -> WasmValue: - assert len(method.args) == len(args) # type hint + return self.value_from_wasm(call.method.return_type, result) + + def send_service_call(self, service: str, method: Method, *args: Any) -> WasmValue: + assert len(method.arg_types) == len(args) # type hint call_args = [ - self.value_from_wasm(x.value_type, y) - for x, y in zip(method.args, args) + self.value_from_wasm(x, y) + for x, y in zip(method.arg_types, args) ] queue: Queue[Value] = Queue(maxsize=1) - def on_success(val: Value) -> None: - queue.put(val) + def on_result(res: Union[Value, MethodCallError]) -> None: + if isinstance(res, Value): + queue.put(res) + else: + raise Exception(res) - def on_error(err: MethodCallError) -> None: - print('Error while calling', service, method, args) + call = MethodCall(method, call_args) - call = MethodCall(method, call_args, on_success, on_error) - - self.method_call_router.send_call(service, call) + self.method_call_router.route_call(service, call, self.container, on_result) try: value = queue.get(block=True, timeout=10) except Empty: - raise Exception() # TODO + raise Exception('Did not receive value from remote call') # TODO return self.value_to_wasm(value) @@ -143,9 +133,9 @@ def build_func_type(method: Method) -> wasmtime.FuncType: returns = [build_wasm_type(method.return_type)] args = [] - for arg in method.args: - assert arg.value_type is not valuetype.none # type hint - args.append(build_wasm_type(arg.value_type)) + for arg_type in method.arg_types: + assert arg_type is not valuetype.none # type hint + args.append(build_wasm_type(arg_type)) return wasmtime.FuncType(args, returns)