/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using QuantConnect.Data; using System.Collections; using System.Collections.Generic; using QuantConnect.Data.UniverseSelection; namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators { /// /// Provides an implementation of /// that aggregates an underlying into a single /// data packet /// public class BaseDataCollectionAggregatorEnumerator : IEnumerator { private bool _endOfStream; private bool _needsMoveNext; private bool _liveMode; private readonly Symbol _symbol; private readonly IEnumerator _enumerator; /// /// Initializes a new instance of the class /// This will aggregate instances emitted from the underlying enumerator and tag them with the /// specified symbol /// /// The underlying enumerator to aggregate /// The symbol to place on the aggregated collection /// True if running in live mode public BaseDataCollectionAggregatorEnumerator(IEnumerator enumerator, Symbol symbol, bool liveMode = false) { _symbol = symbol; _enumerator = enumerator; _liveMode = liveMode; _needsMoveNext = true; } /// /// Advances the enumerator to the next element of the collection. /// /// /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection. /// /// The collection was modified after the enumerator was created. 2 public bool MoveNext() { if (_endOfStream) { return false; } BaseDataCollection collection = null; while (true) { if (_needsMoveNext) { // move next if we dequeued the last item last time we were invoked if (!_enumerator.MoveNext()) { _endOfStream = true; if (!IsValid(collection)) { // we don't emit collection = null; } break; } } if (_enumerator.Current == null) { // the underlying returned null, stop here and start again on the next call _needsMoveNext = true; break; } if (collection == null) { // we have new data, set the collection's symbol/times var current = _enumerator.Current; collection = CreateCollection(_symbol, current.Time, current.EndTime); } if (collection.EndTime != _enumerator.Current.EndTime) { // the data from the underlying is at a different time, stop here _needsMoveNext = false; if (IsValid(collection)) { // we emit break; } // we try again collection = null; continue; } // this data belongs in this collection, keep going until null or bad time Add(collection, _enumerator.Current); _needsMoveNext = true; } Current = collection; return _liveMode || collection != null; } /// /// Sets the enumerator to its initial position, which is before the first element in the collection. /// /// The collection was modified after the enumerator was created. 2 public void Reset() { _enumerator.Reset(); } /// /// Gets the element in the collection at the current position of the enumerator. /// /// /// The element in the collection at the current position of the enumerator. /// public BaseDataCollection Current { get; private set; } /// /// Gets the current element in the collection. /// /// /// The current element in the collection. /// /// 2 object IEnumerator.Current { get { return Current; } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// /// 2 public void Dispose() { _enumerator.Dispose(); } /// /// Creates a new, empty . /// /// The base data collection symbol /// The start time of the collection /// The end time of the collection /// A new, empty private BaseDataCollection CreateCollection(Symbol symbol, DateTime time, DateTime endTime) { return new BaseDataCollection { Symbol = symbol, Time = time, EndTime = endTime }; } /// /// Adds the specified instance of to the current collection /// /// The collection to be added to /// The data to be added private void Add(BaseDataCollection collection, BaseData current) { var baseDataCollection = current as BaseDataCollection; if (_symbol.HasUnderlying && _symbol.Underlying == current.Symbol) { // if the underlying has been aggregated, even if it shouldn't need to be, let's handle it nicely if (baseDataCollection != null) { collection.Underlying = baseDataCollection.Data[0]; } else { collection.Underlying = current; } } else { if (baseDataCollection != null) { // datapoint is already aggregated, let's see if it's a single point or a collection we can use already if(baseDataCollection.Data.Count > 1) { collection.Data = baseDataCollection.Data; } else { collection.Data.Add(baseDataCollection.Data[0]); } // Let's keep the underlying in case it's already there collection.Underlying ??= baseDataCollection.Underlying; } else { collection.Data.Add(current); } } } /// /// Determines if a given data point is valid and can be emitted /// /// The collection to be emitted /// True if its a valid data point private static bool IsValid(BaseDataCollection collection) { return collection != null && collection.Data?.Count > 0; } } }