/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using QuantConnect.Benchmarks; using QuantConnect.Data; using QuantConnect.Data.UniverseSelection; using QuantConnect.Interfaces; using QuantConnect.Logging; using QuantConnect.Securities; using QuantConnect.Util; using QuantConnect.Data.Fundamental; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Provides methods for apply the results of universe selection to an algorithm /// public class UniverseSelection { private IDataFeedSubscriptionManager _dataManager; private readonly IAlgorithm _algorithm; private readonly ISecurityService _securityService; private readonly Dictionary> _pendingSecurityAdditions = new Dictionary>(); private readonly PendingRemovalsManager _pendingRemovalsManager; private readonly CurrencySubscriptionDataConfigManager _currencySubscriptionDataConfigManager; private readonly InternalSubscriptionManager _internalSubscriptionManager; private bool _initializedSecurityBenchmark; private bool _anyDoesNotHaveFundamentalDataWarningLogged; private readonly SecurityChangesConstructor _securityChangesConstructor; /// /// Initializes a new instance of the class /// /// The algorithm to add securities to /// The security service /// The data permissions manager /// The data provider to use /// The resolution to use for internal configuration public UniverseSelection( IAlgorithm algorithm, ISecurityService securityService, IDataPermissionManager dataPermissionManager, IDataProvider dataProvider, Resolution internalConfigResolution = Resolution.Minute) { _algorithm = algorithm; _securityService = securityService; _pendingRemovalsManager = new PendingRemovalsManager(algorithm.Transactions); _currencySubscriptionDataConfigManager = new CurrencySubscriptionDataConfigManager(algorithm.Portfolio.CashBook, algorithm.Securities, algorithm.SubscriptionManager, _securityService, Resolution.Minute); // TODO: next step is to merge currency internal subscriptions under the same 'internal manager' instance and we could move this directly into the DataManager class _internalSubscriptionManager = new InternalSubscriptionManager(_algorithm, internalConfigResolution); _securityChangesConstructor = new SecurityChangesConstructor(); } /// /// Sets the data manager /// public void SetDataManager(IDataFeedSubscriptionManager dataManager) { if (_dataManager != null) { throw new Exception("UniverseSelection.SetDataManager(): can only be set once"); } _dataManager = dataManager; _internalSubscriptionManager.Added += (sender, request) => { _dataManager.AddSubscription(request); }; _internalSubscriptionManager.Removed += (sender, request) => { _dataManager.RemoveSubscription(request.Configuration); }; } /// /// Applies universe selection the the data feed and algorithm /// /// The universe to perform selection on /// The current date time in utc /// The data provided to perform selection with public SecurityChanges ApplyUniverseSelection(Universe universe, DateTime dateTimeUtc, BaseDataCollection universeData) { var algorithmEndDateUtc = _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone); if (dateTimeUtc > algorithmEndDateUtc) { return SecurityChanges.None; } IEnumerable selectSymbolsResult; // check if this universe must be filtered with fine fundamental data Universe fineFiltered = (universe as FineFundamentalFilteredUniverse)?.FineFundamentalUniverse; fineFiltered ??= (universe as FundamentalFilteredUniverse)?.FundamentalUniverse; if (fineFiltered != null // if the universe has been disposed we don't perform selection. This us handled bellow by 'Universe.PerformSelection' // but in this case we directly call 'SelectSymbols' because we want to perform fine selection even if coarse returns the same // symbols, see 'Universe.PerformSelection', which detects this and returns 'Universe.Unchanged' && !universe.DisposeRequested) { // perform initial filtering and limit the result selectSymbolsResult = universe.SelectSymbols(dateTimeUtc, universeData); if (!ReferenceEquals(selectSymbolsResult, Universe.Unchanged)) { // prepare a BaseDataCollection of FineFundamental instances var fineCollection = new BaseDataCollection(); // if the input is already fundamental data we just need to filter it and pass it through var hasFundamentalData = universeData.Data.Count > 0 && universeData.Data[0] is Fundamental; if(hasFundamentalData) { // Remove selected symbols that does not have fine fundamental data var anyDoesNotHaveFundamentalData = false; // only pre filter selected symbols if there actually is any fundamental data. This way we can support custom universe filtered by fine fundamental data // which do not use coarse data as underlying, in which case it could happen that we try to load fine fundamental data that is missing, but no problem, // 'FineFundamentalSubscriptionEnumeratorFactory' won't emit it var set = selectSymbolsResult.ToHashSet(); fineCollection.Data.AddRange(universeData.Data.OfType().Where(fundamental => { // we remove to we distict by symbol if (set.Remove(fundamental.Symbol)) { if (!fundamental.HasFundamentalData) { anyDoesNotHaveFundamentalData = true; return false; } return true; } return false; })); if (!_anyDoesNotHaveFundamentalDataWarningLogged && anyDoesNotHaveFundamentalData) { _algorithm.Debug("Note: Your coarse selection filter was updated to exclude symbols without fine fundamental data. Make sure your coarse filter excludes symbols where HasFundamental is false."); _anyDoesNotHaveFundamentalDataWarningLogged = true; } } else { // we need to load the fundamental data var currentTime = dateTimeUtc.ConvertFromUtc(TimeZones.NewYork); foreach (var symbol in selectSymbolsResult) { fineCollection.Data.Add(new Fundamental(currentTime, symbol)); } } universeData.Data = fineCollection.Data; // perform the fine fundamental universe selection selectSymbolsResult = fineFiltered.PerformSelection(dateTimeUtc, fineCollection); } } else { // perform initial filtering and limit the result selectSymbolsResult = universe.PerformSelection(dateTimeUtc, universeData); } if (!ReferenceEquals(selectSymbolsResult, Universe.Unchanged)) { // materialize the enumerable into a set for processing universe.Selected = selectSymbolsResult.ToHashSet(); } // first check for no pending removals, even if the universe selection // didn't change we might need to remove a security because a position was closed RemoveSecurityFromUniverse( _pendingRemovalsManager.CheckPendingRemovals(universe.Selected, universe), dateTimeUtc, algorithmEndDateUtc); // check for no changes second if (ReferenceEquals(selectSymbolsResult, Universe.Unchanged)) { return SecurityChanges.None; } // determine which data subscriptions need to be removed from this universe foreach (var member in universe.Securities.Values.OrderBy(member => member.Security.Symbol.SecurityType).ThenBy(x => x.Security.Symbol.ID)) { var security = member.Security; // if we've selected this subscription again, keep it if (universe.Selected.Contains(security.Symbol)) continue; // don't remove if the universe wants to keep him in if (!universe.CanRemoveMember(dateTimeUtc, security)) continue; if (!member.Security.IsDelisted) { // TODO: here we are not checking if other universes have this security still selected _securityChangesConstructor.Remove(member.Security, member.IsInternal); } RemoveSecurityFromUniverse(_pendingRemovalsManager.TryRemoveMember(security, universe), dateTimeUtc, algorithmEndDateUtc); } Dictionary pendingAdditions; if (!_pendingSecurityAdditions.TryGetValue(dateTimeUtc, out pendingAdditions)) { // if the frontier moved forward then we've added these securities to the algorithm _pendingSecurityAdditions.Clear(); // keep track of created securities so we don't create the same security twice, leads to bad things :) pendingAdditions = new Dictionary(); _pendingSecurityAdditions[dateTimeUtc] = pendingAdditions; } // find new selections and add them to the algorithm foreach (var symbol in universe.Selected) { if (universe.Securities.ContainsKey(symbol)) { // if its already part of the universe no need to re add it continue; } Security underlying = null; if (symbol.HasUnderlying) { underlying = GetOrCreateSecurity(pendingAdditions, symbol.Underlying, universe.UniverseSettings); } // create the new security, the algorithm thread will add this at the appropriate time var security = GetOrCreateSecurity(pendingAdditions, symbol, universe.UniverseSettings, underlying); var addedSubscription = false; var dataFeedAdded = false; var internalFeed = true; foreach (var request in universe.GetSubscriptionRequests(security, dateTimeUtc, algorithmEndDateUtc, _algorithm.SubscriptionManager.SubscriptionDataConfigService)) { if (!request.TradableDaysInDataTimeZone.Any()) { // Remove the config from the data manager. universe.GetSubscriptionRequests() might have added the configs _dataManager.RemoveSubscription(request.Configuration, universe); continue; } if (security.Symbol == request.Configuration.Symbol // Just in case check its the same symbol, else AddData will throw. && !security.Subscriptions.Contains(request.Configuration)) { // For now this is required for retro compatibility with usages of security.Subscriptions security.AddData(request.Configuration); } var toRemove = _currencySubscriptionDataConfigManager.GetSubscriptionDataConfigToRemove(request.Configuration.Symbol); if (toRemove != null) { Log.Trace($"UniverseSelection.ApplyUniverseSelection(): Removing internal currency data feed {toRemove}"); _dataManager.RemoveSubscription(toRemove); } // 'dataFeedAdded' will help us notify the user for security changes only once per non internal subscription // for example two universes adding the sample configuration, we don't want two notifications dataFeedAdded = _dataManager.AddSubscription(request); // only update our security changes if we actually added data if (!request.IsUniverseSubscription) { addedSubscription = true; // if any config isn't internal then it's not internal internalFeed &= request.Configuration.IsInternalFeed; _internalSubscriptionManager.AddedSubscriptionRequest(request); } } if (addedSubscription) { var addedMember = universe.AddMember(dateTimeUtc, security, internalFeed); if (addedMember && dataFeedAdded) { _securityChangesConstructor.Add(security, internalFeed); } } } var securityChanges = _securityChangesConstructor.Flush(); // Add currency data feeds that weren't explicitly added in Initialize if (securityChanges.AddedSecurities.Count > 0) { EnsureCurrencyDataFeeds(securityChanges); } if (securityChanges != SecurityChanges.None && Log.DebuggingEnabled) { // for performance lets not create the message string if debugging is not enabled // this can be executed many times and its in the algorithm thread Log.Debug("UniverseSelection.ApplyUniverseSelection(): " + dateTimeUtc + ": " + securityChanges); } return securityChanges; } /// /// Will add any pending internal currency subscriptions /// /// The current date time in utc /// Will return true if any subscription was added public bool AddPendingInternalDataFeeds(DateTime utcStart) { var added = false; if (!_initializedSecurityBenchmark) { _initializedSecurityBenchmark = true; var securityBenchmark = _algorithm.Benchmark as SecurityBenchmark; if (securityBenchmark != null) { var resolution = _algorithm.LiveMode ? Resolution.Minute : Resolution.Hour; // Check that the tradebar subscription we are using can support this resolution GH #5893 var subscriptionType = _algorithm.SubscriptionManager.SubscriptionDataConfigService.LookupSubscriptionConfigDataTypes(securityBenchmark.Security.Type, resolution, securityBenchmark.Security.Symbol.IsCanonical()).First(); var symbol = securityBenchmark.Security.Symbol; var isCustomData = false; // Check if the benchmark security is a custom data in order to make sure we get the correct // type if (symbol.SecurityType == SecurityType.Base) { var symbolDataConfigs = _algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol); if (symbolDataConfigs.Any()) { subscriptionType = new Tuple(symbolDataConfigs.First().Type, TickType.Trade); isCustomData = true; } } var baseInstance = subscriptionType.Item1.GetBaseDataInstance(); baseInstance.Symbol = securityBenchmark.Security.Symbol; var supportedResolutions = baseInstance.SupportedResolutions(); if (!supportedResolutions.Contains(resolution)) { resolution = supportedResolutions.OrderByDescending(x => x).First(); } var subscriptionList = new List>() {subscriptionType}; var dataConfig = _algorithm.SubscriptionManager.SubscriptionDataConfigService.Add( securityBenchmark.Security.Symbol, resolution, isInternalFeed: true, fillForward: false, isCustomData: isCustomData, subscriptionDataTypes: subscriptionList ).First(); // we want to start from the previous tradable bar so the benchmark security // never has 0 price var previousTradableBar = Time.GetStartTimeForTradeBars( securityBenchmark.Security.Exchange.Hours, utcStart.ConvertFromUtc(securityBenchmark.Security.Exchange.TimeZone), _algorithm.LiveMode ? Time.OneMinute : Time.OneDay, 1, false, dataConfig.DataTimeZone, LeanData.UseStrictEndTime(_algorithm.Settings.DailyPreciseEndTime, securityBenchmark.Security.Symbol, _algorithm.LiveMode ? Time.OneMinute : Time.OneDay, securityBenchmark.Security.Exchange.Hours) ).ConvertToUtc(securityBenchmark.Security.Exchange.TimeZone); if (dataConfig != null) { added |= _dataManager.AddSubscription(new SubscriptionRequest( false, null, securityBenchmark.Security, dataConfig, previousTradableBar, _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone))); Log.Trace($"UniverseSelection.AddPendingInternalDataFeeds(): Adding internal benchmark data feed {dataConfig}"); } } } if (_currencySubscriptionDataConfigManager.UpdatePendingSubscriptionDataConfigs(_algorithm.BrokerageModel)) { foreach (var subscriptionDataConfig in _currencySubscriptionDataConfigManager .GetPendingSubscriptionDataConfigs()) { var security = _algorithm.Securities[subscriptionDataConfig.Symbol]; added |= _dataManager.AddSubscription(new SubscriptionRequest( false, null, security, subscriptionDataConfig, utcStart, _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone))); } } return added; } /// /// Checks the current subscriptions and adds necessary currency pair feeds to provide real time conversion data /// public void EnsureCurrencyDataFeeds(SecurityChanges securityChanges) { _currencySubscriptionDataConfigManager.EnsureCurrencySubscriptionDataConfigs(securityChanges, _algorithm.BrokerageModel); } /// /// Handles the delisting process of the given data symbol from the algorithm securities /// public SecurityChanges HandleDelisting(BaseData data, bool isInternalFeed) { if (_algorithm.Securities.TryGetValue(data.Symbol, out var security)) { // don't allow users to open a new position once delisted security.IsDelisted = true; security.Reset(); // Add the security removal to the security changes but only if not pending for removal. // If pending, the removed change event was already emitted for this security if (_algorithm.Securities.Remove(data.Symbol) && !_pendingRemovalsManager.PendingRemovals.Values.Any(x => x.Any(y => y.Symbol == data.Symbol))) { _securityChangesConstructor.Remove(security, isInternalFeed); return _securityChangesConstructor.Flush(); } } return SecurityChanges.None; } private void RemoveSecurityFromUniverse( List removedMembers, DateTime dateTimeUtc, DateTime algorithmEndDateUtc) { if (removedMembers == null) { return; } foreach (var removedMember in removedMembers) { var universe = removedMember.Universe; var member = removedMember.Security; // safe to remove the member from the universe universe.RemoveMember(dateTimeUtc, member); var isActive = _algorithm.UniverseManager.ActiveSecurities.ContainsKey(member.Symbol); foreach (var subscription in universe.GetSubscriptionRequests(member, dateTimeUtc, algorithmEndDateUtc, _algorithm.SubscriptionManager.SubscriptionDataConfigService)) { if (_dataManager.RemoveSubscription(subscription.Configuration, universe)) { _internalSubscriptionManager.RemovedSubscriptionRequest(subscription); // if not used by any universe if (!isActive) { member.Reset(); // We need to mark this security as untradeable while it has no data subscription // it is expected that this function is called while in sync with the algo thread, // so we can make direct edits to the security here. // We only clear the cache once the subscription is removed from the data stack // Note: Security.Reset() won't clear the cache, it only clears the data subscription // and marks it as non-tradable, because in some cases the cache needs to be kept, // like when delisting, which could lead to a liquidation or option exercise. member.Cache.Reset(); _algorithm.Securities.Remove(member.Symbol); } } } } } private Security GetOrCreateSecurity(Dictionary pendingAdditions, Symbol symbol, UniverseSettings universeSettings, Security underlying = null) { // create the new security, the algorithm thread will add this at the appropriate time Security security; if (!pendingAdditions.TryGetValue(symbol, out security)) { security = _securityService.CreateSecurity(symbol, new List(), universeSettings.Leverage, symbol.ID.SecurityType.IsOption(), underlying); pendingAdditions.Add(symbol, security); } return security; } } }