/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using NodaTime; using QuantConnect.Data; using QuantConnect.Data.Consolidators; using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators { /// /// An implementation of that relies on "consolidated" data /// /// The item type yielded by the enumerator public class ScannableEnumerator : IEnumerator where T : class, IBaseData { private T _current; private bool _consolidated; private bool _isPeriodBase; private bool _validateInputType; private Type _consolidatorInputType; private readonly DateTimeZone _timeZone; private readonly ConcurrentQueue _queue; private readonly ITimeProvider _timeProvider; private readonly EventHandler _newDataAvailableHandler; private readonly IDataConsolidator _consolidator; /// /// Gets the element in the collection at the current position of the enumerator. /// /// /// The element in the collection at the current position of the enumerator. /// public T Current => _current; /// /// Gets the current element in the collection. /// /// /// The current element in the collection. /// /// 2 object IEnumerator.Current => Current; /// /// Initializes a new instance of the class /// /// Consolidator taking BaseData updates and firing events containing new 'consolidated' data /// The time zone the raw data is time stamped in /// The time provider instance used to determine when bars are completed and can be emitted /// The event handler for a new available data point /// The consolidator is period based, this will enable scanning on public ScannableEnumerator(IDataConsolidator consolidator, DateTimeZone timeZone, ITimeProvider timeProvider, EventHandler newDataAvailableHandler, bool isPeriodBased = true) { _timeZone = timeZone; _timeProvider = timeProvider; _consolidator = consolidator; _isPeriodBase = isPeriodBased; _queue = new ConcurrentQueue(); _consolidatorInputType = consolidator.InputType; _validateInputType = _consolidatorInputType != typeof(BaseData); _newDataAvailableHandler = newDataAvailableHandler ?? ((s, e) => { }); _consolidator.DataConsolidated += DataConsolidatedHandler; } /// /// Updates the consolidator /// /// The data to consolidate public void Update(T data) { // if the input type of the consolidator isn't generic we validate it's correct before sending it in if (_validateInputType && data.GetType() != _consolidatorInputType) { return; } if (_isPeriodBase) { // we only need to lock if it's period base since the move next call could trigger a scan lock (_consolidator) { _consolidator.Update(data); } } else { _consolidator.Update(data); } } /// /// Enqueues the new data into this enumerator /// /// The data to be enqueued private void Enqueue(T data) { _queue.Enqueue(data); } /// /// Advances the enumerator to the next element of the collection. /// /// /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection. /// /// The collection was modified after the enumerator was created. 2 public bool MoveNext() { if (!_queue.TryDequeue(out _current) && _isPeriodBase) { _consolidated = false; lock (_consolidator) { // if there is a working bar we will try to pull it out if the time is right, each consolidator knows when it's right var localTime = _timeProvider.GetUtcNow().ConvertFromUtc(_timeZone); _consolidator.Scan(localTime); } if (_consolidated) { _queue.TryDequeue(out _current); } } // even if we don't have data to return, we haven't technically // passed the end of the collection, so always return true until // the enumerator is explicitly disposed or ended return true; } /// /// Sets the enumerator to its initial position, which is before the first element in the collection. /// /// The collection was modified after the enumerator was created. 2 public void Reset() { } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// /// 2 public void Dispose() { _consolidator.DataConsolidated -= DataConsolidatedHandler; } private void DataConsolidatedHandler(object sender, IBaseData data) { var dataPoint = data as T; _consolidated = true; Enqueue(dataPoint); _newDataAvailableHandler(sender, new NewDataAvailableEventArgs { DataPoint = dataPoint }); } } }