同步的ECS系统
在Bevy ECS系统中的System
都是同步的,通过系统调度器按照顺序并发执行系统。
System的定义
pub trait System:
Send
+ Sync
+ 'static {
type In: SystemInput;
type Out;
目前版本(0.15)来说,Bevy 还不支持 async system。 但一些异步运行时的库, 如reqwest, sqlx等,或者一些需要长时间在后台运行的程序, 在Bevy 中如何使用呢?
方案1: 通道(Channel)
参考官方例程 external_source_external_thread 先在Startup时,用类似crossbeam-channel的库建立tx,rx, 通过标准的std::thread::spawn建立后台运行线程。将接收rx通过Resource分享出去。
let (tx, rx) = bounded::<u32>(1);
std::thread::spawn(move || {
// We're seeding the PRNG here to make this example deterministic for testing purposes.
// This isn't strictly required in practical use unless you need your app to be deterministic.
let mut rng = ChaCha8Rng::seed_from_u64(19878367467713);
loop {
// Everything here happens in another thread
// This is where you could connect to an external data source
// This will block until the previous value has been read in system `read_stream`
tx.send(rng.gen_range(0..2000)).unwrap();
}
});
commands.insert_resource(StreamReceiver(rx));
然后用一个System一直接收Rx的消息,注意这里用了没有堵塞的try_iter
// This system reads from the receiver and sends events to Bevy
fn read_stream(receiver: Res<StreamReceiver>, mut events: EventWriter<StreamEvent>) {
for from_stream in receiver.try_iter() {
events.send(StreamEvent(from_stream));
}
}
方案2: async-std
bevy内置了一个bevy_tasks线程池,它是基于async-std的运行时,内置了三种线程池:AsyncComputeTaskPool
, ComputeTaskPool
和IoTaskPool
.
注意: BevyTasks不支持WebAssembly, 并且要开启bevy feature multi_threaded
AsyncComputeTaskPool
如果所运行的任务是CPU密集型,但不需要在下一帧前完成,就使用AsyncComputeTaskPool
。
参考官方例程 async_compute
fn spawn_tasks(mut commands: Commands) {
let thread_pool = AsyncComputeTaskPool::get();
let task = thread_pool.spawn(async move {
// 异步运算的结果
//
let mut command_queue = CommandQueue::default();
command_queue.push(move |world: &mut World| {
// 拿到ecs world后进行处理,比如说spawn, insert等。
});
command_queue
});
commands.entity(entity).insert(ComputeTransform(task));
}
``` rust
struct Task<T>(async_executor::Task<T>)
然后在另一个system里poll task, 获取结果
fn handle_tasks(mut commands: Commands, mut transform_tasks: Query<&mut ComputeTransform>) {
for mut task in &mut transform_tasks {
if let Some(mut commands_queue) = block_on(future::poll_once(&mut task.0)) {
commands.append(&mut commands_queue);
}
}
}
总结: AsyncComputeTaskPool
使用了async-std
运行时, 生成一个async_executor::Task, 然后通过ECS的system一直poll来获取异步运算结果。
ComputeTaskPool
如果运行任务是CPU敏感性, 必须在当前帧计算完成,就用ComputeTaskPool
实际上bevy ecs里的并行查询就用了ComputeTaskpool
。
fn move_system(mut sprites: Query<(&mut Transform, &Velocity)>) {
// 这里的par_iter_mut就是并行查询修改,官方说法是, 如果相关实体不超过128个时, 并行方法不会比常规的迭代器查询快
sprites
.par_iter_mut()
.for_each(|(mut transform, velocity)| {
transform.translation += velocity.extend(0.0);
});
}
IoTaskPool
可以快速完成的IO 敏感的任务, 可以使用IoTaskPool
.
比如说保存Scene
IoTaskPool::get()
.spawn(async move {
// Write the scene RON data to file
File::create(format!("assets/{NEW_SCENE_FILE_PATH}"))
.and_then(|mut file| file.write(serialized_scene.as_bytes()))
.expect("Error while writing scene to file");
})
.detach();
线程池管理
Bevy的线程池由TaskPoolPlugin
管理, 默认使用机器当前的逻辑核数量为线程数。
也可以通过下面的方式限制:
.add_plugins(DefaultPlugins.set(TaskPoolPlugin {
task_pool_options: TaskPoolOptions::with_num_threads(4),
}))
IoTaskPool, AsyncComputeTaskPool 各分到25%的线程数,但不超过4个。剩下的归于 ComputeTaskPool.
以上三种线程池通过下面的函数在主线程运行,最多同时运行100个任务。
pub fn tick_global_task_pools_on_main_thread() {
COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|compute_local_executor| {
ASYNC_COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|async_local_executor| {
IO_TASK_POOL
.get()
.unwrap()
.with_local_executor(|io_local_executor| {
for _ in 0..100 {
compute_local_executor.try_tick();
async_local_executor.try_tick();
io_local_executor.try_tick();
}
});
});
});
}
方案3: Tokio
tokio是目前rust生态里比较主流的异步运行时。有两种方式使用。
手动管理runtime
建立一个Resource存储runtime
#[derive(Resource)]
pub struct TokioRuntime(Box<Runtime>)
/// 在App Startup时创建
fn setup_tokio_runtime(mut commands: Commands) {
#[cfg(not(target_arch = "wasm32"))]
let runtime = tokio::runtime::Builder::new_multi_thread();
#[cfg(target_arch = "wasm32")]
let runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime
.enable_all()
.build()
.expect("Failed to create Tokio runtime for background tasks");
commands.insert_resource(TokioRuntime(Box::new(runtime)));
}
在需要使用的system通过Resource拿tokio runtime
fn some_system(rt: Res<TokioRuntime>) {
rt.spawn( async move {
// 异步任务返回结果可以通过通道传出。
})
}
bevy-tokio-tasks
有一个第三方库bevy-tokio-tasks你可以帮你管理tokio runtime, 并且提供了一些辅助方法可以拿到主线程的world
fn demo(runtime: ResMut<TokioTasksRuntime>) {
runtime.spawn_background_task(|mut ctx| async move {
let mut color_index = 0;
loop {
ctx.run_on_main_thread(move |ctx| {
if let Some(mut clear_color) = ctx.world.get_resource_mut::<ClearColor>() {
clear_color.0 = bevy::prelude::Color::Srgba(COLORS[color_index]);
println!("Changed clear color to {:?}", clear_color.0);
}
})
.await;
color_index = (color_index + 1) % COLORS.len();
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
错误处理
bevy_tasks
返回是Task
总结
- 对于简单的后台任务(如定时器、简单的网络请求),推荐使用方案1
- 对于需要与Bevy ECS深度集成的异步任务,推荐使用方案2
- 对于需要与大量Tokio生态库集成的复杂应用,推荐使用方案3