Compare commits

..

No commits in common. "project-setup-ideas" and "main" have entirely different histories.

35 changed files with 0 additions and 1428 deletions

View File

@ -1,29 +0,0 @@
Controller (master)
Worker (minion)
API
Find a way to send typed data between instances
- As long as we match the types, we can copy the data
```py
from platform.http import SimpleRequest
#class SimpleRequest:
# method: bytes
# domain: bytes
# path: bytes
# query: bytes
# body: bytes
@exported
def handle_message(namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None:
SimpleRequest(b'GET', b'www.google.com', b'some/path', b'query=value', b'').send()
```
Preferably, the above automatically breaks it into an async function.
Possibly, everything async in Phasm is waiting for something to be delivered from the platform?
If this thing gets integrated as much as possible, then `from x import y` will get Phasm to talk to the server to get its type information.

View File

@ -1,38 +0,0 @@
run-controller: sast
venv/bin/python -m phasmplatform.controller
run-worker: sast examples/echoapp.toml
venv/bin/python -m phasmplatform.worker
examples/echoapp.toml: examples/echoserver.toml examples/echoclient.toml
echo '# echoserver.toml' > $@
cat examples/echoserver.toml >> $@
echo '# echoclient.toml' >> $@
cat examples/echoclient.toml >> $@
clickhouse-sh:
docker compose exec clickhouse /usr/bin/clickhouse client --database phasm_platform
clickhouse-schema:
cat config/clickhouse/schema.sql | docker compose exec -T clickhouse /usr/bin/clickhouse client --multiquery
redis-sh:
docker compose exec -it redis redis-cli
setup:
python3.10 -m venv venv
venv/bin/pip install --upgrade pip wheel setuptools
venv/bin/pip install -r requirements.txt -r requirements-dev.txt
init:
docker compose up -d
update:
venv/bin/pip install -r requirements.txt -r requirements-dev.txt
sast:
venv/bin/mypy --strict phasmplatform
venv/bin/pyflakes phasmplatform
venv/bin/pycodestyle --max-line-length=140 phasmplatform
.PHONY: init redis-sh run-controller run-worker setup sast update

View File

@ -1,61 +1,2 @@
# phasm-platform # phasm-platform
## Background
[phasm](https://git.jbwdevries.nl/jbwdevries/phasm) is a language that compiles
to WebAssembly. This platform can run compiled phasm code, with every module
running as a container or as a service. Modules can call other modules, if they
are registered as a service.
Think of it as Kubernetes, but except that every container is an OS, it's a
program.
## Terminology
A phasm cluster consists of controllers and workers. The controller manage the
workers. The workers run the modules.
A worker can run in standalone mode. In that case, control functionality is
limited, but for basic use cases, it will suffice.
Logging is done via ClickHouse. Modules can log directly, but the platform also
logs routed calls from one module to another.
Controllers manage their state using redis.
## Status
This project is build in lockstep with the phasm compiler.
The controller is not implemented at all yet.
The worker can currently run the examples that you see in the `examples/`
directory. These first need to be compiled from the .py files to .wasm files
using the phasm compiler.
## Running the worker
```sh
# Setup the virtuel environment
make setup
# Run the dependent services using docker compose
make init
# Import the schema for the logging
make clickhouse-schema
# Launch the worker. This won't output much.
# Press Ctrl+C when you're done running the processes.
make run-worker
# Run the ClickHouse CLI to get access to the logs
make clickhouse-sh
```
```sql
-- See the direct logging output from modules
SELECT * FROM container_logs ORDER BY timestamp;
-- See the routing logs between services
SELECT * FROM routing_logs ORDER BY timestamp;
```

View File

@ -1,14 +0,0 @@
[controller.redis.write]
host = 'localhost'
port = 17859
[controller.redis.read]
host = 'localhost'
port = 17859
[worker.clickhouse.write]
host = 'localhost'
port = 17858
username = 'default'
password = ''
database = 'phasm_platform'

View File

@ -1 +0,0 @@
/data

View File

@ -1,27 +0,0 @@
CREATE DATABASE IF NOT EXISTS phasm_platform;
USE phasm_platform;
CREATE TABLE routing_logs
(
timestamp DateTime64(6, 'UTC'),
thread_id String,
from_container String,
to_service String,
to_container String,
to_method String,
result String,
)
ENGINE = MergeTree
ORDER BY timestamp;
CREATE TABLE container_logs
(
timestamp DateTime64(6, 'UTC'),
name String,
thread_id String,
msg String,
)
ENGINE = MergeTree
ORDER BY timestamp;

View File

@ -1,12 +0,0 @@
services:
redis:
image: 'redis:7.0-alpine'
ports:
- '17859:6379'
clickhouse:
image: 'clickhouse/clickhouse-server:22.8-alpine'
ports:
- '17858:8123'
volumes:
- './config/clickhouse/data:/var/lib/clickhouse/'

4
examples/.gitignore vendored
View File

@ -1,4 +0,0 @@
/*.wasm
/*.wat
/echoapp.toml

View File

@ -1,19 +0,0 @@
@imported('echoserver')
def echo(msg: bytes) -> bytes:
pass
@imported('prelude')
def log_bytes(data: bytes) -> None:
pass
@imported('prelude')
def sleep(seconds: u32) -> None:
pass
@exported
def on_module_loaded() -> None:
log_bytes(b'Echo client starting up, calling server')
sleep(8)
log_bytes(echo(b'Hello, world!'))
sleep(8)
log_bytes(echo(b'Bye, world!'))

View File

@ -1,18 +0,0 @@
[echoclient-image]
apiVersion = "v0"
kind = "Image"
path = "examples/echoclient.wasm"
hash = "sha256@84cb22d12dfdd6b05cb906f6db83d59f473c9df85a33822f696344af2b92b502"
imports = [
{ service = "prelude", method = "sleep", arg_types = ["u32"], return_type = "none"},
{ service = "echoserver", method = "echo", arg_types = ["bytes"], return_type = "bytes"},
]
[echoclient-container]
apiVersion = "v0"
kind = "Container"
image = "echoclient-image"
runtime = "wasmtime"

View File

@ -1,11 +0,0 @@
@exported
def echo(msg: bytes) -> bytes:
return msg
@imported('prelude')
def log_bytes(data: bytes) -> None:
pass
@exported
def on_module_loaded() -> None:
log_bytes(b'Echo service up and running')

View File

@ -1,33 +0,0 @@
[echoserver-image]
apiVersion = "v0"
kind = "Image"
path = "examples/echoserver.wasm"
hash = "sha256@dfe03b4f7ce5e921931f8715384e35a6776fdc28837e42ffa04305bbadffcfc9"
[echoserver-container-0]
apiVersion = "v0"
kind = "Container"
image = "echoserver-image"
runtime = "wasmtime"
[echoserver-container-1]
apiVersion = "v0"
kind = "Container"
image = "echoserver-image"
runtime = "wasmtime"
[echoserver-service]
apiVersion = "v0"
kind = "Service"
name = "echoserver"
[echoserver-service.containerMatch]
byName = "echoserver-container-*"
[[echoserver-service.methods]]
name = "echo"
arg_types = [ "bytes" ]
return_type = "bytes"

View File

@ -1,84 +0,0 @@
from typing import TYPE_CHECKING, BinaryIO
from clickhouse_connect.driver.httpclient import HttpClient as ClickHouseHttpClient # type: ignore
import redis
if TYPE_CHECKING:
import tomli as tomllib
else:
try:
import tomllib
except ImportError:
import tomli as tomllib
class ControllerConfig:
__slots__ = ('redis_read', 'redis_write', )
redis_read: 'redis.Redis[bytes]' # Set decode_responses to False
redis_write: 'redis.Redis[bytes]' # Set decode_responses to False
def __init__(
self,
redis_read: 'redis.Redis[bytes]',
redis_write: 'redis.Redis[bytes]',
) -> None:
self.redis_read = redis_read
self.redis_write = redis_write
class WorkerConfig:
__slots__ = ('clickhouse_write', )
clickhouse_write: ClickHouseHttpClient
def __init__(
self,
clickhouse_write: ClickHouseHttpClient,
) -> None:
self.clickhouse_write = clickhouse_write
class Config:
__slots__ = ('controller_config', 'worker_config')
controller_config: ControllerConfig
worker_config: WorkerConfig
def __init__(self, controller_config: ControllerConfig, worker_config: WorkerConfig) -> None:
self.controller_config = controller_config
self.worker_config = worker_config
def from_toml(toml: BinaryIO) -> Config:
"""
Loads the given toml and builds a config instance out of it
"""
toml_dict = tomllib.load(toml)
redis_read = redis.Redis(
host=toml_dict['controller']['redis']['read']['host'],
port=toml_dict['controller']['redis']['read']['port'],
decode_responses=False,
)
redis_write = redis.Redis(
host=toml_dict['controller']['redis']['write']['host'],
port=toml_dict['controller']['redis']['write']['port'],
decode_responses=False,
)
controller_config = ControllerConfig(redis_read, redis_write)
clickhouse_write = ClickHouseHttpClient(
interface='http',
host=toml_dict['worker']['clickhouse']['write']['host'],
port=toml_dict['worker']['clickhouse']['write']['port'],
username=toml_dict['worker']['clickhouse']['write']['username'],
password=toml_dict['worker']['clickhouse']['write']['password'],
database=toml_dict['worker']['clickhouse']['write']['database'],
)
worker_config = WorkerConfig(clickhouse_write)
return Config(controller_config, worker_config)

View File

@ -1,17 +0,0 @@
from .image import Image
class Container:
__slots__ = ('name', 'image', 'runtime', )
name: str
image: Image
runtime: str
def __init__(self, name: str, image: Image, runtime: str) -> None:
self.name = name
self.image = image
self.runtime = runtime
def __repr__(self) -> str:
return f'Container({repr(self.name)}, {repr(self.image)}, {repr(self.runtime)})'

View File

@ -1,14 +0,0 @@
class PhasmPlatformError(Exception):
pass
class PhashPlatformRuntimeError(PhasmPlatformError):
pass
class PhashPlatformServiceNotFound(PhashPlatformRuntimeError):
pass
class PhashPlatformServiceMethodNotFound(PhashPlatformRuntimeError):
pass

View File

@ -1,40 +0,0 @@
from typing import List, Optional, Tuple
from .method import Method
class Image:
__slots__ = ('path', 'hash', 'imports', )
path: Optional[str]
hash: str
imports: List[Tuple[str, Method]]
def __init__(
self,
path: Optional[str],
hash: str,
imports: List[Tuple[str, Method]],
) -> None:
self.path = path
self.hash = hash
self.imports = imports
def __repr__(self) -> str:
return f'Image({repr(self.path)}, {repr(self.hash)}, {repr(self.imports)})'
class ImageReference(Image):
__slots__ = ('name', )
name: str
def __init__(self, name: str) -> None:
# Intentionally do not call super()
# This will cause AttributeError exceptions when someone
# tries to access an image that's only a reference
self.name = name
def __repr__(self) -> str:
return f'ImageReference({repr(self.name)})'

View File

@ -1,29 +0,0 @@
from typing import Any, List
from .valuetype import ValueType
class Method:
__slots__ = ('name', 'arg_types', 'return_type', )
name: str
arg_types: List[ValueType]
return_type: ValueType
def __init__(self, name: str, arg_types: List[ValueType], return_type: ValueType) -> None:
self.name = name
self.arg_types = arg_types
self.return_type = return_type
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Method):
raise NotImplementedError
return (
self.name == other.name
and self.arg_types == other.arg_types
and self.return_type == other.return_type
)
def __repr__(self) -> str:
return f'Method({repr(self.name)}, {repr(self.arg_types)}, {repr(self.return_type)})'

View File

@ -1,120 +0,0 @@
from typing import Callable, List, Optional, Union
from .container import Container
from .method import Method
from .value import Value
class MethodCallError:
__slots__ = ('msg', )
msg: Optional[str]
def __init__(self, msg: Optional[str] = None) -> None:
self.msg = msg
def __repr__(self) -> str:
return f'{self.__class__.__name__}({repr(self.msg)})'
class ServiceNotFoundError(MethodCallError):
"""
The service was not found
You have an `@imported('service_name')` somewhere,
but there is no `service_name` deployed to the platform.
"""
class MethodNotFoundError(MethodCallError):
"""
The method was not found
You have an `@imported('service_name')` somewhere,
but while the `service_name` is deployed to the platform,
it does not provide the given function.
"""
class MethodTypeMismatchError(MethodCallError):
"""
The method's type did not match
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
its type does not match what you defined in your import.
"""
class ServiceUnavailableError(MethodCallError):
"""
The service was not available
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
and its type matches what you defined in your import,
there is currently no container running providing said service.
"""
class MethodCallExceptionError(MethodCallError):
"""
The service was not available
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
and its type matches what you defined in your import,
and there is currently a container running providing said service,
when it tried to run the call, an exeption ocurred.
"""
class MethodCall:
__slots__ = ('method', 'args', )
method: Method
args: List[Value]
def __init__(
self,
method: Method,
args: List[Value],
) -> None:
self.method = method
self.args = args
def __repr__(self) -> str:
return f'MethodCall({repr(self.method)}, {repr(self.args)})'
MethodResultCallable = Callable[[Union[Value, MethodCallError]], None]
class RoutedMethodCall:
__slots__ = ('call', 'from_container', 'to_service', 'to_container', 'on_result', )
call: MethodCall
from_container: Optional[Container]
to_service: Optional[str]
to_container: Optional[Container]
on_result: MethodResultCallable
def __init__(
self,
call: MethodCall,
from_container: Optional[Container],
to_service: Optional[str],
to_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
self.call = call
self.from_container = from_container
self.to_service = to_service
self.to_container = to_container
self.on_result = on_result
def __repr__(self) -> str:
return f'RoutedMethodCall({repr(self.call)}, {repr(self.from_container)}, {repr(self.to_container)}, {repr(self.on_result)})'

View File

@ -1,15 +0,0 @@
from typing import Optional
from .container import Container
from .methodcall import MethodCall, MethodResultCallable
class MethodCallRouterInterface:
def route_call(
self,
service: str,
call: MethodCall,
from_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
raise NotImplementedError(service, call)

View File

@ -1,39 +0,0 @@
from typing import Dict, Optional, List
from .method import Method
class ContainerMatch:
__slots__ = ('by_name', )
by_name: str
def __init__(self, by_name: str) -> None:
self.by_name = by_name
def __repr__(self) -> str:
return f'ContainerMatch({repr(self.by_name)})'
class Service:
__slots__ = ('name', 'container_match', 'methods', )
name: str
container_match: ContainerMatch
methods: Dict[str, Method]
def __init__(self, name: str, container_match: ContainerMatch, methods: List[Method]) -> None:
self.name = name
self.container_match = container_match
self.methods = {
x.name: x
for x in methods
}
def __repr__(self) -> str:
return f'Service({repr(self.name)}, {repr(self.container_match)}, {repr(list(self.methods.values()))})'
class ServiceDiscoveryInterface:
def find_service(self, name: str) -> Optional[Service]:
raise NotImplementedError

View File

@ -1,167 +0,0 @@
from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List, Tuple
from . import valuetype
from .container import Container
from .image import Image, ImageReference
from .method import Method
from .service import ContainerMatch, Service
if TYPE_CHECKING:
import tomli as tomllib
else:
try:
import tomllib
except ImportError:
import tomli as tomllib
class State:
__slots__ = ('images', 'containers', 'services', )
images: List[Image]
containers: List[Container]
services: List[Service]
def __init__(
self,
images: List[Image],
containers: List[Container],
services: List[Service],
) -> None:
self.images = images
self.containers = containers
self.services = services
def __repr__(self) -> str:
return f'State({repr(self.images)}, {repr(self.containers)}, {repr(self.services)})'
def image_import_from_toml(spec: Dict[str, Any]) -> Tuple[str, Method]:
"""
Loads an Method spec from toml, when it comes in the form of Image.imports
"""
service = spec.pop('service')
method = spec.pop('method')
arg_types = spec.pop('arg_types', [])
return_type = spec.pop('return_type', 'none')
assert not spec, ('Unrecognized values in image.imports', spec)
return (str(service), Method(
str(method),
[
valuetype.LOOKUP_TABLE[x]
for x in arg_types
],
valuetype.LOOKUP_TABLE[return_type],
), )
def image_from_toml(spec: Dict[str, Any]) -> Image:
"""
Loads an Image spec from toml
"""
imports = [
image_import_from_toml(imp_spec)
for imp_spec in spec.get('imports', [])
]
return Image(str(spec['path']), str(spec['hash']), imports)
def container_from_toml(name: str, spec: Dict[str, Any]) -> Container:
"""
Loads a Container spec from toml
"""
return Container(name, ImageReference(str(spec['image'])), str(spec['runtime']))
def service_container_match_from_toml(spec: Dict[str, Any]) -> ContainerMatch:
"""
Loads a service.ContainerMatch spec from toml
"""
return ContainerMatch(str(spec['byName']))
def method_from_toml(spec: Dict[str, Any]) -> Method:
"""
Loads an Method spec from toml
"""
name = spec.pop('name')
arg_types = spec.pop('arg_types', [])
return_type = spec.pop('return_type', 'none')
assert not spec, ('Unrecognized values in image.imports', spec)
return Method(
str(name),
[
valuetype.LOOKUP_TABLE[x]
for x in arg_types
],
valuetype.LOOKUP_TABLE[return_type],
)
def service_from_toml(spec: Dict[str, Any]) -> Service:
"""
Loads a Service spec from toml
"""
spec.pop('apiVersion')
spec.pop('kind')
name = spec.pop('name')
container_match = spec.pop('containerMatch')
methods = spec.pop('methods', [])
assert not spec, ('Unrecognized values in service', spec)
return Service(str(name), service_container_match_from_toml(container_match), [
method_from_toml(x)
for x in methods
])
def from_toml(toml: BinaryIO) -> State:
"""
Loads the given toml and builds a state instance out of it
"""
toml_dict = tomllib.load(toml)
images: List[Image] = []
images_by_name: Dict[str, Image] = {}
containers: List[Container] = []
services: List[Service] = []
for name, spec in toml_dict.items():
if spec['apiVersion'] != 'v0':
raise NotImplementedError('apiVersion', spec['apiVersion'])
if spec['kind'] == 'Image':
image = image_from_toml(spec)
images.append(image)
assert name not in images_by_name, f'Duplicate image name: {name}'
images_by_name[name] = image
continue
if spec['kind'] == 'Container':
containers.append(container_from_toml(name, spec))
continue
if spec['kind'] == 'Service':
services.append(service_from_toml(spec))
continue
raise NotImplementedError(spec)
for container in containers:
if not isinstance(container.image, ImageReference):
continue
image_opt = images_by_name.get(container.image.name, None)
assert image_opt is not None, f'Image reference not resolved: {container.image.name}'
container.image = image_opt
return State(images, containers, services)

View File

@ -1,25 +0,0 @@
from typing import Any, Union
from .valuetype import ValueType, none
ValueData = Union[None, int, float, bytes]
class Value:
__slots__ = ('value_type', 'data', )
value_type: ValueType
data: ValueData
def __init__(self, value_type: ValueType, data: ValueData) -> None:
self.value_type = value_type
self.data = data
def __eq__(self, other: Any) -> bool:
return self.value_type is other.value_type and self.data == other.data
def __repr__(self) -> str:
return f'Value({repr(self.value_type)}, {repr(self.data)})'
NoneValue = Value(none, None)

View File

@ -1,32 +0,0 @@
from typing import Any, Dict
class ValueType:
__slots__ = ('name', )
name: str
def __init__(self, name: str) -> None:
self.name = name
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ValueType):
raise NotImplementedError
return self is other
def __repr__(self) -> str:
return f'valuetype.{self.name}'
u32 = ValueType('u32')
bytes = ValueType('bytes')
none = ValueType('none')
LOOKUP_TABLE: Dict[str, ValueType] = {
u32.name: u32,
bytes.name: bytes,
none.name: none,
}

View File

@ -1,16 +0,0 @@
import sys
from phasmplatform.common.config import from_toml
def main() -> int:
with open('config.toml', 'rb') as fil:
config = from_toml(fil)
del config
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -1,271 +0,0 @@
from typing import Dict, List, Optional, Type, Union
import datetime
import sys
import random
import threading
import time
import uuid
from queue import Empty, Queue
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml
from phasmplatform.common.image import Image
from phasmplatform.common.method import Method
from phasmplatform.common.methodcall import MethodCall, RoutedMethodCall, MethodResultCallable
from phasmplatform.common.methodcall import (
MethodCallExceptionError, MethodNotFoundError, MethodTypeMismatchError, ServiceNotFoundError, ServiceUnavailableError
)
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import ContainerMatch, Service
from phasmplatform.common.value import Value
from phasmplatform.common.state import from_toml as state_from_toml
from .runners.base import RunnerInterface
from .runners.prelude import PreludeRunner
from .runners.wasmtime import WasmTimeRunner
RUNTIME_MAP: Dict[str, Type[RunnerInterface]] = {
'prelude': PreludeRunner,
'wasmtime': WasmTimeRunner,
}
def log_now() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)
class ShutDownCommand():
pass
MethodCallQueue = Queue[Union[RoutedMethodCall, ShutDownCommand]]
class ManagedContainer:
__slots__ = ('config', 'method_call_router', 'container', 'thread', 'queue', )
config: WorkerConfig
method_call_router: MethodCallRouterInterface
container: Container
thread: threading.Thread
queue: MethodCallQueue
def __init__(self, config: WorkerConfig, method_call_router: MethodCallRouterInterface, container: Container) -> None:
self.config = config
self.method_call_router = method_call_router
self.container = container
self.thread = threading.Thread(target=container_thread, args=(self, ))
self.queue = Queue()
def __repr__(self) -> str:
return f'ManagedContainer(..., ..., {repr(self.container)}, ..., ...)'
def container_thread(mcont: ManagedContainer) -> None:
clickhouse_session_id = str(uuid.uuid1())
clickhouse_write = mcont.config.clickhouse_write
def container_log(msg: str) -> None:
clickhouse_write.insert(
table='container_logs',
data=[
[log_now(), mcont.container.name, str(id(mcont.thread)), msg],
],
column_names=['timestamp', 'name', 'thread_id', 'msg'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
def routing_log(from_container: str, to_service: Optional[str], to_method: str, result: str) -> None:
clickhouse_write.insert(
table='routing_logs',
data=[
[log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_service or '', to_method, result],
],
column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_service', 'to_method', 'result'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
container_log(f'Creating runtime for {mcont.container.image.path}')
cls = RUNTIME_MAP.get(mcont.container.runtime)
assert cls is not None, f'Unknown runtime: {mcont.container.runtime}'
runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container, container_log)
container_log('Starting thread')
while True:
try:
call = mcont.queue.get(block=True, timeout=1)
except Empty:
continue
if isinstance(call, ShutDownCommand):
break
try:
result = runtime.do_call(call.call)
except Exception as ex:
raise ex
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.call.method.name,
repr(ex),
)
call.on_result(MethodCallExceptionError(str(ex)))
continue
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.to_service,
call.call.method.name,
repr(result.data) if isinstance(result, Value) else repr(result)
)
call.on_result(result)
container_log('Stopping thread')
class LocalhostMethodCallRouter(MethodCallRouterInterface):
__slots__ = ('services', 'container_by_service')
services: Dict[str, Service]
container_by_service: Dict[str, List[ManagedContainer]]
def __init__(self) -> None:
self.services = {}
self.container_by_service = {}
def route_call(
self,
service_name: str,
call: MethodCall,
from_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
service = self.services.get(service_name)
if service is None:
on_result(ServiceNotFoundError(service_name))
return
method = service.methods.get(call.method.name)
if method is None:
on_result(MethodNotFoundError(f'{service_name}.{call.method.name}'))
return
if method != call.method:
on_result(MethodTypeMismatchError())
return
to_mcont = self.find_one_container(service)
if to_mcont is None:
on_result(ServiceUnavailableError(service_name))
return
to_mcont.queue.put(RoutedMethodCall(
call,
from_container,
service.name,
to_mcont.container,
on_result,
))
def register_service(self, service: Service) -> None:
assert service.name not in self.services
self.services[service.name] = service
def register_container(self, mcont: ManagedContainer) -> None:
for service in self.services.values():
if service.container_match.by_name == mcont.container.name:
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
if service.container_match.by_name[-1] == '*' and mcont.container.name.startswith(service.container_match.by_name[:-1]):
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
def find_one_container(self, service: Service) -> Optional[ManagedContainer]:
container_list = self.container_by_service.get(service.name)
if not container_list:
return None
return random.choice(container_list)
def make_prelude() -> Service:
methods: List[Method] = [
Method('sleep', [valuetype.u32], valuetype.none),
]
return Service('prelude', ContainerMatch('__prelude__'), methods)
def main() -> int:
with open('config.toml', 'rb') as fil:
config = config_from_toml(fil)
with open('./examples/echoapp.toml', 'rb') as fil:
state = state_from_toml(fil)
method_call_router = LocalhostMethodCallRouter()
print('Registering services')
method_call_router.register_service(make_prelude())
for service in state.services:
method_call_router.register_service(service)
container_list: List[ManagedContainer] = []
prelude_container = Container(
'__prelude__',
Image('prelude', '', []),
'prelude',
)
for cont in [prelude_container] + state.containers:
mcont = ManagedContainer(config.worker_config, method_call_router, cont)
container_list.append(mcont)
method_call_router.register_container(mcont)
print('Starting containers')
for mcont in container_list:
mcont.thread.start()
print('Sending out on_module_loaded calls') # TODO: Route this normally?
on_module_loaded = MethodCall(
Method('on_module_loaded', [], valuetype.none),
[],
)
for mcont in container_list:
mcont.queue.put(RoutedMethodCall(on_module_loaded, None, None, mcont.container, lambda x: None))
try:
while 1:
time.sleep(1)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, shutting down')
pass
shut_down_command = ShutDownCommand()
for mcont in container_list:
mcont.queue.put(shut_down_command)
print('Awaiting containers')
for mcont in container_list:
mcont.thread.join()
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -1,96 +0,0 @@
from typing import Callable, TextIO, Tuple, Union
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.methodcall import MethodCall, MethodCallError
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
WasmValue = Union[None, int, float]
class RunnerInterface:
__slots__ = ('method_call_router', 'container', 'container_log', )
method_call_router: MethodCallRouterInterface
container: Container
container_log: Tuple[Callable[[str], None]] # Tuple for typing issues
def __init__(self, method_call_router: MethodCallRouterInterface, container: Container, container_log: Callable[[str], None]) -> None:
self.method_call_router = method_call_router
self.container = container
self.container_log = (container_log, )
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
"""
Executes the call on the current container
This method is responsible for calling the on_success or on_error method.
"""
raise NotImplementedError
class BaseRunner(RunnerInterface):
__slots__ = ('method_call_router', )
def alloc_bytes(self, data: bytes) -> int:
"""
Calls upon stdlib.types.__alloc_bytes__ to allocate a bytes object
"""
raise NotImplementedError
def read_bytes(self, ptr: int) -> bytes:
"""
Reads a byte object allocated by stdlib.types.__alloc_bytes__
"""
raise NotImplementedError
def value_to_wasm(self, val: Value) -> WasmValue:
if val.value_type is valuetype.none:
assert val.data is None # type hint
return None
if val.value_type is valuetype.bytes:
assert isinstance(val.data, bytes) # type hint
return self.alloc_bytes(val.data)
raise NotImplementedError(val)
def value_from_wasm(self, value_type: ValueType, val: WasmValue) -> Value:
if value_type is valuetype.none:
assert val is None # type hint
return Value(value_type, None)
if value_type is valuetype.u32:
assert isinstance(val, int) # type hint
return Value(value_type, val)
if value_type is valuetype.bytes:
assert isinstance(val, int) # type hint
return Value(value_type, self.read_bytes(val))
raise NotImplementedError(value_type, val)
def dump_memory(textio: TextIO, mem: bytes) -> None:
line_width = 16
prev_line = None
skip = False
for idx in range(0, len(mem), line_width):
line = ''
for idx2 in range(0, line_width):
line += f'{mem[idx + idx2]:02X}'
if idx2 % 2 == 1:
line += ' '
if prev_line == line:
if not skip:
textio.write('**\n')
skip = True
else:
textio.write(f'{idx:08x} {line}\n')
prev_line = line

View File

@ -1,27 +0,0 @@
from typing import Union
import time
from phasmplatform.common.methodcall import MethodCall, MethodCallError
from phasmplatform.common.value import Value, NoneValue
from .base import BaseRunner
class PreludeRunner(BaseRunner):
__slots__ = ()
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
if call.method.name == 'on_module_loaded':
self.container_log[0]('PreludeRunner loaded')
return NoneValue
if call.method.name == 'sleep':
# This will block this thread
# Which is has to do until we can get the async really working
seconds = call.args[0].data
assert isinstance(seconds, int) # type hint
time.sleep(seconds)
return NoneValue
raise NotImplementedError(call)

View File

@ -1,161 +0,0 @@
from typing import Any, Callable, Dict, List, Union
import ctypes
import functools
import struct
from queue import Empty, Queue
import wasmtime
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.method import Method
from phasmplatform.common.methodcall import MethodCall, MethodCallError, MethodNotFoundError
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
from .base import BaseRunner, WasmValue
class WasmTimeRunner(BaseRunner):
__slots__ = ('store', 'module', 'instance', 'exports')
def __init__(self, method_call_router: MethodCallRouterInterface, container: Container, container_log: Callable[[str], None]) -> None:
super().__init__(method_call_router, container, container_log)
with open(f'./{container.image.path}', 'rb') as fil: # TODO: ImageLoader?
wasm_bin = fil.read()
self.store = wasmtime.Store()
self.module = wasmtime.Module(self.store.engine, wasm_bin)
import_map: Dict[str, wasmtime.Func] = {
f'{imprt_service}.{imprt_method.name}': wasmtime.Func(
self.store,
build_func_type(imprt_method),
functools.partial(self.send_service_call, imprt_service, imprt_method)
)
for (imprt_service, imprt_method, ) in container.image.imports
}
import_map['prelude.log_bytes'] = wasmtime.Func(
self.store,
wasmtime.FuncType([wasmtime.ValType.i32()], []),
functools.partial(self.log_bytes),
)
# Make sure the given import lists order matches the one given by wasmtime
# Otherwise, wasmtime can't match them up.
imports: List[wasmtime.Func] = [
import_map[f'{imprt.module}.{imprt.name}']
for imprt in self.module.imports
]
self.instance = wasmtime.Instance(self.store, self.module, imports)
self.exports = self.instance.exports(self.store)
def alloc_bytes(self, data: bytes) -> int:
memory = self.exports['memory']
assert isinstance(memory, wasmtime.Memory) # type hint
data_ptr = memory.data_ptr(self.store)
data_len = memory.data_len(self.store)
alloc_bytes = self.exports['stdlib.types.__alloc_bytes__']
assert isinstance(alloc_bytes, wasmtime.Func)
ptr = alloc_bytes(self.store, len(data))
assert isinstance(ptr, int) # type hint
idx = ptr + 4 # Skip the header from header from __alloc_bytes__
for byt in data:
assert idx < data_len
data_ptr[idx] = ctypes.c_ubyte(byt)
idx += 1
return ptr
def read_bytes(self, ptr: int) -> bytes:
memory = self.exports['memory']
assert isinstance(memory, wasmtime.Memory) # type hint
data_ptr = memory.data_ptr(self.store)
data_len = memory.data_len(self.store)
raw = ctypes.string_at(data_ptr, data_len)
length, = struct.unpack('<I', raw[ptr:ptr + 4]) # Header prefixed by __alloc_bytes__
return raw[ptr + 4:ptr + 4 + length]
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
try:
wasm_method = self.exports[call.method.name]
except KeyError:
return MethodNotFoundError()
assert isinstance(wasm_method, wasmtime.Func)
act_args = [self.value_to_wasm(x) for x in call.args]
result = wasm_method(self.store, *act_args)
assert result is None or isinstance(result, (int, float, )) # type hint
return self.value_from_wasm(call.method.return_type, result)
def send_service_call(self, service: str, method: Method, *args: Any) -> WasmValue:
assert len(method.arg_types) == len(args) # type hint
call_args = [
self.value_from_wasm(x, y)
for x, y in zip(method.arg_types, args)
]
queue: Queue[Value] = Queue(maxsize=1)
def on_result(res: Union[Value, MethodCallError]) -> None:
if isinstance(res, Value):
queue.put(res)
else:
raise Exception(res)
call = MethodCall(method, call_args)
self.method_call_router.route_call(service, call, self.container, on_result)
try:
value = queue.get(block=True, timeout=10)
except Empty:
raise Exception('Did not receive value from remote call') # TODO
return self.value_to_wasm(value)
def log_bytes(self, data_ptr: int) -> None:
value = self.value_from_wasm(valuetype.bytes, data_ptr)
self.container_log[0](repr(value.data))
def build_func_type(method: Method) -> wasmtime.FuncType:
if method.return_type is valuetype.none:
returns = []
else:
returns = [build_wasm_type(method.return_type)]
args = []
for arg_type in method.arg_types:
assert arg_type is not valuetype.none # type hint
args.append(build_wasm_type(arg_type))
return wasmtime.FuncType(args, returns)
def build_wasm_type(value_type: ValueType) -> wasmtime.ValType:
if value_type is valuetype.u32:
return wasmtime.ValType.i32() # Signed-ness is in the operands
if value_type is valuetype.bytes:
return wasmtime.ValType.i32() # Bytes are passed as pointer
raise NotImplementedError

View File

@ -1,4 +0,0 @@
mypy==1.2.0
pycodestyle==2.10.0
pyflakes==3.0.1
types-redis==4.5.4.1

View File

@ -1,6 +0,0 @@
clickhouse-connect==0.5.20
numpy==1.24.2
pywasm3==0.5.0
redis==4.5.4
tomli==2.0.1 ; python_version < '3.11'
wasmtime==7.0.0