Johan B.W. de Vries 0d0af0e728 clickhouse logging
2023-04-14 14:45:34 +02:00

158 lines
5.4 KiB
Python

from typing import Callable, Dict, List, Optional, Tuple
import datetime
import functools
import sys
import threading
import time
from queue import Empty, Queue
from phasmplatform.common import valuetype
from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml
from phasmplatform.common.method import Method, MethodArgument, MethodCall
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import Service, ServiceDiscoveryInterface
from phasmplatform.common.value import Value, NoneValue
from phasmplatform.common.state import from_toml as state_from_toml
from .runners.base import RunnerInterface
from .runners.wasmtime import WasmTimeRunner
def runner_thread(runner: RunnerInterface, queue: Queue[MethodCall]) -> None:
while True:
try:
call = queue.get(block=True, timeout=1)
except Empty:
break
runner.do_call(call)
def make_prelude() -> Service:
methods: List[Method] = []
methods.append(Method('log_bytes', [
MethodArgument('data', valuetype.bytes)
], valuetype.none))
return Service('prelude', methods)
class LocalhostRunner(RunnerInterface):
def do_call(self, call: MethodCall) -> None:
if call.method.name == 'on_module_loaded':
print('LocalhostRunner loaded')
call.on_success(NoneValue)
return
if call.method.name == 'log_bytes':
print('LOG-BYTES:', repr(call.args[0].data))
call.on_success(NoneValue)
return
raise NotImplementedError(call)
class LocalhostServiceDiscovery(ServiceDiscoveryInterface):
services: Dict[str, Tuple[Service, Queue[MethodCall]]]
def __init__(self) -> None:
self.services = {}
def register_service(self, service: Service, queue: Queue[MethodCall]) -> None:
self.services[service.name] = (service, queue, )
def find_service(self, name: str) -> Optional[Service]:
parts = self.services.get(name, None)
if parts is None:
return None
return parts[0]
class LocalhostMethodCallRouter(MethodCallRouterInterface):
def __init__(self, config: WorkerConfig, service_discovery: LocalhostServiceDiscovery) -> None:
self.config = config
self.service_discovery = service_discovery
def send_call(self, service: Service, call: MethodCall) -> None:
self.config.clickhouse_write.insert('routing_logs', [
[datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name],
], ['timestamp', 'to_service', 'to_method'])
call.on_success = functools.partial(self._send_call_on_succes, service, call, call.on_success)
assert service.name in self.service_discovery.services
queue = self.service_discovery.services[service.name][1]
queue.put(call)
def _send_call_on_succes(self, service: Service, call: MethodCall, orig_on_succes: Callable[[Value], None], value: Value) -> None:
self.config.clickhouse_write.insert('routing_logs', [
[datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name, repr(value.data)],
], ['timestamp', 'to_service', 'to_method', 'result'])
orig_on_succes(value)
def main() -> int:
with open('config.toml', 'rb') as fil:
config = config_from_toml(fil)
with open('./examples/echoserver.toml', 'rb') as fil:
state = state_from_toml(fil)
del state
# TODO: Replace the stuff below with the loading from the example state
localhost_queue: Queue[MethodCall] = Queue()
echo_client_queue: Queue[MethodCall] = Queue()
echo_server_queue: Queue[MethodCall] = Queue()
service_discovery = LocalhostServiceDiscovery()
method_call_router = LocalhostMethodCallRouter(config.worker_config, service_discovery)
localhost = LocalhostRunner()
service_discovery.register_service(make_prelude(), localhost_queue)
with open('./examples/echoserver.wasm', 'rb') as fil:
echo_server = WasmTimeRunner(service_discovery, method_call_router, fil.read())
service_discovery.register_service(Service('echoserver', [
Method('echo', [
MethodArgument('msg', valuetype.bytes)
], valuetype.bytes)
]), echo_server_queue)
with open('./examples/echoclient.wasm', 'rb') as fil:
echo_client = WasmTimeRunner(service_discovery, method_call_router, fil.read())
# service_discovery.register_service(echo_client, echo_client_queue)
# service_discovery.register_service(echo_server, echo_server_queue)
on_module_loaded = MethodCall(
Method('on_module_loaded', [], valuetype.none),
[],
lambda x: None,
lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it
)
localhost_queue.put(on_module_loaded)
echo_client_queue.put(on_module_loaded)
echo_server_queue.put(on_module_loaded)
localhost_thread = threading.Thread(target=runner_thread, args=(localhost, localhost_queue))
echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue))
echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue))
localhost_thread.start()
echo_client_thread.start()
echo_server_thread.start()
time.sleep(3)
return 0
if __name__ == '__main__':
sys.exit(main())