Source code for vuer.base

import asyncio
import ssl
import traceback
from collections.abc import Coroutine
from concurrent.futures import CancelledError
from functools import partial
from pathlib import Path

import aiohttp_cors
from aiohttp import web
from params_proto import EnvVar


[docs]async def default_handler(request, ws): async for msg in ws: print(msg)
[docs]async def websocket_handler(request, handler, **ws_kwargs): ws = web.WebSocketResponse(**ws_kwargs) await ws.prepare(request) try: await handler(request, ws) except ConnectionResetError: print("Connection reset") except CancelledError: print("WebSocket Canceled") except Exception as exp: print(f"Error:\n{exp}\n{traceback.print_exc()}") finally: await ws.close() print("WebSocket connection closed")
[docs]async def handle_file_request(request, root, filename=None): if filename is None: filename = request.match_info["filename"] filepath = Path(root) / filename if not filepath.is_file(): raise web.HTTPNotFound() response = web.FileResponse(filepath) # Check if URL contains "hot" parameter for hot loading mode # Hot assets are those that change frequently during development # Parameter name is case-insensitive (hot, Hot, HOT all work) # Parameter value must not equal "false" (case insensitive) hot_key = None for key in request.query.keys(): if key.lower() == "hot": hot_key = key break if hot_key: # Check if hot is explicitly set to false hot_value = request.query.get(hot_key, "") if hot_value.lower() != "false": # Set Cache-Control to no-cache to force revalidation # This allows 304 responses but prevents strong caching response.headers["Cache-Control"] = "no-cache" return response
[docs]class Server: """Base TCP server with HTTP/WebSocket functionality. This class provides the core server infrastructure for Vuer, including: - HTTP route handling with CORS support - WebSocket connection management - SSL/TLS support for secure connections - Static file serving Subclasses should call _init_app() in their __post_init__ to initialize the aiohttp application before using server methods. Note: This class cannot use @proto.prefix because Vuer inherits from it and also uses @proto.prefix, which causes a metaclass conflict. Attributes: host: Server hostname. Set to "0.0.0.0" to accept remote connections. port: Server port number. cors: Comma-separated list of allowed CORS origins. Use "*" for all. cert: Path to SSL certificate file for HTTPS. key: Path to SSL private key file for HTTPS. ca_cert: Path to CA certificate for client certificate verification. WEBSOCKET_MAX_SIZE: Maximum WebSocket message size (default 256MB). REQUEST_MAX_SIZE: Maximum HTTP request size (default 256MB). """ # Network configuration (can be set via environment variables) host: str = EnvVar("HOST", default="localhost").get() # Server hostname, use 0.0.0.0 for remote access port: int = EnvVar("PORT", dtype=int, default=8012).get() # Server port number cors: str = EnvVar("CORS", default="*").get() # CORS allowed origins, comma-separated # SSL/TLS configuration (all None = HTTP mode) cert: str = EnvVar("SSL_CERT", default=None).get() # Path to SSL certificate file key: str = EnvVar("SSL_KEY", default=None).get() # Path to SSL private key file ca_cert: str = EnvVar("SSL_CA_CERT", default=None).get() # Path to CA certificate for client verification # Size limits WEBSOCKET_MAX_SIZE: int = EnvVar("WEBSOCKET_MAX_SIZE", dtype=int, default=2**28).get() # Max WebSocket message size (default 256MB) REQUEST_MAX_SIZE: int = EnvVar("REQUEST_MAX_SIZE", dtype=int, default=2**28).get() # Max HTTP request size (default 256MB)
[docs] def _init_app(self): """Initialize the aiohttp application and CORS context. This must be called before using any server methods. Subclasses should call this in their __post_init__ method. Creates: self.app: The aiohttp web.Application instance. self.cors_context: The aiohttp_cors setup for CORS handling. """ if hasattr(self, 'app'): return self.app = web.Application(client_max_size=self.REQUEST_MAX_SIZE) default = aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*", ) cors_config = {k: default for k in self.cors.split(",")} self.cors_context = aiohttp_cors.setup(self.app, defaults=cors_config)
[docs] def _add_route( self, path: str, handler: callable, method: str = "GET", ): route = self.app.router.add_resource(path).add_route(method, handler) self.cors_context.add(route)
[docs] def _socket(self, path: str, handler: callable): ws_handler = partial(websocket_handler, handler=handler, max_msg_size=self.WEBSOCKET_MAX_SIZE) self._add_route(path, ws_handler)
[docs] @staticmethod def _add_task(fn: Coroutine, name=None): loop = asyncio.get_running_loop() loop.create_task(fn, name=name)
[docs] def _add_static(self, path, root): _fn = partial(handle_file_request, root=root) self._add_route(f"{path}/{{filename:.*}}", _fn, method="GET")
[docs] def _static_file(self, path, root, filename=None): _fn = partial(handle_file_request, root=root, filename=filename) self._add_route(f"{path}", _fn, method="GET")
[docs] def start(self): async def init_server(): runner = web.AppRunner(self.app) await runner.setup() if not self.cert: site = web.TCPSite(runner, self.host, self.port) return await site.start() ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(certfile=self.cert, keyfile=self.key) if self.ca_cert: ssl_context.load_verify_locations(self.ca_cert) ssl_context.verify_mode = ssl.CERT_REQUIRED else: ssl_context.verify_mode = ssl.CERT_OPTIONAL site = web.TCPSite(runner, self.host, self.port, ssl_context=ssl_context) return await site.start() # Check if there's already a running event loop (e.g., in IPython/Jupyter) try: running_loop = asyncio.get_running_loop() # If we're here, there's a running loop - schedule the server to start asyncio.ensure_future(init_server(), loop=running_loop) print("Server scheduled in existing event loop (IPython/Jupyter mode)") return except RuntimeError: # No running loop, proceed with standard approach pass # Standard approach for scripts/CLI try: event_loop = asyncio.get_event_loop() except RuntimeError: # Python 3.10+ in some environments event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) event_loop.run_until_complete(init_server()) event_loop.run_forever()
if __name__ == "__main__": app = Server() app._add_route("", websocket_handler) app._add_static("/static", ".") app.start()