/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using QuantConnect.Interfaces; namespace QuantConnect.Util { /// /// A non blocking implementation /// /// The item type being processed public class BusyCollection : IBusyCollection { private readonly ConcurrentQueue _collection = new ConcurrentQueue(); private readonly ManualResetEventSlim _processingCompletedEvent = new ManualResetEventSlim(true); private bool _completedAdding; /// /// Gets a wait handle that can be used to wait until this instance is done /// processing all of it's item /// public WaitHandle WaitHandle => _processingCompletedEvent.WaitHandle; /// /// Gets the number of items held within this collection /// public int Count => _collection.Count; /// /// Returns true if processing, false otherwise /// public bool IsBusy => !_collection.IsEmpty || !_processingCompletedEvent.IsSet; /// /// Adds the items to this collection /// /// The item to be added public void Add(T item) { Add(item, CancellationToken.None); } /// /// Adds the items to this collection /// /// The item to be added /// A cancellation token to observer public void Add(T item, CancellationToken cancellationToken) { if (_completedAdding) { throw new InvalidOperationException("Collection has already been marked as not " + $"accepting more additions, see {nameof(CompleteAdding)}"); } // locking to avoid race condition with GetConsumingEnumerable() lock (_processingCompletedEvent) { // we're adding work to be done, mark us as busy _processingCompletedEvent.Reset(); _collection.Enqueue(item); } } /// /// Provides a consuming enumerable for items in this collection. /// /// An enumerable that removes and returns items from the collection public IEnumerable GetConsumingEnumerable() { return GetConsumingEnumerable(CancellationToken.None); } /// /// Provides a consuming enumerable for items in this collection. /// /// A cancellation token to observer /// An enumerable that removes and returns items from the collection public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken) { T item; while (_collection.TryDequeue(out item)) { yield return item; } // locking to avoid race condition with Add() lock (_processingCompletedEvent) { if (!_collection.TryPeek(out item)) { _processingCompletedEvent.Set(); } } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// /// 2 public void Dispose() { _collection.Clear(); _processingCompletedEvent.Dispose(); } /// /// Marks the collection as not accepting any more additions /// public void CompleteAdding() { _completedAdding = true; } } }