/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using QuantConnect.Interfaces;
namespace QuantConnect.Util
{
///
/// A small wrapper around used to communicate busy state of the items
/// being processed
///
/// The item type being processed
public class BusyBlockingCollection : IBusyCollection
{
private readonly BlockingCollection _collection;
private readonly ManualResetEventSlim _processingCompletedEvent;
private readonly object _lock = new object();
///
/// Gets a wait handle that can be used to wait until this instance is done
/// processing all of it's item
///
public WaitHandle WaitHandle
{
get { return _processingCompletedEvent.WaitHandle; }
}
///
/// Gets the number of items held within this collection
///
public int Count
{
get { return _collection.Count; }
}
///
/// Returns true if processing, false otherwise
///
public bool IsBusy
{
get
{
lock (_lock)
{
return _collection.Count > 0 || !_processingCompletedEvent.IsSet;
}
}
}
///
/// Initializes a new instance of the class
/// with a bounded capacity of
///
public BusyBlockingCollection()
: this(int.MaxValue)
{
}
///
/// Initializes a new instance of the class
/// with the specified
///
/// The maximum number of items allowed in the collection
public BusyBlockingCollection(int boundedCapacity)
{
_collection = new BlockingCollection(boundedCapacity);
// initialize as not busy
_processingCompletedEvent = new ManualResetEventSlim(true);
}
///
/// Adds the items to this collection
///
/// The item to be added
public void Add(T item)
{
Add(item, CancellationToken.None);
}
///
/// Adds the items to this collection
///
/// The item to be added
/// A cancellation token to observer
public void Add(T item, CancellationToken cancellationToken)
{
bool added;
lock (_lock)
{
// we're adding work to be done, mark us as busy
_processingCompletedEvent.Reset();
added = _collection.TryAdd(item, 0, cancellationToken);
}
if (!added)
{
_collection.Add(item, cancellationToken);
}
}
///
/// Marks the as not accepting any more additions
///
public void CompleteAdding()
{
_collection.CompleteAdding();
}
///
/// Provides a consuming enumerable for items in this collection.
///
/// An enumerable that removes and returns items from the collection
public IEnumerable GetConsumingEnumerable()
{
return GetConsumingEnumerable(CancellationToken.None);
}
///
/// Provides a consuming enumerable for items in this collection.
///
/// A cancellation token to observer
/// An enumerable that removes and returns items from the collection
public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken)
{
while (!_collection.IsCompleted)
{
T item;
// check to see if something is immediately available
bool tookItem;
try
{
tookItem = _collection.TryTake(out item, 0, cancellationToken);
}
catch (OperationCanceledException)
{
// if the operation was canceled, just bail on the enumeration
yield break;
}
if (tookItem)
{
// something was immediately available, emit it
yield return item;
continue;
}
// we need to lock this with the Add method since we need to model the act of
// taking/flipping the switch and adding/flipping the switch as one operation
lock (_lock)
{
// double check that there's nothing in the collection within a lock, it's possible
// that between the TryTake above and this statement, the Add method was called, so we
// don't want to flip the switch if there's something in the collection
if (_collection.Count == 0)
{
// nothing was immediately available, mark us as idle
_processingCompletedEvent.Set();
}
}
try
{
// now block until something is available
tookItem = _collection.TryTake(out item, Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
// if the operation was canceled, just bail on the enumeration
yield break;
}
if (tookItem)
{
// emit the item we found
yield return item;
}
}
// no more items to process
_processingCompletedEvent.Set();
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
/// 2
public void Dispose()
{
_collection.Dispose();
_processingCompletedEvent.Dispose();
}
}
}