/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using QuantConnect.Data;
using System.Collections;
using System.Collections.Generic;
using QuantConnect.Data.UniverseSelection;
namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators
{
///
/// Provides an implementation of
/// that aggregates an underlying into a single
/// data packet
///
public class BaseDataCollectionAggregatorEnumerator : IEnumerator
{
private bool _endOfStream;
private bool _needsMoveNext;
private bool _liveMode;
private readonly Symbol _symbol;
private readonly IEnumerator _enumerator;
///
/// Initializes a new instance of the class
/// This will aggregate instances emitted from the underlying enumerator and tag them with the
/// specified symbol
///
/// The underlying enumerator to aggregate
/// The symbol to place on the aggregated collection
/// True if running in live mode
public BaseDataCollectionAggregatorEnumerator(IEnumerator enumerator, Symbol symbol, bool liveMode = false)
{
_symbol = symbol;
_enumerator = enumerator;
_liveMode = liveMode;
_needsMoveNext = true;
}
///
/// Advances the enumerator to the next element of the collection.
///
///
/// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
///
/// The collection was modified after the enumerator was created. 2
public bool MoveNext()
{
if (_endOfStream)
{
return false;
}
BaseDataCollection collection = null;
while (true)
{
if (_needsMoveNext)
{
// move next if we dequeued the last item last time we were invoked
if (!_enumerator.MoveNext())
{
_endOfStream = true;
if (!IsValid(collection))
{
// we don't emit
collection = null;
}
break;
}
}
if (_enumerator.Current == null)
{
// the underlying returned null, stop here and start again on the next call
_needsMoveNext = true;
break;
}
if (collection == null)
{
// we have new data, set the collection's symbol/times
var current = _enumerator.Current;
collection = CreateCollection(_symbol, current.Time, current.EndTime);
}
if (collection.EndTime != _enumerator.Current.EndTime)
{
// the data from the underlying is at a different time, stop here
_needsMoveNext = false;
if (IsValid(collection))
{
// we emit
break;
}
// we try again
collection = null;
continue;
}
// this data belongs in this collection, keep going until null or bad time
Add(collection, _enumerator.Current);
_needsMoveNext = true;
}
Current = collection;
return _liveMode || collection != null;
}
///
/// Sets the enumerator to its initial position, which is before the first element in the collection.
///
/// The collection was modified after the enumerator was created. 2
public void Reset()
{
_enumerator.Reset();
}
///
/// Gets the element in the collection at the current position of the enumerator.
///
///
/// The element in the collection at the current position of the enumerator.
///
public BaseDataCollection Current
{
get; private set;
}
///
/// Gets the current element in the collection.
///
///
/// The current element in the collection.
///
/// 2
object IEnumerator.Current
{
get { return Current; }
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
/// 2
public void Dispose()
{
_enumerator.Dispose();
}
///
/// Creates a new, empty .
///
/// The base data collection symbol
/// The start time of the collection
/// The end time of the collection
/// A new, empty
private BaseDataCollection CreateCollection(Symbol symbol, DateTime time, DateTime endTime)
{
return new BaseDataCollection
{
Symbol = symbol,
Time = time,
EndTime = endTime
};
}
///
/// Adds the specified instance of to the current collection
///
/// The collection to be added to
/// The data to be added
private void Add(BaseDataCollection collection, BaseData current)
{
var baseDataCollection = current as BaseDataCollection;
if (_symbol.HasUnderlying && _symbol.Underlying == current.Symbol)
{
// if the underlying has been aggregated, even if it shouldn't need to be, let's handle it nicely
if (baseDataCollection != null)
{
collection.Underlying = baseDataCollection.Data[0];
}
else
{
collection.Underlying = current;
}
}
else
{
if (baseDataCollection != null)
{
// datapoint is already aggregated, let's see if it's a single point or a collection we can use already
if(baseDataCollection.Data.Count > 1)
{
collection.Data = baseDataCollection.Data;
}
else
{
collection.Data.Add(baseDataCollection.Data[0]);
}
// Let's keep the underlying in case it's already there
collection.Underlying ??= baseDataCollection.Underlying;
}
else
{
collection.Data.Add(current);
}
}
}
///
/// Determines if a given data point is valid and can be emitted
///
/// The collection to be emitted
/// True if its a valid data point
private static bool IsValid(BaseDataCollection collection)
{
return collection != null && collection.Data?.Count > 0;
}
}
}