/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using NodaTime;
using QuantConnect.Data;
using QuantConnect.Data.Consolidators;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators
{
///
/// An implementation of that relies on "consolidated" data
///
/// The item type yielded by the enumerator
public class ScannableEnumerator : IEnumerator where T : class, IBaseData
{
private T _current;
private bool _consolidated;
private bool _isPeriodBase;
private bool _validateInputType;
private Type _consolidatorInputType;
private readonly DateTimeZone _timeZone;
private readonly ConcurrentQueue _queue;
private readonly ITimeProvider _timeProvider;
private readonly EventHandler _newDataAvailableHandler;
private readonly IDataConsolidator _consolidator;
///
/// Gets the element in the collection at the current position of the enumerator.
///
///
/// The element in the collection at the current position of the enumerator.
///
public T Current => _current;
///
/// Gets the current element in the collection.
///
///
/// The current element in the collection.
///
/// 2
object IEnumerator.Current => Current;
///
/// Initializes a new instance of the class
///
/// Consolidator taking BaseData updates and firing events containing new 'consolidated' data
/// The time zone the raw data is time stamped in
/// The time provider instance used to determine when bars are completed and can be emitted
/// The event handler for a new available data point
/// The consolidator is period based, this will enable scanning on
public ScannableEnumerator(IDataConsolidator consolidator, DateTimeZone timeZone, ITimeProvider timeProvider, EventHandler newDataAvailableHandler, bool isPeriodBased = true)
{
_timeZone = timeZone;
_timeProvider = timeProvider;
_consolidator = consolidator;
_isPeriodBase = isPeriodBased;
_queue = new ConcurrentQueue();
_consolidatorInputType = consolidator.InputType;
_validateInputType = _consolidatorInputType != typeof(BaseData);
_newDataAvailableHandler = newDataAvailableHandler ?? ((s, e) => { });
_consolidator.DataConsolidated += DataConsolidatedHandler;
}
///
/// Updates the consolidator
///
/// The data to consolidate
public void Update(T data)
{
// if the input type of the consolidator isn't generic we validate it's correct before sending it in
if (_validateInputType && data.GetType() != _consolidatorInputType)
{
return;
}
if (_isPeriodBase)
{
// we only need to lock if it's period base since the move next call could trigger a scan
lock (_consolidator)
{
_consolidator.Update(data);
}
}
else
{
_consolidator.Update(data);
}
}
///
/// Enqueues the new data into this enumerator
///
/// The data to be enqueued
private void Enqueue(T data)
{
_queue.Enqueue(data);
}
///
/// Advances the enumerator to the next element of the collection.
///
///
/// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
///
/// The collection was modified after the enumerator was created. 2
public bool MoveNext()
{
if (!_queue.TryDequeue(out _current) && _isPeriodBase)
{
_consolidated = false;
lock (_consolidator)
{
// if there is a working bar we will try to pull it out if the time is right, each consolidator knows when it's right
var localTime = _timeProvider.GetUtcNow().ConvertFromUtc(_timeZone);
_consolidator.Scan(localTime);
}
if (_consolidated)
{
_queue.TryDequeue(out _current);
}
}
// even if we don't have data to return, we haven't technically
// passed the end of the collection, so always return true until
// the enumerator is explicitly disposed or ended
return true;
}
///
/// Sets the enumerator to its initial position, which is before the first element in the collection.
///
/// The collection was modified after the enumerator was created. 2
public void Reset()
{
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
/// 2
public void Dispose()
{
_consolidator.DataConsolidated -= DataConsolidatedHandler;
}
private void DataConsolidatedHandler(object sender, IBaseData data)
{
var dataPoint = data as T;
_consolidated = true;
Enqueue(dataPoint);
_newDataAvailableHandler(sender, new NewDataAvailableEventArgs { DataPoint = dataPoint });
}
}
}