/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using QuantConnect.Data;
using QuantConnect.Data.Common;
using QuantConnect.Data.Consolidators;
using QuantConnect.Data.Market;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
using QuantConnect.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Aggregates ticks and bars based on given subscriptions.
/// Current implementation is based on that consolidates ticks and put them into enumerator.
///
public class AggregationManager : IDataAggregator
{
private readonly ConcurrentDictionary>>> _enumerators
= new ConcurrentDictionary>>>();
private bool _dailyStrictEndTimeEnabled;
///
/// Continuous UTC time provider
///
protected ITimeProvider TimeProvider { get; set; } = RealTimeProvider.Instance;
///
/// Initialize this instance
///
/// The parameters dto instance
public void Initialize(DataAggregatorInitializeParameters parameters)
{
_dailyStrictEndTimeEnabled = parameters.AlgorithmSettings.DailyPreciseEndTime;
Log.Trace($"AggregationManager.Initialize(): daily strict end times: {_dailyStrictEndTimeEnabled}");
}
///
/// Add new subscription to current instance
///
/// defines the parameters to subscribe to a data feed
/// handler to be fired on new data available
/// The new enumerator for this subscription request
public IEnumerator Add(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
{
var consolidator = GetConsolidator(dataConfig);
var isPeriodBased = (dataConfig.Type.Name == nameof(QuoteBar) ||
dataConfig.Type.Name == nameof(TradeBar) ||
dataConfig.Type.Name == nameof(OpenInterest)) &&
dataConfig.Resolution != Resolution.Tick;
var enumerator = new ScannableEnumerator(consolidator, dataConfig.ExchangeTimeZone, TimeProvider, newDataAvailableHandler, isPeriodBased);
_enumerators.AddOrUpdate(
dataConfig.Symbol.ID,
new List>> { new KeyValuePair>(dataConfig, enumerator) },
(k, v) => { return v.Concat(new[] { new KeyValuePair>(dataConfig, enumerator) }).ToList(); });
return enumerator;
}
///
/// Removes the handler with the specified identifier
///
/// Subscription data configuration to be removed
public bool Remove(SubscriptionDataConfig dataConfig)
{
List>> enumerators;
if (_enumerators.TryGetValue(dataConfig.Symbol.ID, out enumerators))
{
if (enumerators.Count == 1)
{
List>> output;
return _enumerators.TryRemove(dataConfig.Symbol.ID, out output);
}
else
{
_enumerators[dataConfig.Symbol.ID] = enumerators.Where(pair => pair.Key != dataConfig).ToList();
return true;
}
}
else
{
Log.Debug($"AggregationManager.Update(): IDataConsolidator for symbol ({dataConfig.Symbol.Value}) was not found.");
return false;
}
}
///
/// Add new data to aggregator
///
/// The new data
public void Update(BaseData input)
{
try
{
List>> enumerators;
if (_enumerators.TryGetValue(input.Symbol.ID, out enumerators))
{
for (var i = 0; i < enumerators.Count; i++)
{
var kvp = enumerators[i];
// for non tick resolution subscriptions drop suspicious ticks
if (kvp.Key.Resolution != Resolution.Tick)
{
var tick = input as Tick;
if (tick != null && tick.Suspicious)
{
continue;
}
}
kvp.Value.Update(input);
}
}
}
catch (Exception exception)
{
Log.Error(exception);
}
}
///
/// Dispose of the aggregation manager.
///
public void Dispose() { }
///
/// Gets the consolidator to aggregate data for the given config
///
protected virtual IDataConsolidator GetConsolidator(SubscriptionDataConfig config)
{
var period = config.Resolution.ToTimeSpan();
if (config.Resolution == Resolution.Daily && (config.Type == typeof(QuoteBar) || config.Type == typeof(TradeBar)))
{
// in backtesting, daily resolution data does not have extended market hours even if requested, so let's respect the same behavior for live
// also this allows us to enable the daily strict end times if required. See 'SetStrictEndTimes'
return new MarketHourAwareConsolidator(_dailyStrictEndTimeEnabled, config.Resolution, typeof(Tick), config.TickType, extendedMarketHours: false);
}
if (config.Type == typeof(QuoteBar))
{
return new TickQuoteBarConsolidator(period);
}
if (config.Type == typeof(TradeBar))
{
return new TickConsolidator(period);
}
if (config.Type == typeof(OpenInterest))
{
return new OpenInterestConsolidator(period);
}
if (config.Type == typeof(Tick))
{
return FilteredIdentityDataConsolidator.ForTickType(config.TickType);
}
if (config.Type == typeof(Split))
{
return new IdentityDataConsolidator();
}
if (config.Type == typeof(Dividend))
{
return new IdentityDataConsolidator();
}
// streaming custom data subscriptions can pass right through
return new FilteredIdentityDataConsolidator(data => data.GetType() == config.Type);
}
}
}