/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Linq; using QuantConnect.Util; using QuantConnect.Data; using QuantConnect.Packets; using QuantConnect.Logging; using QuantConnect.Interfaces; using QuantConnect.Securities; using QuantConnect.Data.Market; using System.Collections.Generic; using QuantConnect.Lean.Engine.DataFeeds.Enumerators; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// This is an implementation of used to handle multiple live datafeeds /// public class DataQueueHandlerManager : IDataQueueHandler, IDataQueueUniverseProvider { private readonly IAlgorithmSettings _algorithmSettings; private readonly Dictionary> _dataConfigAndDataHandler = new(); /// /// Creates a new instance /// public DataQueueHandlerManager(IAlgorithmSettings settings) { _algorithmSettings = settings; } /// /// Frontier time provider to use /// /// Protected for testing purposes protected ITimeProvider FrontierTimeProvider { get; set; } /// /// Collection of data queue handles being used /// /// Protected for testing purposes protected List DataHandlers { get; set; } = new(); /// /// True if the composite queue handler has any instance /// public bool HasUniverseProvider => DataHandlers.OfType().Any(); /// /// Event triggered when an unsupported configuration is detected /// public event EventHandler UnsupportedConfiguration; /// /// Subscribe to the specified configuration /// /// defines the parameters to subscribe to a data feed /// handler to be fired on new data available /// The new enumerator for this subscription request public IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler) { Exception failureException = null; foreach (var dataHandler in DataHandlers) { // Emit ticks & custom data as soon as we get them, they don't need any kind of batching behavior applied to them // only use the frontier time provider if we need to var immediateEmission = dataConfig.Resolution == Resolution.Tick || dataConfig.IsCustomData || FrontierTimeProvider == null; var exchangeTimeZone = dataConfig.ExchangeTimeZone; IEnumerator enumerator; try { enumerator = dataHandler.Subscribe(dataConfig, immediateEmission ? newDataAvailableHandler : (sender, eventArgs) => { // let's only wake up the main thread if the data point is allowed to be emitted, else we could fill forward previous bar and not let this one through var dataAvailable = eventArgs as NewDataAvailableEventArgs; if (dataAvailable == null || dataAvailable.DataPoint == null || dataAvailable.DataPoint.EndTime.ConvertToUtc(exchangeTimeZone) <= FrontierTimeProvider.GetUtcNow()) { newDataAvailableHandler?.Invoke(sender, eventArgs); } }); } catch (Exception exception) { // we will try the next DQH if any, if it handles the request correctly we ignore the error failureException = exception; continue; } // Check if the enumerator is not empty if (enumerator != null) { if (!_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataQueueHandlers)) { // we can get the same subscription request multiple times, the aggregator manager handles updating each enumerator // but we need to keep track so we can call unsubscribe later to the target data queue handler _dataConfigAndDataHandler[dataConfig] = dataQueueHandlers = new Queue(); } dataQueueHandlers.Enqueue(dataHandler); if (immediateEmission) { return enumerator; } var utcStartTime = FrontierTimeProvider.GetUtcNow(); var exchangeHours = MarketHoursDatabase.FromDataFolder().GetExchangeHours(dataConfig.Symbol.ID.Market, dataConfig.Symbol, dataConfig.Symbol.SecurityType); if (LeanData.UseStrictEndTime(_algorithmSettings.DailyPreciseEndTime, dataConfig.Symbol, dataConfig.Increment, exchangeHours)) { // before the first frontier enumerator we adjust the endtimes if required enumerator = new StrictDailyEndTimesEnumerator(enumerator, exchangeHours, utcStartTime.ConvertFromUtc(exchangeTimeZone)); } return new FrontierAwareEnumerator(enumerator, FrontierTimeProvider, new TimeZoneOffsetProvider(exchangeTimeZone, utcStartTime, Time.EndOfTime) ); } } if (failureException != null) { // we were not able to serve the request with any DQH and we got an exception, let's bubble it up throw failureException; } // filter out warning for expected cases to reduce noise if (!dataConfig.Symbol.Value.Contains("-UNIVERSE-", StringComparison.InvariantCultureIgnoreCase) && dataConfig.Type != typeof(Delisting) && !dataConfig.Symbol.IsCanonical()) { UnsupportedConfiguration?.Invoke(this, dataConfig); } return null; } /// /// Removes the specified configuration /// /// Subscription config to be removed public virtual void Unsubscribe(SubscriptionDataConfig dataConfig) { if (_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataHandlers)) { var dataHandler = dataHandlers.Dequeue(); dataHandler.Unsubscribe(dataConfig); if (dataHandlers.Count == 0) { // nothing left _dataConfigAndDataHandler.Remove(dataConfig); } } } /// /// Sets the job we're subscribing for /// /// Job we're subscribing for public void SetJob(LiveNodePacket job) { var dataHandlersConfig = job.DataQueueHandler; Log.Trace($"CompositeDataQueueHandler.SetJob(): will use {dataHandlersConfig}"); foreach (var dataHandlerName in dataHandlersConfig.DeserializeList()) { var dataHandler = Composer.Instance.GetExportedValueByTypeName(dataHandlerName); dataHandler.SetJob(job); DataHandlers.Add(dataHandler); } FrontierTimeProvider = InitializeFrontierTimeProvider(); } /// /// Returns whether the data provider is connected /// /// true if the data provider is connected public bool IsConnected => true; /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { foreach (var dataHandler in DataHandlers) { dataHandler.Dispose(); } } /// /// Method returns a collection of Symbols that are available at the data source. /// /// Symbol to lookup /// Include expired contracts /// Expected security currency(if any) /// Enumerable of Symbols, that are associated with the provided Symbol public IEnumerable LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null) { foreach (var dataHandler in GetUniverseProviders()) { var symbols = dataHandler.LookupSymbols(symbol, includeExpired, securityCurrency); if (symbols == null) { // the universe provider does not support it continue; } var result = symbols.ToList(); if (result.Any()) { return result; } } return Enumerable.Empty(); } /// /// Returns whether selection can take place or not. /// /// This is useful to avoid a selection taking place during invalid times, for example IB reset times or when not connected, /// because if allowed selection would fail since IB isn't running and would kill the algorithm /// True if selection can take place public bool CanPerformSelection() { return GetUniverseProviders().Any(provider => provider.CanPerformSelection()); } /// /// Creates the frontier time provider instance /// /// Protected for testing purposes protected virtual ITimeProvider InitializeFrontierTimeProvider() { var timeProviders = DataHandlers.OfType().ToList(); if (timeProviders.Any()) { Log.Trace($"DataQueueHandlerManager.InitializeFrontierTimeProvider(): will use the following IDQH frontier time providers: [{string.Join(",", timeProviders.Select(x => x.GetType()))}]"); return new CompositeTimeProvider(timeProviders); } return null; } private IEnumerable GetUniverseProviders() { var yielded = false; foreach (var universeProvider in DataHandlers.OfType()) { yielded = true; yield return universeProvider; } if (!yielded) { throw new NotSupportedException("The DataQueueHandler does not support Options and Futures."); } } } }