From 5e1c5679e5f3aca86b72ed27854c345de329ff08 Mon Sep 17 00:00:00 2001 From: "Johan B.W. de Vries" Date: Fri, 14 Apr 2023 19:09:20 +0200 Subject: [PATCH] prelude.log_bytes is now on the right thread Before, all messages would be logged on the prelude container / thread. routing log improvements --- config/clickhouse/schema.sql | 1 - examples/echoclient.py | 2 +- examples/echoclient.toml | 1 - examples/echoserver.py | 2 +- examples/echoserver.toml | 4 ---- phasmplatform/common/methodcall.py | 5 ++++- phasmplatform/worker/__main__.py | 16 ++++++++-------- phasmplatform/worker/runners/base.py | 8 +++++--- phasmplatform/worker/runners/prelude.py | 16 ++-------------- phasmplatform/worker/runners/wasmtime.py | 17 ++++++++++++++--- 10 files changed, 35 insertions(+), 37 deletions(-) diff --git a/config/clickhouse/schema.sql b/config/clickhouse/schema.sql index 5ba9ae9..c77ef00 100644 --- a/config/clickhouse/schema.sql +++ b/config/clickhouse/schema.sql @@ -7,7 +7,6 @@ CREATE TABLE routing_logs timestamp DateTime64(6, 'UTC'), thread_id String, from_container String, - from_method String, to_service String, to_container String, to_method String, diff --git a/examples/echoclient.py b/examples/echoclient.py index 1cd8d6e..47ff799 100644 --- a/examples/echoclient.py +++ b/examples/echoclient.py @@ -8,5 +8,5 @@ def log_bytes(data: bytes) -> None: @exported def on_module_loaded() -> None: - log_bytes(b'on_module_loaded') + log_bytes(b'Echo client starting up, calling server') log_bytes(echo(b'Hello, world!')) diff --git a/examples/echoclient.toml b/examples/echoclient.toml index 2c0f0fe..fee6ef0 100644 --- a/examples/echoclient.toml +++ b/examples/echoclient.toml @@ -6,7 +6,6 @@ path = "examples/echoclient.wasm" hash = "sha256@84cb22d12dfdd6b05cb906f6db83d59f473c9df85a33822f696344af2b92b502" imports = [ - { service = "prelude", method = "log_bytes", arg_types = ["bytes"], return_type = "none"}, { service = "echoserver", method = "echo", arg_types = ["bytes"], return_type = "bytes"}, ] diff --git a/examples/echoserver.py b/examples/echoserver.py index 3f26f25..a1d5252 100644 --- a/examples/echoserver.py +++ b/examples/echoserver.py @@ -8,4 +8,4 @@ def log_bytes(data: bytes) -> None: @exported def on_module_loaded() -> None: - log_bytes(b'on_module_loaded') + log_bytes(b'Echo service up and running') diff --git a/examples/echoserver.toml b/examples/echoserver.toml index e7189e0..6ff458a 100644 --- a/examples/echoserver.toml +++ b/examples/echoserver.toml @@ -5,10 +5,6 @@ kind = "Image" path = "examples/echoserver.wasm" hash = "sha256@dfe03b4f7ce5e921931f8715384e35a6776fdc28837e42ffa04305bbadffcfc9" -imports = [ - { service = "prelude", method = "log_bytes", arg_types = ["bytes"], return_type = "none"} -] - [echoserver-container-0] apiVersion = "v0" kind = "Container" diff --git a/phasmplatform/common/methodcall.py b/phasmplatform/common/methodcall.py index 7a6fe89..5ed54e1 100644 --- a/phasmplatform/common/methodcall.py +++ b/phasmplatform/common/methodcall.py @@ -94,10 +94,11 @@ MethodResultCallable = Callable[[Union[Value, MethodCallError]], None] class RoutedMethodCall: - __slots__ = ('call', 'from_container', 'to_container', 'on_result', ) + __slots__ = ('call', 'from_container', 'to_service', 'to_container', 'on_result', ) call: MethodCall from_container: Optional[Container] + to_service: Optional[str] to_container: Optional[Container] on_result: MethodResultCallable @@ -105,11 +106,13 @@ class RoutedMethodCall: self, call: MethodCall, from_container: Optional[Container], + to_service: Optional[str], to_container: Optional[Container], on_result: MethodResultCallable, ) -> None: self.call = call self.from_container = from_container + self.to_service = to_service self.to_container = to_container self.on_result = on_result diff --git a/phasmplatform/worker/__main__.py b/phasmplatform/worker/__main__.py index 6f89f85..9f9fecf 100644 --- a/phasmplatform/worker/__main__.py +++ b/phasmplatform/worker/__main__.py @@ -79,14 +79,14 @@ def container_thread(mcont: ManagedContainer) -> None: settings={'session_id': clickhouse_session_id} ) - def routing_log(from_container: str, to_method: str, result: str) -> None: + def routing_log(from_container: str, to_service: Optional[str], to_method: str, result: str) -> None: clickhouse_write.insert( table='routing_logs', data=[ - [log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_method, result], + [log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_service or '', to_method, result], ], - column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_method', 'result'], - column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String'], + column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_service', 'to_method', 'result'], + column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String', 'String'], settings={'session_id': clickhouse_session_id} ) @@ -94,9 +94,7 @@ def container_thread(mcont: ManagedContainer) -> None: cls = RUNTIME_MAP.get(mcont.container.runtime) assert cls is not None, f'Unknown runtime: {mcont.container.runtime}' - runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container) - if isinstance(runtime, PreludeRunner): - runtime.set_config(mcont.config, container_log) + runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container, container_log) container_log('Starting thread') @@ -124,6 +122,7 @@ def container_thread(mcont: ManagedContainer) -> None: routing_log( call.from_container.name if call.from_container is not None else '[SYSTEM]', + call.to_service, call.call.method.name, repr(result.data) if isinstance(result, Value) else repr(result) ) @@ -171,6 +170,7 @@ class LocalhostMethodCallRouter(MethodCallRouterInterface): to_mcont.queue.put(RoutedMethodCall( call, from_container, + service.name, to_mcont.container, on_result, )) @@ -247,7 +247,7 @@ def main() -> int: ) for mcont in container_list: - mcont.queue.put(RoutedMethodCall(on_module_loaded, None, mcont.container, lambda x: None)) + mcont.queue.put(RoutedMethodCall(on_module_loaded, None, None, mcont.container, lambda x: None)) try: while 1: diff --git a/phasmplatform/worker/runners/base.py b/phasmplatform/worker/runners/base.py index e1d12f1..7478459 100644 --- a/phasmplatform/worker/runners/base.py +++ b/phasmplatform/worker/runners/base.py @@ -1,4 +1,4 @@ -from typing import TextIO, Union +from typing import Callable, TextIO, Tuple, Union from phasmplatform.common import valuetype from phasmplatform.common.container import Container @@ -12,14 +12,16 @@ WasmValue = Union[None, int, float] class RunnerInterface: - __slots__ = ('method_call_router', 'container', ) + __slots__ = ('method_call_router', 'container', 'container_log', ) method_call_router: MethodCallRouterInterface container: Container + container_log: Tuple[Callable[[str], None]] # Tuple for typing issues - def __init__(self, method_call_router: MethodCallRouterInterface, container: Container) -> None: + def __init__(self, method_call_router: MethodCallRouterInterface, container: Container, container_log: Callable[[str], None]) -> None: self.method_call_router = method_call_router self.container = container + self.container_log = (container_log, ) def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]: """ diff --git a/phasmplatform/worker/runners/prelude.py b/phasmplatform/worker/runners/prelude.py index 13e3534..eedf266 100644 --- a/phasmplatform/worker/runners/prelude.py +++ b/phasmplatform/worker/runners/prelude.py @@ -1,6 +1,5 @@ -from typing import Callable, Union, Tuple +from typing import Union -from phasmplatform.common.config import WorkerConfig from phasmplatform.common.methodcall import MethodCall, MethodCallError from phasmplatform.common.value import Value, NoneValue @@ -8,22 +7,11 @@ from .base import BaseRunner class PreludeRunner(BaseRunner): - __slots__ = ('config', 'container_log', ) - - config: WorkerConfig - container_log: Tuple[Callable[[str], None]] # Tuple for typing issues - - def set_config(self, config: WorkerConfig, container_log: Callable[[str], None]) -> None: - self.config = config - self.container_log = (container_log, ) + __slots__ = () def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]: if call.method.name == 'on_module_loaded': self.container_log[0]('PreludeRunner loaded') return NoneValue - if call.method.name == 'log_bytes': - self.container_log[0](f'LOG-BYTES: {repr(call.args[0].data)}') - return NoneValue - raise NotImplementedError(call) diff --git a/phasmplatform/worker/runners/wasmtime.py b/phasmplatform/worker/runners/wasmtime.py index 95430d9..96b2f71 100644 --- a/phasmplatform/worker/runners/wasmtime.py +++ b/phasmplatform/worker/runners/wasmtime.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Union +from typing import Any, Callable, Dict, List, Union import ctypes import functools @@ -21,8 +21,8 @@ from .base import BaseRunner, WasmValue class WasmTimeRunner(BaseRunner): __slots__ = ('store', 'module', 'instance', 'exports') - def __init__(self, method_call_router: MethodCallRouterInterface, container: Container) -> None: - super().__init__(method_call_router, container) + def __init__(self, method_call_router: MethodCallRouterInterface, container: Container, container_log: Callable[[str], None]) -> None: + super().__init__(method_call_router, container, container_log) with open(f'./{container.image.path}', 'rb') as fil: # TODO: ImageLoader? wasm_bin = fil.read() @@ -39,6 +39,12 @@ class WasmTimeRunner(BaseRunner): for (imprt_service, imprt_method, ) in container.image.imports } + import_map['prelude.log_bytes'] = wasmtime.Func( + self.store, + wasmtime.FuncType([wasmtime.ValType.i32()], []), + functools.partial(self.log_bytes), + ) + # Make sure the given import lists order matches the one given by wasmtime # Otherwise, wasmtime can't match them up. imports: List[wasmtime.Func] = [ @@ -125,6 +131,11 @@ class WasmTimeRunner(BaseRunner): return self.value_to_wasm(value) + def log_bytes(self, data_ptr: int) -> None: + value = self.value_from_wasm(valuetype.bytes, data_ptr) + + self.container_log[0](repr(value.data)) + def build_func_type(method: Method) -> wasmtime.FuncType: if method.return_type is valuetype.none: