Files
  • main.rs
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
extern crate futures;
extern crate tokio;
extern crate tokio_threadpool;

use std::io::{self, Read};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

use futures::{sync::mpsc, Async, Future, Sink, Stream, lazy};
use tokio::prelude::*;
use tokio_threadpool::{ThreadPool, Builder};

#[derive(Eq, PartialEq)]
enum FuturesItem {
    Value(String),
    Done,
}

const NUMBER_OF_PROCESSES: u32 = 5; //This is for demo only, in production this could be millions.

static total_received: AtomicUsize = ATOMIC_USIZE_INIT;
static print_message_futs_counter: AtomicUsize = ATOMIC_USIZE_INIT;

fn print_message(s: &str) {
    let my_id = print_message_futs_counter.fetch_add(1, Ordering::SeqCst);
    thread::sleep(Duration::from_secs(2));
    println!("{} Received by future with id {}.", s, my_id);
}

fn lightweight_process(rxm: Arc<Mutex<mpsc::Receiver<FuturesItem>>>, thread_id: i32) {
    let thread_pool = ThreadPool::new();

    tokio::run(lazy(move || {
        let mut rx = rxm.lock().unwrap();

        loop {
            match rx.poll() {
                Ok(Async::Ready(Some(FuturesItem::Value(s)))) => {
                    total_received.fetch_add(1, Ordering::SeqCst);
                    
                    thread_pool.spawn(lazy(move || {
                        future::poll_fn(move || {
                            tokio_threadpool::blocking(|| {
                                print_message(&s);
                            }).map_err(|_| panic!("the threadpool shut down"))
                        })
                    }));

                }
                Ok(Async::Ready(Some(FuturesItem::Done))) => break,
                Ok(Async::NotReady) => {
                    /*println!("Async::NotReady.")*/
                }
                Ok(_) => println!("Ok(_)."),
                Ok(Async::Ready(None)) => break,
                Err(_) => break,
            }

            //println!("Receiver poll is spinning. This is not efficient.");
            thread::sleep(Duration::from_millis(100));
        }

        thread_pool.shutdown().wait().unwrap();

        Ok(())
    }));


}

fn main() {
    let mut total_sent: u64 = 0;
    let (mut sx, rx) = mpsc::channel::<FuturesItem>(1_024);

    let rx = Arc::new(Mutex::new(rx));

    let mut bg_thread_id = 0;

    // Make a vector to hold the children which are spawned.
    let mut children = vec![];

    for i in 0..NUMBER_OF_PROCESSES {
        //ERROR: use of moved value: `rx`

        //and maybe this is not needed.

//        children.push(thread::spawn(move || {
//            println!("this is thread (background) number {}", bg_thread_id);
//            lightweight_process(Arc::clone(&rx), bg_thread_id);
//        }));
//
//        bg_thread_id += 1;
    }

    children.push(thread::spawn(move || {
        println!("this is thread (background) number {}", bg_thread_id);
        lightweight_process(Arc::clone(&rx), bg_thread_id);
    }));

    thread::sleep(Duration::from_millis(10));

    loop {
        println!("Enter command: ");
        let mut cmd_input = String::new();
        io::stdin().read_line(&mut cmd_input);

        match &*cmd_input.trim_matches(|c| c == ' ' || c == '\r' || c == '\n').to_uppercase() {
            "CHECK" => {
                println!("Total sent: {:?}, total received: {:?}.", total_sent, total_received);
            }

            "EXIT" => {
                break;
            }

            "SEND" => {
                for i in 0..5 {
                    total_sent += 1;
                    sx = sx.send(FuturesItem::Value(format!("Sent from main."))).wait().unwrap();
                }
            }

            &_ => continue,
        }
    }

    sx = sx.send(FuturesItem::Done).wait().unwrap();
    sx.close().unwrap().is_ready(); //not sure how to gracefully close the channel.

    println!("Closing processes...");
    for child in children {
        // Wait for the thread to finish. Returns a result.
        let _ = child.join();
    }

    println!("Closing application...");

    thread::sleep(Duration::from_millis(2000));
}