/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using QuantConnect.Data;
namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators
{
///
/// Represents an enumerator capable of synchronizing other enumerators of type T in time.
/// This assumes that all enumerators have data time stamped in the same time zone
///
public abstract class SynchronizingEnumerator : IEnumerator
{
private IEnumerator _syncer;
private readonly IEnumerator[] _enumerators;
///
/// Gets the Timestamp for the data
///
protected abstract DateTime GetInstanceTime(T instance);
///
/// Gets the element in the collection at the current position of the enumerator.
///
///
/// The element in the collection at the current position of the enumerator.
///
public T Current
{
get; private set;
}
///
/// Gets the current element in the collection.
///
///
/// The current element in the collection.
///
object IEnumerator.Current
{
get { return Current; }
}
///
/// Initializes a new instance of the class
///
/// The enumerators to be synchronized. NOTE: Assumes the same time zone for all data
/// The type of data we want, for example, or , ect...
protected SynchronizingEnumerator(params IEnumerator[] enumerators)
: this ((IEnumerable>)enumerators)
{
}
///
/// Initializes a new instance of the class
///
/// The enumerators to be synchronized. NOTE: Assumes the same time zone for all data
/// The type of data we want, for example, or , ect...
protected SynchronizingEnumerator(IEnumerable> enumerators)
{
_enumerators = enumerators.ToArray();
_syncer = GetSynchronizedEnumerator(_enumerators);
}
///
/// Advances the enumerator to the next element of the collection.
///
///
/// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
///
/// The collection was modified after the enumerator was created.
public bool MoveNext()
{
var moveNext = _syncer.MoveNext();
Current = moveNext ? _syncer.Current : default(T);
return moveNext;
}
///
/// Sets the enumerator to its initial position, which is before the first element in the collection.
///
/// The collection was modified after the enumerator was created.
public void Reset()
{
foreach (var enumerator in _enumerators)
{
enumerator.Reset();
}
// don't call syncer.reset since the impl will just throw
_syncer = GetSynchronizedEnumerator(_enumerators);
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
_syncer.Dispose();
}
///
/// Synchronization system for the enumerator:
///
///
///
private IEnumerator GetSynchronizedEnumerator(IEnumerator[] enumerators)
{
return GetBruteForceMethod(enumerators);
}
///
/// Brute force implementation for synchronizing the enumerator.
/// Will remove enumerators returning false to the call to MoveNext.
/// Will not remove enumerators with Current Null returning true to the call to MoveNext
///
private IEnumerator GetBruteForceMethod(IEnumerator[] enumerators)
{
var ticks = DateTime.MaxValue.Ticks;
var collection = new HashSet>();
foreach (var enumerator in enumerators)
{
if (enumerator.MoveNext())
{
if (enumerator.Current != null)
{
ticks = Math.Min(ticks, GetInstanceTime(enumerator.Current).Ticks);
}
collection.Add(enumerator);
}
else
{
enumerator.Dispose();
}
}
var frontier = new DateTime(ticks);
var toRemove = new List>();
while (collection.Count > 0)
{
var nextFrontierTicks = DateTime.MaxValue.Ticks;
foreach (var enumerator in collection)
{
while (enumerator.Current == null || GetInstanceTime(enumerator.Current) <= frontier)
{
if (enumerator.Current != null)
{
yield return enumerator.Current;
}
if (!enumerator.MoveNext())
{
toRemove.Add(enumerator);
break;
}
if (enumerator.Current == null)
{
break;
}
}
if (enumerator.Current != null)
{
nextFrontierTicks = Math.Min(nextFrontierTicks, GetInstanceTime(enumerator.Current).Ticks);
}
}
if (toRemove.Count > 0)
{
foreach (var enumerator in toRemove)
{
collection.Remove(enumerator);
}
toRemove.Clear();
}
frontier = new DateTime(nextFrontierTicks);
if (frontier == DateTime.MaxValue)
{
break;
}
}
}
}
}