/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using QuantConnect.Interfaces;
namespace QuantConnect.Util
{
///
/// A non blocking implementation
///
/// The item type being processed
public class BusyCollection : IBusyCollection
{
private readonly ConcurrentQueue _collection = new ConcurrentQueue();
private readonly ManualResetEventSlim _processingCompletedEvent = new ManualResetEventSlim(true);
private bool _completedAdding;
///
/// Gets a wait handle that can be used to wait until this instance is done
/// processing all of it's item
///
public WaitHandle WaitHandle => _processingCompletedEvent.WaitHandle;
///
/// Gets the number of items held within this collection
///
public int Count => _collection.Count;
///
/// Returns true if processing, false otherwise
///
public bool IsBusy => !_collection.IsEmpty || !_processingCompletedEvent.IsSet;
///
/// Adds the items to this collection
///
/// The item to be added
public void Add(T item)
{
Add(item, CancellationToken.None);
}
///
/// Adds the items to this collection
///
/// The item to be added
/// A cancellation token to observer
public void Add(T item, CancellationToken cancellationToken)
{
if (_completedAdding)
{
throw new InvalidOperationException("Collection has already been marked as not " +
$"accepting more additions, see {nameof(CompleteAdding)}");
}
// locking to avoid race condition with GetConsumingEnumerable()
lock (_processingCompletedEvent)
{
// we're adding work to be done, mark us as busy
_processingCompletedEvent.Reset();
_collection.Enqueue(item);
}
}
///
/// Provides a consuming enumerable for items in this collection.
///
/// An enumerable that removes and returns items from the collection
public IEnumerable GetConsumingEnumerable()
{
return GetConsumingEnumerable(CancellationToken.None);
}
///
/// Provides a consuming enumerable for items in this collection.
///
/// A cancellation token to observer
/// An enumerable that removes and returns items from the collection
public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken)
{
T item;
while (_collection.TryDequeue(out item))
{
yield return item;
}
// locking to avoid race condition with Add()
lock (_processingCompletedEvent)
{
if (!_collection.TryPeek(out item))
{
_processingCompletedEvent.Set();
}
}
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
/// 2
public void Dispose()
{
_collection.Clear();
_processingCompletedEvent.Dispose();
}
///
/// Marks the collection as not accepting any more additions
///
public void CompleteAdding()
{
_completedAdding = true;
}
}
}