Using in-browser mujoco as an interactive simulator for robots¶
This example shows you how to use the MuJoCo component in Vuer to create an interactive simulation of a car model.
from asyncio import sleep, Queue, create_task
from pathlib import Path
import numpy as np
from vuer import Vuer
from vuer.events import MjStep, Set, MjRender
from vuer.schemas import DefaultScene, SceneBackground, MjCameraView
from vuer.schemas import MuJoCo
from PIL import Image as PImage
from io import BytesIO
import cv2
app = Vuer(static_root=f"{Path(__file__).parent}/../_static")
# asset_pref = "http://localhost:8012/static/"
asset_pref = "https://docs.vuer.ai/en/latest/_static/"
fileName = "mujoco_scenes/car/car.mjcf.xml"
IS_MUJOCO_LOAD = True
@app.add_handler("ON_MUJOCO_LOAD")
async def handler(event, session):
global IS_MUJOCO_LOAD
IS_MUJOCO_LOAD = True
print("MuJoCo component loaded successfully.")
result_queue = Queue()
async def process_results():
while True:
result = await (await result_queue.get())
try:
if hasattr(result, "value") and isinstance(result.value, dict):
frame = result.value.get("depthFrame") or result.value.get("frame")
if frame:
pil_image = PImage.open(BytesIO(frame))
img = np.array(pil_image)
img_bgr = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
cv2.imshow("monitor", img_bgr)
if cv2.waitKey(1) == ord("q"):
exit()
else:
print("No valid frame data found in result.")
else:
print("Invalid result structure or missing 'value' attribute.")
except Exception as e:
print(f"Error processing result: {e}")
# use `start=True` to start the app immediately
@app.spawn(start=True)
async def main(session):
session @ Set(
DefaultScene(
SceneBackground(),
MuJoCo(
key="simple",
src=asset_pref + fileName,
pause=False,
useLights=True,
unpauseOnDrag=False, # Whether to unpause the simulation when dragging the object
dragForceScale=1.0, # Scale of the drag force applied to the object when dragging
showDragArrow=True, # Whether to show the drag arrow when dragging the object
showDragForceText=True, # Whether to show the drag force text when dragging the object
),
MjCameraView(
key="cam1",
position=[0, 0.35, 0.5],
rotation=[-0.4, 0, 0],
width=640,
height=480,
distanceToCamera=0.1,
movable=True, # Whether the camera can be moved by the user
showCameraFrustum=True, # Whether to show the camera in the scene
),
up=[0, 1, 0],
),
)
await sleep(2)
create_task(process_results())
i = 0
while True:
i += 1
if not IS_MUJOCO_LOAD:
print("Waiting for MuJoCo component to load...")
await sleep(1)
continue
await session.rpc(
MjStep(key="le-cart", sim_steps=10, ctrl=[1, -1]),
ttl=5,
)
"""
Instead of waiting for the rendering result, we can use a queue to handle the
rendering tasks as follows:
"""
with doc:
promise = session.rpc(MjRender(key="cam1"))
await result_queue.put(promise)
await sleep(1 / 30)
Alternatively, you can use
result = await session.rpc(MjRender(key="cam1"), ttl=5)
try:
if hasattr(result, "value") and isinstance(result.value, dict):
frame = result.value.get("depthFrame") or result.value.get("frame")
if frame:
pil_image = PImage.open(BytesIO(frame))
img = np.array(pil_image)
img_bgr = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
cv2.imshow("monitor", img_bgr)
if cv2.waitKey(1) == ord("q"):
exit()
else:
print("No valid frame data found in result.")
else:
print("Invalid result structure or missing 'value' attribute.")
except Exception as e:
print(f"Error processing result: {e}")
to waiting for the rendering result directly, but using a queue is more efficient for handling multiple rendering tasks.