Async Streaming Examples
This guide provides practical examples demonstrating the powerful async streaming capabilities of UESynth for high-performance data collection scenarios.
Overview
Async streaming enables:
Non-blocking operations - Send requests without waiting
High throughput - 100-1000+ operations per second
Concurrent execution - Multiple operations in parallel
Real-time responsiveness - Minimal latency accumulation
Basic Async Pattern
import asyncio
import cv2
from uesynth import AsyncUESynthClient
async def basic_async_capture():
"""Basic pattern for async data capture."""
async with AsyncUESynthClient() as client:
# Start a capture (non-blocking)
request_id = await client.capture.rgb(width=1920, height=1080)
print(f"Started capture request: {request_id}")
# Do other work while capture is processing
await client.camera.set_location(x=100, y=200, z=50)
# Get the captured frame
frame = await client.get_latest_frame()
if frame is not None:
cv2.imwrite("async_capture.png", frame)
print("✅ Frame captured and saved")
asyncio.run(basic_async_capture())
High-Performance Collection
async def rapid_fire_collection():
"""Collect data as fast as possible with minimal latency."""
async with AsyncUESynthClient(
stream_buffer_size=4096,
max_concurrent_requests=500
) as client:
print("🚀 Starting rapid fire collection...")
request_ids = []
# Fire off 500 requests as fast as possible
for i in range(500):
x = (i % 25) * 20 # Grid pattern
y = (i // 25) * 20
await client.camera.set_location(x=x, y=y, z=100)
# Capture (non-blocking)
request_id = await client.capture.rgb(width=512, height=512)
request_ids.append(request_id)
if i % 50 == 49:
await asyncio.sleep(0.01) # Brief pause
# Collect results
frames_collected = 0
for i, request_id in enumerate(request_ids):
try:
frame = await client.wait_for_frame(request_id, timeout=2.0)
if frame is not None:
if i % 10 == 0: # Save every 10th frame
cv2.imwrite(f"rapid_{i:03d}.png", frame)
frames_collected += 1
except asyncio.TimeoutError:
print(f"⏰ Request {i} timed out")
print(f"📊 Collected {frames_collected}/500 frames")
asyncio.run(rapid_fire_collection())
Real-time Simulation
import math
import time
async def real_time_simulation():
"""Simulate real-time data collection with moving objects."""
async with AsyncUESynthClient() as client:
print("🎮 Starting real-time simulation...")
timestep = 0
collected_frames = 0
max_timesteps = 1000
start_time = time.time()
while timestep < max_timesteps:
# Calculate positions
angle = timestep * 0.01
cam_x = 200 * math.cos(angle)
cam_y = 200 * math.sin(angle)
car_x = timestep * 10
# Update scene (non-blocking)
await client.camera.set_location(x=cam_x, y=cam_y, z=100)
await client.camera.set_rotation(pitch=-15, yaw=math.degrees(angle), roll=0)
await client.objects.set_location("Car_01", x=car_x, y=0, z=0)
# Capture frame (non-blocking)
await client.capture.rgb(width=640, height=480)
# Check for completed frames
frame = await client.get_latest_frame()
if frame is not None:
if timestep % 50 == 0:
cv2.imwrite(f"sim_frame_{timestep:06d}.png", frame)
collected_frames += 1
# Real-time delay (60 FPS target)
await asyncio.sleep(1.0 / 60.0)
timestep += 1
if timestep % 100 == 0:
elapsed = time.time() - start_time
fps = timestep / elapsed
print(f"📈 Timestep {timestep}: {fps:.1f} sim FPS, {collected_frames} frames")
final_time = time.time() - start_time
print(f"🏁 Simulation complete: {max_timesteps/final_time:.1f} FPS, {collected_frames} frames")
asyncio.run(real_time_simulation())
Producer-Consumer Pattern
from asyncio import Queue
async def producer_consumer_pattern():
"""Separate data production from processing using queues."""
frame_queue = Queue(maxsize=100)
async def producer(client):
"""Produce frames as fast as possible."""
for i in range(200):
x = (i % 20) * 25
y = (i // 20) * 25
await client.camera.set_location(x=x, y=y, z=100)
request_id = await client.capture.rgb(width=512, height=512)
try:
frame = await client.wait_for_frame(request_id, timeout=1.0)
if frame is not None:
await frame_queue.put((i, frame))
except asyncio.TimeoutError:
print(f"⏰ Frame {i} timed out")
await frame_queue.put(None) # End signal
async def consumer():
"""Process frames from the queue."""
processed = 0
while True:
try:
item = await asyncio.wait_for(frame_queue.get(), timeout=5.0)
if item is None:
break
frame_id, frame = item
# Simulate processing
await asyncio.sleep(0.01)
if frame_id % 10 == 0:
cv2.imwrite(f"processed_{frame_id:03d}.png", frame)
processed += 1
frame_queue.task_done()
except asyncio.TimeoutError:
break
print(f"📥 Processed {processed} frames")
async with AsyncUESynthClient() as client:
await asyncio.gather(producer(client), consumer())
asyncio.run(producer_consumer_pattern())
Error Handling & Resilience
async def robust_streaming():
"""Demonstrate robust streaming with error recovery."""
async def resilient_capture_loop(client, total_frames=100):
successful_captures = 0
failed_captures = 0
for i in range(total_frames):
max_retries = 3
for attempt in range(max_retries):
try:
x = i * 10
await client.camera.set_location(x=x, y=0, z=100)
request_id = await client.capture.rgb()
frame = await client.wait_for_frame(request_id, timeout=2.0)
if frame is not None:
cv2.imwrite(f"robust_frame_{i:03d}.png", frame)
successful_captures += 1
break
else:
raise Exception("No frame returned")
except asyncio.TimeoutError:
print(f"⏰ Timeout on frame {i}, attempt {attempt + 1}")
if attempt < max_retries - 1:
await asyncio.sleep(0.5)
else:
failed_captures += 1
except Exception as e:
print(f"❌ Error on frame {i}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(0.5)
else:
failed_captures += 1
return successful_captures, failed_captures
max_connection_attempts = 3
for connection_attempt in range(max_connection_attempts):
try:
async with AsyncUESynthClient() as client:
print(f"🔌 Connected (attempt {connection_attempt + 1})")
success, failures = await resilient_capture_loop(client)
print(f"📊 Results: {success} successful, {failures} failed")
print(f"Success rate: {success/(success+failures)*100:.1f}%")
break
except Exception as e:
print(f"❌ Connection failed: {e}")
if connection_attempt < max_connection_attempts - 1:
await asyncio.sleep(2.0)
else:
print("💥 Max connection attempts reached")
asyncio.run(robust_streaming())
Performance Tips
1. Use Streaming for High Throughput
# ✅ High performance - streaming mode
for i in range(1000):
await client.camera.set_location(x=i, y=0, z=50) # Non-blocking
await client.capture.rgb() # Non-blocking
# ❌ Lower performance - direct mode
for i in range(1000):
await client.camera.set_location_direct(x=i, y=0, z=50) # Blocking
frame = await client.capture.rgb_direct() # Blocking
2. Batch Operations
# ✅ Better - batch requests
tasks = [client.capture.rgb() for _ in range(100)]
request_ids = await asyncio.gather(*tasks)
# ❌ Less efficient - individual awaits
request_ids = []
for _ in range(100):
request_id = await client.capture.rgb()
request_ids.append(request_id)
3. Configure for Performance
# High-performance setup
client = AsyncUESynthClient(
stream_buffer_size=4096, # Large buffer
max_concurrent_requests=1000, # High concurrency
enable_compression=True # Reduce bandwidth
)
These examples demonstrate the power and flexibility of UESynth's async streaming capabilities. Choose the pattern that best fits your specific use case.
Next Steps
Learn about Performance Optimization
Explore Real-time Examples
Check out Batch Processing techniques
Last updated