diff --git a/Makefile b/Makefile index 906b971..8a28c8f 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,9 @@ run-controller: sast run-worker: sast venv/bin/python -m phasmplatform.worker +clickhouse-sh: + docker compose exec clickhouse /usr/bin/clickhouse client + redis-sh: docker compose exec -it redis redis-cli diff --git a/config.toml b/config.toml index 9b4bded..a94db12 100644 --- a/config.toml +++ b/config.toml @@ -5,3 +5,10 @@ port = 17859 [controller.redis.read] host = 'localhost' port = 17859 + +[worker.clickhouse.write] +host = 'localhost' +port = 17858 +username = 'default' +password = '' +database = 'phasm_platform' diff --git a/config/clickhouse/.gitignore b/config/clickhouse/.gitignore new file mode 100644 index 0000000..3af0ccb --- /dev/null +++ b/config/clickhouse/.gitignore @@ -0,0 +1 @@ +/data diff --git a/config/clickhouse/schema.sql b/config/clickhouse/schema.sql new file mode 100644 index 0000000..d966b80 --- /dev/null +++ b/config/clickhouse/schema.sql @@ -0,0 +1,17 @@ +CREATE DATABASE IF NOT EXISTS phasm_platform; + +USE phasm_platform; + +CREATE TABLE routing_logs +( + timestamp DateTime64(6, 'UTC'), + from_service String, + from_container String, + from_method String, + to_service String, + to_container String, + to_method String, + result String, +) +ENGINE = MergeTree +ORDER BY timestamp; diff --git a/docker-compose.yaml b/docker-compose.yaml index da2080a..b4c8d1e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -3,3 +3,10 @@ services: image: 'redis:7.0-alpine' ports: - '17859:6379' + + clickhouse: + image: 'clickhouse/clickhouse-server:22.8-alpine' + ports: + - '17858:8123' + volumes: + - './config/clickhouse/data:/var/lib/clickhouse/' diff --git a/phasmplatform/common/config.py b/phasmplatform/common/config.py index f32c3e8..7ecb77d 100644 --- a/phasmplatform/common/config.py +++ b/phasmplatform/common/config.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING, BinaryIO +from clickhouse_connect.driver.httpclient import HttpClient as ClickHouseHttpClient # type: ignore import redis if TYPE_CHECKING: @@ -26,13 +27,27 @@ class ControllerConfig: self.redis_write = redis_write +class WorkerConfig: + __slots__ = ('clickhouse_write', ) + + clickhouse_write: ClickHouseHttpClient + + def __init__( + self, + clickhouse_write: ClickHouseHttpClient, + ) -> None: + self.clickhouse_write = clickhouse_write + + class Config: - __slots__ = ('controller_config', ) + __slots__ = ('controller_config', 'worker_config') controller_config: ControllerConfig + worker_config: WorkerConfig - def __init__(self, controller_config: ControllerConfig) -> None: + def __init__(self, controller_config: ControllerConfig, worker_config: WorkerConfig) -> None: self.controller_config = controller_config + self.worker_config = worker_config def from_toml(toml: BinaryIO) -> Config: @@ -55,4 +70,15 @@ def from_toml(toml: BinaryIO) -> Config: controller_config = ControllerConfig(redis_read, redis_write) - return Config(controller_config) + clickhouse_write = ClickHouseHttpClient( + interface='http', + host=toml_dict['worker']['clickhouse']['write']['host'], + port=toml_dict['worker']['clickhouse']['write']['port'], + username=toml_dict['worker']['clickhouse']['write']['username'], + password=toml_dict['worker']['clickhouse']['write']['password'], + database=toml_dict['worker']['clickhouse']['write']['database'], + ) + + worker_config = WorkerConfig(clickhouse_write) + + return Config(controller_config, worker_config) diff --git a/phasmplatform/worker/__main__.py b/phasmplatform/worker/__main__.py index 4d8237a..84e1726 100644 --- a/phasmplatform/worker/__main__.py +++ b/phasmplatform/worker/__main__.py @@ -1,16 +1,18 @@ -from typing import Dict, List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Tuple +import datetime +import functools import sys import threading import time from queue import Empty, Queue from phasmplatform.common import valuetype -from phasmplatform.common.config import from_toml as config_from_toml +from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml from phasmplatform.common.method import Method, MethodArgument, MethodCall from phasmplatform.common.router import MethodCallRouterInterface from phasmplatform.common.service import Service, ServiceDiscoveryInterface -from phasmplatform.common.value import NoneValue +from phasmplatform.common.value import Value, NoneValue from phasmplatform.common.state import from_toml as state_from_toml from .runners.base import RunnerInterface @@ -70,32 +72,45 @@ class LocalhostServiceDiscovery(ServiceDiscoveryInterface): class LocalhostMethodCallRouter(MethodCallRouterInterface): - def __init__(self, service_discovery: LocalhostServiceDiscovery) -> None: + def __init__(self, config: WorkerConfig, service_discovery: LocalhostServiceDiscovery) -> None: + self.config = config self.service_discovery = service_discovery def send_call(self, service: Service, call: MethodCall) -> None: + self.config.clickhouse_write.insert('routing_logs', [ + [datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name], + ], ['timestamp', 'to_service', 'to_method']) + + call.on_success = functools.partial(self._send_call_on_succes, service, call, call.on_success) + assert service.name in self.service_discovery.services queue = self.service_discovery.services[service.name][1] - print('send_call', service, call, queue) queue.put(call) + def _send_call_on_succes(self, service: Service, call: MethodCall, orig_on_succes: Callable[[Value], None], value: Value) -> None: + self.config.clickhouse_write.insert('routing_logs', [ + [datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name, repr(value.data)], + ], ['timestamp', 'to_service', 'to_method', 'result']) + orig_on_succes(value) + def main() -> int: with open('config.toml', 'rb') as fil: config = config_from_toml(fil) - del config - with open('./examples/echoserver.toml', 'rb') as fil: state = state_from_toml(fil) - print(state) + + del state + + # TODO: Replace the stuff below with the loading from the example state localhost_queue: Queue[MethodCall] = Queue() echo_client_queue: Queue[MethodCall] = Queue() echo_server_queue: Queue[MethodCall] = Queue() service_discovery = LocalhostServiceDiscovery() - method_call_router = LocalhostMethodCallRouter(service_discovery) + method_call_router = LocalhostMethodCallRouter(config.worker_config, service_discovery) localhost = LocalhostRunner() service_discovery.register_service(make_prelude(), localhost_queue) diff --git a/phasmplatform/worker/runners/wasmtime.py b/phasmplatform/worker/runners/wasmtime.py index 96ca38e..79db99d 100644 --- a/phasmplatform/worker/runners/wasmtime.py +++ b/phasmplatform/worker/runners/wasmtime.py @@ -126,23 +126,11 @@ class WasmTimeRunner(BaseRunner): call = MethodCall(method, call_args, on_success, on_error) - print( - 'send_service_call', - 'from-service=?', 'from-method=?', # TODO - f'to-service={service.name}', f'to-method={method.name}', - *args, - ) self.method_call_router.send_call(service, call) try: value = queue.get(block=True, timeout=10) except Empty: - print( - 'send_service_call', - 'from-service=?', 'from-method=?', # TODO - f'to-service={service.name}', f'to-method={method.name}', - 'TIMEOUT', - ) raise Exception() # TODO return self.value_to_wasm(value) diff --git a/requirements.txt b/requirements.txt index ce77090..43bcdff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +clickhouse-connect==0.5.20 +numpy==1.24.2 pywasm3==0.5.0 redis==4.5.4 tomli==2.0.1 ; python_version < '3.11'