/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections.Generic; using System.Collections.Specialized; using System.Linq; using QuantConnect.Data; using QuantConnect.Data.Auxiliary; using QuantConnect.Data.Market; using QuantConnect.Data.UniverseSelection; using QuantConnect.Interfaces; using QuantConnect.Logging; using QuantConnect.Securities; using QuantConnect.Util; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// DataManager will manage the subscriptions for both the DataFeeds and the SubscriptionManager /// public class DataManager : IAlgorithmSubscriptionManager, IDataFeedSubscriptionManager, IDataManager { private readonly IDataFeed _dataFeed; private readonly MarketHoursDatabase _marketHoursDatabase; private readonly ITimeKeeper _timeKeeper; private readonly bool _liveMode; private bool _sentUniverseScheduleWarning; private readonly IRegisteredSecurityDataTypesProvider _registeredTypesProvider; private readonly IDataPermissionManager _dataPermissionManager; private List _subscriptionDataConfigsEnumerator; /// There is no ConcurrentHashSet collection in .NET, /// so we use ConcurrentDictionary with byte value to minimize memory usage private readonly Dictionary _subscriptionManagerSubscriptions = new(); /// /// Event fired when a new subscription is added /// public event EventHandler SubscriptionAdded; /// /// Event fired when an existing subscription is removed /// public event EventHandler SubscriptionRemoved; /// /// Creates a new instance of the DataManager /// public DataManager( IDataFeed dataFeed, UniverseSelection universeSelection, IAlgorithm algorithm, ITimeKeeper timeKeeper, MarketHoursDatabase marketHoursDatabase, bool liveMode, IRegisteredSecurityDataTypesProvider registeredTypesProvider, IDataPermissionManager dataPermissionManager) { _dataFeed = dataFeed; UniverseSelection = universeSelection; UniverseSelection.SetDataManager(this); AvailableDataTypes = SubscriptionManager.DefaultDataTypes(); _timeKeeper = timeKeeper; _marketHoursDatabase = marketHoursDatabase; _liveMode = liveMode; _registeredTypesProvider = registeredTypesProvider; _dataPermissionManager = dataPermissionManager; // wire ourselves up to receive notifications when universes are added/removed algorithm.UniverseManager.CollectionChanged += (sender, args) => { switch (args.Action) { case NotifyCollectionChangedAction.Add: foreach (var universe in args.NewItems.OfType()) { var config = universe.Configuration; var start = algorithm.UtcTime; var end = algorithm.LiveMode ? Time.EndOfTime : algorithm.EndDate.ConvertToUtc(algorithm.TimeZone); Security security; if (!algorithm.Securities.TryGetValue(config.Symbol, out security)) { // create a canonical security object if it doesn't exist security = new Security( _marketHoursDatabase.GetExchangeHours(config), config, algorithm.Portfolio.CashBook[algorithm.AccountCurrency], SymbolProperties.GetDefault(algorithm.AccountCurrency), algorithm.Portfolio.CashBook, RegisteredSecurityDataTypesProvider.Null, new SecurityCache() ); } // Let's adjust the start time to the previous tradable date // so universe selection always happens right away at the start of the algorithm. var universeType = universe.GetType(); if ( // We exclude the UserDefinedUniverse because their selection already happens at the algorithm start time. // For instance, ETFs universe selection depends its first trigger time to be before the equity universe // (the UserDefinedUniverse), because the ETFs are EndTime-indexed and that would make their first selection // time to be before the algorithm start time, with the EndTime being the algorithms's start date, // and both the Equity and the ETFs constituents first selection to happen together. !universeType.IsAssignableTo(typeof(UserDefinedUniverse)) && // We exclude the ScheduledUniverse because it's already scheduled to run at a specific time. // Adjusting the start time would cause the first selection trigger time to be before the algorithm start time, // making the selection to be triggered at the first algorithm time, which would be the exact StartDate. universeType != typeof(ScheduledUniverse)) { const int maximumLookback = 60; var loopCount = 0; var startLocalTime = start.ConvertFromUtc(security.Exchange.TimeZone); if (universe.UniverseSettings.Schedule.Initialized) { do { // determine if there's a scheduled selection time at the current start local time date, note that next // we get the previous day of the first scheduled date we find, so we are sure the data is available to trigger selection if (universe.UniverseSettings.Schedule.Get(startLocalTime.Date, startLocalTime.Date).Any()) { break; } startLocalTime = startLocalTime.AddDays(-1); if (++loopCount >= maximumLookback) { // fallback to the original, we found none startLocalTime = algorithm.UtcTime.ConvertFromUtc(security.Exchange.TimeZone); if (!_sentUniverseScheduleWarning) { // just in case _sentUniverseScheduleWarning = true; algorithm.Debug($"Warning: Found no valid start time for scheduled universe, will use default"); } } } while (loopCount < maximumLookback); } startLocalTime = Time.GetStartTimeForTradeBars(security.Exchange.Hours, startLocalTime, // disable universe selection on extended market hours, for example futures/index options have a sunday pre market we are not interested on Time.OneDay, 1, extendedMarketHours: false, config.DataTimeZone, LeanData.UseDailyStrictEndTimes(algorithm.Settings, config.Type, security.Symbol, Time.OneDay, security.Exchange.Hours)); start = startLocalTime.ConvertToUtc(security.Exchange.TimeZone); } AddSubscription( new SubscriptionRequest(true, universe, security, config, start, end)); } break; case NotifyCollectionChangedAction.Remove: foreach (var universe in args.OldItems.OfType()) { // removing the subscription will be handled by the SubscriptionSynchronizer // in the next loop as well as executing a UniverseSelection one last time. if (!universe.DisposeRequested) { universe.Dispose(); } } break; default: throw new NotImplementedException("The specified action is not implemented: " + args.Action); } }; DataFeedSubscriptions = new SubscriptionCollection(); if (!_liveMode) { DataFeedSubscriptions.FillForwardResolutionChanged += (object sender, FillForwardResolutionChangedEvent changedEvent) => { var requests = DataFeedSubscriptions // we don't fill forward tick resolution so we don't need to touch their subscriptions .Where(subscription => subscription.Configuration.FillDataForward && subscription.Configuration.Resolution != Resolution.Tick) .SelectMany(subscription => subscription.SubscriptionRequests) .ToList(); if(requests.Count > 0) { Log.Trace($"DataManager(): Fill forward resolution has changed from {changedEvent.Old} to {changedEvent.New} at utc: {algorithm.UtcTime}. " + $"Restarting {requests.Count} subscriptions..."); // disable reentry while we remove and re add DataFeedSubscriptions.FreezeFillForwardResolution(true); // remove foreach (var request in requests) { // force because we want them actually removed even if still a member of the universe, because the FF res changed // which means we will drop any data points that could be in the next potential slice being created RemoveSubscriptionInternal(request.Configuration, universe: request.Universe, forceSubscriptionRemoval: true); } // re add foreach (var request in requests) { // If it is an add we will set time 1 tick ahead to properly sync data // with next timeslice, avoid emitting now twice. // We do the same in the 'TimeTriggeredUniverseSubscriptionEnumeratorFactory' when handling changes var startUtc = algorithm.UtcTime; // If the algorithm is not initialized (locked) the request start time can be even before the algorithm start time, // like in the case of universe requests that are scheduled to run at a specific time in the past for immediate selection. if (!algorithm.GetLocked() && request.StartTimeUtc < startUtc) { startUtc = request.StartTimeUtc; } AddSubscription(new SubscriptionRequest(request, startTimeUtc: startUtc.AddTicks(1), configuration: new SubscriptionDataConfig(request.Configuration))); } DataFeedSubscriptions.FreezeFillForwardResolution(false); } }; } } #region IDataFeedSubscriptionManager /// /// Gets the data feed subscription collection /// public SubscriptionCollection DataFeedSubscriptions { get; } /// /// Will remove all current /// public void RemoveAllSubscriptions() { // remove each subscription from our collection foreach (var subscription in DataFeedSubscriptions) { try { RemoveSubscription(subscription.Configuration); } catch (Exception err) { Log.Error(err, "DataManager.RemoveAllSubscriptions():" + $"Error removing: {subscription.Configuration}"); } } } /// /// Adds a new to provide data for the specified security. /// /// Defines the to be added /// True if the subscription was created and added successfully, false otherwise public bool AddSubscription(SubscriptionRequest request) { lock (_subscriptionManagerSubscriptions) { // guarantee the configuration is present in our config collection // this is related to GH issue 3877: where we added a configuration which we also removed if(_subscriptionManagerSubscriptions.TryAdd(request.Configuration, request.Configuration)) { _subscriptionDataConfigsEnumerator = null; } } Subscription subscription; if (DataFeedSubscriptions.TryGetValue(request.Configuration, out subscription)) { // duplicate subscription request subscription.AddSubscriptionRequest(request); // only result true if the existing subscription is internal, we actually added something from the users perspective return subscription.Configuration.IsInternalFeed; } if (request.Configuration.DataNormalizationMode == DataNormalizationMode.ScaledRaw) { throw new InvalidOperationException($"{DataNormalizationMode.ScaledRaw} normalization mode only intended for history requests."); } // before adding the configuration to the data feed let's assert it's valid _dataPermissionManager.AssertConfiguration(request.Configuration, request.StartTimeLocal, request.EndTimeLocal); subscription = _dataFeed.CreateSubscription(request); if (subscription == null) { Log.Trace($"DataManager.AddSubscription(): Unable to add subscription for: {request.Configuration}"); // subscription will be null when there's no tradeable dates for the security between the requested times, so // don't even try to load the data return false; } if (_liveMode) { OnSubscriptionAdded(subscription); Log.Trace($"DataManager.AddSubscription(): Added {request.Configuration}." + $" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}"); } else if(Log.DebuggingEnabled) { // for performance lets not create the message string if debugging is not enabled // this can be executed many times and its in the algorithm thread Log.Debug($"DataManager.AddSubscription(): Added {request.Configuration}." + $" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}"); } return DataFeedSubscriptions.TryAdd(subscription); } /// /// Removes the , if it exists /// /// The of the subscription to remove /// Universe requesting to remove . /// Default value, null, will remove all universes /// True if the subscription was successfully removed, false otherwise public bool RemoveSubscription(SubscriptionDataConfig configuration, Universe universe = null) { return RemoveSubscriptionInternal(configuration, universe, forceSubscriptionRemoval: false); } /// /// Removes the , if it exists /// /// The of the subscription to remove /// Universe requesting to remove . /// Default value, null, will remove all universes /// We force the subscription removal by marking it as removed from universe, so that all it's data is dropped /// True if the subscription was successfully removed, false otherwise private bool RemoveSubscriptionInternal(SubscriptionDataConfig configuration, Universe universe, bool forceSubscriptionRemoval) { // remove the subscription from our collection, if it exists Subscription subscription; if (DataFeedSubscriptions.TryGetValue(configuration, out subscription)) { // we remove the subscription when there are no other requests left if (subscription.RemoveSubscriptionRequest(universe)) { if (!DataFeedSubscriptions.TryRemove(configuration, out subscription)) { Log.Error($"DataManager.RemoveSubscription(): Unable to remove {configuration}"); return false; } _dataFeed.RemoveSubscription(subscription); if (_liveMode) { OnSubscriptionRemoved(subscription); } subscription.Dispose(); RemoveSubscriptionDataConfig(subscription); if (forceSubscriptionRemoval) { subscription.MarkAsRemovedFromUniverse(); } if (_liveMode) { Log.Trace($"DataManager.RemoveSubscription(): Removed {configuration}"); } else if(Log.DebuggingEnabled) { // for performance lets not create the message string if debugging is not enabled // this can be executed many times and its in the algorithm thread Log.Debug($"DataManager.RemoveSubscription(): Removed {configuration}"); } return true; } } else if (universe != null) { // a universe requested removal of a subscription which wasn't present anymore, this can happen when a subscription ends // it will get removed from the data feed subscription list, but the configuration will remain until the universe removes it // why? the effect I found is that the fill models are using these subscriptions to determine which data they could use lock (_subscriptionManagerSubscriptions) { if (_subscriptionManagerSubscriptions.Remove(configuration)) { _subscriptionDataConfigsEnumerator = null; } } } return false; } /// /// Event invocator for the event /// /// The added subscription private void OnSubscriptionAdded(Subscription subscription) { SubscriptionAdded?.Invoke(this, subscription); } /// /// Event invocator for the event /// /// The removed subscription private void OnSubscriptionRemoved(Subscription subscription) { SubscriptionRemoved?.Invoke(this, subscription); } #endregion #region IAlgorithmSubscriptionManager /// /// Gets all the current data config subscriptions that are being processed for the SubscriptionManager /// public IEnumerable SubscriptionManagerSubscriptions { get { lock (_subscriptionManagerSubscriptions) { if(_subscriptionDataConfigsEnumerator == null) { _subscriptionDataConfigsEnumerator = _subscriptionManagerSubscriptions.Values.ToList(); } return _subscriptionDataConfigsEnumerator; } } } /// /// Gets existing or adds new /// /// Returns the SubscriptionDataConfig instance used public SubscriptionDataConfig SubscriptionManagerGetOrAdd(SubscriptionDataConfig newConfig) { SubscriptionDataConfig config; lock (_subscriptionManagerSubscriptions) { if (!_subscriptionManagerSubscriptions.TryGetValue(newConfig, out config)) { _subscriptionManagerSubscriptions[newConfig] = config = newConfig; _subscriptionDataConfigsEnumerator = null; } } // if the reference is not the same, means it was already there and we did not add anything new if (!ReferenceEquals(config, newConfig)) { // for performance lets not create the message string if debugging is not enabled // this can be executed many times and its in the algorithm thread if (Log.DebuggingEnabled) { Log.Debug("DataManager.SubscriptionManagerGetOrAdd(): subscription already added: " + config); } } else { // add the time zone to our time keeper _timeKeeper.AddTimeZone(newConfig.ExchangeTimeZone); } return config; } /// /// Will try to remove a and update the corresponding /// consumers accordingly /// /// The owning the configuration to remove private void RemoveSubscriptionDataConfig(Subscription subscription) { // the subscription could of ended but might still be part of the universe if (subscription.RemovedFromUniverse.Value) { lock (_subscriptionManagerSubscriptions) { if (_subscriptionManagerSubscriptions.Remove(subscription.Configuration)) { _subscriptionDataConfigsEnumerator = null; } } } } /// /// Returns the amount of data config subscriptions processed for the SubscriptionManager /// public int SubscriptionManagerCount() { lock (_subscriptionManagerSubscriptions) { return _subscriptionManagerSubscriptions.Count; } } #region ISubscriptionDataConfigService /// /// The different each supports /// public Dictionary> AvailableDataTypes { get; } /// /// Creates and adds a list of for a given symbol and configuration. /// Can optionally pass in desired subscription data type to use. /// If the config already existed will return existing instance instead /// public SubscriptionDataConfig Add( Type dataType, Symbol symbol, Resolution? resolution = null, bool fillForward = true, bool extendedMarketHours = false, bool isFilteredSubscription = true, bool isInternalFeed = false, bool isCustomData = false, DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted, DataMappingMode dataMappingMode = DataMappingMode.OpenInterest, uint contractDepthOffset = 0 ) { return Add(symbol, resolution, fillForward, extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData, new List> { new Tuple(dataType, LeanData.GetCommonTickTypeForCommonDataTypes(dataType, symbol.SecurityType))}, dataNormalizationMode, dataMappingMode, contractDepthOffset) .First(); } /// /// Creates and adds a list of for a given symbol and configuration. /// Can optionally pass in desired subscription data types to use. /// If the config already existed will return existing instance instead /// public List Add( Symbol symbol, Resolution? resolution = null, bool fillForward = true, bool extendedMarketHours = false, bool isFilteredSubscription = true, bool isInternalFeed = false, bool isCustomData = false, List> subscriptionDataTypes = null, DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted, DataMappingMode dataMappingMode = DataMappingMode.OpenInterest, uint contractDepthOffset = 0 ) { var dataTypes = subscriptionDataTypes; if(dataTypes == null) { if (symbol.SecurityType == SecurityType.Base && SecurityIdentifier.TryGetCustomDataTypeInstance(symbol.ID.Symbol, out var type)) { // we've detected custom data request if we find a type let's use it dataTypes = new List> { new Tuple(type, TickType.Trade) }; } else { dataTypes = LookupSubscriptionConfigDataTypes(symbol.SecurityType, resolution ?? Resolution.Minute, symbol.IsCanonical()); } } if (!dataTypes.Any()) { throw new ArgumentNullException(nameof(dataTypes), "At least one type needed to create new subscriptions"); } var resolutionWasProvided = resolution.HasValue; foreach (var typeTuple in dataTypes) { var baseInstance = typeTuple.Item1.GetBaseDataInstance(); baseInstance.Symbol = symbol; if (!resolutionWasProvided) { var defaultResolution = baseInstance.DefaultResolution(); if (resolution.HasValue && resolution != defaultResolution) { // we are here because there are multiple 'dataTypes'. // if we get different default resolutions lets throw, this shouldn't happen throw new InvalidOperationException( $"Different data types ({string.Join(",", dataTypes.Select(tuple => tuple.Item1))})" + $" provided different default resolutions {defaultResolution} and {resolution}, this is an unexpected invalid operation."); } resolution = defaultResolution; } else { // only assert resolution in backtesting, live can use other data source // for example daily data for options if (!_liveMode) { var supportedResolutions = baseInstance.SupportedResolutions(); if (supportedResolutions.Contains(resolution.Value)) { continue; } throw new ArgumentException($"Sorry {resolution.ToStringInvariant()} is not a supported resolution for {typeTuple.Item1.Name}" + $" and SecurityType.{symbol.SecurityType.ToStringInvariant()}." + $" Please change your AddData to use one of the supported resolutions ({string.Join(",", supportedResolutions)})."); } } } var marketHoursDbEntry = _marketHoursDatabase.GetEntry(symbol, dataTypes.Select(tuple => tuple.Item1)); var exchangeHours = marketHoursDbEntry.ExchangeHours; if (symbol.ID.SecurityType.IsOption() || symbol.ID.SecurityType == SecurityType.Index) { dataNormalizationMode = DataNormalizationMode.Raw; } if (marketHoursDbEntry.DataTimeZone == null) { throw new ArgumentNullException(nameof(marketHoursDbEntry.DataTimeZone), "DataTimeZone is a required parameter for new subscriptions. Set to the time zone the raw data is time stamped in."); } if (exchangeHours.TimeZone == null) { throw new ArgumentNullException(nameof(exchangeHours.TimeZone), "ExchangeTimeZone is a required parameter for new subscriptions. Set to the time zone the security exchange resides in."); } var result = (from subscriptionDataType in dataTypes let dataType = subscriptionDataType.Item1 let tickType = subscriptionDataType.Item2 select new SubscriptionDataConfig( dataType, symbol, resolution.Value, marketHoursDbEntry.DataTimeZone, exchangeHours.TimeZone, fillForward, extendedMarketHours, // if the subscription data types were not provided and the tick type is OpenInterest we make it internal subscriptionDataTypes == null && tickType == TickType.OpenInterest || isInternalFeed, isCustomData, isFilteredSubscription: isFilteredSubscription, tickType: tickType, dataNormalizationMode: dataNormalizationMode, dataMappingMode: dataMappingMode, contractDepthOffset: contractDepthOffset)).ToList(); for (int i = 0; i < result.Count; i++) { result[i] = SubscriptionManagerGetOrAdd(result[i]); // track all registered data types _registeredTypesProvider.RegisterType(result[i].Type); } return result; } /// /// Get the data feed types for a given /// /// The used to determine the types /// The resolution of the data requested /// Indicates whether the security is Canonical (future and options) /// Types that should be added to the /// TODO: data type additions are very related to ticktype and should be more generic/independent of each other public List> LookupSubscriptionConfigDataTypes( SecurityType symbolSecurityType, Resolution resolution, bool isCanonical ) { if (isCanonical) { if (symbolSecurityType.IsOption()) { return new List> { new Tuple(typeof(OptionUniverse), TickType.Quote) }; } return new List> { new Tuple(typeof(FutureUniverse), TickType.Quote) }; } IEnumerable availableDataType = AvailableDataTypes[symbolSecurityType] // Equities will only look for trades in case of low resolutions. .Where(tickType => LeanData.IsValidConfiguration(symbolSecurityType, resolution, tickType)); var result = availableDataType .Select(tickType => new Tuple(LeanData.GetDataType(resolution, tickType), tickType)).ToList(); if(symbolSecurityType == SecurityType.CryptoFuture) { result.Add(new Tuple(typeof(MarginInterestRate), TickType.Quote)); } return result; } /// /// Gets a list of all registered for a given /// /// Will not return internal subscriptions by default public List GetSubscriptionDataConfigs(Symbol symbol = null, bool includeInternalConfigs = false) { lock (_subscriptionManagerSubscriptions) { return _subscriptionManagerSubscriptions.Keys .Where(config => (includeInternalConfigs || !config.IsInternalFeed) && (symbol == null || config.Symbol.ID == symbol.ID)) .OrderBy(config => config.IsInternalFeed) .ToList(); } } #endregion #endregion #region IDataManager /// /// Get the universe selection instance /// public UniverseSelection UniverseSelection { get; } #endregion } }