vuer.frame¶
- class vuer.Vuer[source]¶
Bases:
ServerVuer Server
This is the server for the Vuer client.
Usage:
app = Vuer() @app.spawn async def main(session: VuerSession): session.set @ Scene(children=[...]) app.run()
- bind(fn=None, start=False)[source]¶
Bind an asynchronous generator function for use in socket connection handler. The function should be a generator that yields Page objects.
- Parameters:
fn – The function to bind.
start – Start server after binding
- Returns:
BoundFn instance that can be called later with .start()
- spawn(fn: Callable[[VuerSession], Coroutine] | None = None, start=False)[source]¶
Bind the socket handler function fn to vuer, and start the event loop if start is True.
Note: this is really a misnomer.
- Parameters:
fn (Callable[[VuerSession], Coroutine] | None) – The function to spawn.
start – Start server after binding
- Returns:
BoundFn instance that can be called later with .start()
- async relay(request)[source]¶
This is the relay object for sending events to the server.
Todo: add API for specifying the websocket ID. Or just broadcast to all. Todo: add type hint
- Interface:
<uri>/relay?sid=<websocket_id>
- Returns:
Status 200
Status 400
- async bound_fn(session_proxy: VuerSession)[source]¶
This is the default generator function in the socket connection handler
- Parameters:
session_proxy (VuerSession) –
- async send(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]¶
- Parameters:
event (ServerEvent | None) –
- async rpc(ws_id, event: ServerRPC, ttl=2.0) ClientEvent | None[source]¶
RPC only takes a single response. For multi-response streaming, we need to build a new one
Question is whether we want to make this RPC an awaitable funciton.
- Parameters:
ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.
event (ServerRPC) –
- Return type:
ClientEvent | None
- async rpc_stream(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]¶
This RPC offers multiple responses.
- Parameters:
event (ServerEvent | None) –
- async uplink(proxy: VuerSession)[source]¶
- Parameters:
proxy (VuerSession) –
- async downlink(request: aiohttp.web_request.Request, ws: aiohttp.web_ws.WebSocketResponse)[source]¶
The websocket handler for receiving messages from the client.
- Parameters:
ws (aiohttp.web_ws.WebSocketResponse) – The websocket.
request (aiohttp.web_request.Request) – The request (unused).
- Returns:
None
- add_handler(event_type: str, fn: Callable[[ClientEvent, VuerSession], None] | None = None, once: bool = False) Callable[[], None][source]¶
Adding event handlers to the vuer server.
- Parameters:
event_type (str) – The event type to handle.
fn (Callable[[ClientEvent, VuerSession], None] | None) – The function to handle the event.
once (bool) – Whether to remove the handler after the first call. This is useful for RPC, which cleans up after itself. The issue is for RPC, the key also needs to match. So we hack it here to use a call specific event_type to enforce the cleanup.
- Return type:
Callable[[], None]
# Usage:
As a decorator:
app = Vuer() @app.add_handler("CAMERA_MOVE") def on_camera(event: ClientEvent, session: VuerSession): print("camera event", event.etype, event.value)
As a function:
app = Vuer() def on_camera(event: ClientEvent, session: VuerSession): print("camera event", event.etype, event.value) app.add_handler("CAMERA_MOVE", on_camera) app.run()
- run(free_port=None, *args, **kwargs)[source]¶
Run the server.
Deprecated since version Use:
start()instead. This method will be removed in a future version.
- domain: str = 'https://vuer.ai'¶
- port: int = 8012¶
- free_port: bool = False¶
- static_root: str = '.'¶
- queue_len: int = 100¶
- cors: str = 'https://vuer.ai,https://staging.vuer.ai,https://dash.ml,http://localhost:8000,http://127.0.0.1:8000,*'¶
- queries: Dict = None¶
- client_root: Path = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/vuer-py/checkouts/v0.0.71/src/vuer/client_build')¶
- verbose: bool = False¶
- _proxy(ws_id) VuerSession[source]¶
This is a proxy object that allows us to use the @ notation to send events to the client.
- Parameters:
ws_id – The websocket id.
- Returns:
A proxy object.
- Return type:
- async relay(request)[source]¶
This is the relay object for sending events to the server.
Todo: add API for specifying the websocket ID. Or just broadcast to all. Todo: add type hint
- Interface:
<uri>/relay?sid=<websocket_id>
- Returns:
Status 200
Status 400
- async bound_fn(session_proxy: VuerSession)[source]¶
This is the default generator function in the socket connection handler
- Parameters:
session_proxy (VuerSession) –
- spawn(fn: Callable[[VuerSession], Coroutine] | None = None, start=False)[source]¶
Bind the socket handler function fn to vuer, and start the event loop if start is True.
Note: this is really a misnomer.
- Parameters:
fn (Callable[[VuerSession], Coroutine] | None) – The function to spawn.
start – Start server after binding
- Returns:
BoundFn instance that can be called later with .start()
- bind(fn=None, start=False)[source]¶
Bind an asynchronous generator function for use in socket connection handler. The function should be a generator that yields Page objects.
- Parameters:
fn – The function to bind.
start – Start server after binding
- Returns:
BoundFn instance that can be called later with .start()
- async send(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]¶
- Parameters:
event (ServerEvent | None) –
- async rpc(ws_id, event: ServerRPC, ttl=2.0) ClientEvent | None[source]¶
RPC only takes a single response. For multi-response streaming, we need to build a new one
Question is whether we want to make this RPC an awaitable funciton.
- Parameters:
ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.
event (ServerRPC) –
- Return type:
ClientEvent | None
- async rpc_stream(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]¶
This RPC offers multiple responses.
- Parameters:
event (ServerEvent | None) –
- async uplink(proxy: VuerSession)[source]¶
- Parameters:
proxy (VuerSession) –
- async downlink(request: aiohttp.web_request.Request, ws: aiohttp.web_ws.WebSocketResponse)[source]¶
The websocket handler for receiving messages from the client.
- Parameters:
ws (aiohttp.web_ws.WebSocketResponse) – The websocket.
request (aiohttp.web_request.Request) – The request (unused).
- Returns:
None
- add_handler(event_type: str, fn: Callable[[ClientEvent, VuerSession], None] | None = None, once: bool = False) Callable[[], None][source]¶
Adding event handlers to the vuer server.
- Parameters:
event_type (str) – The event type to handle.
fn (Callable[[ClientEvent, VuerSession], None] | None) – The function to handle the event.
once (bool) – Whether to remove the handler after the first call. This is useful for RPC, which cleans up after itself. The issue is for RPC, the key also needs to match. So we hack it here to use a call specific event_type to enforce the cleanup.
- Return type:
Callable[[], None]
# Usage:
As a decorator:
app = Vuer() @app.add_handler("CAMERA_MOVE") def on_camera(event: ClientEvent, session: VuerSession): print("camera event", event.etype, event.value)
As a function:
app = Vuer() def on_camera(event: ClientEvent, session: VuerSession): print("camera event", event.etype, event.value) app.add_handler("CAMERA_MOVE", on_camera) app.run()
- async socket_index(request: aiohttp.web_request.BaseRequest)[source]¶
This is the relay object for sending events to the server.
Todo: add API for specifying the websocket ID. Or just broadcast to all. Todo: add type hint
- Interface:
<uri>/relay?sid=<websocket_id>
- Returns:
Status 200
Status 400
- Parameters:
request (aiohttp.web_request.BaseRequest) –
- add_route(path, fn: Callable, method='GET', content_type='text/html')[source]¶
- Parameters:
fn (Callable) –
- run(free_port=None, *args, **kwargs)[source]¶
Run the server.
Deprecated since version Use:
start()instead. This method will be removed in a future version.
- REQUEST_MAX_SIZE: int = 268435456¶
- WEBSOCKET_MAX_SIZE: int = 268435456¶
- _add_route(path: str, handler: callable, method: str = 'GET')¶
- Parameters:
path (str) –
handler (callable) –
method (str) –
- _add_static(path, root)¶
- _add_task(name=None)¶
- Parameters:
fn (Coroutine) –
- _init_app()¶
Initialize the aiohttp application and CORS context.
This must be called before using any server methods. Subclasses should call this in their __post_init__ method.
- Creates:
self.app: The aiohttp web.Application instance. self.cors_context: The aiohttp_cors setup for CORS handling.
- _socket(path: str, handler: callable)¶
- Parameters:
path (str) –
handler (callable) –
- _static_file(path, root, filename=None)¶
- ca_cert: str = None¶
- cert: str = None¶
- host: str = 'localhost'¶
- key: str = None¶
- class vuer.VuerSession[source]¶
Bases:
object- property socket¶
Getter for the websocket object.
this is useful for closing the socket session from the client side.
Example Usage:
@app.spawn(start=True): async def main(session: VuerSession): print("doing something...") await sleep(1.0) print("I am done! closing the socket.") session.socket.close()
- async grab_render(ttl=2.0, **kwargs) ClientEvent[source]¶
Grab a render from the client.
- Parameters:
quality – The quality of the render. 0.0 - 1.0
subsample – The subsample of the render.
ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.
- Return type:
- async get_webxr_mesh(key: str = 'webxr-mesh', ttl=2.0) ClientEvent[source]¶
Request WebXR mesh data from the client.
This method sends a GET_WEBXR_MESH RPC request to the client and waits for a response containing the detected environmental meshes from the WebXR AR session.
The response contains mesh data including vertices, indices, semantic labels, and transformation matrices for each detected mesh.
Usage Example:
from vuer import Vuer, VuerSession from vuer.schemas import WebXRMesh, Scene from asyncio import sleep app = Vuer() @app.spawn(start=True) async def main(session: VuerSession): session.set @ Scene( children=[WebXRMesh(key="webxr-mesh", stream=False)] ) await sleep(2) # Wait for meshes to be detected # Request mesh data on-demand mesh_data = await session.get_webxr_mesh(key="webxr-mesh") meshes = mesh_data.value.get('meshes', []) print(f"Retrieved {len(meshes)} meshes") for mesh in meshes: vertices = mesh['vertices'] indices = mesh['indices'] semantic_label = mesh.get('semanticLabel', 'unknown') matrix = mesh['matrix'] print(f"Mesh: {len(vertices)/3:.0f} vertices, label={semantic_label}")
- Parameters:
key (str) – The key of the WebXRMesh component to query (default: “webxr-mesh”)
ttl – The time to live for the handler in seconds. If no response is received within this time, a TimeoutError is raised (default: 2.0)
- Returns:
ClientEvent containing mesh data in event.value[‘meshes’]
- Raises:
asyncio.TimeoutError – If the client doesn’t respond within ttl seconds
AssertionError – If websocket session is missing
- Return type:
- send(event: ServerEvent) None[source]¶
Sending the event through the uplink queue.
- Parameters:
event (ServerEvent) –
- Return type:
None
- async rpc(event: ServerRPC, ttl=2.0) ClientEvent | None[source]¶
Send a ServerRPC event to the client and wait for a response through the session queue
- Parameters:
event (ServerRPC) – The ServerRPC event to send.
ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.
- Returns:
ClientEvent
- Return type:
ClientEvent | None
- property set: At¶
Used exclusively to set the scene.
the @SET operator is responsible for setting the root node of the scene.
- Examples:
proxy @ Set(Scene(children=[…]))
or
app.set @ Scene(children=[…])
- property update: At¶
Used to update existing elements. NOOP if an element does not exist.
Supports passing in a list of elements. (Thank God I implemented this… so handy! - Ge)
Example Usage:
app.update @ [element1, element2, ...]
- property add: At¶
Used to add elements to a specific parent.
Requires a parentKey, or treats the Scene root node as the default parent.
Example Usage:
app.add(element1, element2, ..., to=parentKey.)
or using the Scene root node as the default parent:
app.add @ element1
- property upsert: At¶
Upsert elements to a specific parent.
Requires a parentKey, or treats the Scene root node as the default parent.
Example Usage:
app.upsert(element1, element2, ..., to=parentKey.)
or using the Scene root node as the default parent:
app.upsert @ element1
- property remove: At¶
Remove elements by keys.
Example Usage:
app.remove @ ["key1", "key2", ...]
or a single key:
app.remove @ "key1"
- spawn_task(task, name=None)[source]¶
Spawn a task in the running asyncio event loop
Useful for background tasks. Returns an asyncio task that can be canceled.
1async background_task(): 2 print('\rthis ran once') 3 4async long_running_bg_task(): 5 while True: 6 await asyncio.sleep(1.0) 7 print("\rlong running background task is still running") 8 9@app.spawn_task 10async def main_fn(sess: VuerSession): 11 # Prepare background tasks here: 12 task = sess.spawn_task(background_task) 13 long_running_task = sess.spawn_task(long_running_bg_task)
Now to cancel a running task, simply
1task.cancel()
Todos
▫️ Add a way to automatically clean up when exiting the main_fn.
- async forever()[source]¶
Keep the session alive indefinitely.
This is useful when you want to set up a scene and keep the server running without the session closing. The session will remain active until the client disconnects or the server is stopped.
Example Usage:
@app.spawn(start=True) async def main(session: VuerSession): session.set @ Scene(Box(args=[0.2, 0.2, 0.2], key="box")) await session.forever()