/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Python.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using NodaTime;
using QuantConnect.Data.Consolidators;
using QuantConnect.Data.Market;
using QuantConnect.Interfaces;
using QuantConnect.Util;
using QuantConnect.Python;
namespace QuantConnect.Data
{
///
/// Enumerable Subscription Management Class
///
public class SubscriptionManager
{
private readonly PriorityQueue _consolidatorsSortedByScanTime;
private readonly Dictionary _consolidators;
private List> _consolidatorsToAdd;
private readonly object _threadSafeCollectionLock;
private readonly ITimeKeeper _timeKeeper;
private IAlgorithmSubscriptionManager _subscriptionManager;
///
/// Instance that implements
///
public ISubscriptionDataConfigService SubscriptionDataConfigService => _subscriptionManager;
///
/// Returns an IEnumerable of Subscriptions
///
/// Will not return internal subscriptions
public IEnumerable Subscriptions => _subscriptionManager.SubscriptionManagerSubscriptions.Where(config => !config.IsInternalFeed);
///
/// The different each supports
///
public Dictionary> AvailableDataTypes => _subscriptionManager.AvailableDataTypes;
///
/// Get the count of assets:
///
public int Count => _subscriptionManager.SubscriptionManagerCount();
///
/// Creates a new instance
///
public SubscriptionManager(ITimeKeeper timeKeeper)
{
_consolidators = new();
_timeKeeper = timeKeeper;
_consolidatorsSortedByScanTime = new(1000, ConsolidatorScanPriority.Comparer);
_threadSafeCollectionLock = new object();
}
///
/// Add Market Data Required (Overloaded method for backwards compatibility).
///
/// Symbol of the asset we're like
/// Resolution of Asset Required
/// The time zone the subscription's data is time stamped in
///
/// Specifies the time zone of the exchange for the security this subscription is for. This
/// is this output time zone, that is, the time zone that will be used on BaseData instances
///
/// True if this is custom user supplied data, false for normal QC data
/// when there is no data pass the last tradebar forward
/// Request premarket data as well when true
///
/// The newly created or existing instance if it already existed
///
public SubscriptionDataConfig Add(
Symbol symbol,
Resolution resolution,
DateTimeZone timeZone,
DateTimeZone exchangeTimeZone,
bool isCustomData = false,
bool fillForward = true,
bool extendedMarketHours = false
)
{
//Set the type: market data only comes in two forms -- ticks(trade by trade) or tradebar(time summaries)
var dataType = typeof(TradeBar);
if (resolution == Resolution.Tick)
{
dataType = typeof(Tick);
}
var tickType = LeanData.GetCommonTickTypeForCommonDataTypes(dataType, symbol.SecurityType);
return Add(dataType, tickType, symbol, resolution, timeZone, exchangeTimeZone, isCustomData, fillForward,
extendedMarketHours);
}
///
/// Add Market Data Required - generic data typing support as long as Type implements BaseData.
///
/// Set the type of the data we're subscribing to.
/// Tick type for the subscription.
/// Symbol of the asset we're like
/// Resolution of Asset Required
/// The time zone the subscription's data is time stamped in
///
/// Specifies the time zone of the exchange for the security this subscription is for. This
/// is this output time zone, that is, the time zone that will be used on BaseData instances
///
/// True if this is custom user supplied data, false for normal QC data
/// when there is no data pass the last tradebar forward
/// Request premarket data as well when true
///
/// Set to true to prevent data from this subscription from being sent into the algorithm's
/// OnData events
///
///
/// True if this subscription should have filters applied to it (market hours/user
/// filters from security), false otherwise
///
/// Define how data is normalized
///
/// The newly created or existing instance if it already existed
///
public SubscriptionDataConfig Add(
Type dataType,
TickType tickType,
Symbol symbol,
Resolution resolution,
DateTimeZone dataTimeZone,
DateTimeZone exchangeTimeZone,
bool isCustomData,
bool fillForward = true,
bool extendedMarketHours = false,
bool isInternalFeed = false,
bool isFilteredSubscription = true,
DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted
)
{
return SubscriptionDataConfigService.Add(symbol, resolution, fillForward,
extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
new List> { new Tuple(dataType, tickType) },
dataNormalizationMode).First();
}
///
/// Add a consolidator for the symbol
///
/// Symbol of the asset to consolidate
/// The consolidator
/// Desired tick type for the subscription
public void AddConsolidator(Symbol symbol, IDataConsolidator consolidator, TickType? tickType = null)
{
// Find the right subscription and add the consolidator to it
var subscriptions = Subscriptions.Where(x => x.Symbol == symbol).ToList();
if (subscriptions.Count == 0)
{
// If we made it here it is because we never found the symbol in the subscription list
throw new ArgumentException("Please subscribe to this symbol before adding a consolidator for it. Symbol: " +
symbol.Value);
}
if (consolidator.InputType.IsAbstract && tickType == null)
{
tickType = AvailableDataTypes[symbol.SecurityType].FirstOrDefault();
}
foreach (var subscription in subscriptions)
{
// we need to be able to pipe data directly from the data feed into the consolidator
if (IsSubscriptionValidForConsolidator(subscription, consolidator, tickType))
{
subscription.Consolidators.Add(consolidator);
var wrapper = _consolidators[consolidator] =
new ConsolidatorWrapper(consolidator, subscription.Increment, _timeKeeper, _timeKeeper.GetLocalTimeKeeper(subscription.ExchangeTimeZone));
lock (_threadSafeCollectionLock)
{
_consolidatorsToAdd ??= new();
_consolidatorsToAdd.Add(new(wrapper, wrapper.Priority));
}
return;
}
}
string tickTypeException = null;
if (tickType != null && !subscriptions.Where(x => x.TickType == tickType).Any())
{
tickTypeException = $"No subscription with the requested Tick Type {tickType} was found. Available Tick Types: {string.Join(", ", subscriptions.Select(x => x.TickType))}";
}
throw new ArgumentException(tickTypeException ?? ("Type mismatch found between consolidator and symbol. " +
$"Symbol: {symbol.Value} does not support input type: {consolidator.InputType.Name}. " +
$"Supported types: {string.Join(",", subscriptions.Select(x => x.Type.Name))}."));
}
///
/// Add a custom python consolidator for the symbol
///
/// Symbol of the asset to consolidate
/// The custom python consolidator
public void AddConsolidator(Symbol symbol, PyObject pyConsolidator)
{
if (!pyConsolidator.TryConvert(out IDataConsolidator consolidator))
{
consolidator = new DataConsolidatorPythonWrapper(pyConsolidator);
}
AddConsolidator(symbol, consolidator);
}
///
/// Removes the specified consolidator for the symbol
///
/// The symbol the consolidator is receiving data from
/// The consolidator instance to be removed
public void RemoveConsolidator(Symbol symbol, IDataConsolidator consolidator)
{
// let's try to get associated symbol, not required but nice to have
symbol ??= consolidator.Consolidated?.Symbol;
symbol ??= consolidator.WorkingData?.Symbol;
// remove consolidator from each subscription
foreach (var subscription in _subscriptionManager.GetSubscriptionDataConfigs(symbol))
{
subscription.Consolidators.Remove(consolidator);
if (_consolidators.Remove(consolidator, out var consolidatorsToScan))
{
consolidatorsToScan.Dispose();
}
}
// dispose of the consolidator to remove any remaining event handlers
consolidator.DisposeSafely();
}
///
/// Removes the specified python consolidator for the symbol
///
/// The symbol the consolidator is receiving data from
/// The python consolidator instance to be removed
public void RemoveConsolidator(Symbol symbol, PyObject pyConsolidator)
{
if (!pyConsolidator.TryConvert(out IDataConsolidator consolidator))
{
consolidator = new DataConsolidatorPythonWrapper(pyConsolidator);
}
RemoveConsolidator(symbol, consolidator);
}
///
/// Will trigger past consolidator scans
///
/// The new utc time
/// The algorithm instance
public void ScanPastConsolidators(DateTime newUtcTime, IAlgorithm algorithm)
{
if (_consolidatorsToAdd != null)
{
lock (_threadSafeCollectionLock)
{
_consolidatorsToAdd.DoForEach(x => _consolidatorsSortedByScanTime.Enqueue(x.Item1, x.Item2));
_consolidatorsToAdd = null;
}
}
while (_consolidatorsSortedByScanTime.TryPeek(out _, out var priority) && priority.UtcScanTime < newUtcTime)
{
var consolidatorToScan = _consolidatorsSortedByScanTime.Dequeue();
if (consolidatorToScan.Disposed)
{
// consolidator has been removed
continue;
}
if (priority.UtcScanTime != algorithm.UtcTime)
{
// only update the algorithm time once, it's not cheap because of TZ conversions
algorithm.SetDateTime(priority.UtcScanTime);
}
if (consolidatorToScan.UtcScanTime <= priority.UtcScanTime)
{
// only scan if we still need to
consolidatorToScan.Scan();
}
_consolidatorsSortedByScanTime.Enqueue(consolidatorToScan, consolidatorToScan.Priority);
}
}
///
/// Hard code the set of default available data feeds
///
public static Dictionary> DefaultDataTypes()
{
return new Dictionary>
{
{SecurityType.Base, new List {TickType.Trade}},
{SecurityType.Index, new List {TickType.Trade}},
{SecurityType.Forex, new List {TickType.Quote}},
{SecurityType.Equity, new List {TickType.Trade, TickType.Quote}},
{SecurityType.Option, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
{SecurityType.FutureOption, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
{SecurityType.IndexOption, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
{SecurityType.Cfd, new List {TickType.Quote}},
{SecurityType.Future, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
{SecurityType.Commodity, new List {TickType.Trade}},
{SecurityType.Crypto, new List {TickType.Trade, TickType.Quote}},
{SecurityType.CryptoFuture, new List {TickType.Trade, TickType.Quote}}
};
}
///
/// Get the available data types for a security
///
public IReadOnlyList GetDataTypesForSecurity(SecurityType securityType)
{
return AvailableDataTypes[securityType];
}
///
/// Get the data feed types for a given
///
/// The used to determine the types
/// The resolution of the data requested
/// Indicates whether the security is Canonical (future and options)
/// Types that should be added to the
public List> LookupSubscriptionConfigDataTypes(
SecurityType symbolSecurityType,
Resolution resolution,
bool isCanonical
)
{
return _subscriptionManager.LookupSubscriptionConfigDataTypes(symbolSecurityType, resolution, isCanonical);
}
///
/// Sets the Subscription Manager
///
public void SetDataManager(IAlgorithmSubscriptionManager subscriptionManager)
{
_subscriptionManager = subscriptionManager;
}
///
/// Checks if the subscription is valid for the consolidator
///
/// The subscription configuration
/// The consolidator
/// The desired tick type for the subscription. If not given is null.
/// true if the subscription is valid for the consolidator
public static bool IsSubscriptionValidForConsolidator(SubscriptionDataConfig subscription, IDataConsolidator consolidator, TickType? desiredTickType = null)
{
// Ensure the consolidator can accept data of the subscription's type
if (!consolidator.InputType.IsAssignableFrom(subscription.Type))
{
return false;
}
if (subscription.Type == typeof(Tick))
{
if (desiredTickType == null)
{
if (!LeanData.IsCommonLeanDataType(consolidator.OutputType))
{
return true;
}
var tickType = LeanData.GetCommonTickTypeForCommonDataTypes(consolidator.OutputType, subscription.Symbol.SecurityType);
return subscription.TickType == tickType;
}
return subscription.TickType == desiredTickType;
}
// For non-Tick data, the subscription is valid if its type is compatible with the consolidator's input type
return true;
}
///
/// Returns true if the provided data is the default data type associated with it's .
/// This is useful to determine if a data point should be used/cached in an environment where consumers will not provider a data type and we want to preserve
/// determinism and backwards compatibility when there are multiple data types available per or new ones added.
///
/// Temporary until we have a dictionary for the default data type per security type see GH issue 4196.
/// Internal so it's only accessible from this assembly.
internal static bool IsDefaultDataType(BaseData data)
{
switch (data.Symbol.SecurityType)
{
case SecurityType.Equity:
if (data.DataType == MarketDataType.QuoteBar || data.DataType == MarketDataType.Tick && (data as Tick).TickType == TickType.Quote)
{
return false;
}
break;
}
return true;
}
}
}