/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using Python.Runtime; using System; using System.Collections.Generic; using System.Linq; using NodaTime; using QuantConnect.Data.Consolidators; using QuantConnect.Data.Market; using QuantConnect.Interfaces; using QuantConnect.Util; using QuantConnect.Python; namespace QuantConnect.Data { /// /// Enumerable Subscription Management Class /// public class SubscriptionManager { private readonly PriorityQueue _consolidatorsSortedByScanTime; private readonly Dictionary _consolidators; private List> _consolidatorsToAdd; private readonly object _threadSafeCollectionLock; private readonly ITimeKeeper _timeKeeper; private IAlgorithmSubscriptionManager _subscriptionManager; /// /// Instance that implements /// public ISubscriptionDataConfigService SubscriptionDataConfigService => _subscriptionManager; /// /// Returns an IEnumerable of Subscriptions /// /// Will not return internal subscriptions public IEnumerable Subscriptions => _subscriptionManager.SubscriptionManagerSubscriptions.Where(config => !config.IsInternalFeed); /// /// The different each supports /// public Dictionary> AvailableDataTypes => _subscriptionManager.AvailableDataTypes; /// /// Get the count of assets: /// public int Count => _subscriptionManager.SubscriptionManagerCount(); /// /// Creates a new instance /// public SubscriptionManager(ITimeKeeper timeKeeper) { _consolidators = new(); _timeKeeper = timeKeeper; _consolidatorsSortedByScanTime = new(1000, ConsolidatorScanPriority.Comparer); _threadSafeCollectionLock = new object(); } /// /// Add Market Data Required (Overloaded method for backwards compatibility). /// /// Symbol of the asset we're like /// Resolution of Asset Required /// The time zone the subscription's data is time stamped in /// /// Specifies the time zone of the exchange for the security this subscription is for. This /// is this output time zone, that is, the time zone that will be used on BaseData instances /// /// True if this is custom user supplied data, false for normal QC data /// when there is no data pass the last tradebar forward /// Request premarket data as well when true /// /// The newly created or existing instance if it already existed /// public SubscriptionDataConfig Add( Symbol symbol, Resolution resolution, DateTimeZone timeZone, DateTimeZone exchangeTimeZone, bool isCustomData = false, bool fillForward = true, bool extendedMarketHours = false ) { //Set the type: market data only comes in two forms -- ticks(trade by trade) or tradebar(time summaries) var dataType = typeof(TradeBar); if (resolution == Resolution.Tick) { dataType = typeof(Tick); } var tickType = LeanData.GetCommonTickTypeForCommonDataTypes(dataType, symbol.SecurityType); return Add(dataType, tickType, symbol, resolution, timeZone, exchangeTimeZone, isCustomData, fillForward, extendedMarketHours); } /// /// Add Market Data Required - generic data typing support as long as Type implements BaseData. /// /// Set the type of the data we're subscribing to. /// Tick type for the subscription. /// Symbol of the asset we're like /// Resolution of Asset Required /// The time zone the subscription's data is time stamped in /// /// Specifies the time zone of the exchange for the security this subscription is for. This /// is this output time zone, that is, the time zone that will be used on BaseData instances /// /// True if this is custom user supplied data, false for normal QC data /// when there is no data pass the last tradebar forward /// Request premarket data as well when true /// /// Set to true to prevent data from this subscription from being sent into the algorithm's /// OnData events /// /// /// True if this subscription should have filters applied to it (market hours/user /// filters from security), false otherwise /// /// Define how data is normalized /// /// The newly created or existing instance if it already existed /// public SubscriptionDataConfig Add( Type dataType, TickType tickType, Symbol symbol, Resolution resolution, DateTimeZone dataTimeZone, DateTimeZone exchangeTimeZone, bool isCustomData, bool fillForward = true, bool extendedMarketHours = false, bool isInternalFeed = false, bool isFilteredSubscription = true, DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted ) { return SubscriptionDataConfigService.Add(symbol, resolution, fillForward, extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData, new List> { new Tuple(dataType, tickType) }, dataNormalizationMode).First(); } /// /// Add a consolidator for the symbol /// /// Symbol of the asset to consolidate /// The consolidator /// Desired tick type for the subscription public void AddConsolidator(Symbol symbol, IDataConsolidator consolidator, TickType? tickType = null) { // Find the right subscription and add the consolidator to it var subscriptions = Subscriptions.Where(x => x.Symbol == symbol).ToList(); if (subscriptions.Count == 0) { // If we made it here it is because we never found the symbol in the subscription list throw new ArgumentException("Please subscribe to this symbol before adding a consolidator for it. Symbol: " + symbol.Value); } if (consolidator.InputType.IsAbstract && tickType == null) { tickType = AvailableDataTypes[symbol.SecurityType].FirstOrDefault(); } foreach (var subscription in subscriptions) { // we need to be able to pipe data directly from the data feed into the consolidator if (IsSubscriptionValidForConsolidator(subscription, consolidator, tickType)) { subscription.Consolidators.Add(consolidator); var wrapper = _consolidators[consolidator] = new ConsolidatorWrapper(consolidator, subscription.Increment, _timeKeeper, _timeKeeper.GetLocalTimeKeeper(subscription.ExchangeTimeZone)); lock (_threadSafeCollectionLock) { _consolidatorsToAdd ??= new(); _consolidatorsToAdd.Add(new(wrapper, wrapper.Priority)); } return; } } string tickTypeException = null; if (tickType != null && !subscriptions.Where(x => x.TickType == tickType).Any()) { tickTypeException = $"No subscription with the requested Tick Type {tickType} was found. Available Tick Types: {string.Join(", ", subscriptions.Select(x => x.TickType))}"; } throw new ArgumentException(tickTypeException ?? ("Type mismatch found between consolidator and symbol. " + $"Symbol: {symbol.Value} does not support input type: {consolidator.InputType.Name}. " + $"Supported types: {string.Join(",", subscriptions.Select(x => x.Type.Name))}.")); } /// /// Add a custom python consolidator for the symbol /// /// Symbol of the asset to consolidate /// The custom python consolidator public void AddConsolidator(Symbol symbol, PyObject pyConsolidator) { if (!pyConsolidator.TryConvert(out IDataConsolidator consolidator)) { consolidator = new DataConsolidatorPythonWrapper(pyConsolidator); } AddConsolidator(symbol, consolidator); } /// /// Removes the specified consolidator for the symbol /// /// The symbol the consolidator is receiving data from /// The consolidator instance to be removed public void RemoveConsolidator(Symbol symbol, IDataConsolidator consolidator) { // let's try to get associated symbol, not required but nice to have symbol ??= consolidator.Consolidated?.Symbol; symbol ??= consolidator.WorkingData?.Symbol; // remove consolidator from each subscription foreach (var subscription in _subscriptionManager.GetSubscriptionDataConfigs(symbol)) { subscription.Consolidators.Remove(consolidator); if (_consolidators.Remove(consolidator, out var consolidatorsToScan)) { consolidatorsToScan.Dispose(); } } // dispose of the consolidator to remove any remaining event handlers consolidator.DisposeSafely(); } /// /// Removes the specified python consolidator for the symbol /// /// The symbol the consolidator is receiving data from /// The python consolidator instance to be removed public void RemoveConsolidator(Symbol symbol, PyObject pyConsolidator) { if (!pyConsolidator.TryConvert(out IDataConsolidator consolidator)) { consolidator = new DataConsolidatorPythonWrapper(pyConsolidator); } RemoveConsolidator(symbol, consolidator); } /// /// Will trigger past consolidator scans /// /// The new utc time /// The algorithm instance public void ScanPastConsolidators(DateTime newUtcTime, IAlgorithm algorithm) { if (_consolidatorsToAdd != null) { lock (_threadSafeCollectionLock) { _consolidatorsToAdd.DoForEach(x => _consolidatorsSortedByScanTime.Enqueue(x.Item1, x.Item2)); _consolidatorsToAdd = null; } } while (_consolidatorsSortedByScanTime.TryPeek(out _, out var priority) && priority.UtcScanTime < newUtcTime) { var consolidatorToScan = _consolidatorsSortedByScanTime.Dequeue(); if (consolidatorToScan.Disposed) { // consolidator has been removed continue; } if (priority.UtcScanTime != algorithm.UtcTime) { // only update the algorithm time once, it's not cheap because of TZ conversions algorithm.SetDateTime(priority.UtcScanTime); } if (consolidatorToScan.UtcScanTime <= priority.UtcScanTime) { // only scan if we still need to consolidatorToScan.Scan(); } _consolidatorsSortedByScanTime.Enqueue(consolidatorToScan, consolidatorToScan.Priority); } } /// /// Hard code the set of default available data feeds /// public static Dictionary> DefaultDataTypes() { return new Dictionary> { {SecurityType.Base, new List {TickType.Trade}}, {SecurityType.Index, new List {TickType.Trade}}, {SecurityType.Forex, new List {TickType.Quote}}, {SecurityType.Equity, new List {TickType.Trade, TickType.Quote}}, {SecurityType.Option, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}}, {SecurityType.FutureOption, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}}, {SecurityType.IndexOption, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}}, {SecurityType.Cfd, new List {TickType.Quote}}, {SecurityType.Future, new List {TickType.Quote, TickType.Trade, TickType.OpenInterest}}, {SecurityType.Commodity, new List {TickType.Trade}}, {SecurityType.Crypto, new List {TickType.Trade, TickType.Quote}}, {SecurityType.CryptoFuture, new List {TickType.Trade, TickType.Quote}} }; } /// /// Get the available data types for a security /// public IReadOnlyList GetDataTypesForSecurity(SecurityType securityType) { return AvailableDataTypes[securityType]; } /// /// Get the data feed types for a given /// /// The used to determine the types /// The resolution of the data requested /// Indicates whether the security is Canonical (future and options) /// Types that should be added to the public List> LookupSubscriptionConfigDataTypes( SecurityType symbolSecurityType, Resolution resolution, bool isCanonical ) { return _subscriptionManager.LookupSubscriptionConfigDataTypes(symbolSecurityType, resolution, isCanonical); } /// /// Sets the Subscription Manager /// public void SetDataManager(IAlgorithmSubscriptionManager subscriptionManager) { _subscriptionManager = subscriptionManager; } /// /// Checks if the subscription is valid for the consolidator /// /// The subscription configuration /// The consolidator /// The desired tick type for the subscription. If not given is null. /// true if the subscription is valid for the consolidator public static bool IsSubscriptionValidForConsolidator(SubscriptionDataConfig subscription, IDataConsolidator consolidator, TickType? desiredTickType = null) { // Ensure the consolidator can accept data of the subscription's type if (!consolidator.InputType.IsAssignableFrom(subscription.Type)) { return false; } if (subscription.Type == typeof(Tick)) { if (desiredTickType == null) { if (!LeanData.IsCommonLeanDataType(consolidator.OutputType)) { return true; } var tickType = LeanData.GetCommonTickTypeForCommonDataTypes(consolidator.OutputType, subscription.Symbol.SecurityType); return subscription.TickType == tickType; } return subscription.TickType == desiredTickType; } // For non-Tick data, the subscription is valid if its type is compatible with the consolidator's input type return true; } /// /// Returns true if the provided data is the default data type associated with it's . /// This is useful to determine if a data point should be used/cached in an environment where consumers will not provider a data type and we want to preserve /// determinism and backwards compatibility when there are multiple data types available per or new ones added. /// /// Temporary until we have a dictionary for the default data type per security type see GH issue 4196. /// Internal so it's only accessible from this assembly. internal static bool IsDefaultDataType(BaseData data) { switch (data.Symbol.SecurityType) { case SecurityType.Equity: if (data.DataType == MarketDataType.QuoteBar || data.DataType == MarketDataType.Tick && (data as Tick).TickType == TickType.Quote) { return false; } break; } return true; } } }