/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections; using System.Collections.Generic; using System.Linq; using QuantConnect.Data; namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators { /// /// Represents an enumerator capable of synchronizing other enumerators of type T in time. /// This assumes that all enumerators have data time stamped in the same time zone /// public abstract class SynchronizingEnumerator : IEnumerator { private IEnumerator _syncer; private readonly IEnumerator[] _enumerators; /// /// Gets the Timestamp for the data /// protected abstract DateTime GetInstanceTime(T instance); /// /// Gets the element in the collection at the current position of the enumerator. /// /// /// The element in the collection at the current position of the enumerator. /// public T Current { get; private set; } /// /// Gets the current element in the collection. /// /// /// The current element in the collection. /// object IEnumerator.Current { get { return Current; } } /// /// Initializes a new instance of the class /// /// The enumerators to be synchronized. NOTE: Assumes the same time zone for all data /// The type of data we want, for example, or , ect... protected SynchronizingEnumerator(params IEnumerator[] enumerators) : this ((IEnumerable>)enumerators) { } /// /// Initializes a new instance of the class /// /// The enumerators to be synchronized. NOTE: Assumes the same time zone for all data /// The type of data we want, for example, or , ect... protected SynchronizingEnumerator(IEnumerable> enumerators) { _enumerators = enumerators.ToArray(); _syncer = GetSynchronizedEnumerator(_enumerators); } /// /// Advances the enumerator to the next element of the collection. /// /// /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection. /// /// The collection was modified after the enumerator was created. public bool MoveNext() { var moveNext = _syncer.MoveNext(); Current = moveNext ? _syncer.Current : default(T); return moveNext; } /// /// Sets the enumerator to its initial position, which is before the first element in the collection. /// /// The collection was modified after the enumerator was created. public void Reset() { foreach (var enumerator in _enumerators) { enumerator.Reset(); } // don't call syncer.reset since the impl will just throw _syncer = GetSynchronizedEnumerator(_enumerators); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { foreach (var enumerator in _enumerators) { enumerator.Dispose(); } _syncer.Dispose(); } /// /// Synchronization system for the enumerator: /// /// /// private IEnumerator GetSynchronizedEnumerator(IEnumerator[] enumerators) { return GetBruteForceMethod(enumerators); } /// /// Brute force implementation for synchronizing the enumerator. /// Will remove enumerators returning false to the call to MoveNext. /// Will not remove enumerators with Current Null returning true to the call to MoveNext /// private IEnumerator GetBruteForceMethod(IEnumerator[] enumerators) { var ticks = DateTime.MaxValue.Ticks; var collection = new HashSet>(); foreach (var enumerator in enumerators) { if (enumerator.MoveNext()) { if (enumerator.Current != null) { ticks = Math.Min(ticks, GetInstanceTime(enumerator.Current).Ticks); } collection.Add(enumerator); } else { enumerator.Dispose(); } } var frontier = new DateTime(ticks); var toRemove = new List>(); while (collection.Count > 0) { var nextFrontierTicks = DateTime.MaxValue.Ticks; foreach (var enumerator in collection) { while (enumerator.Current == null || GetInstanceTime(enumerator.Current) <= frontier) { if (enumerator.Current != null) { yield return enumerator.Current; } if (!enumerator.MoveNext()) { toRemove.Add(enumerator); break; } if (enumerator.Current == null) { break; } } if (enumerator.Current != null) { nextFrontierTicks = Math.Min(nextFrontierTicks, GetInstanceTime(enumerator.Current).Ticks); } } if (toRemove.Count > 0) { foreach (var enumerator in toRemove) { collection.Remove(enumerator); } toRemove.Clear(); } frontier = new DateTime(nextFrontierTicks); if (frontier == DateTime.MaxValue) { break; } } } } }