using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace Wox.Helper { /// /// Provides a caller-friendly wrapper around parallel actions. /// http://stackoverflow.com/a/540380 /// public sealed class Forker { int running; private readonly object joinLock = new object(), eventLock = new object(); /// Raised when all operations have completed. public event EventHandler AllComplete { add { lock (eventLock) { allComplete += value; } } remove { lock (eventLock) { allComplete -= value; } } } private EventHandler allComplete; /// Raised when each operation completes. public event EventHandler ItemComplete { add { lock (eventLock) { itemComplete += value; } } remove { lock (eventLock) { itemComplete -= value; } } } private EventHandler itemComplete; private void OnItemComplete(object state, Exception exception) { EventHandler itemHandler = itemComplete; // don't need to lock if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); if (Interlocked.Decrement(ref running) == 0) { EventHandler allHandler = allComplete; // don't need to lock if (allHandler != null) allHandler(this, EventArgs.Empty); lock (joinLock) { Monitor.PulseAll(joinLock); } } } /// Adds a callback to invoke when each operation completes. /// Current instance (for fluent API). public Forker OnItemComplete(EventHandler handler) { if (handler == null) throw new ArgumentNullException("handler"); ItemComplete += handler; return this; } /// Adds a callback to invoke when all operations are complete. /// Current instance (for fluent API). public Forker OnAllComplete(EventHandler handler) { if (handler == null) throw new ArgumentNullException("handler"); AllComplete += handler; return this; } /// Waits for all operations to complete. public void Join() { Join(-1); } /// Waits (with timeout) for all operations to complete. /// Whether all operations had completed before the timeout. public bool Join(int millisecondsTimeout) { lock (joinLock) { if (CountRunning() == 0) return true; Thread.SpinWait(1); // try our luck... return (CountRunning() == 0) || Monitor.Wait(joinLock, millisecondsTimeout); } } /// Indicates the number of incomplete operations. /// The number of incomplete operations. public int CountRunning() { return Interlocked.CompareExchange(ref running, 0, 0); } /// Enqueues an operation. /// The operation to perform. /// The current instance (for fluent API). public Forker Fork(ThreadStart action) { return Fork(action, null); } /// Enqueues an operation. /// The operation to perform. /// An opaque object, allowing the caller to identify operations. /// The current instance (for fluent API). public Forker Fork(ThreadStart action, object state) { if (action == null) throw new ArgumentNullException("action"); Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { Exception exception = null; try { action(); } catch (Exception ex) { exception = ex; } OnItemComplete(state, exception); }); return this; } /// Event arguments representing the completion of a parallel action. public class ParallelEventArgs : EventArgs { private readonly object state; private readonly Exception exception; internal ParallelEventArgs(object state, Exception exception) { this.state = state; this.exception = exception; } /// The opaque state object that identifies the action (null otherwise). public object State { get { return state; } } /// The exception thrown by the parallel action, or null if it completed without exception. public Exception Exception { get { return exception; } } } } }