البرمجة غير المتزامنة مع Tokio: آلاف المهام بخيط واحد
غير المتزامن مقابل الخيوط: متى تستخدم أيّاً منهما
في المصنع، بعض المهام تنتظر بيانات (I/O-bound) وبعضها تحسب أرقاماً (CPU-bound).
// I/O-bound: انتظار استجابة مستشعر عبر الشبكة ← async
// قراءة 100 مستشعر بخيط واحد أكفأ من 100 خيط
async fn read_sensor(id: u32) -> f64 { /* ... */ }
// CPU-bound: تحليل بيانات اهتزاز ثقيلة ← خيوط
fn analyze_vibration(data: &[f64]) -> Report { /* ... */ }
القاعدة: إذا كان الكود ينتظر شبكة أو قرص، استخدم async. إذا كان يحسب، استخدم خيوطاً.
المستقبلات وصيغة async/await
الدالة async fn لا تُنفَّذ فوراً، بل تُنشئ Future — وعد بقيمة مستقبلية.
// هذه الدالة تُرجع Future<Output = f64>
async fn read_temperature(sensor_id: u32) -> f64 {
// await تُوقف التنفيذ هنا حتى تصل البيانات
// لكن الخيط لا يتوقف — ينتقل لمهمة أخرى
let response = fetch_from_device(sensor_id).await;
response.value
}
async fn monitor() {
// .await تُحرّك المستقبل حتى يكتمل
let temp = read_temperature(42).await;
println!("الحرارة: {temp}°C");
}
بدون .await، المستقبل لا يعمل — مثل آلة متصلة بالكهرباء لكن لم تُشغّل.
بيئة تشغيل Tokio
Rust لا تحتوي بيئة تشغيل مدمجة. نستخدم Tokio كمحرك للمهام غير المتزامنة.
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
// بيئة متعددة الخيوط (الافتراضي) — للخوادم وأنظمة المراقبة
#[tokio::main]
async fn main() {
println!("نظام المراقبة يعمل");
}
// بيئة خيط واحد — للأجهزة المدمجة محدودة الموارد
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("وحدة تحكم مدمجة");
}
إطلاق مهام متزامنة مع tokio::spawn
لقراءة عدة مستشعرات في نفس الوقت بدلاً من واحد تلو الآخر:
use tokio::task::JoinHandle;
async fn read_sensor(id: u32) -> (u32, f64) {
// محاكاة تأخير شبكة
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
(id, 25.0 + id as f64 * 0.5)
}
#[tokio::main]
async fn main() {
// إطلاق 5 قراءات متزامنة
let handles: Vec<JoinHandle<(u32, f64)>> = (1..=5)
.map(|id| tokio::spawn(read_sensor(id)))
.collect();
// انتظار جميع النتائج
for handle in handles {
let (id, value) = handle.await.unwrap();
println!("مستشعر {id}: {value}°C");
}
}
خمس قراءات تكتمل في ~100ms بدلاً من ~500ms تسلسلياً.
المؤقتات والفترات الزمنية
في المصانع، نحتاج استطلاع دوري للمستشعرات كل ثوانٍ محددة:
use tokio::time::{interval, Duration};
async fn periodic_monitoring() {
// قراءة كل 5 ثوانٍ
let mut timer = interval(Duration::from_secs(5));
loop {
timer.tick().await; // تنتظر حتى الدورة التالية
let temp = read_temperature(1).await;
if temp > 80.0 {
trigger_alarm("تجاوز حراري").await;
}
println!("[دورة] الحرارة: {temp}°C");
}
}
interval يضمن توقيتاً منتظماً حتى لو استغرقت القراءة وقتاً متفاوتاً.
select: سباق عدة مستقبلات
tokio::select! ينفّذ أول مستقبل يكتمل — مثالي للمهلات وأولويات الإنذار:
use tokio::time::{sleep, Duration};
async fn read_with_timeout(sensor_id: u32) -> Option<f64> {
tokio::select! {
// الفرع الأول: القراءة الفعلية
result = read_sensor(sensor_id) => {
Some(result.1)
}
// الفرع الثاني: مهلة 3 ثوانٍ
_ = sleep(Duration::from_secs(3)) => {
eprintln!("مستشعر {sensor_id}: انتهت المهلة!");
None
}
}
}
// سباق بين مصدرين: أيهما يُبلّغ أولاً
async fn wait_for_any_alarm() {
tokio::select! {
temp = watch_temperature() => {
println!("إنذار حرارة: {temp}°C");
}
pressure = watch_pressure() => {
println!("إنذار ضغط: {pressure} bar");
}
}
}
مثال عملي: نظام استطلاع مستشعرات غير متزامن
نظام يراقب 100 مستشعر مع مهلات وإنذارات:
use tokio::time::{interval, timeout, Duration};
use std::collections::HashMap;
struct SensorReading {
id: u32,
value: f64,
status: SensorStatus,
}
enum SensorStatus { Ok, Warning, Critical, Timeout }
async fn poll_sensor(id: u32) -> SensorReading {
// تطبيق مهلة على كل قراءة
match timeout(Duration::from_secs(2), read_sensor(id)).await {
Ok((_, value)) => {
let status = if value > 90.0 { SensorStatus::Critical }
else if value > 75.0 { SensorStatus::Warning }
else { SensorStatus::Ok };
SensorReading { id, value, status }
}
Err(_) => SensorReading {
id, value: 0.0, status: SensorStatus::Timeout
},
}
}
#[tokio::main]
async fn main() {
let mut tick = interval(Duration::from_secs(10));
let sensor_ids: Vec<u32> = (1..=100).collect();
loop {
tick.tick().await;
// إطلاق 100 قراءة متزامنة
let handles: Vec<_> = sensor_ids.iter()
.map(|&id| tokio::spawn(poll_sensor(id)))
.collect();
let mut alerts = Vec::new();
for handle in handles {
let reading = handle.await.unwrap();
match reading.status {
SensorStatus::Critical => {
alerts.push(format!(
"حرج: مستشعر {} = {:.1}", reading.id, reading.value
));
}
SensorStatus::Timeout => {
alerts.push(format!("انقطاع: مستشعر {}", reading.id));
}
_ => {}
}
}
if !alerts.is_empty() {
println!("=== إنذارات ({}) ===", alerts.len());
for alert in &alerts {
println!(" ⚠ {alert}");
}
}
}
}
الخلاصة
- async/await مثالي لعمليات الإدخال/الإخراج كقراءة المستشعرات عبر الشبكة
- Tokio يوفر بيئة تشغيل قوية مع مؤقتات ومهام متزامنة
- tokio::spawn يُطلق مهاماً متوازية لقراءة عشرات المستشعرات معاً
- interval يُنظّم الاستطلاع الدوري بتوقيت ثابت
- select! يُسابق عدة عمليات — مثالي للمهلات والأولويات
- في الدرس القادم: الاختبارات — كيف نتأكد أن كل هذا الكود يعمل بشكل صحيح