Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Mutex, AtomicU64, and deal with lots of related fallout #11610

Closed
wants to merge 13 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
std: Remove try_send_deferred plus all fallout
Now that extra::sync primitives are built on a proper mutex instead of a
pthreads one, there's no longer any use for this function.
alexcrichton committed Jan 27, 2014
commit 55111cf1415981470e3dbe4224dfefb5c2debcad
3 changes: 2 additions & 1 deletion src/libgreen/simple.rs
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ impl Runtime for SimpleTask {
}
Local::put(cur_task);
}
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
fn reawaken(mut ~self, mut to_wake: ~Task) {
let me = &mut *self as *mut SimpleTask;
to_wake.put_runtime(self as ~Runtime);
unsafe {
@@ -76,6 +76,7 @@ impl Runtime for SimpleTask {
}
fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> { None }
fn stack_bounds(&self) -> (uint, uint) { fail!() }
fn can_block(&self) -> bool { true }
fn wrap(~self) -> ~Any { fail!() }
}

13 changes: 5 additions & 8 deletions src/libgreen/task.rs
Original file line number Diff line number Diff line change
@@ -376,7 +376,7 @@ impl Runtime for GreenTask {
}
}

fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) {
fn reawaken(mut ~self, to_wake: ~Task) {
self.put_task(to_wake);
assert!(self.sched.is_none());

@@ -409,15 +409,10 @@ impl Runtime for GreenTask {
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
running_green_task.put_task(running_task);
let mut sched = running_green_task.sched.take_unwrap();
let sched = running_green_task.sched.take_unwrap();

if sched.pool_id == self.pool_id {
if can_resched {
sched.run_task(running_green_task, self);
} else {
sched.enqueue_task(self);
running_green_task.put_with_sched(sched);
}
sched.run_task(running_green_task, self);
} else {
self.reawaken_remotely();

@@ -462,6 +457,8 @@ impl Runtime for GreenTask {
c.current_stack_segment.end() as uint)
}

fn can_block(&self) -> bool { false }

fn wrap(~self) -> ~Any { self as ~Any }
}

4 changes: 3 additions & 1 deletion src/libnative/task.rs
Original file line number Diff line number Diff line change
@@ -142,6 +142,8 @@ impl rt::Runtime for Ops {

fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }

fn can_block(&self) -> bool { true }

// This function gets a little interesting. There are a few safety and
// ownership violations going on here, but this is all done in the name of
// shared state. Additionally, all of the violations are protected with a
@@ -230,7 +232,7 @@ impl rt::Runtime for Ops {

// See the comments on `deschedule` for why the task is forgotten here, and
// why it's valid to do so.
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
fn reawaken(mut ~self, mut to_wake: ~Task) {
unsafe {
let me = &mut *self as *mut Ops;
to_wake.put_runtime(self as ~rt::Runtime);
2 changes: 1 addition & 1 deletion src/librustuv/idle.rs
Original file line number Diff line number Diff line change
@@ -122,7 +122,7 @@ mod test {
}
}
};
task.wake().map(|t| t.reawaken(true));
task.wake().map(|t| t.reawaken());
}
}

2 changes: 1 addition & 1 deletion src/librustuv/lib.rs
Original file line number Diff line number Diff line change
@@ -207,7 +207,7 @@ fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {

fn wakeup(slot: &mut Option<BlockedTask>) {
assert!(slot.is_some());
slot.take_unwrap().wake().map(|t| t.reawaken(true));
slot.take_unwrap().wake().map(|t| t.reawaken());
}

pub struct Request {
2 changes: 1 addition & 1 deletion src/librustuv/queue.rs
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
loop {
match state.consumer.pop() {
mpsc::Data(Task(task)) => {
task.wake().map(|t| t.reawaken(true));
task.wake().map(|t| t.reawaken());
}
mpsc::Data(Increment) => unsafe {
if state.refcnt == 0 {
2 changes: 1 addition & 1 deletion src/librustuv/timer.rs
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {

match timer.action.take_unwrap() {
WakeTask(task) => {
task.wake().map(|t| t.reawaken(true));
task.wake().map(|t| t.reawaken());
}
SendOnce(chan) => { chan.try_send(()); }
SendMany(chan, id) => {
20 changes: 7 additions & 13 deletions src/libstd/comm/mod.rs
Original file line number Diff line number Diff line change
@@ -435,9 +435,9 @@ impl Packet {

// This function must have had at least an acquire fence before it to be
// properly called.
fn wakeup(&mut self, can_resched: bool) {
fn wakeup(&mut self) {
match self.to_wake.take_unwrap().wake() {
Some(task) => task.reawaken(can_resched),
Some(task) => task.reawaken(),
None => {}
}
self.selecting.store(false, Relaxed);
@@ -511,7 +511,7 @@ impl Packet {
match self.channels.fetch_sub(1, SeqCst) {
1 => {
match self.cnt.swap(DISCONNECTED, SeqCst) {
-1 => { self.wakeup(true); }
-1 => { self.wakeup(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
}
@@ -586,20 +586,14 @@ impl<T: Send> Chan<T> {
///
/// Like `send`, this method will never block. If the failure of send cannot
/// be tolerated, then this method should be used instead.
pub fn try_send(&self, t: T) -> bool { self.try(t, true) }

/// This function will not stick around for very long. The purpose of this
/// function is to guarantee that no rescheduling is performed.
pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) }

fn try(&self, t: T, can_resched: bool) -> bool {
pub fn try_send(&self, t: T) -> bool {
unsafe {
let this = cast::transmute_mut(self);
this.queue.push(t);
let packet = this.queue.packet();
match (*packet).increment() {
// As described above, -1 == wakeup
-1 => { (*packet).wakeup(can_resched); true }
-1 => { (*packet).wakeup(); true }
// Also as above, SPSC queues must be >= -2
-2 => true,
// We succeeded if we sent data
@@ -614,7 +608,7 @@ impl<T: Send> Chan<T> {
// the TLS overhead can be a bit much.
n => {
assert!(n >= 0);
if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
if n > 0 && n % RESCHED_FREQ == 0 {
let task: ~Task = Local::take();
task.maybe_yield();
}
@@ -690,7 +684,7 @@ impl<T: Send> SharedChan<T> {

match (*packet).increment() {
DISCONNECTED => {} // oh well, we tried
-1 => { (*packet).wakeup(true); }
-1 => { (*packet).wakeup(); }
n => {
if n > 0 && n % RESCHED_FREQ == 0 {
let task: ~Task = Local::take();
3 changes: 2 additions & 1 deletion src/libstd/rt/mod.rs
Original file line number Diff line number Diff line change
@@ -146,14 +146,15 @@ pub trait Runtime {
fn maybe_yield(~self, cur_task: ~Task);
fn deschedule(~self, times: uint, cur_task: ~Task,
f: |BlockedTask| -> Result<(), BlockedTask>);
fn reawaken(~self, to_wake: ~Task, can_resched: bool);
fn reawaken(~self, to_wake: ~Task);

// Miscellaneous calls which are very different depending on what context
// you're in.
fn spawn_sibling(~self, cur_task: ~Task, opts: TaskOpts, f: proc());
fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>>;
/// The (low, high) edges of the current stack.
fn stack_bounds(&self) -> (uint, uint); // (lo, hi)
fn can_block(&self) -> bool;

// FIXME: This is a serious code smell and this should not exist at all.
fn wrap(~self) -> ~Any;
10 changes: 8 additions & 2 deletions src/libstd/rt/task.rs
Original file line number Diff line number Diff line change
@@ -250,9 +250,9 @@ impl Task {
/// Wakes up a previously blocked task, optionally specifiying whether the
/// current task can accept a change in scheduling. This function can only
/// be called on tasks that were previously blocked in `deschedule`.
pub fn reawaken(mut ~self, can_resched: bool) {
pub fn reawaken(mut ~self) {
let ops = self.imp.take_unwrap();
ops.reawaken(self, can_resched);
ops.reawaken(self);
}

/// Yields control of this task to another task. This function will
@@ -283,6 +283,12 @@ impl Task {
pub fn stack_bounds(&self) -> (uint, uint) {
self.imp.get_ref().stack_bounds()
}

/// Returns whether it is legal for this task to block the OS thread that it
/// is running on.
pub fn can_block(&self) -> bool {
self.imp.get_ref().can_block()
}
}

impl Drop for Task {