Async Streams em Rust: Processamento de Dados em Tempo Real — 2026

Aprenda a usar async streams em Rust com tokio-stream, futures e StreamExt para processar dados em tempo real. Exemplos práticos e padrões modernos.

Introdução

Se você já trabalha com async/await em Rust, sabe que Future resolve um único valor. Mas e quando precisa processar uma sequência contínua de dados — mensagens de WebSocket, linhas de um arquivo enorme, eventos de sensores IoT ou updates de uma API? É aí que entram os async streams.

Async streams são a versão assíncrona dos iteradores: em vez de bloquear a thread para cada item, eles suspendem a task e liberam o runtime para fazer outras coisas. Em 2026, com o ecossistema do Tokio maduro e a trait Stream consolidada, dominar async streams é essencial para qualquer dev Rust que trabalhe com I/O.

O que é um Async Stream?

Um async stream implementa a trait Stream do crate futures-core:

pub trait Stream {
    type Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}

Parece complexo, mas na prática você raramente implementa poll_next manualmente. Assim como usamos async fn em vez de implementar Future::poll, existem ferramentas que criam streams de forma ergonômica.

A analogia é direta:

SíncronoAssíncrono
IteratorStream
next()next().await
for item in iterwhile let Some(item) = stream.next().await

Criando Streams com tokio-stream

O crate tokio-stream oferece adaptadores prontos. Adicione ao seu Cargo.toml:

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
futures = "0.3"

Stream a partir de um intervalo

use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() {
    let stream = IntervalStream::new(interval(Duration::from_secs(1)));

    // Pegar apenas os 5 primeiros ticks
    let mut stream = stream.take(5);

    while let Some(tick) = stream.next().await {
        println!("Tick em: {:?}", tick);
    }

    println!("Stream finalizado!");
}

O IntervalStream emite um item a cada intervalo. O .take(5) limita a 5 elementos — exatamente como faria com um iterador síncrono.

Stream a partir de um canal

Canais mpsc do Tokio se integram naturalmente com streams:

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(100);
    let mut stream = ReceiverStream::new(rx);

    // Produtor: simula dados chegando de uma API
    tokio::spawn(async move {
        for i in 1..=10 {
            tx.send(format!("evento_{}", i)).await.unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
        }
    });

    // Consumidor: processa cada evento conforme chega
    while let Some(evento) = stream.next().await {
        println!("Recebido: {}", evento);
    }
}

Esse padrão é a base de praticamente qualquer pipeline de processamento em tempo real: um produtor envia dados e o consumidor processa sob demanda.

Transformando Streams com StreamExt

A trait StreamExt (de tokio-stream ou futures) adiciona combinadores poderosos, análogos aos de iteradores:

use tokio_stream::StreamExt;
use tokio_stream::iter;

#[tokio::main]
async fn main() {
    let dados = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    let resultado: Vec<i32> = iter(dados)
        .filter(|x| x % 2 == 0)      // só pares
        .map(|x| x * x)               // eleva ao quadrado
        .take(3)                       // apenas 3 primeiros
        .collect()
        .await;

    println!("Resultado: {:?}", resultado); // [4, 16, 36]
}

Combinadores mais úteis

CombinadorDescrição
.map(f)Transforma cada item
.filter(f)Filtra itens por predicado
.take(n)Limita a N itens
.skip(n)Pula os N primeiros
.throttle(dur)Emite no máximo 1 item por intervalo
.timeout(dur)Erro se nenhum item chegar no prazo
.merge(outro)Combina dois streams em um
.chain(outro)Concatena dois streams

Criando Streams Customizados com async_stream

Para streams com lógica mais complexa, o crate async-stream permite usar a macro stream! com sintaxe natural:

use async_stream::stream;
use tokio_stream::StreamExt;
use std::time::Duration;

fn dados_sensor(sensor_id: &'static str) -> impl tokio_stream::Stream<Item = f64> {
    stream! {
        let mut leitura = 20.0;
        loop {
            // Simula leitura de sensor com variação
            leitura += (rand::random::<f64>() - 0.5) * 2.0;
            yield leitura;
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let mut stream = dados_sensor("temp_01").take(10);

    while let Some(temp) = stream.next().await {
        println!("Temperatura: {:.1}°C", temp);
    }
}

A palavra-chave yield dentro de stream! emite um item, e o stream suspende automaticamente nos pontos .await.

Exemplo Prático: Pipeline de Logs em Tempo Real

Vamos montar um pipeline realista que lê logs, filtra por severidade e agrega métricas:

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use std::collections::HashMap;

#[derive(Debug, Clone)]
struct LogEntry {
    nivel: String,
    mensagem: String,
    timestamp: u64,
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<LogEntry>(1000);
    let stream = ReceiverStream::new(rx);

    // Produtor: simula logs chegando
    tokio::spawn(async move {
        let niveis = ["INFO", "WARN", "ERROR", "DEBUG", "ERROR"];
        for (i, nivel) in niveis.iter().cycle().take(50).enumerate() {
            let entry = LogEntry {
                nivel: nivel.to_string(),
                mensagem: format!("Evento #{} processado", i),
                timestamp: i as u64,
            };
            if tx.send(entry).await.is_err() {
                break;
            }
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
        }
    });

    // Pipeline: filtra erros e acumula contagem
    let mut contagem: HashMap<String, usize> = HashMap::new();

    let mut stream = stream
        .filter(|log| log.nivel == "ERROR" || log.nivel == "WARN")
        .take(20);

    while let Some(log) = stream.next().await {
        *contagem.entry(log.nivel.clone()).or_insert(0) += 1;
        println!("[{}] {}", log.nivel, log.mensagem);
    }

    println!("\n--- Resumo ---");
    for (nivel, total) in &contagem {
        println!("{}: {} ocorrências", nivel, total);
    }
}

Esse padrão se aplica diretamente a cenários de produção: ingestão de métricas, monitoramento de infraestrutura, processamento de eventos de microsserviços.

Streams vs Channels vs Iteradores: Quando Usar Cada Um

  • Iterador síncrono: dados já estão na memória, processamento CPU-bound
  • Canal (mpsc): comunicação entre tasks, quando produtor e consumidor têm lógica separada
  • Async stream: processamento sequencial de dados assíncronos com transformações compostas (filter, map, throttle)

Na prática, canais e streams se complementam: você cria um canal para receber dados e envolve o Receiver em um ReceiverStream para usar os combinadores.

Tratamento de Erros em Streams

Streams podem emitir Result como item, e existem combinadores específicos:

use tokio_stream::StreamExt;
use tokio_stream::iter;

#[tokio::main]
async fn main() {
    let dados: Vec<Result<i32, String>> = vec![
        Ok(1), Ok(2), Err("falha na rede".into()), Ok(4), Ok(5),
    ];

    // Processar apenas os Ok, parando no primeiro erro
    let mut stream = iter(dados);

    while let Some(item) = stream.next().await {
        match item {
            Ok(valor) => println!("Processado: {}", valor),
            Err(e) => {
                eprintln!("Erro no stream: {}. Reconectando...", e);
                break;
            }
        }
    }
}

Para cenários de produção, combine com tratamento de erros robusto usando thiserror e anyhow.

Performance: Backpressure e Buffering

Async streams no Tokio têm backpressure natural: se o consumidor está lento, o produtor automaticamente suspende. Isso é uma vantagem enorme sobre modelos push-based.

Para ajustar performance:

use tokio_stream::StreamExt;

// Buffer: acumula itens antes de processar em lote
let mut chunks = stream.chunks_timeout(100, Duration::from_secs(5));

while let Some(lote) = chunks.next().await {
    processar_lote(&lote).await;
}

O chunks_timeout acumula até 100 itens ou espera no máximo 5 segundos — o que vier primeiro. Perfeito para batch inserts em bancos de dados como PostgreSQL.

Comparação com Outras Linguagens

Se você vem de Go, async streams são como channels com a ergonomia de range loops mais combinadores funcionais. Em Go você faria for msg := range ch, em Rust é while let Some(msg) = stream.next().await — mas com .filter(), .map(), .throttle() compostos.

Em Python, o equivalente são async generators (async for item in gen). A diferença é que Rust garante zero-cost abstractions — sem overhead de runtime para cada yield.

E em Kotlin, o conceito mais próximo são Flows do coroutines, que também suportam backpressure e combinadores. O modelo é bastante similar ao Rust.

Conclusão

Async streams são a ferramenta certa quando você precisa processar sequências de dados assíncronos com composição elegante. Em 2026, o ecossistema está maduro:

  • tokio-stream para adaptadores e wrappers
  • async-stream para criar streams com yield
  • StreamExt para combinadores poderosos

Combinados com o runtime do Tokio e o sistema de tipos do Rust, você consegue pipelines de dados em tempo real que são seguros, rápidos e legíveis.

Leia Também