Blog

rust-concurrency
Quick Tips

Rust Concurrency

For a long time I have been thinking about writing a sample program in Rust “the” new systems language. I have done coding in C++ for initial 5 years of my career before I moved on completely to Java and recently in one of my products a requirement came up that a low latency high performance component had to be developed.

As I have written by default Java was a default choice as its my first choice anyways. However I realized that this component could not afford non deterministic nature of garbage collector.

So need was to write program where I could have control over exact memory deallocation without worrying about “stop the world” garbage collection. Natural Choice was C++ but programming is all about having fun and I wanted to try out something new and C++ threading support and syntax is not all that great even in C++11.

So I decided to try out Go. but again Go had an issue of garbage collection and same fear of non determinism creeped in.

So time to try out Rust.

Program is simple but can be extended to lot of other scenarios.

One thread keeps spitting out data at some regular intervals. A vector keeps track of generated data.

Other thread keeps ticking at regular intervals (100ms or so) and whenever there are items which have elapsed time greater than a threshold those items are expired. Same as cache TTL.

use std::thread;
    use std::sync::mpsc;
    use std::time::{Duration,Instant};
    use std::collections::HashMap;
    //Define struct
    #[derive(Clone)]
    struct Item {
        created_at: Instant,
        id:i64,
        pub description: String
    }

//Implement Item
    impl Item {
        pub fn new(id: i64,description: String) -> Item {
            Item {
                created_at: Instant::now(),
                id: id,
                description: description
            }
        }

        fn created_at(&self) -> Instant {
            self.created_at
        }

        fn id(&self) -> i64 {
            self.id
        }
    }

    fn main() {
        let (sender, receiver) = mpsc::channel(); //Creat  multiple publisher single receiver channel
        let sender_pop = sender.clone(); //clone sender
    
        //Create a thread that sends pop every 2 seconds
        thread::spawn(move || {
            //Create infinite loop
            loop {
                thread::sleep(Duration::from_millis(100));
                sender_pop.send(Item::new(-1,String::from("Pop"))).unwrap();
            }
        });

        //Create a thread that keeps sending data every second t
        thread::spawn(move || {
            let mut val = 1;
            //Create infinite loop
            loop {
                val = val + 1;
                sender.send(Item::new(val,String::from("New"))).unwrap();
                thread::sleep(Duration::from_millis(1000));
            }
        });

        //Create a mutable vector
        let mut vals: Vec<Item> = Vec::new(); 
        let ttl = 5; //TTL in seconds
        //Receive items in non blocking fashion
        for received in receiver {
            //let item = &received;
            let mut item = &received;
            let newItem: Item  = item.clone();
            match item.description.as_ref(){
                "Pop" => {
                    println!("Pop");
                    vals.retain(|ref x| Instant::now().duration_since(x.created_at).as_secs() < ttl);
                },
                _ => {
                    vals.push(newItem);
                }
            }
        }
    }

That’s it. You have done synchronisation between threads without any race condition. That’s how cool Rust is.

In the next blog we will try to send notification whenever items are expired.

Happy Coding !!