from typing import Dict, List, Optional, Type, Union import datetime import sys import random import threading import time import uuid from queue import Empty, Queue from phasmplatform.common import valuetype from phasmplatform.common.container import Container from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml from phasmplatform.common.image import Image from phasmplatform.common.method import Method from phasmplatform.common.methodcall import MethodCall, RoutedMethodCall, MethodResultCallable from phasmplatform.common.methodcall import ( MethodCallExceptionError, MethodNotFoundError, MethodTypeMismatchError, ServiceNotFoundError, ServiceUnavailableError ) from phasmplatform.common.router import MethodCallRouterInterface from phasmplatform.common.service import ContainerMatch, Service from phasmplatform.common.value import Value from phasmplatform.common.state import from_toml as state_from_toml from .runners.base import RunnerInterface from .runners.prelude import PreludeRunner from .runners.wasmtime import WasmTimeRunner RUNTIME_MAP: Dict[str, Type[RunnerInterface]] = { 'prelude': PreludeRunner, 'wasmtime': WasmTimeRunner, } def log_now() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc) class ShutDownCommand(): pass MethodCallQueue = Queue[Union[RoutedMethodCall, ShutDownCommand]] class ManagedContainer: __slots__ = ('config', 'method_call_router', 'container', 'thread', 'queue', ) config: WorkerConfig method_call_router: MethodCallRouterInterface container: Container thread: threading.Thread queue: MethodCallQueue def __init__(self, config: WorkerConfig, method_call_router: MethodCallRouterInterface, container: Container) -> None: self.config = config self.method_call_router = method_call_router self.container = container self.thread = threading.Thread(target=container_thread, args=(self, )) self.queue = Queue() def __repr__(self) -> str: return f'ManagedContainer(..., ..., {repr(self.container)}, ..., ...)' def container_thread(mcont: ManagedContainer) -> None: clickhouse_session_id = str(uuid.uuid1()) clickhouse_write = mcont.config.clickhouse_write def container_log(msg: str) -> None: clickhouse_write.insert( table='container_logs', data=[ [log_now(), mcont.container.name, str(id(mcont.thread)), msg], ], column_names=['timestamp', 'name', 'thread_id', 'msg'], column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'], settings={'session_id': clickhouse_session_id} ) def routing_log(from_container: str, to_method: str, result: str) -> None: clickhouse_write.insert( table='routing_logs', data=[ [log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_method, result], ], column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_method', 'result'], column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String'], settings={'session_id': clickhouse_session_id} ) container_log(f'Creating runtime for {mcont.container.image.path}') cls = RUNTIME_MAP.get(mcont.container.runtime) assert cls is not None, f'Unknown runtime: {mcont.container.runtime}' runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container) if isinstance(runtime, PreludeRunner): runtime.set_config(mcont.config, container_log) container_log('Starting thread') while True: try: call = mcont.queue.get(block=True, timeout=1) except Empty: continue if isinstance(call, ShutDownCommand): break try: result = runtime.do_call(call.call) except Exception as ex: raise ex routing_log( call.from_container.name if call.from_container is not None else '[SYSTEM]', call.call.method.name, repr(ex), ) call.on_result(MethodCallExceptionError(str(ex))) continue routing_log( call.from_container.name if call.from_container is not None else '[SYSTEM]', call.call.method.name, repr(result.data) if isinstance(result, Value) else repr(result) ) call.on_result(result) container_log('Stopping thread') class LocalhostMethodCallRouter(MethodCallRouterInterface): __slots__ = ('services', 'container_by_service') services: Dict[str, Service] container_by_service: Dict[str, List[ManagedContainer]] def __init__(self) -> None: self.services = {} self.container_by_service = {} def route_call( self, service_name: str, call: MethodCall, from_container: Optional[Container], on_result: MethodResultCallable, ) -> None: service = self.services.get(service_name) if service is None: on_result(ServiceNotFoundError(service_name)) return method = service.methods.get(call.method.name) if method is None: on_result(MethodNotFoundError(f'{service_name}.{call.method.name}')) return if method != call.method: on_result(MethodTypeMismatchError()) return to_mcont = self.find_one_container(service) if to_mcont is None: on_result(ServiceUnavailableError(service_name)) return to_mcont.queue.put(RoutedMethodCall( call, from_container, to_mcont.container, on_result, )) def register_service(self, service: Service) -> None: assert service.name not in self.services self.services[service.name] = service def register_container(self, mcont: ManagedContainer) -> None: for service in self.services.values(): if service.container_match.by_name == mcont.container.name: self.container_by_service.setdefault(service.name, []) self.container_by_service[service.name].append(mcont) continue if service.container_match.by_name[-1] == '*' and mcont.container.name.startswith(service.container_match.by_name[:-1]): self.container_by_service.setdefault(service.name, []) self.container_by_service[service.name].append(mcont) continue def find_one_container(self, service: Service) -> Optional[ManagedContainer]: container_list = self.container_by_service.get(service.name) if not container_list: return None return random.choice(container_list) def make_prelude() -> Service: methods: List[Method] = [] methods.append(Method('log_bytes', [valuetype.bytes], valuetype.none)) return Service('prelude', ContainerMatch('__prelude__'), methods) def main() -> int: with open('config.toml', 'rb') as fil: config = config_from_toml(fil) with open('./examples/echoapp.toml', 'rb') as fil: state = state_from_toml(fil) method_call_router = LocalhostMethodCallRouter() print('Registering services') method_call_router.register_service(make_prelude()) for service in state.services: method_call_router.register_service(service) container_list: List[ManagedContainer] = [] prelude_container = Container( '__prelude__', Image('prelude', '', []), 'prelude', ) for cont in [prelude_container] + state.containers: mcont = ManagedContainer(config.worker_config, method_call_router, cont) container_list.append(mcont) method_call_router.register_container(mcont) print('Starting containers') for mcont in container_list: mcont.thread.start() print('Sending out on_module_loaded calls') # TODO: Route this normally? on_module_loaded = MethodCall( Method('on_module_loaded', [], valuetype.none), [], ) for mcont in container_list: mcont.queue.put(RoutedMethodCall(on_module_loaded, None, mcont.container, lambda x: None)) try: while 1: time.sleep(1) except KeyboardInterrupt: print('Caught KeyboardInterrupt, shutting down') pass shut_down_command = ShutDownCommand() for mcont in container_list: mcont.queue.put(shut_down_command) print('Awaiting containers') for mcont in container_list: mcont.thread.join() return 0 if __name__ == '__main__': sys.exit(main())