Learning Rust; Creating a multithreaded worker from scratch.

Learning Rust; Creating a multithreaded worker from scratch.

Hello everyone! In the past few weeks, I've been learning rust and wanted to apply what I learned through a small project and document it.

What we are going to do is simply, from scratch, we're going to be building a multithreaded worker that processes jobs that get pushed onto a Redis queue.

What we aim to achieve is the following:

Simply put. A rust consumer that listens to a Redis queue using BLPOP which blocks if there is nothing in the queue. Once we pop a job we're going to distribute it to one of our workers.

We're going to create a thread pool for our workers so we can manage our resources and not have to spawn a thread for each incoming job.

How will we pass the jobs to the workers (threads)?

We'll use channels for this. Channels are a unique way to synchronize between threads and share memory by communicating between the threads.

The thread pool will act as our Sender (who passes data into the channel) and the workers will act as Receivers

The workers will share the same receiver once a job gets passed in on of the workers will start processing it while the other still wait on that receiver waiting for more jobs.

If you wish to tag along create a new rust project using cargo new <name>. If you don't have cargo/rust installed you can install it from here

Let's get started with our first component. The Thread Pool struct.

In our src directory we'll create a new file called threadpool.rs

// threadpool.rs
pub struct ThreadPool{
  workers: Vec<Worker>,
  sender: Sender<Message>
}

The struct consists of a vector (dynamic array) of workers and the sender part of the channel. In the channel we'll pass along an enum called Message which looks like this:

pub enum Message{
  NewJob(Job),
  Terminate
}

The enum tells us that Message can have one of 2 values. Either a NewJob passing in the job as a parameter or Terminate signal which if passed down to a worker will tell it to shut down gracefully.

Rust relies on enums heavily when a value can have a certain possible set of values. For more info check out the rust documentation here

The struct we created has 2 methods defined in it new and execute, The new method looks like this

impl ThreadPool {
  pub fn new(size: usize) -> ThreadPool {
    assert!(size > 0);

    // Create a channel for a single producer & multiple consumers
    let (sender, receiver) = channel::<Message>();

    // Create ARC for receiver so it can be used in multiple threads.
    let receiver_rc = Arc::new(Mutex::new(receiver));
    // Create <size> workers.
    let mut workers: Vec<Worker> = vec![];
    for i in 1..size + 1{
      workers.push(Worker::new(i,Arc::clone(&receiver_rc)));
    }


    ThreadPool { workers, sender }
  }

What we do is:

  1. We create the channel

  2. Create an ARC (Atomically reference counter) pointer which in Rust allows shared ownership of some value. Since we want several workers to use the receiver part of the channel we need a thread safe method to do it and reference counters are the way to do it. More info here

  3. We wrap the receiver with a Mutex before proceeding since we're going to mutate it in the workers by taking data from it. A mutex will safely allow this were each worker needs to acquire a lock in order to be able to listen to the channel.

  4. We proceed in creating the Worker struct (which we'll look at next). Creating a desired size of workers as specified in the parameter size

  5. And for each worker we pass in a given ID along with its reference to the receiver. The clone function increments the reference count for each worker.

  6. Then we return the new ThreadPool struct created.

Now the execute method

  pub fn execute<F>(&self, job: F)
  where
    F: FnOnce() + 'static + Send
  {
    let job = Box::new(job);
    match self.sender.send(Message::NewJob(job)){
      Ok(_) => "Job Queued to Workers!",
      Err(e) => panic!("An Error occured while queuing a job to the workers {}", e)
    };
  }

The execute method takes a generic that implements certain traits. Traits are similar to interfaces in other programming languages but offer alot more.

We're going to pass the execute method a closure as a parameter. Closures are similar to Ruby's proc it's just a block of code to execute without it explicitily being defined as a function.

F: FnOnce() + 'static + Send This line means the closure needs to implement 3 traits;

  1. FnOnce() is a trait for functions that can be called once, taking no parameters, and returning some value. It is a closure trait that indicates that the function can be called only once and takes ownership of any captured variables.

  2. static is a lifetime specifier that indicates that the references inside the function must live for the entire lifetime of the program. This is useful for functions that need to return references to data that will exist for the entire duration of the program.

  3. Send is a trait that indicates that the function can be safely sent across threads to be executed. This means that any data used by the function must be thread-safe and not have any data races.

Then when a closure is passed we allocate memory for it in the heap using Box::new returning a pointer to the closure so threads can safely use it.

Lastly we proceed to send the job via our created channel and check for any existing errors that might've occured. If errors exist we'll panic with a message.

Before moving on to the worker struct. In order to Terminate our workers when we finish and gracefully shutdown them. We'll need to implement a trait to our ThreadPool.

The drop trait has a method that allows us to add some logic when the ThreadPool gets destructed. Calling the destructor of ThreadPool with extra logic that terminates our workers.

impl Drop for ThreadPool {
  fn drop(&mut self) {
      for (i,_) in self.workers.iter().enumerate() {
        self.sender.send(Message::Terminate).unwrap();
      }

      for worker in &mut self.workers{
        if let Some(thread) = worker.thread.take() {
            println!("Shutting down worker {}", worker.id);
            thread.join().unwrap();
        }
    }
  }
}

We loop over our created workers and send the Terminate message. We also use thread.join() (similar to waitgroup in go) that waits for the threads to finish any ongoing job before terminating. This is to prevent the main thread from returning first thus killing all the ongoing threads.

Worker Struct

Our second part is implementing the worker struct which looks like the following

// worker.rs
pub struct Worker{
  pub id: usize,
  pub thread: Option<JoinHandle<()>>,
}

It has 2 properties. ID & thread. The id is the one we sent earlier in our thread pool and we'll discover the thread below. The thread returns an Option enum which states that it can be either a value or none (null). The JoinHandle type has a join method that can be used to wait for the thread to finish and retrieve the result.

The worker struct has a new method looking like this


impl Worker{
  pub fn new(id: usize, receiver: Arc<Mutex<Receiver<Message>>>) -> Worker{
    let thread = spawn(move || {
      poll_tasks(receiver);
    });

    Worker { id, thread: Some(thread) }
  }
}

Simply put, takes in the id and the receiver from the channel we created earlier. It spawns a new thread and invokes a function called poll_tasks passing in the receiver. Then simply returns the worker struct.

The poll_tasks function:

fn poll_tasks(rec: Arc<Mutex<Receiver<Message>>>){
  loop {
    // listen on the channel for Messages.
      match rec.lock().unwrap().recv().unwrap(){
        Message::NewJob(job) => job(),
        Message::Terminate => break
      }
  }
}

Simply tries to lock on the receiver & listens for any jobs passed from the sender aka ThreadPool. Once a message gets passed it matches against it whether it's a new job or a termination and executes all this in an infinite loop.

Only if it's a termination it breaks from the loop and thus the thread will finish execution.

Redis BLPOP

Finally, all that's left is adding Redis, connecting to it & using the command BLPOP to a given queue.

I will add a new struct called RedisClient that takes in the created connection and abstracts the redis logic.

pub struct RedisClient {
    pub conn: Connection,
}

impl RedisClient {
    pub fn brpop(&mut self) -> Option<String>{
      let value:Option<(String, String)> = self.conn.brpop("jobs", 3).unwrap();
      return match value {
        Some(value) => Some(value.1.into()),
        None => None
      }
    }
}

We have a struct with a conn property and a brpop method that listens to a queue named jobs. It will block for 3 seconds maximum if it didn't find anything in the queue then it matches against the result whether it is a timeout or a job.

In our main.rs we can have the following

// main.rs
    let redis_init = redis::Client::open("127.0.0.1:6379").unwrap();
    let conn = redis_init.get_connection().unwrap();
    let mut redis_client = RedisClient { conn: conn };

We create the connection having spun up a Redis docker container. unwrap function panics if any error occurs.

After creating our redis_client all that's left is connecting everything together

   //main.rs
   let pool = ThreadPool::new(POOL_SIZE);

   loop {
      match redis_client.brpop(){
        Some(val) => {
          pool.execute(move || {
            println!("Found a job in the queue, processing {}", val)
          })
        },
        None => println!("Waiting for items..")
      }
    }

We create the thread pool and use the brpop in an infinite loop. Whenever we get a val from the queue we'll execute the passed closure that just prints that it found a job.

To add items to the queue do the following:

  1. docker exec -it <redis-container-name> bash

  2. Type in redis-cli

  3. RPUSH jobs "first_job"

And you should see the terminal print that it found a job!

Summary

Rust is a beautiful language. But it has a somewhat difficult learning curve because of the concepts of ownership, etc. This is just a demo I was practicing with so I thought I'd share it. Hope you got anything out of it and I'll see you in the next one!

References

  1. https://doc.rust-lang.org/stable/std/index.html

  2. https://www.youtube.com/watch?v=1AamFJGAE8E

Did you find this article valuable?

Support Amr Elhewy by becoming a sponsor. Any amount is appreciated!