Loading state from toml, redo routing, main cleanup

This commit is contained in:
Johan B.W. de Vries 2023-04-14 18:42:45 +02:00
parent b06a604fd4
commit bbf1514035
18 changed files with 607 additions and 257 deletions

View File

@ -1,9 +1,15 @@
run-controller: sast
venv/bin/python -m phasmplatform.controller
run-worker: sast
run-worker: sast examples/echoapp.toml
venv/bin/python -m phasmplatform.worker
examples/echoapp.toml: examples/echoserver.toml examples/echoclient.toml
echo '# echoserver.toml' > $@
cat examples/echoserver.toml >> $@
echo '# echoclient.toml' >> $@
cat examples/echoclient.toml >> $@
clickhouse-sh:
docker compose exec clickhouse /usr/bin/clickhouse client

View File

@ -1,11 +1,11 @@
CREATE DATABASE IF NOT EXISTS phasm_platform;
USE phasm_platform;
CREATE TABLE routing_logs
(
timestamp DateTime64(6, 'UTC'),
from_service String,
thread_id String,
from_container String,
from_method String,
to_service String,
@ -15,3 +15,14 @@ CREATE TABLE routing_logs
)
ENGINE = MergeTree
ORDER BY timestamp;
CREATE TABLE container_logs
(
timestamp DateTime64(6, 'UTC'),
name String,
thread_id String,
msg String,
)
ENGINE = MergeTree
ORDER BY timestamp;

2
examples/.gitignore vendored
View File

@ -1,2 +1,4 @@
/*.wasm
/*.wat
/echoapp.toml

View File

@ -5,6 +5,11 @@ kind = "Image"
path = "examples/echoclient.wasm"
hash = "sha256@84cb22d12dfdd6b05cb906f6db83d59f473c9df85a33822f696344af2b92b502"
imports = [
{ service = "prelude", method = "log_bytes", arg_types = ["bytes"], return_type = "none"},
{ service = "echoserver", method = "echo", arg_types = ["bytes"], return_type = "bytes"},
]
[echoclient-container]
apiVersion = "v0"
kind = "Container"

View File

@ -1,3 +1,11 @@
@exported
def echo(msg: bytes) -> bytes:
return msg
@imported('prelude')
def log_bytes(data: bytes) -> None:
pass
@exported
def on_module_loaded() -> None:
log_bytes(b'on_module_loaded')

View File

@ -5,6 +5,10 @@ kind = "Image"
path = "examples/echoserver.wasm"
hash = "sha256@dfe03b4f7ce5e921931f8715384e35a6776fdc28837e42ffa04305bbadffcfc9"
imports = [
{ service = "prelude", method = "log_bytes", arg_types = ["bytes"], return_type = "none"}
]
[echoserver-container-0]
apiVersion = "v0"
kind = "Container"
@ -19,16 +23,15 @@ kind = "Container"
image = "echoserver-image"
runtime = "wasmtime"
[echoserver-service]
apiVersion = "v0"
kind = "Service"
name = "echoserver"
[echoserver-service.container]
[echoserver-service.containerMatch]
byName = "echoserver-container-*"
[[echoserver-service.methods]]
name = "echo"
returns = "none"
args = [ {name = "msg", type = "bytes"} ]
arg_types = [ "bytes" ]
return_type = "bytes"

View File

@ -2,14 +2,16 @@ from .image import Image
class Container:
__slots__ = ('image', 'runtime', )
__slots__ = ('name', 'image', 'runtime', )
name: str
image: Image
runtime: str
def __init__(self, image: Image, runtime: str) -> None:
def __init__(self, name: str, image: Image, runtime: str) -> None:
self.name = name
self.image = image
self.runtime = runtime
def __repr__(self) -> str:
return f'Container({repr(self.image)}, {repr(self.runtime)})'
return f'Container({repr(self.name)}, {repr(self.image)}, {repr(self.runtime)})'

View File

@ -1,18 +1,27 @@
from typing import Optional
from typing import List, Optional, Tuple
from .method import Method
class Image:
__slots__ = ('path', 'hash', )
__slots__ = ('path', 'hash', 'imports', )
path: Optional[str]
hash: str
imports: List[Tuple[str, Method]]
def __init__(self, path: Optional[str], hash: str) -> None:
def __init__(
self,
path: Optional[str],
hash: str,
imports: List[Tuple[str, Method]],
) -> None:
self.path = path
self.hash = hash
self.imports = imports
def __repr__(self) -> str:
return f'Image({repr(self.path)}, {repr(self.hash)})'
return f'Image({repr(self.path)}, {repr(self.hash)}, {repr(self.imports)})'
class ImageReference(Image):

View File

@ -1,73 +1,29 @@
from typing import Any, Callable, List
from typing import Any, List
from .value import Value
from .valuetype import ValueType
class MethodArgument:
__slots__ = ('name', 'value_type', )
name: str
value_type: ValueType
def __init__(self, name: str, value_type: ValueType) -> None:
self.name = name
self.value_type = value_type
def __eq__(self, other: Any) -> bool:
if not isinstance(other, MethodArgument):
raise NotImplementedError
return self.name == other.name and self.value_type == other.value_type
def __repr__(self) -> str:
return f'MethodArgument({repr(self.name)}, {repr(self.value_type)})'
class Method:
__slots__ = ('name', 'args', 'return_type', )
__slots__ = ('name', 'arg_types', 'return_type', )
name: str
args: List[MethodArgument]
arg_types: List[ValueType]
return_type: ValueType
def __init__(self, name: str, args: List[MethodArgument], return_type: ValueType) -> None:
def __init__(self, name: str, arg_types: List[ValueType], return_type: ValueType) -> None:
self.name = name
self.args = args
self.arg_types = arg_types
self.return_type = return_type
def __repr__(self) -> str:
return f'Method({repr(self.name)}, {repr(self.args)}, {repr(self.return_type)})'
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Method):
raise NotImplementedError
class MethodCallError:
pass
class MethodNotFoundError(MethodCallError):
pass
class MethodCall:
__slots__ = ('method', 'args', 'on_success', 'on_error', )
method: Method
args: List[Value]
on_success: Callable[[Value], None]
on_error: Callable[[MethodCallError], None]
def __init__(
self,
method: Method,
args: List[Value],
on_success: Callable[[Value], None],
on_error: Callable[[MethodCallError], None],
) -> None:
self.method = method
self.args = args
self.on_success = on_success
self.on_error = on_error
return (
self.name == other.name
and self.arg_types == other.arg_types
and self.return_type == other.return_type
)
def __repr__(self) -> str:
return f'MethodCall({repr(self.method)}, {repr(self.args)}, {repr(self.on_success)}, {repr(self.on_error)})'
return f'Method({repr(self.name)}, {repr(self.arg_types)}, {repr(self.return_type)})'

View File

@ -0,0 +1,117 @@
from typing import Callable, List, Optional, Union
from .container import Container
from .method import Method
from .value import Value
class MethodCallError:
__slots__ = ('msg', )
msg: Optional[str]
def __init__(self, msg: Optional[str] = None) -> None:
self.msg = msg
def __repr__(self) -> str:
return f'{self.__class__.__name__}({repr(self.msg)})'
class ServiceNotFoundError(MethodCallError):
"""
The service was not found
You have an `@imported('service_name')` somewhere,
but there is no `service_name` deployed to the platform.
"""
class MethodNotFoundError(MethodCallError):
"""
The method was not found
You have an `@imported('service_name')` somewhere,
but while the `service_name` is deployed to the platform,
it does not provide the given function.
"""
class MethodTypeMismatchError(MethodCallError):
"""
The method's type did not match
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
its type does not match what you defined in your import.
"""
class ServiceUnavailableError(MethodCallError):
"""
The service was not available
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
and its type matches what you defined in your import,
there is currently no container running providing said service.
"""
class MethodCallExceptionError(MethodCallError):
"""
The service was not available
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
and its type matches what you defined in your import,
and there is currently a container running providing said service,
when it tried to run the call, an exeption ocurred.
"""
class MethodCall:
__slots__ = ('method', 'args', )
method: Method
args: List[Value]
def __init__(
self,
method: Method,
args: List[Value],
) -> None:
self.method = method
self.args = args
def __repr__(self) -> str:
return f'MethodCall({repr(self.method)}, {repr(self.args)})'
MethodResultCallable = Callable[[Union[Value, MethodCallError]], None]
class RoutedMethodCall:
__slots__ = ('call', 'from_container', 'to_container', 'on_result', )
call: MethodCall
from_container: Optional[Container]
to_container: Optional[Container]
on_result: MethodResultCallable
def __init__(
self,
call: MethodCall,
from_container: Optional[Container],
to_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
self.call = call
self.from_container = from_container
self.to_container = to_container
self.on_result = on_result
def __repr__(self) -> str:
return f'RoutedMethodCall({repr(self.call)}, {repr(self.from_container)}, {repr(self.to_container)}, {repr(self.on_result)})'

View File

@ -1,7 +1,15 @@
from .method import MethodCall
from .service import Service
from typing import Optional
from .container import Container
from .methodcall import MethodCall, MethodResultCallable
class MethodCallRouterInterface:
def send_call(self, service: Service, call: MethodCall) -> None:
raise NotImplementedError
def route_call(
self,
service: str,
call: MethodCall,
from_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
raise NotImplementedError(service, call)

View File

@ -3,21 +3,35 @@ from typing import Dict, Optional, List
from .method import Method
class ContainerMatch:
__slots__ = ('by_name', )
by_name: str
def __init__(self, by_name: str) -> None:
self.by_name = by_name
def __repr__(self) -> str:
return f'ContainerMatch({repr(self.by_name)})'
class Service:
__slots__ = ('name', 'methods', )
__slots__ = ('name', 'container_match', 'methods', )
name: str
container_match: ContainerMatch
methods: Dict[str, Method]
def __init__(self, name: str, methods: List[Method]) -> None:
def __init__(self, name: str, container_match: ContainerMatch, methods: List[Method]) -> None:
self.name = name
self.container_match = container_match
self.methods = {
x.name: x
for x in methods
}
def __repr__(self) -> str:
return f'Service({repr(self.name)}, {repr(list(self.methods.values()))})'
return f'Service({repr(self.name)}, {repr(self.container_match)}, {repr(list(self.methods.values()))})'
class ServiceDiscoveryInterface:

View File

@ -1,9 +1,10 @@
from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List
from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List, Tuple
from . import valuetype
from .container import Container
from .image import Image, ImageReference
from .method import Method
from .service import Service
from .service import ContainerMatch, Service
if TYPE_CHECKING:
@ -36,27 +37,90 @@ class State:
return f'State({repr(self.images)}, {repr(self.containers)}, {repr(self.services)})'
def image_import_from_toml(spec: Dict[str, Any]) -> Tuple[str, Method]:
"""
Loads an Method spec from toml, when it comes in the form of Image.imports
"""
service = spec.pop('service')
method = spec.pop('method')
arg_types = spec.pop('arg_types', [])
return_type = spec.pop('return_type', 'none')
assert not spec, ('Unrecognized values in image.imports', spec)
return (str(service), Method(
str(method),
[
valuetype.LOOKUP_TABLE[x]
for x in arg_types
],
valuetype.LOOKUP_TABLE[return_type],
), )
def image_from_toml(spec: Dict[str, Any]) -> Image:
"""
Loads an Image spec from toml
"""
return Image(str(spec['path']), str(spec['hash']))
imports = [
image_import_from_toml(imp_spec)
for imp_spec in spec.get('imports', [])
]
return Image(str(spec['path']), str(spec['hash']), imports)
def container_from_toml(spec: Dict[str, Any]) -> Container:
def container_from_toml(name: str, spec: Dict[str, Any]) -> Container:
"""
Loads a Container spec from toml
"""
return Container(ImageReference(str(spec['image'])), str(spec['runtime']))
return Container(name, ImageReference(str(spec['image'])), str(spec['runtime']))
def service_container_match_from_toml(spec: Dict[str, Any]) -> ContainerMatch:
"""
Loads a service.ContainerMatch spec from toml
"""
return ContainerMatch(str(spec['byName']))
def method_from_toml(spec: Dict[str, Any]) -> Method:
"""
Loads an Method spec from toml
"""
name = spec.pop('name')
arg_types = spec.pop('arg_types', [])
return_type = spec.pop('return_type', 'none')
assert not spec, ('Unrecognized values in image.imports', spec)
return Method(
str(name),
[
valuetype.LOOKUP_TABLE[x]
for x in arg_types
],
valuetype.LOOKUP_TABLE[return_type],
)
def service_from_toml(spec: Dict[str, Any]) -> Service:
"""
Loads a Service spec from toml
"""
methods: List[Method] = []
spec.pop('apiVersion')
spec.pop('kind')
return Service(str(spec['name']), methods)
name = spec.pop('name')
container_match = spec.pop('containerMatch')
methods = spec.pop('methods', [])
assert not spec, ('Unrecognized values in service', spec)
return Service(str(name), service_container_match_from_toml(container_match), [
method_from_toml(x)
for x in methods
])
def from_toml(toml: BinaryIO) -> State:
@ -83,7 +147,7 @@ def from_toml(toml: BinaryIO) -> State:
continue
if spec['kind'] == 'Container':
containers.append(container_from_toml(spec))
containers.append(container_from_toml(name, spec))
continue
if spec['kind'] == 'Service':

View File

@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Dict
class ValueType:
@ -22,3 +22,8 @@ class ValueType:
bytes = ValueType('bytes')
none = ValueType('none')
LOOKUP_TABLE: Dict[str, ValueType] = {
bytes.name: bytes,
none.name: none,
}

View File

@ -1,169 +1,290 @@
from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Type, Union
import datetime
import functools
import sys
import random
import threading
import time
import uuid
from queue import Empty, Queue
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml
from phasmplatform.common.method import Method, MethodArgument, MethodCall
from phasmplatform.common.image import Image
from phasmplatform.common.method import Method
from phasmplatform.common.methodcall import MethodCall, RoutedMethodCall, MethodResultCallable
from phasmplatform.common.methodcall import (
MethodCallExceptionError, MethodNotFoundError, MethodTypeMismatchError, ServiceNotFoundError, ServiceUnavailableError
)
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import Service, ServiceDiscoveryInterface
from phasmplatform.common.value import Value, NoneValue
from phasmplatform.common.service import ContainerMatch, Service
from phasmplatform.common.value import Value
from phasmplatform.common.state import from_toml as state_from_toml
from .runners.base import RunnerInterface
from .runners.prelude import PreludeRunner
from .runners.wasmtime import WasmTimeRunner
class ShuttingDown():
RUNTIME_MAP: Dict[str, Type[RunnerInterface]] = {
'prelude': PreludeRunner,
'wasmtime': WasmTimeRunner,
}
def log_now() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)
class ShutDownCommand():
pass
def runner_thread(runner: RunnerInterface, queue: Queue[Union[MethodCall, ShuttingDown]]) -> None:
MethodCallQueue = Queue[Union[RoutedMethodCall, ShutDownCommand]]
class ManagedContainer:
__slots__ = ('config', 'method_call_router', 'container', 'thread', 'queue', )
config: WorkerConfig
method_call_router: MethodCallRouterInterface
container: Container
thread: threading.Thread
queue: MethodCallQueue
def __init__(self, config: WorkerConfig, method_call_router: MethodCallRouterInterface, container: Container) -> None:
self.config = config
self.method_call_router = method_call_router
self.container = container
self.thread = threading.Thread(target=container_thread, args=(self, ))
self.queue = Queue()
def __repr__(self) -> str:
return f'ManagedContainer(..., ..., {repr(self.container)}, ..., ...)'
def container_thread(mcont: ManagedContainer) -> None:
clickhouse_session_id = str(uuid.uuid1())
clickhouse_write = mcont.config.clickhouse_write
def container_log(msg: str) -> None:
clickhouse_write.insert(
table='container_logs',
data=[
[log_now(), mcont.container.name, str(id(mcont.thread)), msg],
],
column_names=['timestamp', 'name', 'thread_id', 'msg'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
def routing_log(from_container: str, to_method: str, result: str) -> None:
clickhouse_write.insert(
table='routing_logs',
data=[
[log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_method, result],
],
column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_method', 'result'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
container_log(f'Creating runtime for {mcont.container.image.path}')
cls = RUNTIME_MAP.get(mcont.container.runtime)
assert cls is not None, f'Unknown runtime: {mcont.container.runtime}'
runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container)
if isinstance(runtime, PreludeRunner):
runtime.set_config(mcont.config, container_log)
container_log('Starting thread')
while True:
try:
call = queue.get(block=True, timeout=1)
call = mcont.queue.get(block=True, timeout=1)
except Empty:
continue
if isinstance(call, ShuttingDown):
if isinstance(call, ShutDownCommand):
break
runner.do_call(call)
try:
result = runtime.do_call(call.call)
except Exception as ex:
raise ex
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.call.method.name,
repr(ex),
)
call.on_result(MethodCallExceptionError(str(ex)))
continue
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.call.method.name,
repr(result.data) if isinstance(result, Value) else repr(result)
)
call.on_result(result)
container_log('Stopping thread')
class LocalhostMethodCallRouter(MethodCallRouterInterface):
__slots__ = ('services', 'container_by_service')
services: Dict[str, Service]
container_by_service: Dict[str, List[ManagedContainer]]
def __init__(self) -> None:
self.services = {}
self.container_by_service = {}
def route_call(
self,
service_name: str,
call: MethodCall,
from_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
service = self.services.get(service_name)
if service is None:
on_result(ServiceNotFoundError(service_name))
return
method = service.methods.get(call.method.name)
if method is None:
on_result(MethodNotFoundError(f'{service_name}.{call.method.name}'))
return
if method != call.method:
on_result(MethodTypeMismatchError())
return
to_mcont = self.find_one_container(service)
if to_mcont is None:
on_result(ServiceUnavailableError(service_name))
return
to_mcont.queue.put(RoutedMethodCall(
call,
from_container,
to_mcont.container,
on_result,
))
def register_service(self, service: Service) -> None:
assert service.name not in self.services
self.services[service.name] = service
def register_container(self, mcont: ManagedContainer) -> None:
for service in self.services.values():
if service.container_match.by_name == mcont.container.name:
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
if service.container_match.by_name[-1] == '*' and mcont.container.name.startswith(service.container_match.by_name[:-1]):
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
def find_one_container(self, service: Service) -> Optional[ManagedContainer]:
container_list = self.container_by_service.get(service.name)
if not container_list:
return None
return random.choice(container_list)
# def __init__(self, config: WorkerConfig, service_discovery: LocalhostServiceDiscovery) -> None:
# self.config = config
# self.service_discovery = service_discovery
# def send_call(self, service: Service, call: MethodCall) -> None:
# self.config.clickhouse_write.insert('routing_logs', [
# [log_now(), service.name, call.method.name],
# ], ['timestamp', 'to_service', 'to_method'], column_type_names=["DateTime64(6, 'UTC')", 'String', 'String'])
# call.on_success = functools.partial(self._send_call_on_succes, service, call, call.on_success)
# assert service.name in self.service_discovery.services
# queue = self.service_discovery.services[service.name][1]
# queue.put(call)
# def _send_call_on_succes(self, service: Service, call: MethodCall, orig_on_succes: Callable[[Value], None], value: Value) -> None:
# self.config.clickhouse_write.insert('routing_logs', [
# [datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name, repr(value.data)],
# ], ['timestamp', 'to_service', 'to_method', 'result'], column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'])
# orig_on_succes(value)
def make_prelude() -> Service:
methods: List[Method] = []
methods.append(Method('log_bytes', [
MethodArgument('data', valuetype.bytes)
], valuetype.none))
methods.append(Method('log_bytes', [valuetype.bytes], valuetype.none))
return Service('prelude', methods)
class LocalhostRunner(RunnerInterface):
def do_call(self, call: MethodCall) -> None:
if call.method.name == 'on_module_loaded':
print('LocalhostRunner loaded')
call.on_success(NoneValue)
return
if call.method.name == 'log_bytes':
print('LOG-BYTES:', repr(call.args[0].data))
call.on_success(NoneValue)
return
raise NotImplementedError(call)
class LocalhostServiceDiscovery(ServiceDiscoveryInterface):
services: Dict[str, Tuple[Service, Queue[Union[MethodCall, ShuttingDown]]]]
def __init__(self) -> None:
self.services = {}
def register_service(self, service: Service, queue: Queue[Union[MethodCall, ShuttingDown]]) -> None:
self.services[service.name] = (service, queue, )
def find_service(self, name: str) -> Optional[Service]:
parts = self.services.get(name, None)
if parts is None:
return None
return parts[0]
class LocalhostMethodCallRouter(MethodCallRouterInterface):
def __init__(self, config: WorkerConfig, service_discovery: LocalhostServiceDiscovery) -> None:
self.config = config
self.service_discovery = service_discovery
def send_call(self, service: Service, call: MethodCall) -> None:
self.config.clickhouse_write.insert('routing_logs', [
[datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name],
], ['timestamp', 'to_service', 'to_method'])
call.on_success = functools.partial(self._send_call_on_succes, service, call, call.on_success)
assert service.name in self.service_discovery.services
queue = self.service_discovery.services[service.name][1]
queue.put(call)
def _send_call_on_succes(self, service: Service, call: MethodCall, orig_on_succes: Callable[[Value], None], value: Value) -> None:
self.config.clickhouse_write.insert('routing_logs', [
[datetime.datetime.now(tz=datetime.timezone.utc), service.name, call.method.name, repr(value.data)],
], ['timestamp', 'to_service', 'to_method', 'result'])
orig_on_succes(value)
return Service('prelude', ContainerMatch('__prelude__'), methods)
def main() -> int:
with open('config.toml', 'rb') as fil:
config = config_from_toml(fil)
with open('./examples/echoserver.toml', 'rb') as fil:
with open('./examples/echoapp.toml', 'rb') as fil:
state = state_from_toml(fil)
del state
method_call_router = LocalhostMethodCallRouter()
# TODO: Replace the stuff below with the loading from the example state
print('Registering services')
method_call_router.register_service(make_prelude())
localhost_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue()
echo_client_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue()
echo_server_queue: Queue[Union[MethodCall, ShuttingDown]] = Queue()
for service in state.services:
method_call_router.register_service(service)
service_discovery = LocalhostServiceDiscovery()
method_call_router = LocalhostMethodCallRouter(config.worker_config, service_discovery)
container_list: List[ManagedContainer] = []
localhost = LocalhostRunner()
service_discovery.register_service(make_prelude(), localhost_queue)
prelude_container = Container(
'__prelude__',
Image('prelude', '', []),
'prelude',
)
with open('./examples/echoserver.wasm', 'rb') as fil:
echo_server = WasmTimeRunner(service_discovery, method_call_router, fil.read())
service_discovery.register_service(Service('echoserver', [
Method('echo', [
MethodArgument('msg', valuetype.bytes)
], valuetype.bytes)
]), echo_server_queue)
for cont in [prelude_container] + state.containers:
mcont = ManagedContainer(config.worker_config, method_call_router, cont)
container_list.append(mcont)
method_call_router.register_container(mcont)
with open('./examples/echoclient.wasm', 'rb') as fil:
echo_client = WasmTimeRunner(service_discovery, method_call_router, fil.read())
# service_discovery.register_service(echo_client, echo_client_queue)
# service_discovery.register_service(echo_server, echo_server_queue)
print('Starting containers')
for mcont in container_list:
mcont.thread.start()
print('Sending out on_module_loaded calls') # TODO: Route this normally?
on_module_loaded = MethodCall(
Method('on_module_loaded', [], valuetype.none),
[],
lambda x: None,
lambda x: None, # TODO: Check for MethodNotFoundError, otherwise report it
)
localhost_queue.put(on_module_loaded)
echo_client_queue.put(on_module_loaded)
echo_server_queue.put(on_module_loaded)
localhost_thread = threading.Thread(target=runner_thread, args=(localhost, localhost_queue))
echo_client_thread = threading.Thread(target=runner_thread, args=(echo_client, echo_client_queue))
echo_server_thread = threading.Thread(target=runner_thread, args=(echo_server, echo_server_queue))
localhost_thread.start()
echo_client_thread.start()
echo_server_thread.start()
for mcont in container_list:
mcont.queue.put(RoutedMethodCall(on_module_loaded, None, mcont.container, lambda x: None))
try:
while 1:
time.sleep(1)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, shutting down')
pass
localhost_queue.put(ShuttingDown())
echo_client_queue.put(ShuttingDown())
echo_server_queue.put(ShuttingDown())
shut_down_command = ShutDownCommand()
for mcont in container_list:
mcont.queue.put(shut_down_command)
print('Awaiting containers')
for mcont in container_list:
mcont.thread.join()
return 0

View File

@ -1,9 +1,9 @@
from typing import TextIO, Union
from phasmplatform.common import valuetype
from phasmplatform.common.method import MethodCall
from phasmplatform.common.container import Container
from phasmplatform.common.methodcall import MethodCall, MethodCallError
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import ServiceDiscoveryInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
@ -12,9 +12,16 @@ WasmValue = Union[None, int, float]
class RunnerInterface:
__slots__ = ('router', )
__slots__ = ('method_call_router', 'container', )
def do_call(self, call: MethodCall) -> None:
method_call_router: MethodCallRouterInterface
container: Container
def __init__(self, method_call_router: MethodCallRouterInterface, container: Container) -> None:
self.method_call_router = method_call_router
self.container = container
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
"""
Executes the call on the current container
@ -24,14 +31,7 @@ class RunnerInterface:
class BaseRunner(RunnerInterface):
__slots__ = ('service_discovery', 'method_call_router', )
service_discovery: ServiceDiscoveryInterface
method_call_router: MethodCallRouterInterface
def __init__(self, service_discovery: ServiceDiscoveryInterface, method_call_router: MethodCallRouterInterface) -> None:
self.service_discovery = service_discovery
self.method_call_router = method_call_router
__slots__ = ('method_call_router', )
def alloc_bytes(self, data: bytes) -> int:
"""

View File

@ -0,0 +1,29 @@
from typing import Callable, Union, Tuple
from phasmplatform.common.config import WorkerConfig
from phasmplatform.common.methodcall import MethodCall, MethodCallError
from phasmplatform.common.value import Value, NoneValue
from .base import BaseRunner
class PreludeRunner(BaseRunner):
__slots__ = ('config', 'container_log', )
config: WorkerConfig
container_log: Tuple[Callable[[str], None]] # Tuple for typing issues
def set_config(self, config: WorkerConfig, container_log: Callable[[str], None]) -> None:
self.config = config
self.container_log = (container_log, )
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
if call.method.name == 'on_module_loaded':
self.container_log[0]('PreludeRunner loaded')
return NoneValue
if call.method.name == 'log_bytes':
self.container_log[0](f'LOG-BYTES: {repr(call.args[0].data)}')
return NoneValue
raise NotImplementedError(call)

View File

@ -1,4 +1,4 @@
from typing import Any, List
from typing import Any, Dict, List, Union
import ctypes
import functools
@ -8,10 +8,10 @@ from queue import Empty, Queue
import wasmtime
from phasmplatform.common import valuetype
from phasmplatform.common.exceptions import PhashPlatformServiceNotFound, PhashPlatformServiceMethodNotFound
from phasmplatform.common.method import Method, MethodCall, MethodCallError, MethodNotFoundError
from phasmplatform.common.container import Container
from phasmplatform.common.method import Method
from phasmplatform.common.methodcall import MethodCall, MethodCallError, MethodNotFoundError
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import Service, ServiceDiscoveryInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
@ -21,40 +21,30 @@ from .base import BaseRunner, WasmValue
class WasmTimeRunner(BaseRunner):
__slots__ = ('store', 'module', 'instance', 'exports')
def __init__(
self,
service_discovery: ServiceDiscoveryInterface,
method_call_router: MethodCallRouterInterface,
wasm_bin: bytes,
) -> None:
super().__init__(service_discovery, method_call_router)
def __init__(self, method_call_router: MethodCallRouterInterface, container: Container) -> None:
super().__init__(method_call_router, container)
with open(f'./{container.image.path}', 'rb') as fil: # TODO: ImageLoader?
wasm_bin = fil.read()
self.store = wasmtime.Store()
self.module = wasmtime.Module(self.store.engine, wasm_bin)
imports: List[wasmtime.Func] = []
for imprt in self.module.imports:
service = service_discovery.find_service(imprt.module)
if service is None:
raise PhashPlatformServiceNotFound(
f'Dependent service "{imprt.module}" not found; could not provide "{imprt.name}"'
)
assert imprt.name is not None # type hint
method = service.methods.get(imprt.name)
if method is None:
raise PhashPlatformServiceMethodNotFound(
f'Dependent service "{imprt.module}" found, but it does not provide "{imprt.name}"'
)
func = wasmtime.Func(
import_map: Dict[str, wasmtime.Func] = {
f'{imprt_service}.{imprt_method.name}': wasmtime.Func(
self.store,
build_func_type(method),
functools.partial(self.send_service_call, service, method)
build_func_type(imprt_method),
functools.partial(self.send_service_call, imprt_service, imprt_method)
)
for (imprt_service, imprt_method, ) in container.image.imports
}
imports.append(func)
# Make sure the given import lists order matches the one given by wasmtime
# Otherwise, wasmtime can't match them up.
imports: List[wasmtime.Func] = [
import_map[f'{imprt.module}.{imprt.name}']
for imprt in self.module.imports
]
self.instance = wasmtime.Instance(self.store, self.module, imports)
@ -94,44 +84,44 @@ class WasmTimeRunner(BaseRunner):
return raw[ptr + 4:ptr + 4 + length]
def do_call(self, call: MethodCall) -> None:
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
try:
wasm_method = self.exports[call.method.name]
except KeyError:
call.on_error(MethodNotFoundError())
return
return MethodNotFoundError()
assert isinstance(wasm_method, wasmtime.Func)
act_args = [self.value_to_wasm(x) for x in call.args]
result = wasm_method(self.store, *act_args)
assert result is None or isinstance(result, (int, float, )) # type hint
call.on_success(self.value_from_wasm(call.method.return_type, result))
def send_service_call(self, service: Service, method: Method, *args: Any) -> WasmValue:
assert len(method.args) == len(args) # type hint
return self.value_from_wasm(call.method.return_type, result)
def send_service_call(self, service: str, method: Method, *args: Any) -> WasmValue:
assert len(method.arg_types) == len(args) # type hint
call_args = [
self.value_from_wasm(x.value_type, y)
for x, y in zip(method.args, args)
self.value_from_wasm(x, y)
for x, y in zip(method.arg_types, args)
]
queue: Queue[Value] = Queue(maxsize=1)
def on_success(val: Value) -> None:
queue.put(val)
def on_result(res: Union[Value, MethodCallError]) -> None:
if isinstance(res, Value):
queue.put(res)
else:
raise Exception(res)
def on_error(err: MethodCallError) -> None:
print('Error while calling', service, method, args)
call = MethodCall(method, call_args)
call = MethodCall(method, call_args, on_success, on_error)
self.method_call_router.send_call(service, call)
self.method_call_router.route_call(service, call, self.container, on_result)
try:
value = queue.get(block=True, timeout=10)
except Empty:
raise Exception() # TODO
raise Exception('Did not receive value from remote call') # TODO
return self.value_to_wasm(value)
@ -143,9 +133,9 @@ def build_func_type(method: Method) -> wasmtime.FuncType:
returns = [build_wasm_type(method.return_type)]
args = []
for arg in method.args:
assert arg.value_type is not valuetype.none # type hint
args.append(build_wasm_type(arg.value_type))
for arg_type in method.arg_types:
assert arg_type is not valuetype.none # type hint
args.append(build_wasm_type(arg_type))
return wasmtime.FuncType(args, returns)