vuer.rtc - Python CRDT Data Store¶
A Python implementation of the @vuer-ai/vuer-rtc TypeScript package, providing CRDT-based real-time collaborative data structures for Vuer applications.
Overview¶
vuer.rtc enables consistent state management across Python and JavaScript clients by implementing:
CRDT-based conflict resolution: Last-Write-Wins (LWW) for absolute values, additive operations for deltas
Journal with snapshots: Enables undo/redo, fast replay, and state recovery
Vector clocks: Causal ordering of concurrent operations
TypeScript API compatibility: Same operator format as
@vuer-ai/vuer-rtc
Installation¶
The vuer.rtc module is included in the vuer package:
pip install vuer
Quick Start¶
from vuer.rtc import create_graph, Operation
# Create a store with a send callback
store = create_graph(
session_id="my-session",
on_send=lambda msg: print(f"Send: {msg.to_dict()}"),
)
# Insert a new node
store.edit(Operation(
key="cube-1",
otype="node.insert",
path="",
tag="Mesh",
parent_key="scene",
))
# Commit the edit
msg = store.commit()
print(f"Committed message: {msg.id}")
# Modify the node's position
store.edit(Operation(
key="cube-1",
otype="vector3.add",
path="transform.position",
value=[1, 0, 0],
))
store.commit()
# Access the current state
graph = store.graph
cube = graph.get_node("cube-1")
position = cube.get_property("transform.position")
print(f"Cube position: {position}") # [1, 0, 0]
Core Concepts¶
1. Scene Graph¶
The scene graph is a hierarchical structure of nodes:
from vuer.rtc import create_empty_graph, SceneNode
# Create an empty graph with a root "scene" node
graph = create_empty_graph()
# Access the root node
root = graph.get_node("scene")
print(root.tag) # "Scene"
# Add a node
cube = SceneNode(key="cube-1", tag="Mesh")
graph.set_node(cube)
root.children.append("cube-1")
2. Operations¶
Operations modify the scene graph. Each operation has:
key: Target node keyotype: Operation type (e.g., “vector3.add”, “node.insert”)path: Property path (e.g., “transform.position”)value: Operation-specific value
from vuer.rtc import Operation
# Set a value (Last-Write-Wins)
op = Operation(
key="cube-1",
otype="vector3.set",
path="transform.position",
value=[0, 1, 0],
)
# Add to a value (additive)
op = Operation(
key="cube-1",
otype="vector3.add",
path="transform.position",
value=[1, 0, 0],
)
3. Messages¶
Messages batch operations and include CRDT metadata:
from vuer.rtc import CRDTMessage
# Messages are created by store.commit()
msg = store.commit()
print(msg.id) # "session-id:1"
print(msg.session_id) # "session-id"
print(msg.lamport_time) # 1
print(msg.ops) # List of operations
4. Journal and Snapshots¶
The journal tracks all committed messages, enabling:
Undo/Redo: Mark messages as deleted/undeleted
State recovery: Replay from snapshot + journal
Synchronization: Track acknowledgments
# Undo the last operation
undo_msg = store.undo()
# Redo the last undone operation
redo_msg = store.redo()
# Create a snapshot for fast recovery
snapshot = store.compact()
Operation Types¶
Number Operations¶
Type |
Behavior |
Example |
|---|---|---|
|
Last-Write-Wins |
|
|
Additive |
|
|
Multiplicative |
|
|
Take minimum |
|
|
Take maximum |
|
Vector3 Operations¶
Type |
Behavior |
|---|---|
|
LWW, |
|
Component-wise addition |
|
Component-wise multiplication |
|
Rotate by Euler angles |
|
Rotate by quaternion |
Boolean Operations¶
Type |
Behavior |
|---|---|
|
Last-Write-Wins |
|
Logical OR (enable wins) |
|
Logical AND (disable wins) |
Array Operations¶
Type |
Behavior |
|---|---|
|
Replace entire array |
|
Append item |
|
Set union (no duplicates) |
|
Remove by value |
Scene Graph Operations¶
Type |
Description |
|---|---|
|
Create new node as child of parent |
|
Soft delete (tombstone) |
Meta Operations¶
Type |
Description |
|---|---|
|
Mark message as undone |
|
Restore undone message |
State Management¶
GraphStore API¶
from vuer.rtc import create_graph
store = create_graph(
session_id="my-session",
on_send=lambda msg: websocket.send(msg.to_dict()),
on_state_change=lambda state: print("State changed"),
)
# Edit operations
store.edit(op) # Add uncommitted operation
store.edit_batch(ops) # Add multiple operations
store.cancel_edits() # Revert uncommitted edits
store.has_pending_edits() # Check for uncommitted edits
# Commit
msg = store.commit() # Commit edits as message
# Receive from server/peers
store.receive(msg) # Process remote message
store.ack(msg_id) # Acknowledge message
# Undo/Redo
store.undo() # Undo last operation
store.redo() # Redo last undone operation
# Snapshots
store.compact() # Create snapshot
store.get_snapshot() # Get current snapshot
# State access
state = store.get_state() # Get full ClientState
graph = store.graph # Get current SceneGraph
# Subscription
unsubscribe = store.subscribe(lambda state: ...)
Serialization¶
All types support serialization for network transport:
# Serialize
msg_dict = msg.to_dict()
json_str = json.dumps(msg_dict)
# Deserialize
msg_dict = json.loads(json_str)
msg = CRDTMessage.from_dict(msg_dict)
Integration with Vuer Server¶
from vuer import Vuer, VuerSession
from vuer.rtc import create_graph, Operation
app = Vuer()
# Create a shared store
store = create_graph(session_id="server")
@app.spawn(start=True)
async def main(session: VuerSession):
# Initialize client with current state
snapshot = store.get_snapshot()
session.set @ Scene(rawChildren=[node.to_dict() for node in store.graph.nodes.values()])
# Handle incoming operations
async for event in session:
if event.etype == "RTC_MESSAGE":
msg = CRDTMessage.from_dict(event.value)
store.receive(msg)
# Broadcast to other clients...
Conflict Resolution¶
The CRDT implementation ensures eventual consistency:
LWW Operations (
*.set): Higher Lamport timestamp winsAdditive Operations (
*.add,*.multiply): Order-independent mergeBoolean Operations:
orenables (any true),anddisables (all must be true)
# Concurrent edits from two clients:
# Client A: vector3.add [1, 0, 0]
# Client B: vector3.add [0, 1, 0]
# Result: [1, 1, 0] (order-independent)
# Concurrent sets:
# Client A (lamport=5): vector3.set [1, 0, 0]
# Client B (lamport=6): vector3.set [0, 1, 0]
# Result: [0, 1, 0] (higher lamport wins)
SceneStore - Reactive Scene Management¶
For a simpler API focused on scene graph management with multi-session synchronization, see SceneStore.
SceneStore provides:
Reactive updates to all subscribed sessions
Async context manager for safe subscription cleanup
Familiar
@operator pattern matching VuerSessionSnapshot access for state recovery
from vuer.rtc import SceneStore
store = SceneStore()
@app.spawn(start=True)
async def main(sess):
async with store.subscribe(sess):
await store.upsert @ Box(key="box", position=[0, 0, 0])
while True:
await store.update @ {"key": "box", "position": [t, 0, 0]}
await asyncio.sleep(0.05)