Compare commits

...

20 Commits

Author SHA1 Message Date
Johan B.W. de Vries
85632b4c2c Ideas regarding async runtime 2023-04-15 17:14:37 +02:00
Johan B.W. de Vries
dae2740e65 We can have on thread sleep at a time 2023-04-14 19:23:43 +02:00
Johan B.W. de Vries
5e1c5679e5 prelude.log_bytes is now on the right thread
Before, all messages would be logged on the prelude
container / thread.

routing log improvements
2023-04-14 19:09:20 +02:00
Johan B.W. de Vries
72ea5bd592 Loading state from toml, redo routing, main cleanup 2023-04-14 18:44:18 +02:00
Johan B.W. de Vries
b06a604fd4 Clean shutdown 2023-04-14 14:48:28 +02:00
Johan B.W. de Vries
0d0af0e728 clickhouse logging 2023-04-14 14:45:34 +02:00
Johan B.W. de Vries
628fb775e8 State loading from toml 2023-04-11 12:27:36 +02:00
Johan B.W. de Vries
45cafdf327 Adds examples 2023-04-11 11:41:41 +02:00
Johan B.W. de Vries
13dc426fc5 It lives \o/ 2023-04-11 11:35:08 +02:00
Johan B.W. de Vries
1ff21c7f29 Some running already, on_module_loaded 2023-04-11 10:27:36 +02:00
Johan B.W. de Vries
8c5a2893d6 __eq__ operators 2023-04-11 09:49:07 +02:00
Johan B.W. de Vries
bb9ac649bf Routing ideas 2023-04-11 09:45:58 +02:00
Johan B.W. de Vries
2485ccba40 do_call ideas 2023-04-10 16:49:22 +02:00
Johan B.W. de Vries
90531a5b99 Idea 2023-04-10 16:31:51 +02:00
Johan B.W. de Vries
1d502ee9a2 Remove old code 2023-04-10 15:29:36 +02:00
Johan B.W. de Vries
79ac328fb6 Idea 2023-04-10 15:16:59 +02:00
Johan B.W. de Vries
9497747148 First message routed 2023-04-10 14:48:41 +02:00
Johan B.W. de Vries
34aaaa4ccb Routing ideas 2023-04-10 14:12:50 +02:00
Johan B.W. de Vries
2063a6ea9c Worker, run_once 2023-04-10 13:11:16 +02:00
Johan B.W. de Vries
1701c7fb6b Project setup and ideas 2023-04-10 12:55:02 +02:00
36 changed files with 1475 additions and 0 deletions

29
ARCHITECTURE.md Normal file
View File

@ -0,0 +1,29 @@
Controller (master)
Worker (minion)
API
Find a way to send typed data between instances
- As long as we match the types, we can copy the data
```py
from platform.http import SimpleRequest
#class SimpleRequest:
# method: bytes
# domain: bytes
# path: bytes
# query: bytes
# body: bytes
@exported
def handle_message(namespace: bytes, topic: bytes, kind: bytes, body: bytes) -> None:
SimpleRequest(b'GET', b'www.google.com', b'some/path', b'query=value', b'').send()
```
Preferably, the above automatically breaks it into an async function.
Possibly, everything async in Phasm is waiting for something to be delivered from the platform?
If this thing gets integrated as much as possible, then `from x import y` will get Phasm to talk to the server to get its type information.

35
Makefile Normal file
View File

@ -0,0 +1,35 @@
run-controller: sast
venv/bin/python -m phasmplatform.controller
run-worker: sast examples/echoapp.toml
venv/bin/python -m phasmplatform.worker
examples/echoapp.toml: examples/echoserver.toml examples/echoclient.toml
echo '# echoserver.toml' > $@
cat examples/echoserver.toml >> $@
echo '# echoclient.toml' >> $@
cat examples/echoclient.toml >> $@
clickhouse-sh:
docker compose exec clickhouse /usr/bin/clickhouse client
redis-sh:
docker compose exec -it redis redis-cli
setup:
python3.10 -m venv venv
venv/bin/pip install --upgrade pip wheel setuptools
venv/bin/pip install -r requirements.txt -r requirements-dev.txt
init:
docker compose up -d
update:
venv/bin/pip install -r requirements.txt -r requirements-dev.txt
sast:
venv/bin/mypy --strict phasmplatform
venv/bin/pyflakes phasmplatform
venv/bin/pycodestyle --max-line-length=140 phasmplatform
.PHONY: init redis-sh run-controller run-worker setup sast update

View File

@ -1,2 +1,8 @@
# phasm-platform # phasm-platform
## Steps
```sh
make setup
make init
make run-controller
```

14
config.toml Normal file
View File

@ -0,0 +1,14 @@
[controller.redis.write]
host = 'localhost'
port = 17859
[controller.redis.read]
host = 'localhost'
port = 17859
[worker.clickhouse.write]
host = 'localhost'
port = 17858
username = 'default'
password = ''
database = 'phasm_platform'

1
config/clickhouse/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/data

View File

@ -0,0 +1,27 @@
CREATE DATABASE IF NOT EXISTS phasm_platform;
USE phasm_platform;
CREATE TABLE routing_logs
(
timestamp DateTime64(6, 'UTC'),
thread_id String,
from_container String,
to_service String,
to_container String,
to_method String,
result String,
)
ENGINE = MergeTree
ORDER BY timestamp;
CREATE TABLE container_logs
(
timestamp DateTime64(6, 'UTC'),
name String,
thread_id String,
msg String,
)
ENGINE = MergeTree
ORDER BY timestamp;

12
docker-compose.yaml Normal file
View File

@ -0,0 +1,12 @@
services:
redis:
image: 'redis:7.0-alpine'
ports:
- '17859:6379'
clickhouse:
image: 'clickhouse/clickhouse-server:22.8-alpine'
ports:
- '17858:8123'
volumes:
- './config/clickhouse/data:/var/lib/clickhouse/'

4
examples/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/*.wasm
/*.wat
/echoapp.toml

103
examples/async.wat Normal file
View File

@ -0,0 +1,103 @@
(module
(import "env" "greet" (func $greet (param i32)))
(memory 1)
(type $func (func (param $callstack i32) (result i32)))
(func $do_greet (param $callstack i32) (result i32)
(local $greet_number i32)
local.get $callstack
i32.const 4
i32.sub
local.set $callstack
local.get $callstack
i32.load
local.set $greet_number
local.get $greet_number
call $greet
local.get $callstack
i32.const -1 ;; return to caller
i32.store
local.get $callstack
i32.const 4
i32.add
local.set $callstack
local.get $callstack
return
)
(func $on_module_loaded (param $callstack i32) (result i32)
;; Param 1: number to send to $greet
local.get $callstack
i32.const 29
i32.store
local.get $callstack
i32.const 4
i32.add
local.set $callstack
local.get $callstack
i32.const 17 ;; goto do_greet
i32.store
local.get $callstack
i32.const 4
i32.add
local.set $callstack
local.get $callstack
return
)
;; You could have a $on_module_loaded.0 that processes
;; the result from greet
(table 32 funcref)
(elem (i32.const 16) $on_module_loaded)
(elem (i32.const 17) $do_greet)
(func $runtime (param $func i32) (param $callstack i32)
loop $call_methods
local.get $callstack
local.get $func
call_indirect (type $func)
local.set $callstack
;; Pop the func number of the callback
local.get $callstack
i32.const 4
i32.sub
local.set $callstack
local.get $callstack
i32.load
local.set $func
;; See if we still have a function to jump to
local.get $func
i32.const 0
i32.ge_s
br_if $call_methods
end
)
(func $start
i32.const 16
i32.const 0
call $runtime
)
(start $start)
)
;; A function that takes 3 params and returns 1
;; stack param0 param1 param2 func_id
;; => $runtime pops func_id and calls its function
;; stack param0 param1 param2
;; => function$func_id pops param2, param1 and param0
;; stack
;; => function$func_id stores its return value
;; stack $result
;; => function$func_id returns to $runtime
;; $runtime now has the $result on top of the stack, rather the func_id to call -.-

19
examples/echoclient.py Normal file
View File

@ -0,0 +1,19 @@
@imported('echoserver')
def echo(msg: bytes) -> bytes:
pass
@imported('prelude')
def log_bytes(data: bytes) -> None:
pass
@imported('prelude')
def sleep(seconds: u32) -> None:
pass
@exported
def on_module_loaded() -> None:
log_bytes(b'Echo client starting up, calling server')
sleep(8)
log_bytes(echo(b'Hello, world!'))
sleep(8)
log_bytes(echo(b'Bye, world!'))

18
examples/echoclient.toml Normal file
View File

@ -0,0 +1,18 @@
[echoclient-image]
apiVersion = "v0"
kind = "Image"
path = "examples/echoclient.wasm"
hash = "sha256@84cb22d12dfdd6b05cb906f6db83d59f473c9df85a33822f696344af2b92b502"
imports = [
{ service = "prelude", method = "sleep", arg_types = ["u32"], return_type = "none"},
{ service = "echoserver", method = "echo", arg_types = ["bytes"], return_type = "bytes"},
]
[echoclient-container]
apiVersion = "v0"
kind = "Container"
image = "echoclient-image"
runtime = "wasmtime"

11
examples/echoserver.py Normal file
View File

@ -0,0 +1,11 @@
@exported
def echo(msg: bytes) -> bytes:
return msg
@imported('prelude')
def log_bytes(data: bytes) -> None:
pass
@exported
def on_module_loaded() -> None:
log_bytes(b'Echo service up and running')

33
examples/echoserver.toml Normal file
View File

@ -0,0 +1,33 @@
[echoserver-image]
apiVersion = "v0"
kind = "Image"
path = "examples/echoserver.wasm"
hash = "sha256@dfe03b4f7ce5e921931f8715384e35a6776fdc28837e42ffa04305bbadffcfc9"
[echoserver-container-0]
apiVersion = "v0"
kind = "Container"
image = "echoserver-image"
runtime = "wasmtime"
[echoserver-container-1]
apiVersion = "v0"
kind = "Container"
image = "echoserver-image"
runtime = "wasmtime"
[echoserver-service]
apiVersion = "v0"
kind = "Service"
name = "echoserver"
[echoserver-service.containerMatch]
byName = "echoserver-container-*"
[[echoserver-service.methods]]
name = "echo"
arg_types = [ "bytes" ]
return_type = "bytes"

View File

View File

View File

@ -0,0 +1,84 @@
from typing import TYPE_CHECKING, BinaryIO
from clickhouse_connect.driver.httpclient import HttpClient as ClickHouseHttpClient # type: ignore
import redis
if TYPE_CHECKING:
import tomli as tomllib
else:
try:
import tomllib
except ImportError:
import tomli as tomllib
class ControllerConfig:
__slots__ = ('redis_read', 'redis_write', )
redis_read: 'redis.Redis[bytes]' # Set decode_responses to False
redis_write: 'redis.Redis[bytes]' # Set decode_responses to False
def __init__(
self,
redis_read: 'redis.Redis[bytes]',
redis_write: 'redis.Redis[bytes]',
) -> None:
self.redis_read = redis_read
self.redis_write = redis_write
class WorkerConfig:
__slots__ = ('clickhouse_write', )
clickhouse_write: ClickHouseHttpClient
def __init__(
self,
clickhouse_write: ClickHouseHttpClient,
) -> None:
self.clickhouse_write = clickhouse_write
class Config:
__slots__ = ('controller_config', 'worker_config')
controller_config: ControllerConfig
worker_config: WorkerConfig
def __init__(self, controller_config: ControllerConfig, worker_config: WorkerConfig) -> None:
self.controller_config = controller_config
self.worker_config = worker_config
def from_toml(toml: BinaryIO) -> Config:
"""
Loads the given toml and builds a config instance out of it
"""
toml_dict = tomllib.load(toml)
redis_read = redis.Redis(
host=toml_dict['controller']['redis']['read']['host'],
port=toml_dict['controller']['redis']['read']['port'],
decode_responses=False,
)
redis_write = redis.Redis(
host=toml_dict['controller']['redis']['write']['host'],
port=toml_dict['controller']['redis']['write']['port'],
decode_responses=False,
)
controller_config = ControllerConfig(redis_read, redis_write)
clickhouse_write = ClickHouseHttpClient(
interface='http',
host=toml_dict['worker']['clickhouse']['write']['host'],
port=toml_dict['worker']['clickhouse']['write']['port'],
username=toml_dict['worker']['clickhouse']['write']['username'],
password=toml_dict['worker']['clickhouse']['write']['password'],
database=toml_dict['worker']['clickhouse']['write']['database'],
)
worker_config = WorkerConfig(clickhouse_write)
return Config(controller_config, worker_config)

View File

@ -0,0 +1,17 @@
from .image import Image
class Container:
__slots__ = ('name', 'image', 'runtime', )
name: str
image: Image
runtime: str
def __init__(self, name: str, image: Image, runtime: str) -> None:
self.name = name
self.image = image
self.runtime = runtime
def __repr__(self) -> str:
return f'Container({repr(self.name)}, {repr(self.image)}, {repr(self.runtime)})'

View File

@ -0,0 +1,14 @@
class PhasmPlatformError(Exception):
pass
class PhashPlatformRuntimeError(PhasmPlatformError):
pass
class PhashPlatformServiceNotFound(PhashPlatformRuntimeError):
pass
class PhashPlatformServiceMethodNotFound(PhashPlatformRuntimeError):
pass

View File

@ -0,0 +1,40 @@
from typing import List, Optional, Tuple
from .method import Method
class Image:
__slots__ = ('path', 'hash', 'imports', )
path: Optional[str]
hash: str
imports: List[Tuple[str, Method]]
def __init__(
self,
path: Optional[str],
hash: str,
imports: List[Tuple[str, Method]],
) -> None:
self.path = path
self.hash = hash
self.imports = imports
def __repr__(self) -> str:
return f'Image({repr(self.path)}, {repr(self.hash)}, {repr(self.imports)})'
class ImageReference(Image):
__slots__ = ('name', )
name: str
def __init__(self, name: str) -> None:
# Intentionally do not call super()
# This will cause AttributeError exceptions when someone
# tries to access an image that's only a reference
self.name = name
def __repr__(self) -> str:
return f'ImageReference({repr(self.name)})'

View File

@ -0,0 +1,29 @@
from typing import Any, List
from .valuetype import ValueType
class Method:
__slots__ = ('name', 'arg_types', 'return_type', )
name: str
arg_types: List[ValueType]
return_type: ValueType
def __init__(self, name: str, arg_types: List[ValueType], return_type: ValueType) -> None:
self.name = name
self.arg_types = arg_types
self.return_type = return_type
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Method):
raise NotImplementedError
return (
self.name == other.name
and self.arg_types == other.arg_types
and self.return_type == other.return_type
)
def __repr__(self) -> str:
return f'Method({repr(self.name)}, {repr(self.arg_types)}, {repr(self.return_type)})'

View File

@ -0,0 +1,120 @@
from typing import Callable, List, Optional, Union
from .container import Container
from .method import Method
from .value import Value
class MethodCallError:
__slots__ = ('msg', )
msg: Optional[str]
def __init__(self, msg: Optional[str] = None) -> None:
self.msg = msg
def __repr__(self) -> str:
return f'{self.__class__.__name__}({repr(self.msg)})'
class ServiceNotFoundError(MethodCallError):
"""
The service was not found
You have an `@imported('service_name')` somewhere,
but there is no `service_name` deployed to the platform.
"""
class MethodNotFoundError(MethodCallError):
"""
The method was not found
You have an `@imported('service_name')` somewhere,
but while the `service_name` is deployed to the platform,
it does not provide the given function.
"""
class MethodTypeMismatchError(MethodCallError):
"""
The method's type did not match
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
its type does not match what you defined in your import.
"""
class ServiceUnavailableError(MethodCallError):
"""
The service was not available
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
and its type matches what you defined in your import,
there is currently no container running providing said service.
"""
class MethodCallExceptionError(MethodCallError):
"""
The service was not available
You have an `@imported('service_name')` somewhere,
but there while the `service_name` is deployed to the platform,
and the service does provide the function,
and its type matches what you defined in your import,
and there is currently a container running providing said service,
when it tried to run the call, an exeption ocurred.
"""
class MethodCall:
__slots__ = ('method', 'args', )
method: Method
args: List[Value]
def __init__(
self,
method: Method,
args: List[Value],
) -> None:
self.method = method
self.args = args
def __repr__(self) -> str:
return f'MethodCall({repr(self.method)}, {repr(self.args)})'
MethodResultCallable = Callable[[Union[Value, MethodCallError]], None]
class RoutedMethodCall:
__slots__ = ('call', 'from_container', 'to_service', 'to_container', 'on_result', )
call: MethodCall
from_container: Optional[Container]
to_service: Optional[str]
to_container: Optional[Container]
on_result: MethodResultCallable
def __init__(
self,
call: MethodCall,
from_container: Optional[Container],
to_service: Optional[str],
to_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
self.call = call
self.from_container = from_container
self.to_service = to_service
self.to_container = to_container
self.on_result = on_result
def __repr__(self) -> str:
return f'RoutedMethodCall({repr(self.call)}, {repr(self.from_container)}, {repr(self.to_container)}, {repr(self.on_result)})'

View File

@ -0,0 +1,15 @@
from typing import Optional
from .container import Container
from .methodcall import MethodCall, MethodResultCallable
class MethodCallRouterInterface:
def route_call(
self,
service: str,
call: MethodCall,
from_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
raise NotImplementedError(service, call)

View File

@ -0,0 +1,39 @@
from typing import Dict, Optional, List
from .method import Method
class ContainerMatch:
__slots__ = ('by_name', )
by_name: str
def __init__(self, by_name: str) -> None:
self.by_name = by_name
def __repr__(self) -> str:
return f'ContainerMatch({repr(self.by_name)})'
class Service:
__slots__ = ('name', 'container_match', 'methods', )
name: str
container_match: ContainerMatch
methods: Dict[str, Method]
def __init__(self, name: str, container_match: ContainerMatch, methods: List[Method]) -> None:
self.name = name
self.container_match = container_match
self.methods = {
x.name: x
for x in methods
}
def __repr__(self) -> str:
return f'Service({repr(self.name)}, {repr(self.container_match)}, {repr(list(self.methods.values()))})'
class ServiceDiscoveryInterface:
def find_service(self, name: str) -> Optional[Service]:
raise NotImplementedError

View File

@ -0,0 +1,167 @@
from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List, Tuple
from . import valuetype
from .container import Container
from .image import Image, ImageReference
from .method import Method
from .service import ContainerMatch, Service
if TYPE_CHECKING:
import tomli as tomllib
else:
try:
import tomllib
except ImportError:
import tomli as tomllib
class State:
__slots__ = ('images', 'containers', 'services', )
images: List[Image]
containers: List[Container]
services: List[Service]
def __init__(
self,
images: List[Image],
containers: List[Container],
services: List[Service],
) -> None:
self.images = images
self.containers = containers
self.services = services
def __repr__(self) -> str:
return f'State({repr(self.images)}, {repr(self.containers)}, {repr(self.services)})'
def image_import_from_toml(spec: Dict[str, Any]) -> Tuple[str, Method]:
"""
Loads an Method spec from toml, when it comes in the form of Image.imports
"""
service = spec.pop('service')
method = spec.pop('method')
arg_types = spec.pop('arg_types', [])
return_type = spec.pop('return_type', 'none')
assert not spec, ('Unrecognized values in image.imports', spec)
return (str(service), Method(
str(method),
[
valuetype.LOOKUP_TABLE[x]
for x in arg_types
],
valuetype.LOOKUP_TABLE[return_type],
), )
def image_from_toml(spec: Dict[str, Any]) -> Image:
"""
Loads an Image spec from toml
"""
imports = [
image_import_from_toml(imp_spec)
for imp_spec in spec.get('imports', [])
]
return Image(str(spec['path']), str(spec['hash']), imports)
def container_from_toml(name: str, spec: Dict[str, Any]) -> Container:
"""
Loads a Container spec from toml
"""
return Container(name, ImageReference(str(spec['image'])), str(spec['runtime']))
def service_container_match_from_toml(spec: Dict[str, Any]) -> ContainerMatch:
"""
Loads a service.ContainerMatch spec from toml
"""
return ContainerMatch(str(spec['byName']))
def method_from_toml(spec: Dict[str, Any]) -> Method:
"""
Loads an Method spec from toml
"""
name = spec.pop('name')
arg_types = spec.pop('arg_types', [])
return_type = spec.pop('return_type', 'none')
assert not spec, ('Unrecognized values in image.imports', spec)
return Method(
str(name),
[
valuetype.LOOKUP_TABLE[x]
for x in arg_types
],
valuetype.LOOKUP_TABLE[return_type],
)
def service_from_toml(spec: Dict[str, Any]) -> Service:
"""
Loads a Service spec from toml
"""
spec.pop('apiVersion')
spec.pop('kind')
name = spec.pop('name')
container_match = spec.pop('containerMatch')
methods = spec.pop('methods', [])
assert not spec, ('Unrecognized values in service', spec)
return Service(str(name), service_container_match_from_toml(container_match), [
method_from_toml(x)
for x in methods
])
def from_toml(toml: BinaryIO) -> State:
"""
Loads the given toml and builds a state instance out of it
"""
toml_dict = tomllib.load(toml)
images: List[Image] = []
images_by_name: Dict[str, Image] = {}
containers: List[Container] = []
services: List[Service] = []
for name, spec in toml_dict.items():
if spec['apiVersion'] != 'v0':
raise NotImplementedError('apiVersion', spec['apiVersion'])
if spec['kind'] == 'Image':
image = image_from_toml(spec)
images.append(image)
assert name not in images_by_name, f'Duplicate image name: {name}'
images_by_name[name] = image
continue
if spec['kind'] == 'Container':
containers.append(container_from_toml(name, spec))
continue
if spec['kind'] == 'Service':
services.append(service_from_toml(spec))
continue
raise NotImplementedError(spec)
for container in containers:
if not isinstance(container.image, ImageReference):
continue
image_opt = images_by_name.get(container.image.name, None)
assert image_opt is not None, f'Image reference not resolved: {container.image.name}'
container.image = image_opt
return State(images, containers, services)

View File

@ -0,0 +1,25 @@
from typing import Any, Union
from .valuetype import ValueType, none
ValueData = Union[None, int, float, bytes]
class Value:
__slots__ = ('value_type', 'data', )
value_type: ValueType
data: ValueData
def __init__(self, value_type: ValueType, data: ValueData) -> None:
self.value_type = value_type
self.data = data
def __eq__(self, other: Any) -> bool:
return self.value_type is other.value_type and self.data == other.data
def __repr__(self) -> str:
return f'Value({repr(self.value_type)}, {repr(self.data)})'
NoneValue = Value(none, None)

View File

@ -0,0 +1,32 @@
from typing import Any, Dict
class ValueType:
__slots__ = ('name', )
name: str
def __init__(self, name: str) -> None:
self.name = name
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ValueType):
raise NotImplementedError
return self is other
def __repr__(self) -> str:
return f'valuetype.{self.name}'
u32 = ValueType('u32')
bytes = ValueType('bytes')
none = ValueType('none')
LOOKUP_TABLE: Dict[str, ValueType] = {
u32.name: u32,
bytes.name: bytes,
none.name: none,
}

View File

View File

@ -0,0 +1,16 @@
import sys
from phasmplatform.common.config import from_toml
def main() -> int:
with open('config.toml', 'rb') as fil:
config = from_toml(fil)
del config
return 0
if __name__ == '__main__':
sys.exit(main())

View File

View File

@ -0,0 +1,271 @@
from typing import Dict, List, Optional, Type, Union
import datetime
import sys
import random
import threading
import time
import uuid
from queue import Empty, Queue
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.config import WorkerConfig, from_toml as config_from_toml
from phasmplatform.common.image import Image
from phasmplatform.common.method import Method
from phasmplatform.common.methodcall import MethodCall, RoutedMethodCall, MethodResultCallable
from phasmplatform.common.methodcall import (
MethodCallExceptionError, MethodNotFoundError, MethodTypeMismatchError, ServiceNotFoundError, ServiceUnavailableError
)
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.service import ContainerMatch, Service
from phasmplatform.common.value import Value
from phasmplatform.common.state import from_toml as state_from_toml
from .runners.base import RunnerInterface
from .runners.prelude import PreludeRunner
from .runners.wasmtime import WasmTimeRunner
RUNTIME_MAP: Dict[str, Type[RunnerInterface]] = {
'prelude': PreludeRunner,
'wasmtime': WasmTimeRunner,
}
def log_now() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)
class ShutDownCommand():
pass
MethodCallQueue = Queue[Union[RoutedMethodCall, ShutDownCommand]]
class ManagedContainer:
__slots__ = ('config', 'method_call_router', 'container', 'thread', 'queue', )
config: WorkerConfig
method_call_router: MethodCallRouterInterface
container: Container
thread: threading.Thread
queue: MethodCallQueue
def __init__(self, config: WorkerConfig, method_call_router: MethodCallRouterInterface, container: Container) -> None:
self.config = config
self.method_call_router = method_call_router
self.container = container
self.thread = threading.Thread(target=container_thread, args=(self, ))
self.queue = Queue()
def __repr__(self) -> str:
return f'ManagedContainer(..., ..., {repr(self.container)}, ..., ...)'
def container_thread(mcont: ManagedContainer) -> None:
clickhouse_session_id = str(uuid.uuid1())
clickhouse_write = mcont.config.clickhouse_write
def container_log(msg: str) -> None:
clickhouse_write.insert(
table='container_logs',
data=[
[log_now(), mcont.container.name, str(id(mcont.thread)), msg],
],
column_names=['timestamp', 'name', 'thread_id', 'msg'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
def routing_log(from_container: str, to_service: Optional[str], to_method: str, result: str) -> None:
clickhouse_write.insert(
table='routing_logs',
data=[
[log_now(), str(id(mcont.thread)), from_container, mcont.container.name, to_service or '', to_method, result],
],
column_names=['timestamp', 'thread_id', 'from_container', 'to_container', 'to_service', 'to_method', 'result'],
column_type_names=["DateTime64(6, 'UTC')", 'String', 'String', 'String', 'String', 'String', 'String'],
settings={'session_id': clickhouse_session_id}
)
container_log(f'Creating runtime for {mcont.container.image.path}')
cls = RUNTIME_MAP.get(mcont.container.runtime)
assert cls is not None, f'Unknown runtime: {mcont.container.runtime}'
runtime: RunnerInterface = cls(mcont.method_call_router, mcont.container, container_log)
container_log('Starting thread')
while True:
try:
call = mcont.queue.get(block=True, timeout=1)
except Empty:
continue
if isinstance(call, ShutDownCommand):
break
try:
result = runtime.do_call(call.call)
except Exception as ex:
raise ex
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.call.method.name,
repr(ex),
)
call.on_result(MethodCallExceptionError(str(ex)))
continue
routing_log(
call.from_container.name if call.from_container is not None else '[SYSTEM]',
call.to_service,
call.call.method.name,
repr(result.data) if isinstance(result, Value) else repr(result)
)
call.on_result(result)
container_log('Stopping thread')
class LocalhostMethodCallRouter(MethodCallRouterInterface):
__slots__ = ('services', 'container_by_service')
services: Dict[str, Service]
container_by_service: Dict[str, List[ManagedContainer]]
def __init__(self) -> None:
self.services = {}
self.container_by_service = {}
def route_call(
self,
service_name: str,
call: MethodCall,
from_container: Optional[Container],
on_result: MethodResultCallable,
) -> None:
service = self.services.get(service_name)
if service is None:
on_result(ServiceNotFoundError(service_name))
return
method = service.methods.get(call.method.name)
if method is None:
on_result(MethodNotFoundError(f'{service_name}.{call.method.name}'))
return
if method != call.method:
on_result(MethodTypeMismatchError())
return
to_mcont = self.find_one_container(service)
if to_mcont is None:
on_result(ServiceUnavailableError(service_name))
return
to_mcont.queue.put(RoutedMethodCall(
call,
from_container,
service.name,
to_mcont.container,
on_result,
))
def register_service(self, service: Service) -> None:
assert service.name not in self.services
self.services[service.name] = service
def register_container(self, mcont: ManagedContainer) -> None:
for service in self.services.values():
if service.container_match.by_name == mcont.container.name:
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
if service.container_match.by_name[-1] == '*' and mcont.container.name.startswith(service.container_match.by_name[:-1]):
self.container_by_service.setdefault(service.name, [])
self.container_by_service[service.name].append(mcont)
continue
def find_one_container(self, service: Service) -> Optional[ManagedContainer]:
container_list = self.container_by_service.get(service.name)
if not container_list:
return None
return random.choice(container_list)
def make_prelude() -> Service:
methods: List[Method] = [
Method('sleep', [valuetype.u32], valuetype.none),
]
return Service('prelude', ContainerMatch('__prelude__'), methods)
def main() -> int:
with open('config.toml', 'rb') as fil:
config = config_from_toml(fil)
with open('./examples/echoapp.toml', 'rb') as fil:
state = state_from_toml(fil)
method_call_router = LocalhostMethodCallRouter()
print('Registering services')
method_call_router.register_service(make_prelude())
for service in state.services:
method_call_router.register_service(service)
container_list: List[ManagedContainer] = []
prelude_container = Container(
'__prelude__',
Image('prelude', '', []),
'prelude',
)
for cont in [prelude_container] + state.containers:
mcont = ManagedContainer(config.worker_config, method_call_router, cont)
container_list.append(mcont)
method_call_router.register_container(mcont)
print('Starting containers')
for mcont in container_list:
mcont.thread.start()
print('Sending out on_module_loaded calls') # TODO: Route this normally?
on_module_loaded = MethodCall(
Method('on_module_loaded', [], valuetype.none),
[],
)
for mcont in container_list:
mcont.queue.put(RoutedMethodCall(on_module_loaded, None, None, mcont.container, lambda x: None))
try:
while 1:
time.sleep(1)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, shutting down')
pass
shut_down_command = ShutDownCommand()
for mcont in container_list:
mcont.queue.put(shut_down_command)
print('Awaiting containers')
for mcont in container_list:
mcont.thread.join()
return 0
if __name__ == '__main__':
sys.exit(main())

View File

View File

@ -0,0 +1,96 @@
from typing import Callable, TextIO, Tuple, Union
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.methodcall import MethodCall, MethodCallError
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
WasmValue = Union[None, int, float]
class RunnerInterface:
__slots__ = ('method_call_router', 'container', 'container_log', )
method_call_router: MethodCallRouterInterface
container: Container
container_log: Tuple[Callable[[str], None]] # Tuple for typing issues
def __init__(self, method_call_router: MethodCallRouterInterface, container: Container, container_log: Callable[[str], None]) -> None:
self.method_call_router = method_call_router
self.container = container
self.container_log = (container_log, )
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
"""
Executes the call on the current container
This method is responsible for calling the on_success or on_error method.
"""
raise NotImplementedError
class BaseRunner(RunnerInterface):
__slots__ = ('method_call_router', )
def alloc_bytes(self, data: bytes) -> int:
"""
Calls upon stdlib.types.__alloc_bytes__ to allocate a bytes object
"""
raise NotImplementedError
def read_bytes(self, ptr: int) -> bytes:
"""
Reads a byte object allocated by stdlib.types.__alloc_bytes__
"""
raise NotImplementedError
def value_to_wasm(self, val: Value) -> WasmValue:
if val.value_type is valuetype.none:
assert val.data is None # type hint
return None
if val.value_type is valuetype.bytes:
assert isinstance(val.data, bytes) # type hint
return self.alloc_bytes(val.data)
raise NotImplementedError(val)
def value_from_wasm(self, value_type: ValueType, val: WasmValue) -> Value:
if value_type is valuetype.none:
assert val is None # type hint
return Value(value_type, None)
if value_type is valuetype.u32:
assert isinstance(val, int) # type hint
return Value(value_type, val)
if value_type is valuetype.bytes:
assert isinstance(val, int) # type hint
return Value(value_type, self.read_bytes(val))
raise NotImplementedError(value_type, val)
def dump_memory(textio: TextIO, mem: bytes) -> None:
line_width = 16
prev_line = None
skip = False
for idx in range(0, len(mem), line_width):
line = ''
for idx2 in range(0, line_width):
line += f'{mem[idx + idx2]:02X}'
if idx2 % 2 == 1:
line += ' '
if prev_line == line:
if not skip:
textio.write('**\n')
skip = True
else:
textio.write(f'{idx:08x} {line}\n')
prev_line = line

View File

@ -0,0 +1,27 @@
from typing import Union
import time
from phasmplatform.common.methodcall import MethodCall, MethodCallError
from phasmplatform.common.value import Value, NoneValue
from .base import BaseRunner
class PreludeRunner(BaseRunner):
__slots__ = ()
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
if call.method.name == 'on_module_loaded':
self.container_log[0]('PreludeRunner loaded')
return NoneValue
if call.method.name == 'sleep':
# This will block this thread
# Which is has to do until we can get the async really working
seconds = call.args[0].data
assert isinstance(seconds, int) # type hint
time.sleep(seconds)
return NoneValue
raise NotImplementedError(call)

View File

@ -0,0 +1,161 @@
from typing import Any, Callable, Dict, List, Union
import ctypes
import functools
import struct
from queue import Empty, Queue
import wasmtime
from phasmplatform.common import valuetype
from phasmplatform.common.container import Container
from phasmplatform.common.method import Method
from phasmplatform.common.methodcall import MethodCall, MethodCallError, MethodNotFoundError
from phasmplatform.common.router import MethodCallRouterInterface
from phasmplatform.common.value import Value
from phasmplatform.common.valuetype import ValueType
from .base import BaseRunner, WasmValue
class WasmTimeRunner(BaseRunner):
__slots__ = ('store', 'module', 'instance', 'exports')
def __init__(self, method_call_router: MethodCallRouterInterface, container: Container, container_log: Callable[[str], None]) -> None:
super().__init__(method_call_router, container, container_log)
with open(f'./{container.image.path}', 'rb') as fil: # TODO: ImageLoader?
wasm_bin = fil.read()
self.store = wasmtime.Store()
self.module = wasmtime.Module(self.store.engine, wasm_bin)
import_map: Dict[str, wasmtime.Func] = {
f'{imprt_service}.{imprt_method.name}': wasmtime.Func(
self.store,
build_func_type(imprt_method),
functools.partial(self.send_service_call, imprt_service, imprt_method)
)
for (imprt_service, imprt_method, ) in container.image.imports
}
import_map['prelude.log_bytes'] = wasmtime.Func(
self.store,
wasmtime.FuncType([wasmtime.ValType.i32()], []),
functools.partial(self.log_bytes),
)
# Make sure the given import lists order matches the one given by wasmtime
# Otherwise, wasmtime can't match them up.
imports: List[wasmtime.Func] = [
import_map[f'{imprt.module}.{imprt.name}']
for imprt in self.module.imports
]
self.instance = wasmtime.Instance(self.store, self.module, imports)
self.exports = self.instance.exports(self.store)
def alloc_bytes(self, data: bytes) -> int:
memory = self.exports['memory']
assert isinstance(memory, wasmtime.Memory) # type hint
data_ptr = memory.data_ptr(self.store)
data_len = memory.data_len(self.store)
alloc_bytes = self.exports['stdlib.types.__alloc_bytes__']
assert isinstance(alloc_bytes, wasmtime.Func)
ptr = alloc_bytes(self.store, len(data))
assert isinstance(ptr, int) # type hint
idx = ptr + 4 # Skip the header from header from __alloc_bytes__
for byt in data:
assert idx < data_len
data_ptr[idx] = ctypes.c_ubyte(byt)
idx += 1
return ptr
def read_bytes(self, ptr: int) -> bytes:
memory = self.exports['memory']
assert isinstance(memory, wasmtime.Memory) # type hint
data_ptr = memory.data_ptr(self.store)
data_len = memory.data_len(self.store)
raw = ctypes.string_at(data_ptr, data_len)
length, = struct.unpack('<I', raw[ptr:ptr + 4]) # Header prefixed by __alloc_bytes__
return raw[ptr + 4:ptr + 4 + length]
def do_call(self, call: MethodCall) -> Union[Value, MethodCallError]:
try:
wasm_method = self.exports[call.method.name]
except KeyError:
return MethodNotFoundError()
assert isinstance(wasm_method, wasmtime.Func)
act_args = [self.value_to_wasm(x) for x in call.args]
result = wasm_method(self.store, *act_args)
assert result is None or isinstance(result, (int, float, )) # type hint
return self.value_from_wasm(call.method.return_type, result)
def send_service_call(self, service: str, method: Method, *args: Any) -> WasmValue:
assert len(method.arg_types) == len(args) # type hint
call_args = [
self.value_from_wasm(x, y)
for x, y in zip(method.arg_types, args)
]
queue: Queue[Value] = Queue(maxsize=1)
def on_result(res: Union[Value, MethodCallError]) -> None:
if isinstance(res, Value):
queue.put(res)
else:
raise Exception(res)
call = MethodCall(method, call_args)
self.method_call_router.route_call(service, call, self.container, on_result)
try:
value = queue.get(block=True, timeout=10)
except Empty:
raise Exception('Did not receive value from remote call') # TODO
return self.value_to_wasm(value)
def log_bytes(self, data_ptr: int) -> None:
value = self.value_from_wasm(valuetype.bytes, data_ptr)
self.container_log[0](repr(value.data))
def build_func_type(method: Method) -> wasmtime.FuncType:
if method.return_type is valuetype.none:
returns = []
else:
returns = [build_wasm_type(method.return_type)]
args = []
for arg_type in method.arg_types:
assert arg_type is not valuetype.none # type hint
args.append(build_wasm_type(arg_type))
return wasmtime.FuncType(args, returns)
def build_wasm_type(value_type: ValueType) -> wasmtime.ValType:
if value_type is valuetype.u32:
return wasmtime.ValType.i32() # Signed-ness is in the operands
if value_type is valuetype.bytes:
return wasmtime.ValType.i32() # Bytes are passed as pointer
raise NotImplementedError

4
requirements-dev.txt Normal file
View File

@ -0,0 +1,4 @@
mypy==1.2.0
pycodestyle==2.10.0
pyflakes==3.0.1
types-redis==4.5.4.1

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
clickhouse-connect==0.5.20
numpy==1.24.2
pywasm3==0.5.0
redis==4.5.4
tomli==2.0.1 ; python_version < '3.11'
wasmtime==7.0.0