/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using QuantConnect.Data; using QuantConnect.Data.Common; using QuantConnect.Data.Consolidators; using QuantConnect.Data.Market; using QuantConnect.Lean.Engine.DataFeeds.Enumerators; using QuantConnect.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Aggregates ticks and bars based on given subscriptions. /// Current implementation is based on that consolidates ticks and put them into enumerator. /// public class AggregationManager : IDataAggregator { private readonly ConcurrentDictionary>>> _enumerators = new ConcurrentDictionary>>>(); private bool _dailyStrictEndTimeEnabled; /// /// Continuous UTC time provider /// protected ITimeProvider TimeProvider { get; set; } = RealTimeProvider.Instance; /// /// Initialize this instance /// /// The parameters dto instance public void Initialize(DataAggregatorInitializeParameters parameters) { _dailyStrictEndTimeEnabled = parameters.AlgorithmSettings.DailyPreciseEndTime; Log.Trace($"AggregationManager.Initialize(): daily strict end times: {_dailyStrictEndTimeEnabled}"); } /// /// Add new subscription to current instance /// /// defines the parameters to subscribe to a data feed /// handler to be fired on new data available /// The new enumerator for this subscription request public IEnumerator Add(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler) { var consolidator = GetConsolidator(dataConfig); var isPeriodBased = (dataConfig.Type.Name == nameof(QuoteBar) || dataConfig.Type.Name == nameof(TradeBar) || dataConfig.Type.Name == nameof(OpenInterest)) && dataConfig.Resolution != Resolution.Tick; var enumerator = new ScannableEnumerator(consolidator, dataConfig.ExchangeTimeZone, TimeProvider, newDataAvailableHandler, isPeriodBased); _enumerators.AddOrUpdate( dataConfig.Symbol.ID, new List>> { new KeyValuePair>(dataConfig, enumerator) }, (k, v) => { return v.Concat(new[] { new KeyValuePair>(dataConfig, enumerator) }).ToList(); }); return enumerator; } /// /// Removes the handler with the specified identifier /// /// Subscription data configuration to be removed public bool Remove(SubscriptionDataConfig dataConfig) { List>> enumerators; if (_enumerators.TryGetValue(dataConfig.Symbol.ID, out enumerators)) { if (enumerators.Count == 1) { List>> output; return _enumerators.TryRemove(dataConfig.Symbol.ID, out output); } else { _enumerators[dataConfig.Symbol.ID] = enumerators.Where(pair => pair.Key != dataConfig).ToList(); return true; } } else { Log.Debug($"AggregationManager.Update(): IDataConsolidator for symbol ({dataConfig.Symbol.Value}) was not found."); return false; } } /// /// Add new data to aggregator /// /// The new data public void Update(BaseData input) { try { List>> enumerators; if (_enumerators.TryGetValue(input.Symbol.ID, out enumerators)) { for (var i = 0; i < enumerators.Count; i++) { var kvp = enumerators[i]; // for non tick resolution subscriptions drop suspicious ticks if (kvp.Key.Resolution != Resolution.Tick) { var tick = input as Tick; if (tick != null && tick.Suspicious) { continue; } } kvp.Value.Update(input); } } } catch (Exception exception) { Log.Error(exception); } } /// /// Dispose of the aggregation manager. /// public void Dispose() { } /// /// Gets the consolidator to aggregate data for the given config /// protected virtual IDataConsolidator GetConsolidator(SubscriptionDataConfig config) { var period = config.Resolution.ToTimeSpan(); if (config.Resolution == Resolution.Daily && (config.Type == typeof(QuoteBar) || config.Type == typeof(TradeBar))) { // in backtesting, daily resolution data does not have extended market hours even if requested, so let's respect the same behavior for live // also this allows us to enable the daily strict end times if required. See 'SetStrictEndTimes' return new MarketHourAwareConsolidator(_dailyStrictEndTimeEnabled, config.Resolution, typeof(Tick), config.TickType, extendedMarketHours: false); } if (config.Type == typeof(QuoteBar)) { return new TickQuoteBarConsolidator(period); } if (config.Type == typeof(TradeBar)) { return new TickConsolidator(period); } if (config.Type == typeof(OpenInterest)) { return new OpenInterestConsolidator(period); } if (config.Type == typeof(Tick)) { return FilteredIdentityDataConsolidator.ForTickType(config.TickType); } if (config.Type == typeof(Split)) { return new IdentityDataConsolidator(); } if (config.Type == typeof(Dividend)) { return new IdentityDataConsolidator(); } // streaming custom data subscriptions can pass right through return new FilteredIdentityDataConsolidator(data => data.GetType() == config.Type); } } }