/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Linq;
using QuantConnect.Util;
using QuantConnect.Data;
using QuantConnect.Packets;
using QuantConnect.Logging;
using QuantConnect.Interfaces;
using QuantConnect.Securities;
using QuantConnect.Data.Market;
using System.Collections.Generic;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// This is an implementation of used to handle multiple live datafeeds
///
public class DataQueueHandlerManager : IDataQueueHandler, IDataQueueUniverseProvider
{
private readonly IAlgorithmSettings _algorithmSettings;
private readonly Dictionary> _dataConfigAndDataHandler = new();
///
/// Creates a new instance
///
public DataQueueHandlerManager(IAlgorithmSettings settings)
{
_algorithmSettings = settings;
}
///
/// Frontier time provider to use
///
/// Protected for testing purposes
protected ITimeProvider FrontierTimeProvider { get; set; }
///
/// Collection of data queue handles being used
///
/// Protected for testing purposes
protected List DataHandlers { get; set; } = new();
///
/// True if the composite queue handler has any instance
///
public bool HasUniverseProvider => DataHandlers.OfType().Any();
///
/// Event triggered when an unsupported configuration is detected
///
public event EventHandler UnsupportedConfiguration;
///
/// Subscribe to the specified configuration
///
/// defines the parameters to subscribe to a data feed
/// handler to be fired on new data available
/// The new enumerator for this subscription request
public IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
{
Exception failureException = null;
foreach (var dataHandler in DataHandlers)
{
// Emit ticks & custom data as soon as we get them, they don't need any kind of batching behavior applied to them
// only use the frontier time provider if we need to
var immediateEmission = dataConfig.Resolution == Resolution.Tick || dataConfig.IsCustomData || FrontierTimeProvider == null;
var exchangeTimeZone = dataConfig.ExchangeTimeZone;
IEnumerator enumerator;
try
{
enumerator = dataHandler.Subscribe(dataConfig, immediateEmission ? newDataAvailableHandler
: (sender, eventArgs) => {
// let's only wake up the main thread if the data point is allowed to be emitted, else we could fill forward previous bar and not let this one through
var dataAvailable = eventArgs as NewDataAvailableEventArgs;
if (dataAvailable == null || dataAvailable.DataPoint == null
|| dataAvailable.DataPoint.EndTime.ConvertToUtc(exchangeTimeZone) <= FrontierTimeProvider.GetUtcNow())
{
newDataAvailableHandler?.Invoke(sender, eventArgs);
}
});
}
catch (Exception exception)
{
// we will try the next DQH if any, if it handles the request correctly we ignore the error
failureException = exception;
continue;
}
// Check if the enumerator is not empty
if (enumerator != null)
{
if (!_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataQueueHandlers))
{
// we can get the same subscription request multiple times, the aggregator manager handles updating each enumerator
// but we need to keep track so we can call unsubscribe later to the target data queue handler
_dataConfigAndDataHandler[dataConfig] = dataQueueHandlers = new Queue();
}
dataQueueHandlers.Enqueue(dataHandler);
if (immediateEmission)
{
return enumerator;
}
var utcStartTime = FrontierTimeProvider.GetUtcNow();
var exchangeHours = MarketHoursDatabase.FromDataFolder().GetExchangeHours(dataConfig.Symbol.ID.Market, dataConfig.Symbol, dataConfig.Symbol.SecurityType);
if (LeanData.UseStrictEndTime(_algorithmSettings.DailyPreciseEndTime, dataConfig.Symbol, dataConfig.Increment, exchangeHours))
{
// before the first frontier enumerator we adjust the endtimes if required
enumerator = new StrictDailyEndTimesEnumerator(enumerator, exchangeHours, utcStartTime.ConvertFromUtc(exchangeTimeZone));
}
return new FrontierAwareEnumerator(enumerator, FrontierTimeProvider,
new TimeZoneOffsetProvider(exchangeTimeZone, utcStartTime, Time.EndOfTime)
);
}
}
if (failureException != null)
{
// we were not able to serve the request with any DQH and we got an exception, let's bubble it up
throw failureException;
}
// filter out warning for expected cases to reduce noise
if (!dataConfig.Symbol.Value.Contains("-UNIVERSE-", StringComparison.InvariantCultureIgnoreCase)
&& dataConfig.Type != typeof(Delisting)
&& !dataConfig.Symbol.IsCanonical())
{
UnsupportedConfiguration?.Invoke(this, dataConfig);
}
return null;
}
///
/// Removes the specified configuration
///
/// Subscription config to be removed
public virtual void Unsubscribe(SubscriptionDataConfig dataConfig)
{
if (_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataHandlers))
{
var dataHandler = dataHandlers.Dequeue();
dataHandler.Unsubscribe(dataConfig);
if (dataHandlers.Count == 0)
{
// nothing left
_dataConfigAndDataHandler.Remove(dataConfig);
}
}
}
///
/// Sets the job we're subscribing for
///
/// Job we're subscribing for
public void SetJob(LiveNodePacket job)
{
var dataHandlersConfig = job.DataQueueHandler;
Log.Trace($"CompositeDataQueueHandler.SetJob(): will use {dataHandlersConfig}");
foreach (var dataHandlerName in dataHandlersConfig.DeserializeList())
{
var dataHandler = Composer.Instance.GetExportedValueByTypeName(dataHandlerName);
dataHandler.SetJob(job);
DataHandlers.Add(dataHandler);
}
FrontierTimeProvider = InitializeFrontierTimeProvider();
}
///
/// Returns whether the data provider is connected
///
/// true if the data provider is connected
public bool IsConnected => true;
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
foreach (var dataHandler in DataHandlers)
{
dataHandler.Dispose();
}
}
///
/// Method returns a collection of Symbols that are available at the data source.
///
/// Symbol to lookup
/// Include expired contracts
/// Expected security currency(if any)
/// Enumerable of Symbols, that are associated with the provided Symbol
public IEnumerable LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null)
{
foreach (var dataHandler in GetUniverseProviders())
{
var symbols = dataHandler.LookupSymbols(symbol, includeExpired, securityCurrency);
if (symbols == null)
{
// the universe provider does not support it
continue;
}
var result = symbols.ToList();
if (result.Any())
{
return result;
}
}
return Enumerable.Empty();
}
///
/// Returns whether selection can take place or not.
///
/// This is useful to avoid a selection taking place during invalid times, for example IB reset times or when not connected,
/// because if allowed selection would fail since IB isn't running and would kill the algorithm
/// True if selection can take place
public bool CanPerformSelection()
{
return GetUniverseProviders().Any(provider => provider.CanPerformSelection());
}
///
/// Creates the frontier time provider instance
///
/// Protected for testing purposes
protected virtual ITimeProvider InitializeFrontierTimeProvider()
{
var timeProviders = DataHandlers.OfType().ToList();
if (timeProviders.Any())
{
Log.Trace($"DataQueueHandlerManager.InitializeFrontierTimeProvider(): will use the following IDQH frontier time providers: [{string.Join(",", timeProviders.Select(x => x.GetType()))}]");
return new CompositeTimeProvider(timeProviders);
}
return null;
}
private IEnumerable GetUniverseProviders()
{
var yielded = false;
foreach (var universeProvider in DataHandlers.OfType())
{
yielded = true;
yield return universeProvider;
}
if (!yielded)
{
throw new NotSupportedException("The DataQueueHandler does not support Options and Futures.");
}
}
}
}