Async, Futures, AMQP, pick three

A few weeks ago, we set out to develop an AMQP client library in Rust, and I'm happy to release it now! We will integrate it in more and more of our tools in the future.

Design: a futures based API and a low level API

One of our goals was to leverage tokio and futures to make an API that is easy to use, but also allowing for lower level implementations using directly an event loop with something like mio.

This was a bit challenging, but we ended up with two crates:

The resulting code can work with tokio-core's event reactor, or even futures-cpupool.

The libraries use, for the network frame format: nom, the Rust parser combinators library; cookie-factory, the experimental serialization library with the same approach as nom. It is a great example of employing nom inside a tokio transport, and integrating a complex protocol's state machine directly with tokio-io. We will release a tutorial on how to write such a protocol soon.

The libraries are also designed to be completely independent from the network stream: you can use a basic TCP stream, a TLS stream or a unix socket, and you won't be blocked by rust-openssl version conflicts between many libraries (which was a big issue for us).

Using the futures API: publishing a message

Every method returns a future, to let you chain them: the connect result will give a correct client once the complete AMQP handshake was performed, the channel will be available once the server has answered, etc. But the nature of AMQP makes parallel work on the same connection easy.

extern crate futures;
extern crate tokio_core;
extern crate lapin_futures as lapin;

use std::default::Default;
use futures::Stream;
use futures::future::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use lapin::client::ConnectionOptions;
use lapin::channel::{BasicPublishOptions,QueueDeclareOptions};

fn main() {

  // create the reactor
  let mut core = Core::new().unwrap();
  let handle = core.handle();
  let addr = "127.0.0.1:5672".parse().unwrap();

  core.run(

    TcpStream::connect(&addr, &handle).and_then(|stream| {

      // connect() returns a future of an AMQP Client
      // that resolves once the handshake is done
      lapin::client::Client::connect(
        stream,
        &ConnectionOptions{
          username: "guest",
          password: "guest",
          ..Default::default()
        }
      )
    }).and_then(|client| {

      // create_channel returns a future that is resolved
      // once the channel is successfully created
      client.create_channel()
    }).and_then(|channel| {
      let id = channel.id;
      info!("created channel with id: {}", id);

      channel.queue_declare("hello", &QueueDeclareOptions::default()).and_then(move |_| {
        info!("channel {} declared queue {}", id, "hello");

        channel.basic_publish(
          "hello",
          b"hello from tokio",
          &BasicPublishOptions::default(),
          BasicProperties::default().with_user_id("guest".to_string()).with_reply_to("foobar".to_string())
        )
      })
    })
  ).unwrap();
}

Every struct of the API, be it a client, channel or consumer, holds a synchronized reference to the underlying transport, so you could call it from any thread.

Using the futures API: creating a consumer

When you call the basic_consume method, it returns a future of a Consumer. It implements Stream, so this can reuse all the related combinators from the futures library.

extern crate futures;
extern crate tokio_core;
extern crate lapin_futures as lapin;

use futures::Stream;
use futures::future::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use lapin::client::ConnectionOptions;
use lapin::channel::{BasicConsumeOptions,QueueDeclareOptions};

fn main() {

  // create the reactor
  let mut core = Core::new().unwrap();
  let handle = core.handle();
  let addr = "127.0.0.1:5672".parse().unwrap();

  core.run(

    TcpStream::connect(&addr, &handle).and_then(|stream| {
      lapin::client::Client::connect(stream, &ConnectionOptions::default())
    }).and_then(|client| {

      client.create_channel()
    }).and_then(|channel| {

      let id = channel.id;
      info!("created channel with id: {}", id);

      let ch = channel.clone();
      channel.queue_declare("hello", &QueueDeclareOptions::default()).and_then(move |_| {
        info!("channel {} declared queue {}", id, "hello");

        channel.basic_consume("hello", "my_consumer", &BasicConsumeOptions::default())
      }).and_then(|stream| {
        info!("got consumer stream");

        stream.for_each(|message| {
          debug!("got message: {:?}", message);
          info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());

          ch.basic_ack(message.delivery_tag);
          Ok(())
        })
      })
    })
  ).unwrap();
}

Looking under the hood: lapin-async

The lapin-async library is meant for use with an event loop that will tell you when you can read or write on the underlying stream. As such, it does not own the network stream, nor the buffers used to read and write. You handle your IO, then pass the buffers to the protocol's state machine. It will update its state, tell you how much data it consumed, give you data to send to the network. And then you can query it for state changes.

There are various reasons for an architecture like this one:

  • a library that owns the IO stream usually does not play well with event loops
  • the developer might want to make their own optimizations with sockets and buffers
  • separating the IO makes the library easy to test: you can pass buffers (or even
    complete structs) to the state machine and verify the expected state easily

More generally, a protocol library should not dictate how the application handles its networking.

As an example of how it could run:

let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap();
stream.set_nonblocking(true);

let capacity = 8192;
let mut send_buffer    = Buffer::with_capacity(capacity as usize);
let mut receive_buffer = Buffer::with_capacity(capacity as usize);

let mut conn: Connection = Connection::new();
assert_eq!(conn.connect().unwrap(), ConnectionState::Connecting(ConnectingState::SentProtocolHeader));
loop {
  match conn.run(&mut stream, &mut send_buffer, &mut receive_buffer) {
    Err(e) => panic!("could not connect: {:?}", e),
    Ok(ConnectionState::Connected) => break,
    Ok(state) => info!("now at state {:?}, continue", state),
  }
  thread::sleep(time::Duration::from_millis(100));
}
info!("CONNECTED");

the run method is a helper that will read from the network, parse frames, update internal state with the frames, write new frames to the network. We loop until the state switches to "connected". Most of the behaviour is on that model.

While the lapin-async library has most of the functionality, it is still a lot of manual work to manage, and you should prefer the futures based library.

A young library

This is an early release, and it is missing a lot of features, but the design makes them easy to implement.

Right now, the only authentication method is "plain", you can create and close channels, create queues (without options), and use the methods from the "basic" AMQP class. RabbitMQ's "publisher confirms" extension is also available.

It is mainly missing the "nack" extension, and the exchange and transaction handling methods.

More features will come in the following weeks, and if you want to contribute, you're very welcome 🙂

Blog

À lire également

Create your own MCP client/server: as easy as 1-2-3 with Otoroshi

While Otoroshi with LLM already allows you to simplify the management of your various AI providers, access to models and integration with your teams, we have added simplified management of MCP clients and servers.
Company

Clever Cloud obtains HDS (Health Data Hosting) certification

Clever Cloud achieves HDS Certification, enabling it to host health data in France. Clever Cloud, Europe's leading provider of Platform as a Service cloud solutions, today announced that it has been awarded the Hébergeur de Données de Santé (HDS) certification, in its updated version effective May 16, 2024, for all 6 activities in the standard. This certification reinforces Clever Cloud's position as a trusted partner for companies and organizations in the healthcare sector.
Press

Clever Tools: a year of enhancements for your deployments, on the road to v4

A command line interface (CLI) is at the core of developer experience. At Clever Cloud, we have been providing Clever Tools for almost 10 years.
Engineering Features