Johan B.W. de Vries 5e1c5679e5 prelude.log_bytes is now on the right thread
Before, all messages would be logged on the prelude
container / thread.

routing log improvements
2023-04-14 19:09:20 +02:00

272 lines
8.7 KiB
Python

from typing import Dict, List, Optional, Type, Union
import datetime
import sys
import random
import threading
import time
import uuid
from queue import Empty, Queue
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml
from phasmplatform.common.image import Image
from phasmplatform.common.method import Method
from phasmplatform.common.methodcall import MethodCall, RoutedMethodCall, MethodResultCallable
from phasmplatform.common.methodcall import (
MethodCallExceptionError, MethodNotFoundError, MethodTypeMismatchError, ServiceNotFoundError, ServiceUnavailableError
)
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import ContainerMatch, Service
from phasmplatform.common.value import Value
from phasmplatform.common.state import from_toml as state_from_toml
from .runners.base import RunnerInterface
from .runners.prelude import PreludeRunner
from .runners.wasmtime import WasmTimeRunner
RUNTIME_MAP: Dict[str, Type[RunnerInterface]] = {
'prelude': PreludeRunner,
'wasmtime': WasmTimeRunner,
}
def log_now() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)
class ShutDownCommand():
pass
MethodCallQueue = Queue[Union[RoutedMethodCall, ShutDownCommand]]
class ManagedContainer:
__slots__ = ('config', 'method_call_router', 'container', 'thread', 'queue', )
config: WorkerConfig
method_call_router: MethodCallRouterInterface
container: Container
thread: threading.Thread
queue: MethodCallQueue
def __init__(self, config: WorkerConfig, method_call_router: MethodCallRouterInterface, container: Container) -> None:
self.config = config
self.method_call_router = method_call_router
self.container = container
self.thread = threading.Thread(target=container_thread, args=(self, ))
self.queue = Queue()
def __repr__(self) -> str:
return f'ManagedContainer(..., ..., {repr(self.container)}, ..., ...)'
def container_thread(mcont: ManagedContainer) -> None:
clickhouse_session_id = str(uuid.uuid1())
clickhouse_write = mcont.config.clickhouse_write
def container_log(msg: str) -> None:
clickhouse_write.insert(
table='container_logs',
data=[
[log_now(), mcont.container.name, str(id(mcont.thread)), msg],
],
column_names=['timestamp', 'name', 'thread_id', 'msg'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
def routing_log(from_container: str, to_service: Optional[str], to_method: str, result: str) -> None:
clickhouse_write.insert(
table='routing_logs',
data=[
[log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_service or '', to_method, result],
],
column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_service', 'to_method', 'result'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
container_log(f'Creating runtime for {mcont.container.image.path}')
cls = RUNTIME_MAP.get(mcont.container.runtime)
assert cls is not None, f'Unknown runtime: {mcont.container.runtime}'
runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container, container_log)
container_log('Starting thread')
while True:
try:
call = mcont.queue.get(block=True, timeout=1)
except Empty:
continue
if isinstance(call, ShutDownCommand):
break
try:
result = runtime.do_call(call.call)
except Exception as ex:
raise ex
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.call.method.name,
repr(ex),
)
call.on_result(MethodCallExceptionError(str(ex)))
continue
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.to_service,
call.call.method.name,
repr(result.data) if isinstance(result, Value) else repr(result)
)
call.on_result(result)
container_log('Stopping thread')
class LocalhostMethodCallRouter(MethodCallRouterInterface):
__slots__ = ('services', 'container_by_service')
services: Dict[str, Service]
container_by_service: Dict[str, List[ManagedContainer]]
def __init__(self) -> None:
self.services = {}
self.container_by_service = {}
def route_call(
self,
service_name: str,
call: MethodCall,
from_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
service = self.services.get(service_name)
if service is None:
on_result(ServiceNotFoundError(service_name))
return
method = service.methods.get(call.method.name)
if method is None:
on_result(MethodNotFoundError(f'{service_name}.{call.method.name}'))
return
if method != call.method:
on_result(MethodTypeMismatchError())
return
to_mcont = self.find_one_container(service)
if to_mcont is None:
on_result(ServiceUnavailableError(service_name))
return
to_mcont.queue.put(RoutedMethodCall(
call,
from_container,
service.name,
to_mcont.container,
on_result,
))
def register_service(self, service: Service) -> None:
assert service.name not in self.services
self.services[service.name] = service
def register_container(self, mcont: ManagedContainer) -> None:
for service in self.services.values():
if service.container_match.by_name == mcont.container.name:
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
if service.container_match.by_name[-1] == '*' and mcont.container.name.startswith(service.container_match.by_name[:-1]):
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
def find_one_container(self, service: Service) -> Optional[ManagedContainer]:
container_list = self.container_by_service.get(service.name)
if not container_list:
return None
return random.choice(container_list)
def make_prelude() -> Service:
methods: List[Method] = []
methods.append(Method('log_bytes', [valuetype.bytes], valuetype.none))
return Service('prelude', ContainerMatch('__prelude__'), methods)
def main() -> int:
with open('config.toml', 'rb') as fil:
config = config_from_toml(fil)
with open('./examples/echoapp.toml', 'rb') as fil:
state = state_from_toml(fil)
method_call_router = LocalhostMethodCallRouter()
print('Registering services')
method_call_router.register_service(make_prelude())
for service in state.services:
method_call_router.register_service(service)
container_list: List[ManagedContainer] = []
prelude_container = Container(
'__prelude__',
Image('prelude', '', []),
'prelude',
)
for cont in [prelude_container] + state.containers:
mcont = ManagedContainer(config.worker_config, method_call_router, cont)
container_list.append(mcont)
method_call_router.register_container(mcont)
print('Starting containers')
for mcont in container_list:
mcont.thread.start()
print('Sending out on_module_loaded calls') # TODO: Route this normally?
on_module_loaded = MethodCall(
Method('on_module_loaded', [], valuetype.none),
[],
)
for mcont in container_list:
mcont.queue.put(RoutedMethodCall(on_module_loaded, None, None, mcont.container, lambda x: None))
try:
while 1:
time.sleep(1)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, shutting down')
pass
shut_down_command = ShutDownCommand()
for mcont in container_list:
mcont.queue.put(shut_down_command)
print('Awaiting containers')
for mcont in container_list:
mcont.thread.join()
return 0
if __name__ == '__main__':
sys.exit(main())