vuer.frame
ΒΆ
- class vuer.Vuer[source]ΒΆ
Bases:
PrefixProto
,Server
Vuer Server
This is the server for the Vuer client.
Usage:
app = Vuer() @app.spawn async def main(session: VuerSession): session.set @ Scene(children=[...]) app.run()
- bind(fn=None, start=False)[source]ΒΆ
Bind an asynchronous generator function for use in socket connection handler. The function should be a generator that yields Page objects.
- Parameters:
fn β The function to bind.
- Returns:
None
- spawn(fn: Callable[[VuerSession], Coroutine] | None = None, start=False)[source]ΒΆ
bind the socket handler function fn to vuer, and start the event loop if start is True.
Note: this is really a misnomer.
- Parameters:
fn (Callable[[VuerSession], Coroutine] | None) β The function to spawn.
start β Start server after binding
- Returns:
None
- async relay(request)[source]ΒΆ
This is the relay object for sending events to the server.
Todo: add API for specifying the websocket ID. Or just broadcast to all. Todo: add type hint
- Interface:
<uri>/relay?sid=<websocket_id>
- Returns:
Status 200
Status 400
- async bound_fn(session_proxy: VuerSession)[source]ΒΆ
This is the default generator function in the socket connection handler
- Parameters:
session_proxy (VuerSession) β
- async send(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]ΒΆ
- Parameters:
event (ServerEvent | None) β
- async rpc(ws_id, event: ServerRPC, ttl=2.0) ClientEvent | None [source]ΒΆ
RPC only takes a single response. For multi-response streaming, we need to build a new one
Question is whether we want to make this RPC an awaitable funciton.
- Parameters:
ttl β The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.
event (ServerRPC) β
- Return type:
ClientEvent | None
- async rpc_stream(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]ΒΆ
This RPC offers multiple responses.
- Parameters:
event (ServerEvent | None) β
- async uplink(proxy: VuerSession)[source]ΒΆ
- Parameters:
proxy (VuerSession) β
- async downlink(request: aiohttp.web_request.Request, ws: aiohttp.web_ws.WebSocketResponse)[source]ΒΆ
The websocket handler for receiving messages from the client.
- Parameters:
ws (aiohttp.web_ws.WebSocketResponse) β The websocket.
request (aiohttp.web_request.Request) β The request (unused).
- Returns:
None
- add_handler(event_type: str, fn: Callable[[ClientEvent, VuerSession], None] | None = None, once: bool = False) Callable[[], None] [source]ΒΆ
Adding event handlers to the vuer server.
- Parameters:
event_type (str) β The event type to handle.
fn (Callable[[ClientEvent, VuerSession], None] | None) β The function to handle the event.
once (bool) β Whether to remove the handler after the first call. This is useful for RPC, which cleans up after itself. The issue is for RPC, the key also needs to match. So we hack it here to use a call specific event_type to enforce the cleanup.
- Return type:
Callable[[], None]
# Usage:
As a decorator:
app = Vuer() @app.add_handler("CAMERA_MOVE") def on_camera(event: ClientEvent, session: VuerSession): print("camera event", event.etype, event.value)
As a function:
app = Vuer() def on_camera(event: ClientEvent, session: VuerSession): print("camera event", event.etype, event.value) app.add_handler("CAMERA_MOVE", on_camera) app.run()
- domain = 'https://vuer.ai'ΒΆ
- host = 'localhost'ΒΆ
- port = 8012ΒΆ
- free_port = NoneΒΆ
- static_root = '.'ΒΆ
todo: we want to support multiple paths.
- queue_len = 100ΒΆ
- cors = 'https://vuer.ai,https://staging.vuer.ai,https://dash.ml,http://localhost:8000,http://127.0.0.1:8000,*'ΒΆ
- queries = {}ΒΆ
- cert = NoneΒΆ
- key = NoneΒΆ
- ca_cert = NoneΒΆ
- client_root = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/vuer-py/checkouts/latest/vuer/client_build')ΒΆ
- verbose = NoneΒΆ
- REQUEST_MAX_SIZE: int = 268435456ΒΆ
- WEBSOCKET_MAX_SIZE: int = 268435456ΒΆ
- class vuer.VuerSession[source]ΒΆ
Bases:
object
- property socketΒΆ
Getter for the websocket object.
this is useful for closing the socket session from the client side.
Example Usage:
@app.spawn(start=True): async def main(session: VuerSession): print("doing something...") await sleep(1.0) print("I am done! closing the socket.") session.socket.close()
- async grab_render(ttl=2.0, **kwargs) ClientEvent [source]ΒΆ
Grab a render from the client.
- Parameters:
quality β The quality of the render. 0.0 - 1.0
subsample β The subsample of the render.
ttl β The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.
- Return type:
- property set: AtΒΆ
Used exclusively to set the scene.
the @SET operator is responsible for setting the root node of the scene.
- Examples:
proxy @ Set(Scene(children=[β¦]))
or
app.set @ Scene(children=[β¦])
- property update: AtΒΆ
Used to update existing elements. NOOP if an element does not exist.
Supports passing in a list of elements. (Thank God I implemented this⦠so handy! - Ge)
Example Usage:
app.update @ [element1, element2, ...]
- property add: AtΒΆ
Used to add elements to a specific parent.
Requires a parentKey, or treats the Scene root node as the default parent.
Example Usage:
app.add(element1, element2, ..., to=parentKey.)
or using the Scene root node as the default parent:
app.add @ element1
- property upsert: AtΒΆ
Upsert elements to a specific parent.
Requires a parentKey, or treats the Scene root node as the default parent.
Example Usage:
app.upsert(element1, element2, ..., to=parentKey.)
or using the Scene root node as the default parent:
app.upsert @ element1
- property remove: AtΒΆ
Remove elements by keys.
Example Usage:
app.remove @ ["key1", "key2", ...]
or a single key:
app.remove @ "key1"
- spawn_task(task, name=None)[source]ΒΆ
Spawn a task in the running asyncio event loop
Useful for background tasks. Returns an asyncio task that can be canceled.
1async background_task(): 2 print('\rthis ran once') 3 4async long_running_bg_task(): 5 while True: 6 await asyncio.sleep(1.0) 7 print("\rlong running background task is still running") 8 9@app.spawn_task 10async def main_fn(sess: VuerSession): 11 # Prepare background tasks here: 12 task = sess.spawn_task(background_task) 13 long_running_task = sess.spawn_task(long_running_bg_task)
Now to cancel a running task, simply
1task.cancel()
Todos
β«οΈ Add a way to automatically clean up when exiting the main_fn.