Breaking News

Saturday, May 29, 2010

Custom Thread Pool: Multithreading in a loop

Should you come across a scenario where you want spawn threads from within a loop, you can have a quick two options:

Option 1: Use ThreadPool.QueueUserWorkItem

Depending on the size of the job to be processed, I always admire the ThreadPool.QueueUserWorkItem; this creates a handy pool of threads, and execute the process upon request whenever a thread is idle'ly availble in the pool.


using System;
using System.Threading;
public class Example {
    public static void Main() {
        // Queue the task.
        ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadProc));

        Console.WriteLine("Main thread does some work, then sleeps.");
        // If you comment out the Sleep, the main thread exits before
        // the thread pool task runs.  The thread pool uses background
        // threads, which do not keep the application running.  (This
        // is a simple example of a race condition.)
        Thread.Sleep(1000);

        Console.WriteLine("Main thread exits.");
    }

    // This thread procedure performs the task.
    static void ThreadProc(Object stateInfo) {
        // No state object was passed to QueueUserWorkItem, so 
        // stateInfo is null.
        Console.WriteLine("Hello from the thread pool.");
    }
}


Option 2: Implement a custom ThreadPool using BackgroundWorker, incase you are hating ThreadPool for whatever reason.

The main worker object.

/// 
/// The core entity that handles
/// 
public class CWorkers
{
public int _nIndex { get; private set; }
public BackgroundWorker bgWorker { get; private set; } //the main "culprit"

public CWorkers(int nIndex)
{
_nIndex = nIndex;
bgWorker = new BackgroundWorker();
}
}
The manager class that manages the worker thread.

/// 
/// Manages the worker thread.
/// 
public class CWorkerManager
{
    private List _lstWorkers;//list of worker threads
    private const int MAXWORKERS = 5;//Max workers you want; change/update or pull it from app.config.

    public CWorkerManager()
    {
        Initialize();
    }

    /// 
    /// Initializes the thread pool - sorta customized threadpool
    /// 
    private void Initialize()
    {
        _lstWorkers = new List();//initialize the list

        for (int i = 0; i < MAXWORKERS; i++)
        {
            _lstWorkers.Add(CreateAWorker(i)); //inits a worker objects and adds to list
        }
    }

    /// 
    /// Looks for a free thread
    /// 
    /// Returns the thread if found, else nothing.
    public CWorkers RequestForWorker()
    {
        foreach (var theWorker in _lstWorkers)
        {
            if (!theWorker.bgWorker.IsBusy)
                return theWorker;
        }

        return null;
    }

    /// 
    /// Emulate the BCL's .WaitOne()
    /// 
    public void WaitAndSignalWhenFree()
    {
        while (true)
        {
            //Loop through the list to find an idle thread
            foreach (var theWorker in _lstWorkers)
            {
                if (!theWorker.bgWorker.IsBusy)
                    return;
            }
            Thread.Sleep(1);//This may be a hack; not really recommended as a production code.
        }
    }

    /// 
    /// Inits a CWorker object; adds the 
    /// 
    /// 
/// 
    private static CWorkers CreateAWorker(int nIndex)
    {
        var theWorker = new CWorkers(nIndex);

        theWorker.bgWorker.DoWork += (sender, e) => ((Action)e.Argument).Invoke();
        theWorker.bgWorker.RunWorkerCompleted += (sender, e) => Console.WriteLine("Finished worker number:[" + theWorker._nIndex + "]");

        return theWorker;
    }
}

The test program:

class Program
{
    private static List _lstWorkers;
    private const int MAXWORKERS = 5;
    static CWorkerManager theManager;

    static void Main(string[] args)
    {
        theManager = new CWorkerManager();

        ProcessJobs(20000);
        Console.ReadKey();
    }

    /// 
    /// Simulator that request the Manager for worker threads
    /// 
    /// 
private static void ProcessJobs(int nMaxTime)
    {
        Random rndRandom = new Random();
        DateTime dteStart = DateTime.Now;

        //Run till the max time.
        while (DateTime.Now - dteStart < TimeSpan.FromMilliseconds(nMaxTime))
        {
            var theWorker = theManager.RequestForWorker();//Request for a worker

            if (theWorker != null)
            {
                theWorker.bgWorker.RunWorkerAsync(new Action(() =>
                                    ProcessThis(theWorker._nIndex,
                                        rndRandom.Next(1500, 2500)//Generate somethign random
                                        )));
            }
            else
            {
                Console.WriteLine("All busy, lets wait...");
                theManager.WaitAndSignalWhenFree();
            }
        }
    }

    /// 
    /// Actual method that processes the job.
    /// 
    /// 
/// 
static void ProcessThis(int nIndex, int nTimeout)
    {
        Console.WriteLine("Worker {1} starts to work for {0} ms", nTimeout, nIndex);
        Thread.Sleep(nTimeout);
    }
}

Output:


Happy threading.

No comments:

Post a Comment

Designed By Published.. Blogger Templates