vuer.frame

class vuer.Vuer[source]

Bases: Server

Vuer Server

This is the server for the Vuer client.

Usage:

app = Vuer()

@app.spawn
async def main(session: VuerSession):
     session.set @ Scene(children=[...])

app.run()
bind(fn=None, start=False)[source]

Bind an asynchronous generator function for use in socket connection handler. The function should be a generator that yields Page objects.

Parameters:
  • fn – The function to bind.

  • start – Start server after binding

Returns:

BoundFn instance that can be called later with .start()

spawn(fn: Callable[[VuerSession], Coroutine] | None = None, start=False)[source]

Bind the socket handler function fn to vuer, and start the event loop if start is True.

Note: this is really a misnomer.

Parameters:
  • fn (Callable[[VuerSession], Coroutine] | None) – The function to spawn.

  • start – Start server after binding

Returns:

BoundFn instance that can be called later with .start()

async relay(request)[source]

This is the relay object for sending events to the server.

Todo: add API for specifying the websocket ID. Or just broadcast to all. Todo: add type hint

Interface:

<uri>/relay?sid=<websocket_id>

Returns:

  • Status 200

  • Status 400

async bound_fn(session_proxy: VuerSession)[source]

This is the default generator function in the socket connection handler

Parameters:

session_proxy (VuerSession) –

get_url()[source]

Get the URL for the Tassa. :return: The URL for the Tassa.

async send(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]
Parameters:

event (ServerEvent | None) –

async rpc(ws_id, event: ServerRPC, ttl=2.0) ClientEvent | None[source]

RPC only takes a single response. For multi-response streaming, we need to build a new one

Question is whether we want to make this RPC an awaitable funciton.

Parameters:
  • ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.

  • event (ServerRPC) –

Return type:

ClientEvent | None

async rpc_stream(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]

This RPC offers multiple responses.

Parameters:

event (ServerEvent | None) –

async close_ws(ws_id)[source]
Parameters:

proxy (VuerSession) –

The websocket handler for receiving messages from the client.

Parameters:
  • ws (aiohttp.web_ws.WebSocketResponse) – The websocket.

  • request (aiohttp.web_request.Request) – The request (unused).

Returns:

None

add_handler(event_type: str, fn: Callable[[ClientEvent, VuerSession], None] | None = None, once: bool = False) Callable[[], None][source]

Adding event handlers to the vuer server.

Parameters:
  • event_type (str) – The event type to handle.

  • fn (Callable[[ClientEvent, VuerSession], None] | None) – The function to handle the event.

  • once (bool) – Whether to remove the handler after the first call. This is useful for RPC, which cleans up after itself. The issue is for RPC, the key also needs to match. So we hack it here to use a call specific event_type to enforce the cleanup.

Return type:

Callable[[], None]

# Usage:

As a decorator:

app = Vuer()
@app.add_handler("CAMERA_MOVE")
def on_camera(event: ClientEvent, session: VuerSession):
    print("camera event", event.etype, event.value)

As a function:

app = Vuer()
def on_camera(event: ClientEvent, session: VuerSession):
    print("camera event", event.etype, event.value)

app.add_handler("CAMERA_MOVE", on_camera)
app.run()
_ttl_handler(ttl, cleanup)[source]
run(free_port=None, *args, **kwargs)[source]

Run the server.

Deprecated since version Use: start() instead. This method will be removed in a future version.

domain: str = 'https://vuer.ai'
port: int = 8012
free_port: bool = False
static_root: str = '.'
queue_len: int = 100
cors: str = 'https://vuer.ai,https://staging.vuer.ai,https://dash.ml,http://localhost:8000,http://127.0.0.1:8000,*'
queries: Dict = None
client_root: Path = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/vuer-py/checkouts/v0.0.71/src/vuer/client_build')
verbose: bool = False
_proxy(ws_id) VuerSession[source]

This is a proxy object that allows us to use the @ notation to send events to the client.

Parameters:

ws_id – The websocket id.

Returns:

A proxy object.

Return type:

VuerSession

async relay(request)[source]

This is the relay object for sending events to the server.

Todo: add API for specifying the websocket ID. Or just broadcast to all. Todo: add type hint

Interface:

<uri>/relay?sid=<websocket_id>

Returns:

  • Status 200

  • Status 400

async bound_fn(session_proxy: VuerSession)[source]

This is the default generator function in the socket connection handler

Parameters:

session_proxy (VuerSession) –

spawn(fn: Callable[[VuerSession], Coroutine] | None = None, start=False)[source]

Bind the socket handler function fn to vuer, and start the event loop if start is True.

Note: this is really a misnomer.

Parameters:
  • fn (Callable[[VuerSession], Coroutine] | None) – The function to spawn.

  • start – Start server after binding

Returns:

BoundFn instance that can be called later with .start()

bind(fn=None, start=False)[source]

Bind an asynchronous generator function for use in socket connection handler. The function should be a generator that yields Page objects.

Parameters:
  • fn – The function to bind.

  • start – Start server after binding

Returns:

BoundFn instance that can be called later with .start()

get_url()[source]

Get the URL for the Tassa. :return: The URL for the Tassa.

async send(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]
Parameters:

event (ServerEvent | None) –

async rpc(ws_id, event: ServerRPC, ttl=2.0) ClientEvent | None[source]

RPC only takes a single response. For multi-response streaming, we need to build a new one

Question is whether we want to make this RPC an awaitable funciton.

Parameters:
  • ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.

  • event (ServerRPC) –

Return type:

ClientEvent | None

async rpc_stream(ws_id, event: ServerEvent | None = None, event_bytes=None)[source]

This RPC offers multiple responses.

Parameters:

event (ServerEvent | None) –

async close_ws(ws_id)[source]
async uplink(proxy: VuerSession)[source]
Parameters:

proxy (VuerSession) –

async downlink(request: aiohttp.web_request.Request, ws: aiohttp.web_ws.WebSocketResponse)[source]

The websocket handler for receiving messages from the client.

Parameters:
  • ws (aiohttp.web_ws.WebSocketResponse) – The websocket.

  • request (aiohttp.web_request.Request) – The request (unused).

Returns:

None

add_handler(event_type: str, fn: Callable[[ClientEvent, VuerSession], None] | None = None, once: bool = False) Callable[[], None][source]

Adding event handlers to the vuer server.

Parameters:
  • event_type (str) – The event type to handle.

  • fn (Callable[[ClientEvent, VuerSession], None] | None) – The function to handle the event.

  • once (bool) – Whether to remove the handler after the first call. This is useful for RPC, which cleans up after itself. The issue is for RPC, the key also needs to match. So we hack it here to use a call specific event_type to enforce the cleanup.

Return type:

Callable[[], None]

# Usage:

As a decorator:

app = Vuer()
@app.add_handler("CAMERA_MOVE")
def on_camera(event: ClientEvent, session: VuerSession):
    print("camera event", event.etype, event.value)

As a function:

app = Vuer()
def on_camera(event: ClientEvent, session: VuerSession):
    print("camera event", event.etype, event.value)

app.add_handler("CAMERA_MOVE", on_camera)
app.run()
_ttl_handler(ttl, cleanup)[source]
async socket_index(request: aiohttp.web_request.BaseRequest)[source]

This is the relay object for sending events to the server.

Todo: add API for specifying the websocket ID. Or just broadcast to all. Todo: add type hint

Interface:

<uri>/relay?sid=<websocket_id>

Returns:

  • Status 200

  • Status 400

Parameters:

request (aiohttp.web_request.BaseRequest) –

add_route(path, fn: Callable, method='GET', content_type='text/html')[source]
Parameters:

fn (Callable) –

run(free_port=None, *args, **kwargs)[source]

Run the server.

Deprecated since version Use: start() instead. This method will be removed in a future version.

start(free_port=None, *args, **kwargs)[source]
REQUEST_MAX_SIZE: int = 268435456
WEBSOCKET_MAX_SIZE: int = 268435456
_add_route(path: str, handler: callable, method: str = 'GET')
Parameters:
  • path (str) –

  • handler (callable) –

  • method (str) –

_add_static(path, root)
_add_task(name=None)
Parameters:

fn (Coroutine) –

_init_app()

Initialize the aiohttp application and CORS context.

This must be called before using any server methods. Subclasses should call this in their __post_init__ method.

Creates:

self.app: The aiohttp web.Application instance. self.cors_context: The aiohttp_cors setup for CORS handling.

_socket(path: str, handler: callable)
Parameters:
  • path (str) –

  • handler (callable) –

_static_file(path, root, filename=None)
ca_cert: str = None
cert: str = None
host: str = 'localhost'
key: str = None
class vuer.VuerSession[source]

Bases: object

property socket

Getter for the websocket object.

this is useful for closing the socket session from the client side.

Example Usage:

@app.spawn(start=True):
async def main(session: VuerSession):
    print("doing something...")
    await sleep(1.0)

    print("I am done! closing the socket.")
    session.socket.close()
async grab_render(ttl=2.0, **kwargs) ClientEvent[source]

Grab a render from the client.

Parameters:
  • quality – The quality of the render. 0.0 - 1.0

  • subsample – The subsample of the render.

  • ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.

Return type:

ClientEvent

async get_webxr_mesh(key: str = 'webxr-mesh', ttl=2.0) ClientEvent[source]

Request WebXR mesh data from the client.

This method sends a GET_WEBXR_MESH RPC request to the client and waits for a response containing the detected environmental meshes from the WebXR AR session.

The response contains mesh data including vertices, indices, semantic labels, and transformation matrices for each detected mesh.

Usage Example:

from vuer import Vuer, VuerSession
from vuer.schemas import WebXRMesh, Scene
from asyncio import sleep

app = Vuer()

@app.spawn(start=True)
async def main(session: VuerSession):
    session.set @ Scene(
        children=[WebXRMesh(key="webxr-mesh", stream=False)]
    )

    await sleep(2)  # Wait for meshes to be detected

    # Request mesh data on-demand
    mesh_data = await session.get_webxr_mesh(key="webxr-mesh")

    meshes = mesh_data.value.get('meshes', [])
    print(f"Retrieved {len(meshes)} meshes")

    for mesh in meshes:
        vertices = mesh['vertices']
        indices = mesh['indices']
        semantic_label = mesh.get('semanticLabel', 'unknown')
        matrix = mesh['matrix']

        print(f"Mesh: {len(vertices)/3:.0f} vertices, label={semantic_label}")
Parameters:
  • key (str) – The key of the WebXRMesh component to query (default: “webxr-mesh”)

  • ttl – The time to live for the handler in seconds. If no response is received within this time, a TimeoutError is raised (default: 2.0)

Returns:

ClientEvent containing mesh data in event.value[‘meshes’]

Raises:
  • asyncio.TimeoutError – If the client doesn’t respond within ttl seconds

  • AssertionError – If websocket session is missing

Return type:

ClientEvent

send(event: ServerEvent) None[source]

Sending the event through the uplink queue.

Parameters:

event (ServerEvent) –

Return type:

None

async rpc(event: ServerRPC, ttl=2.0) ClientEvent | None[source]

Send a ServerRPC event to the client and wait for a response through the session queue

Parameters:
  • event (ServerRPC) – The ServerRPC event to send.

  • ttl – The time to live for the handler. If the handler is not called within the time it gets removed from the handler list.

Returns:

ClientEvent

Return type:

ClientEvent | None

property set: At

Used exclusively to set the scene.

the @SET operator is responsible for setting the root node of the scene.

Examples:

proxy @ Set(Scene(children=[…]))

or

app.set @ Scene(children=[…])

property update: At

Used to update existing elements. NOOP if an element does not exist.

Supports passing in a list of elements. (Thank God I implemented this… so handy! - Ge)

Example Usage:

app.update @ [element1, element2, ...]
property add: At

Used to add elements to a specific parent.

Requires a parentKey, or treats the Scene root node as the default parent.

Example Usage:

app.add(element1, element2, ..., to=parentKey.)

or using the Scene root node as the default parent:

app.add @ element1
property upsert: At

Upsert elements to a specific parent.

Requires a parentKey, or treats the Scene root node as the default parent.

Example Usage:

app.upsert(element1, element2, ..., to=parentKey.)

or using the Scene root node as the default parent:

app.upsert @ element1
property remove: At

Remove elements by keys.

Example Usage:

app.remove @ ["key1", "key2", ...]

or a single key:

app.remove @ "key1"
popleft()[source]
pop()[source]
clear()[source]

clears all client messages

stream()[source]
spawn_task(task, name=None)[source]

Spawn a task in the running asyncio event loop

Useful for background tasks. Returns an asyncio task that can be canceled.

 1async background_task():
 2    print('\rthis ran once')
 3
 4async long_running_bg_task():
 5    while True:
 6        await asyncio.sleep(1.0)
 7        print("\rlong running background task is still running")
 8
 9@app.spawn_task
10async def main_fn(sess: VuerSession):
11    # Prepare background tasks here:
12    task = sess.spawn_task(background_task)
13    long_running_task = sess.spawn_task(long_running_bg_task)

Now to cancel a running task, simply

1task.cancel()

Todos

▫️ Add a way to automatically clean up when exiting the main_fn.

async forever()[source]

Keep the session alive indefinitely.

This is useful when you want to set up a scene and keep the server running without the session closing. The session will remain active until the client disconnects or the server is stopped.

Example Usage:

@app.spawn(start=True)
async def main(session: VuerSession):
    session.set @ Scene(Box(args=[0.2, 0.2, 0.2], key="box"))
    await session.forever()
vuer.entrypoint()[source]

CLI entrypoint for vuer command.