Active object: Difference between revisions
Content added Content deleted
Thundergnat (talk | contribs) m (Alphabetize; try this again) |
(Add Rust implementation from the rust-rosetta GitHub repository.) |
||
Line 2,064: | Line 2,064: | ||
(displayln (send active output)) |
(displayln (send active output)) |
||
</lang> |
</lang> |
||
=={{header|Rust}}== |
|||
<lang rust>#![feature(mpsc_select)] |
|||
extern crate num; |
|||
extern crate schedule_recv; |
|||
use num::traits::Zero; |
|||
use num::Float; |
|||
use schedule_recv::periodic_ms; |
|||
use std::f64::consts::PI; |
|||
use std::ops::Mul; |
|||
use std::sync::mpsc::{self, SendError, Sender}; |
|||
use std::sync::{Arc, Mutex}; |
|||
use std::thread; |
|||
use std::time::Duration; |
|||
pub type Actor<S> = Sender<Box<Fn(u32) -> S + Send>>; |
|||
pub type ActorResult<S> = Result<(), SendError<Box<Fn(u32) -> S + Send>>>; |
|||
/// Rust supports both shared-memory and actor models of concurrency, and the `Integrator` utilizes |
|||
/// both. We use an `Actor` to send the `Integrator` new functions, while we use a `Mutex` |
|||
/// (shared-memory concurrency) to hold the result of the integration. |
|||
/// |
|||
/// Note that these are not the only options here--there are many, many ways you can deal with |
|||
/// concurrent access. But when in doubt, a plain old `Mutex` is often a good bet. For example, |
|||
/// this might look like a good situation for a `RwLock`--after all, there's no reason for a read |
|||
/// in the main task to block writes. Unfortunately, unless you have significantly more reads than |
|||
/// writes (which is certainly not the case here), a `Mutex` will usually outperform a `RwLock`. |
|||
pub struct Integrator<S: 'static, T: Send> { |
|||
input: Actor<S>, |
|||
output: Arc<Mutex<T>>, |
|||
} |
|||
/// In Rust, time durations are strongly typed. This is usually exactly what you want, but for a |
|||
/// problem like this--where the integrated value has unusual (unspecified?) units--it can actually |
|||
/// be a bit tricky. Right now, `Duration`s can only be multiplied or divided by `i32`s, so in |
|||
/// order to be able to actually do math with them we say that the type parameter `S` (the result |
|||
/// of the function being integrated) must yield `T` (the type of the integrated value) when |
|||
/// multiplied by `f64`. We could possibly replace `f64` with a generic as well, but it would make |
|||
/// things a bit more complex. |
|||
impl<S, T> Integrator<S, T> |
|||
where |
|||
S: Mul<f64, Output = T> + Float + Zero, |
|||
T: 'static + Clone + Send + Float, |
|||
{ |
|||
pub fn new(frequency: u32) -> Integrator<S, T> { |
|||
// We create a pipe allowing functions to be sent from tx (the sending end) to input (the |
|||
// receiving end). In order to change the function we are integrating from the task in |
|||
// which the Integrator lives, we simply send the function through tx. |
|||
let (tx, input) = mpsc::channel(); |
|||
// The easiest way to do shared-memory concurrency in Rust is to use atomic reference |
|||
// counting, or Arc, around a synchronized type (like Mutex<T>). Arc gives you a guarantee |
|||
// that memory will not be freed as long as there is at least one reference to it. |
|||
// It is similar to C++'s shared_ptr, but it is guaranteed to be safe and is never |
|||
// incremented unless explicitly cloned (by default, it is moved). |
|||
let s: Arc<Mutex<T>> = Arc::new(Mutex::new(Zero::zero())); |
|||
let integrator = Integrator { |
|||
input: tx, |
|||
// Here is the aforementioned clone. We have to do it before s enters the closure, |
|||
// because once that happens it is moved into the closure (and later, the new task) and |
|||
// becomes inaccessible to the outside world. |
|||
output: Arc::clone(&s), |
|||
}; |
|||
thread::spawn(move || -> () { |
|||
// The frequency is how often we want to "tick" as we update our integrated total. In |
|||
// Rust, timers can yield Receivers that are periodically notified with an empty |
|||
// message (where the period is the frequency). This is useful because it lets us wait |
|||
// on either a tick or another type of message (in this case, a request to change the |
|||
// function we are integrating). |
|||
let periodic = periodic_ms(frequency); |
|||
let mut t = 0; |
|||
let mut k: Box<Fn(u32) -> S + Send> = Box::new(|_| Zero::zero()); |
|||
let mut k_0: S = Zero::zero(); |
|||
loop { |
|||
// Here's the selection we talked about above. Note that we are careful to call |
|||
// the *non*-failing function, recv(), here. The reason we do this is because |
|||
// recv() will return Err when the sending end of a channel is dropped. While |
|||
// this is unlikely to happen for the timer (so again, you could argue for failure |
|||
// there), it's normal behavior for the sending end of input to be dropped, since |
|||
// it just happens when the Integrator falls out of scope. So we handle it cleanly |
|||
// and break out of the loop, rather than failing. |
|||
select! { |
|||
res = periodic.recv() => match res { |
|||
Ok(_) => { |
|||
t += frequency; |
|||
let k_1: S = k(t); |
|||
// Rust Mutexes are a bit different from Mutexes in many other |
|||
// languages, in that the protected data is actually encapsulated by |
|||
// the Mutex. The reason for this is that Rust is actually capable of |
|||
// enforcing (via its borrow checker) the invariant that the contents |
|||
// of a Mutex may only be read when you have acquired its lock. This |
|||
// is enforced by way of a MutexGuard, the return value of lock(), |
|||
// which implements some special traits (Deref and DerefMut) that allow |
|||
// access to the inner element "through" the guard. The element so |
|||
// acquired has a lifetime bounded by that of the MutexGuard, the |
|||
// MutexGuard can only be acquired by taking a lock, and the only way |
|||
// to release the lock is by letting the MutexGuard fall out of scope, |
|||
// so it's impossible to access the data incorrectly. There are some |
|||
// additional subtleties around the actual implementation, but that's |
|||
// the basic idea. |
|||
let mut s = s.lock().unwrap(); |
|||
*s = *s + (k_1 + k_0) * (f64::from(frequency) / 2.); |
|||
k_0 = k_1; |
|||
} |
|||
Err(_) => break, |
|||
}, |
|||
res = input.recv() => match res { |
|||
Ok(k_new) => k = k_new, |
|||
Err(_) => break, |
|||
} |
|||
} |
|||
} |
|||
}); |
|||
integrator |
|||
} |
|||
pub fn input(&self, k: Box<Fn(u32) -> S + Send>) -> ActorResult<S> { |
|||
// The meat of the work is done in the other thread, so to set the |
|||
// input we just send along the Sender we set earlier... |
|||
self.input.send(k) |
|||
} |
|||
pub fn output(&self) -> T { |
|||
// ...and to read the input, we simply acquire a lock on the output Mutex and return a |
|||
// copy. Why do we have to copy it? Because, as mentioned above, Rust won't let us |
|||
// retain access to the interior of the Mutex unless we have possession of its lock. There |
|||
// are ways and circumstances in which one can avoid this (e.g. by using atomic types) but |
|||
// a copy is a perfectly reasonable solution as well, and a lot easier to reason about :) |
|||
*self.output.lock().unwrap() |
|||
} |
|||
} |
|||
/// This function is fairly straightforward. We create the integrator, set its input function k(t) |
|||
/// to 2pi * f * t, and then wait as described in the Rosetta stone problem. |
|||
fn integrate() -> f64 { |
|||
let object = Integrator::new(10); |
|||
object |
|||
.input(Box::new(|t: u32| { |
|||
let two_seconds_ms = 2 * 1000; |
|||
let f = 1. / f64::from(two_seconds_ms); |
|||
(2. * PI * f * f64::from(t)).sin() |
|||
})) |
|||
.expect("Failed to set input"); |
|||
thread::sleep(Duration::from_secs(2)); |
|||
object.input(Box::new(|_| 0.)).expect("Failed to set input"); |
|||
thread::sleep(Duration::from_millis(500)); |
|||
object.output() |
|||
} |
|||
fn main() { |
|||
println!("{}", integrate()); |
|||
} |
|||
/// Will fail on a heavily loaded machine |
|||
#[test] |
|||
#[ignore] |
|||
fn solution() { |
|||
// We should just be able to call integrate, but can't represent the closure properly due to |
|||
// rust-lang/rust issue #17060 if we make frequency or period a variable. |
|||
// FIXME(pythonesque): When unboxed closures are fixed, fix integrate() to take two arguments. |
|||
let object = Integrator::new(10); |
|||
object |
|||
.input(Box::new(|t: u32| { |
|||
let two_seconds_ms = 2 * 1000; |
|||
let f = 1. / (two_seconds_ms / 10) as f64; |
|||
(2. * PI * f * t as f64).sin() |
|||
})) |
|||
.expect("Failed to set input"); |
|||
thread::sleep(Duration::from_millis(200)); |
|||
object.input(Box::new(|_| 0.)).expect("Failed to set input"); |
|||
thread::sleep(Duration::from_millis(100)); |
|||
assert_eq!(object.output() as u32, 0) |
|||
}</lang> |
|||
=={{header|Scala}}== |
=={{header|Scala}}== |
||