From fc3882dc1fc15118b0c2865de5af0d8b548e3476 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Thu, 22 Mar 2018 02:10:48 +0000 Subject: [PATCH 1/8] Refactor promise executor --- src/webcore/promise_executor.rs | 201 +++++++++++++++----------------- 1 file changed, 92 insertions(+), 109 deletions(-) diff --git a/src/webcore/promise_executor.rs b/src/webcore/promise_executor.rs index fe6a014f..d3c6d4b4 100644 --- a/src/webcore/promise_executor.rs +++ b/src/webcore/promise_executor.rs @@ -26,19 +26,16 @@ type BoxedFuture = Box< Future< Item = (), Error = () > + 'static >; struct SpawnedTask { is_queued: Cell< bool >, spawn: RefCell< Option< Spawn< BoxedFuture > > >, - // TODO maybe this should use Weak instead ? - inner: Rc< Inner >, } impl SpawnedTask { - fn new< F >( future: F, inner: Rc< Inner > ) -> Rc< Self > + fn new< F >( future: F ) -> Rc< Self > where F: Future< Item = (), Error = () > + 'static { Rc::new( Self { is_queued: Cell::new( false ), spawn: RefCell::new( Some( executor::spawn( Box::new( future ) as BoxedFuture - ) ) ), - inner, + ) ) ) } ) } @@ -50,7 +47,7 @@ impl SpawnedTask { // Clear `is_queued` flag so that it will re-queue if poll calls task.notify() self.is_queued.set( false ); - if spawn_future.poll_future_notify( &&Notifier, self as *const _ as usize ) == Ok( Async::NotReady ) { + if spawn_future.poll_future_notify( &&EventLoop, self as *const _ as usize ) == Ok( Async::NotReady ) { // Future was not ready, so put it back *spawn = Some( spawn_future ); } @@ -58,154 +55,140 @@ impl SpawnedTask { } fn notify( task: Rc< SpawnedTask > ) { - let inner = &task.inner; - // If not already queued if !task.is_queued.replace( true ) { - // TODO figure out a way to avoid the clone ? - inner.queue.queue.borrow_mut().push_back( task.clone() ); - } - - // If not already running - if !inner.queue.is_running.replace( true ) { - js! { @(no_return) - @{&inner.microtask}.next_tick(); - } + EventLoop.push_task(task); } } } -struct Notifier; +// A proxy for the javascript event loop. +struct EventLoop; + +// There's only one thread, but this lets us tell the compiler that we +// don't need a `Sync` bound, and also gives us lazy initialization. +thread_local! { + static EVENT_LOOP_INNER: EventLoopInner = EventLoopInner::new(); +} -struct Queue { - is_running: Cell< bool >, - // TODO maybe SpawnedTask needs to use Arc rather than Rc ? - queue: RefCell< VecDeque< Rc< SpawnedTask > > >, +impl EventLoop { + fn drain(&self) { + EVENT_LOOP_INNER.with(EventLoopInner::drain) + } + fn push_task(&self, task: Rc< SpawnedTask >) { + EVENT_LOOP_INNER.with(|inner| inner.push_task(task)) + } } -struct Inner { - queue: Rc< Queue >, - microtask: Reference, +// State relating to the javascript event loop. Only one instance ever exists. +struct EventLoopInner { + // Avoid unnecessary allocation and interop by keeping a local + // queue of pending tasks. + microtask_queue: RefCell< VecDeque< Rc< SpawnedTask > > >, + waker: Reference, } -impl Drop for Inner { +// Not strictly necessary, but may become relevant in the future +impl Drop for EventLoopInner { #[inline] fn drop( &mut self ) { js! { @(no_return) - @{&self.microtask}.callback.drop(); + @{&self.waker}.drop(); } } } -struct PromiseExecutor( Rc< Inner > ); - -// TODO this should be generalized into a MicroTask API -thread_local! { - static EXECUTOR: PromiseExecutor = { - let queue = Rc::new( Queue { - is_running: Cell::new( false ), - queue: RefCell::new( VecDeque::with_capacity( INITIAL_QUEUE_CAPACITY ) ), - } ); - - let inner = { - let clone = queue.clone(); +impl EventLoopInner { + // Initializes the event loop. Only called once. + fn new() -> Self { + EventLoopInner { + microtask_queue: RefCell::new(VecDeque::with_capacity(INITIAL_QUEUE_CAPACITY)), + waker: js!( + var callback = @{|| EventLoop.drain()}; + var wrapper = function() { callback() }; + + // Modern browsers can use `MutationObserver` which allows + // us to schedule a micro-task without allocating a promise. + // https://dom.spec.whatwg.org/#notify-mutation-observers + if ( typeof MutationObserver === "function" ) { + var node = document.createTextNode( "0" ); + var state = false; + + new MutationObserver( wrapper ).observe( node, { characterData: true } ); + + function nextTick() { + state = !state; + node.data = ( state ? "1" : "0" ); + } + nextTick.drop = callback.drop; - // TODO is Null the fastest type for conversion from JS ? - let callback = move || { - loop { - let task = clone.queue.borrow_mut().pop_front(); + return nextTick; - if let Some( task ) = task { - task.poll(); + // Node.js and other environments + } else { + var promise = Promise.resolve( null ); - } else { - break; + function nextTick( value ) { + promise.then( wrapper ); } - } + nextTick.drop = callback.drop; - // This frees up the memory for the VecDeque - *clone.queue.borrow_mut() = VecDeque::with_capacity( INITIAL_QUEUE_CAPACITY ); - - clone.is_running.set( false ); - }; - - Inner { - queue: queue, - // This causes the callback to be pushed onto the microtask queue - microtask: js!( - var callback = @{callback}; - - // Modern browsers - // https://dom.spec.whatwg.org/#notify-mutation-observers - if ( typeof MutationObserver === "function" ) { - var node = document.createTextNode( "0" ); - var state = false; - - new MutationObserver( function ( changes, observer ) { - callback(); - } ).observe( node, { characterData: true } ); - - return { - callback: callback, - next_tick: function () { - state = !state; - node.data = ( state ? "1" : "0" ); - } - }; - - // Node.js and other environments - } else { - var promise = Promise.resolve( null ); - - // TODO what if the callback has been dropped ? - function next_tick( value ) { - callback(); - } - - return { - callback: callback, - next_tick: function () { - promise.then( next_tick ); - } - }; - } - ).try_into().unwrap(), - } - }; - - PromiseExecutor( Rc::new( inner ) ) - }; + return nextTick; + } + ).try_into().unwrap() + } + } + // Pushes a task onto the queue + fn push_task(&self, task: Rc< SpawnedTask >) { + let mut queue = self.microtask_queue.borrow_mut(); + queue.push_back(task); + + // If the queue was previously empty, then we need to schedule + // the queue to be drained. + if queue.len() == 1 { + self.wake(); + } + } + // Invoke the javascript waker function + fn wake(&self) { + js! { @(no_return) @{&self.waker}(); } + } + // Remove and return a task from the front of the queue + fn pop_task(&self) -> Option< Rc< SpawnedTask > > { + self.microtask_queue.borrow_mut().pop_front() + } + // Poll the queue until it is empty + fn drain(&self) { + while let Some(task) = self.pop_task() { + task.poll(); + } + } } -impl< F > Executor< F > for PromiseExecutor where +impl< F > Executor< F > for EventLoop where F: Future< Item = (), Error = () > + 'static { fn execute( &self, future: F ) -> StdResult< (), ExecuteError< F > > { - SpawnedTask::notify( SpawnedTask::new( future, self.0.clone() ) ); + SpawnedTask::notify( SpawnedTask::new( future ) ); Ok( () ) } } -impl Notify for Notifier { +impl Notify for EventLoop { fn notify( &self, spawned_id: usize ) { SpawnedTask::notify( unsafe { clone_raw( spawned_id as *const _ ) } ); } - // TODO does this cause memory unsafety ? fn clone_id( &self, id: usize ) -> usize { unsafe { Rc::into_raw( clone_raw( id as *const SpawnedTask ) ) as usize } } - // TODO does this cause memory unsafety ? fn drop_id( &self, id: usize ) { unsafe { Rc::from_raw( id as *const SpawnedTask ) }; } } - #[inline] pub fn spawn< F >( future: F ) where F: Future< Item = (), Error = () > + 'static { - EXECUTOR.with( |executor| { - executor.execute( future ).unwrap(); - } ); + EventLoop.execute( future ).unwrap(); } From b84b0db1e9c2bcd9f73d316442eb54d1946bbfcb Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Thu, 22 Mar 2018 02:11:50 +0000 Subject: [PATCH 2/8] Rename promise_executor => executor --- src/webcore/{promise_executor.rs => executor.rs} | 0 src/webcore/mod.rs | 2 +- src/webcore/promise_future.rs | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename src/webcore/{promise_executor.rs => executor.rs} (100%) diff --git a/src/webcore/promise_executor.rs b/src/webcore/executor.rs similarity index 100% rename from src/webcore/promise_executor.rs rename to src/webcore/executor.rs diff --git a/src/webcore/mod.rs b/src/webcore/mod.rs index 7088fb94..ed0fb73b 100644 --- a/src/webcore/mod.rs +++ b/src/webcore/mod.rs @@ -22,7 +22,7 @@ pub mod promise; pub mod promise_future; #[cfg(feature = "futures")] -pub mod promise_executor; +pub mod executor; #[cfg(feature = "nightly")] pub mod void { diff --git a/src/webcore/promise_future.rs b/src/webcore/promise_future.rs index e4ddb5b2..adf7d66c 100644 --- a/src/webcore/promise_future.rs +++ b/src/webcore/promise_future.rs @@ -4,7 +4,7 @@ use webcore::try_from::{TryInto, TryFrom}; use webapi::error; use futures::{Future, Poll, Async}; use futures::unsync::oneshot::Receiver; -use webcore::promise_executor::spawn; +use webcore::executor::spawn; use super::promise::Promise; From 342916733684db8f98f98c0d5cc32960ac519bda Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Sat, 24 Mar 2018 20:34:54 +0000 Subject: [PATCH 3/8] Review changes --- src/webcore/executor.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/webcore/executor.rs b/src/webcore/executor.rs index d3c6d4b4..d0d76bfd 100644 --- a/src/webcore/executor.rs +++ b/src/webcore/executor.rs @@ -62,7 +62,7 @@ impl SpawnedTask { } } -// A proxy for the javascript event loop. +// A proxy for the JavaScript event loop. struct EventLoop; // There's only one thread, but this lets us tell the compiler that we @@ -80,7 +80,7 @@ impl EventLoop { } } -// State relating to the javascript event loop. Only one instance ever exists. +// State relating to the JavaScript event loop. Only one instance ever exists. struct EventLoopInner { // Avoid unnecessary allocation and interop by keeping a local // queue of pending tasks. @@ -105,7 +105,9 @@ impl EventLoopInner { microtask_queue: RefCell::new(VecDeque::with_capacity(INITIAL_QUEUE_CAPACITY)), waker: js!( var callback = @{|| EventLoop.drain()}; - var wrapper = function() { callback() }; + var wrapper = function() { + if (!callback.dropped) { callback() } + }; // Modern browsers can use `MutationObserver` which allows // us to schedule a micro-task without allocating a promise. @@ -120,21 +122,22 @@ impl EventLoopInner { state = !state; node.data = ( state ? "1" : "0" ); } - nextTick.drop = callback.drop; - - return nextTick; // Node.js and other environments } else { var promise = Promise.resolve( null ); - function nextTick( value ) { + function nextTick() { promise.then( wrapper ); } - nextTick.drop = callback.drop; - - return nextTick; } + + nextTick.drop = function() { + callback.dropped = true; + callback.drop(); + }; + + return nextTick; ).try_into().unwrap() } } @@ -149,7 +152,7 @@ impl EventLoopInner { self.wake(); } } - // Invoke the javascript waker function + // Invoke the JavaScript waker function fn wake(&self) { js! { @(no_return) @{&self.waker}(); } } From 3cc0221a5eb84bc314cf30ee0a37996fce48d135 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Sat, 24 Mar 2018 23:40:29 +0000 Subject: [PATCH 4/8] Implement queue shrinking --- src/webcore/executor.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/webcore/executor.rs b/src/webcore/executor.rs index d0d76bfd..4b4173d5 100644 --- a/src/webcore/executor.rs +++ b/src/webcore/executor.rs @@ -1,3 +1,8 @@ +// This file implements a futures-compatible executor which schedules futures +// onto the JavaScript event loop. This implementation assumes there is a +// single thread and is *not* compatible with multiple WebAssembly workers sharing +// the same address space. + use futures::future::{Future, ExecuteError, Executor}; use futures::executor::{self, Notify, Spawn}; use futures::Async; @@ -9,7 +14,10 @@ use webcore::try_from::TryInto; use webcore::value::Reference; +// Initial capacity of the event queue const INITIAL_QUEUE_CAPACITY: usize = 10; +// Iterations to wait before allowing the queue to shrink +const QUEUE_SHRINK_DELAY: usize = 25; // This functionality should really be in libstd, because the implementation @@ -86,6 +94,7 @@ struct EventLoopInner { // queue of pending tasks. microtask_queue: RefCell< VecDeque< Rc< SpawnedTask > > >, waker: Reference, + shrink_counter: Cell } // Not strictly necessary, but may become relevant in the future @@ -138,7 +147,8 @@ impl EventLoopInner { }; return nextTick; - ).try_into().unwrap() + ).try_into().unwrap(), + shrink_counter: Cell::new(0) } } // Pushes a task onto the queue @@ -160,11 +170,30 @@ impl EventLoopInner { fn pop_task(&self) -> Option< Rc< SpawnedTask > > { self.microtask_queue.borrow_mut().pop_front() } + fn shrink_if_necessary(&self) { + let mut queue = self.microtask_queue.borrow_mut(); + // We consider shrinking the queue if it is less than + // half full... + if queue.len() <= queue.capacity() / 2 { + // ...and if it's been that way for at least + // `QUEUE_SHRINK_DELAY` iterations. + let shrink_counter = self.shrink_counter.get(); + if shrink_counter < QUEUE_SHRINK_DELAY { + self.shrink_counter.set(shrink_counter+1); + } else { + queue.shrink_to_fit(); + self.shrink_counter.set(0); + } + } else { + self.shrink_counter.set(0); + } + } // Poll the queue until it is empty fn drain(&self) { while let Some(task) = self.pop_task() { task.poll(); } + self.shrink_if_necessary(); } } From 0ce735a3fd701e25ba37b2348ef4767df14f958c Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Sun, 25 Mar 2018 19:30:10 +0100 Subject: [PATCH 5/8] Review changes --- src/webcore/executor.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/webcore/executor.rs b/src/webcore/executor.rs index 4b4173d5..443a330e 100644 --- a/src/webcore/executor.rs +++ b/src/webcore/executor.rs @@ -2,6 +2,9 @@ // onto the JavaScript event loop. This implementation assumes there is a // single thread and is *not* compatible with multiple WebAssembly workers sharing // the same address space. +// +// TODO: Implement support for multiple threads. This will require a mechanism to +// wake up another thread, such as the `postMessage` API. use futures::future::{Future, ExecuteError, Executor}; use futures::executor::{self, Notify, Spawn}; @@ -14,6 +17,7 @@ use webcore::try_from::TryInto; use webcore::value::Reference; +// TODO: Determine optimal values for these constants // Initial capacity of the event queue const INITIAL_QUEUE_CAPACITY: usize = 10; // Iterations to wait before allowing the queue to shrink @@ -117,6 +121,7 @@ impl EventLoopInner { var wrapper = function() { if (!callback.dropped) { callback() } }; + var nextTick; // Modern browsers can use `MutationObserver` which allows // us to schedule a micro-task without allocating a promise. @@ -127,18 +132,18 @@ impl EventLoopInner { new MutationObserver( wrapper ).observe( node, { characterData: true } ); - function nextTick() { + nextTick = function() { state = !state; node.data = ( state ? "1" : "0" ); - } + }; // Node.js and other environments } else { var promise = Promise.resolve( null ); - function nextTick() { + nextTick = function() { promise.then( wrapper ); - } + }; } nextTick.drop = function() { @@ -170,6 +175,7 @@ impl EventLoopInner { fn pop_task(&self) -> Option< Rc< SpawnedTask > > { self.microtask_queue.borrow_mut().pop_front() } + // Reclaim space from the queue if it's going to waste fn shrink_if_necessary(&self) { let mut queue = self.microtask_queue.borrow_mut(); // We consider shrinking the queue if it is less than @@ -179,7 +185,7 @@ impl EventLoopInner { // `QUEUE_SHRINK_DELAY` iterations. let shrink_counter = self.shrink_counter.get(); if shrink_counter < QUEUE_SHRINK_DELAY { - self.shrink_counter.set(shrink_counter+1); + self.shrink_counter.set(shrink_counter + 1); } else { queue.shrink_to_fit(); self.shrink_counter.set(0); @@ -190,10 +196,10 @@ impl EventLoopInner { } // Poll the queue until it is empty fn drain(&self) { + self.shrink_if_necessary(); while let Some(task) = self.pop_task() { task.poll(); } - self.shrink_if_necessary(); } } From 542d2f56a2ca903479d2fba19cb3d9c65834a462 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Mon, 26 Mar 2018 22:35:06 +0100 Subject: [PATCH 6/8] Don't resize queues smaller than the initial capacity --- src/webcore/executor.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/webcore/executor.rs b/src/webcore/executor.rs index 443a330e..dfebf82c 100644 --- a/src/webcore/executor.rs +++ b/src/webcore/executor.rs @@ -178,6 +178,10 @@ impl EventLoopInner { // Reclaim space from the queue if it's going to waste fn shrink_if_necessary(&self) { let mut queue = self.microtask_queue.borrow_mut(); + if queue.capacity() <= INITIAL_QUEUE_CAPACITY { + return; + } + // We consider shrinking the queue if it is less than // half full... if queue.len() <= queue.capacity() / 2 { From 288caccf93826a2a70f0095a28e589528a9793a5 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Tue, 27 Mar 2018 00:05:03 +0100 Subject: [PATCH 7/8] Review changes --- src/webcore/executor.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/webcore/executor.rs b/src/webcore/executor.rs index dfebf82c..f1c33521 100644 --- a/src/webcore/executor.rs +++ b/src/webcore/executor.rs @@ -13,6 +13,7 @@ use std::collections::VecDeque; use std::result::Result as StdResult; use std::cell::{Cell, RefCell}; use std::rc::Rc; +use std::cmp; use webcore::try_from::TryInto; use webcore::value::Reference; @@ -175,35 +176,40 @@ impl EventLoopInner { fn pop_task(&self) -> Option< Rc< SpawnedTask > > { self.microtask_queue.borrow_mut().pop_front() } - // Reclaim space from the queue if it's going to waste - fn shrink_if_necessary(&self) { - let mut queue = self.microtask_queue.borrow_mut(); - if queue.capacity() <= INITIAL_QUEUE_CAPACITY { - return; - } - + // See if it's worth trying to reclaim some space from the queue + fn estimate_realloc_capacity(&self) -> Option { + let queue = self.microtask_queue.borrow(); + let cap = queue.capacity(); // We consider shrinking the queue if it is less than // half full... - if queue.len() <= queue.capacity() / 2 { + if cap > queue.len()*2 && cap > INITIAL_QUEUE_CAPACITY { // ...and if it's been that way for at least // `QUEUE_SHRINK_DELAY` iterations. let shrink_counter = self.shrink_counter.get(); if shrink_counter < QUEUE_SHRINK_DELAY { self.shrink_counter.set(shrink_counter + 1); } else { - queue.shrink_to_fit(); self.shrink_counter.set(0); + return Some(cmp::max(queue.len(), INITIAL_QUEUE_CAPACITY)); } } else { self.shrink_counter.set(0); } + None } // Poll the queue until it is empty fn drain(&self) { - self.shrink_if_necessary(); + let maybe_realloc_capacity = self.estimate_realloc_capacity(); + + // Poll all the pending tasks while let Some(task) = self.pop_task() { task.poll(); } + + if let Some(realloc_capacity) = maybe_realloc_capacity { + // We decided to reclaim some space + *self.microtask_queue.borrow_mut() = VecDeque::with_capacity(realloc_capacity); + } } } From 0d28258a779d33ac451062dfe62141a63c063606 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Tue, 27 Mar 2018 00:49:12 +0100 Subject: [PATCH 8/8] Half the capacity before comparing it --- src/webcore/executor.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/webcore/executor.rs b/src/webcore/executor.rs index f1c33521..4ea66d24 100644 --- a/src/webcore/executor.rs +++ b/src/webcore/executor.rs @@ -179,10 +179,11 @@ impl EventLoopInner { // See if it's worth trying to reclaim some space from the queue fn estimate_realloc_capacity(&self) -> Option { let queue = self.microtask_queue.borrow(); - let cap = queue.capacity(); + // A VecDeque retains a `2^n-1` capacity + let half_cap = queue.capacity()/2; // We consider shrinking the queue if it is less than // half full... - if cap > queue.len()*2 && cap > INITIAL_QUEUE_CAPACITY { + if half_cap > queue.len() && half_cap > INITIAL_QUEUE_CAPACITY { // ...and if it's been that way for at least // `QUEUE_SHRINK_DELAY` iterations. let shrink_counter = self.shrink_counter.get();