272 lines
8.7 KiB
Python
272 lines
8.7 KiB
Python
from typing import Dict, List, Optional, Type, Union
|
|
|
|
import datetime
|
|
import sys
|
|
import random
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from queue import Empty, Queue
|
|
|
|
from phasmplatform.common import valuetype
|
|
from phasmplatform.common.container import Container
|
|
from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml
|
|
from phasmplatform.common.image import Image
|
|
from phasmplatform.common.method import Method
|
|
from phasmplatform.common.methodcall import MethodCall, RoutedMethodCall, MethodResultCallable
|
|
from phasmplatform.common.methodcall import (
|
|
MethodCallExceptionError, MethodNotFoundError, MethodTypeMismatchError, ServiceNotFoundError, ServiceUnavailableError
|
|
)
|
|
from phasmplatform.common.router import MethodCallRouterInterface
|
|
from phasmplatform.common.service import ContainerMatch, Service
|
|
from phasmplatform.common.value import Value
|
|
from phasmplatform.common.state import from_toml as state_from_toml
|
|
|
|
from .runners.base import RunnerInterface
|
|
from .runners.prelude import PreludeRunner
|
|
from .runners.wasmtime import WasmTimeRunner
|
|
|
|
|
|
RUNTIME_MAP: Dict[str, Type[RunnerInterface]] = {
|
|
'prelude': PreludeRunner,
|
|
'wasmtime': WasmTimeRunner,
|
|
}
|
|
|
|
|
|
def log_now() -> datetime.datetime:
|
|
return datetime.datetime.now(tz=datetime.timezone.utc)
|
|
|
|
|
|
class ShutDownCommand():
|
|
pass
|
|
|
|
|
|
MethodCallQueue = Queue[Union[RoutedMethodCall, ShutDownCommand]]
|
|
|
|
|
|
class ManagedContainer:
|
|
__slots__ = ('config', 'method_call_router', 'container', 'thread', 'queue', )
|
|
|
|
config: WorkerConfig
|
|
method_call_router: MethodCallRouterInterface
|
|
container: Container
|
|
thread: threading.Thread
|
|
queue: MethodCallQueue
|
|
|
|
def __init__(self, config: WorkerConfig, method_call_router: MethodCallRouterInterface, container: Container) -> None:
|
|
self.config = config
|
|
self.method_call_router = method_call_router
|
|
self.container = container
|
|
self.thread = threading.Thread(target=container_thread, args=(self, ))
|
|
self.queue = Queue()
|
|
|
|
def __repr__(self) -> str:
|
|
return f'ManagedContainer(..., ..., {repr(self.container)}, ..., ...)'
|
|
|
|
|
|
def container_thread(mcont: ManagedContainer) -> None:
|
|
clickhouse_session_id = str(uuid.uuid1())
|
|
clickhouse_write = mcont.config.clickhouse_write
|
|
|
|
def container_log(msg: str) -> None:
|
|
clickhouse_write.insert(
|
|
table='container_logs',
|
|
data=[
|
|
[log_now(), mcont.container.name, str(id(mcont.thread)), msg],
|
|
],
|
|
column_names=['timestamp', 'name', 'thread_id', 'msg'],
|
|
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'],
|
|
settings={'session_id': clickhouse_session_id}
|
|
)
|
|
|
|
def routing_log(from_container: str, to_service: Optional[str], to_method: str, result: str) -> None:
|
|
clickhouse_write.insert(
|
|
table='routing_logs',
|
|
data=[
|
|
[log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_service or '', to_method, result],
|
|
],
|
|
column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_service', 'to_method', 'result'],
|
|
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String', 'String'],
|
|
settings={'session_id': clickhouse_session_id}
|
|
)
|
|
|
|
container_log(f'Creating runtime for {mcont.container.image.path}')
|
|
|
|
cls = RUNTIME_MAP.get(mcont.container.runtime)
|
|
assert cls is not None, f'Unknown runtime: {mcont.container.runtime}'
|
|
runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container, container_log)
|
|
|
|
container_log('Starting thread')
|
|
|
|
while True:
|
|
try:
|
|
call = mcont.queue.get(block=True, timeout=1)
|
|
except Empty:
|
|
continue
|
|
|
|
if isinstance(call, ShutDownCommand):
|
|
break
|
|
|
|
try:
|
|
result = runtime.do_call(call.call)
|
|
except Exception as ex:
|
|
raise ex
|
|
|
|
routing_log(
|
|
call.from_container.name if call.from_container is not None else '[SYSTEM]',
|
|
call.call.method.name,
|
|
repr(ex),
|
|
)
|
|
call.on_result(MethodCallExceptionError(str(ex)))
|
|
continue
|
|
|
|
routing_log(
|
|
call.from_container.name if call.from_container is not None else '[SYSTEM]',
|
|
call.to_service,
|
|
call.call.method.name,
|
|
repr(result.data) if isinstance(result, Value) else repr(result)
|
|
)
|
|
call.on_result(result)
|
|
|
|
container_log('Stopping thread')
|
|
|
|
|
|
class LocalhostMethodCallRouter(MethodCallRouterInterface):
|
|
__slots__ = ('services', 'container_by_service')
|
|
|
|
services: Dict[str, Service]
|
|
container_by_service: Dict[str, List[ManagedContainer]]
|
|
|
|
def __init__(self) -> None:
|
|
self.services = {}
|
|
self.container_by_service = {}
|
|
|
|
def route_call(
|
|
self,
|
|
service_name: str,
|
|
call: MethodCall,
|
|
from_container: Optional[Container],
|
|
on_result: MethodResultCallable,
|
|
) -> None:
|
|
service = self.services.get(service_name)
|
|
if service is None:
|
|
on_result(ServiceNotFoundError(service_name))
|
|
return
|
|
|
|
method = service.methods.get(call.method.name)
|
|
if method is None:
|
|
on_result(MethodNotFoundError(f'{service_name}.{call.method.name}'))
|
|
return
|
|
|
|
if method != call.method:
|
|
on_result(MethodTypeMismatchError())
|
|
return
|
|
|
|
to_mcont = self.find_one_container(service)
|
|
if to_mcont is None:
|
|
on_result(ServiceUnavailableError(service_name))
|
|
return
|
|
|
|
to_mcont.queue.put(RoutedMethodCall(
|
|
call,
|
|
from_container,
|
|
service.name,
|
|
to_mcont.container,
|
|
on_result,
|
|
))
|
|
|
|
def register_service(self, service: Service) -> None:
|
|
assert service.name not in self.services
|
|
|
|
self.services[service.name] = service
|
|
|
|
def register_container(self, mcont: ManagedContainer) -> None:
|
|
for service in self.services.values():
|
|
if service.container_match.by_name == mcont.container.name:
|
|
self.container_by_service.setdefault(service.name, [])
|
|
self.container_by_service[service.name].append(mcont)
|
|
continue
|
|
|
|
if service.container_match.by_name[-1] == '*' and mcont.container.name.startswith(service.container_match.by_name[:-1]):
|
|
self.container_by_service.setdefault(service.name, [])
|
|
self.container_by_service[service.name].append(mcont)
|
|
continue
|
|
|
|
def find_one_container(self, service: Service) -> Optional[ManagedContainer]:
|
|
container_list = self.container_by_service.get(service.name)
|
|
if not container_list:
|
|
return None
|
|
|
|
return random.choice(container_list)
|
|
|
|
|
|
def make_prelude() -> Service:
|
|
methods: List[Method] = [
|
|
Method('sleep', [valuetype.u32], valuetype.none),
|
|
]
|
|
|
|
return Service('prelude', ContainerMatch('__prelude__'), methods)
|
|
|
|
|
|
def main() -> int:
|
|
with open('config.toml', 'rb') as fil:
|
|
config = config_from_toml(fil)
|
|
|
|
with open('./examples/echoapp.toml', 'rb') as fil:
|
|
state = state_from_toml(fil)
|
|
|
|
method_call_router = LocalhostMethodCallRouter()
|
|
|
|
print('Registering services')
|
|
method_call_router.register_service(make_prelude())
|
|
|
|
for service in state.services:
|
|
method_call_router.register_service(service)
|
|
|
|
container_list: List[ManagedContainer] = []
|
|
|
|
prelude_container = Container(
|
|
'__prelude__',
|
|
Image('prelude', '', []),
|
|
'prelude',
|
|
)
|
|
|
|
for cont in [prelude_container] + state.containers:
|
|
mcont = ManagedContainer(config.worker_config, method_call_router, cont)
|
|
container_list.append(mcont)
|
|
method_call_router.register_container(mcont)
|
|
|
|
print('Starting containers')
|
|
for mcont in container_list:
|
|
mcont.thread.start()
|
|
|
|
print('Sending out on_module_loaded calls') # TODO: Route this normally?
|
|
on_module_loaded = MethodCall(
|
|
Method('on_module_loaded', [], valuetype.none),
|
|
[],
|
|
)
|
|
|
|
for mcont in container_list:
|
|
mcont.queue.put(RoutedMethodCall(on_module_loaded, None, None, mcont.container, lambda x: None))
|
|
|
|
try:
|
|
while 1:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print('Caught KeyboardInterrupt, shutting down')
|
|
pass
|
|
|
|
shut_down_command = ShutDownCommand()
|
|
for mcont in container_list:
|
|
mcont.queue.put(shut_down_command)
|
|
|
|
print('Awaiting containers')
|
|
for mcont in container_list:
|
|
mcont.thread.join()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|