/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Linq; using QuantConnect.Data; using QuantConnect.Util; using QuantConnect.Logging; using QuantConnect.Packets; using QuantConnect.Interfaces; using QuantConnect.Securities; using System.Collections.Generic; using QuantConnect.Configuration; using QuantConnect.Data.Auxiliary; using QuantConnect.Data.Custom.Tiingo; using QuantConnect.Lean.Engine.Results; using QuantConnect.Data.UniverseSelection; using QuantConnect.Lean.Engine.DataFeeds.Enumerators; using QuantConnect.Lean.Engine.DataFeeds.Enumerators.Factories; using QuantConnect.Data.Fundamental; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Provides an implementation of that is designed to deal with /// live, remote data sources /// public class LiveTradingDataFeed : FileSystemDataFeed { private static readonly int MaximumWarmupHistoryDaysLookBack = Config.GetInt("maximum-warmup-history-days-look-back", 5); private LiveNodePacket _job; // used to get current time private ITimeProvider _timeProvider; private IAlgorithm _algorithm; private ITimeProvider _frontierTimeProvider; private IDataProvider _dataProvider; private IMapFileProvider _mapFileProvider; private IDataQueueHandler _dataQueueHandler; private BaseDataExchange _customExchange; private SubscriptionCollection _subscriptions; private IFactorFileProvider _factorFileProvider; private IDataChannelProvider _channelProvider; // in live trading we delay scheduled universe selection between 11 & 12 hours after midnight UTC so that we allow new selection data to be piped in // NY goes from -4/-5 UTC time, so: // 11 UTC - 4 => 7am NY // 12 UTC - 4 => 8am NY private readonly TimeSpan _scheduledUniverseUtcTimeShift = TimeSpan.FromMinutes(11 * 60 + DateTime.UtcNow.Second); private readonly HashSet _unsupportedConfigurations = new(); /// /// Public flag indicator that the thread is still busy. /// public bool IsActive { get; private set; } /// /// Initializes the data feed for the specified job and algorithm /// public override void Initialize(IAlgorithm algorithm, AlgorithmNodePacket job, IResultHandler resultHandler, IMapFileProvider mapFileProvider, IFactorFileProvider factorFileProvider, IDataProvider dataProvider, IDataFeedSubscriptionManager subscriptionManager, IDataFeedTimeProvider dataFeedTimeProvider, IDataChannelProvider dataChannelProvider) { if (!(job is LiveNodePacket)) { throw new ArgumentException("The LiveTradingDataFeed requires a LiveNodePacket."); } _algorithm = algorithm; _job = (LiveNodePacket)job; _timeProvider = dataFeedTimeProvider.TimeProvider; _dataProvider = dataProvider; _mapFileProvider = mapFileProvider; _factorFileProvider = factorFileProvider; _channelProvider = dataChannelProvider; _frontierTimeProvider = dataFeedTimeProvider.FrontierTimeProvider; _customExchange = GetBaseDataExchange(); _subscriptions = subscriptionManager.DataFeedSubscriptions; _dataQueueHandler = GetDataQueueHandler(); _dataQueueHandler?.SetJob(_job); // run the custom data exchange _customExchange.Start(); IsActive = true; base.Initialize(algorithm, job, resultHandler, mapFileProvider, factorFileProvider, dataProvider, subscriptionManager, dataFeedTimeProvider, dataChannelProvider); } /// /// Creates a new subscription to provide data for the specified security. /// /// Defines the subscription to be added, including start/end times the universe and security /// The created if successful, null otherwise public override Subscription CreateSubscription(SubscriptionRequest request) { Subscription subscription = null; try { // create and add the subscription to our collection subscription = request.IsUniverseSubscription ? CreateUniverseSubscription(request) : CreateDataSubscription(request); } catch (Exception err) { Log.Error(err, $"CreateSubscription(): Failed configuration: '{request.Configuration}'"); // kill the algorithm, this shouldn't happen _algorithm.SetRuntimeError(err, $"Failed to subscribe to {request.Configuration.Symbol}"); } return subscription; } /// /// Removes the subscription from the data feed, if it exists /// /// The subscription to remove public override void RemoveSubscription(Subscription subscription) { var symbol = subscription.Configuration.Symbol; // remove the subscriptions if (!_channelProvider.ShouldStreamSubscription(subscription.Configuration)) { _customExchange.RemoveEnumerator(symbol); } else { _dataQueueHandler.UnsubscribeWithMapping(subscription.Configuration); } } /// /// External controller calls to signal a terminate of the thread. /// public override void Exit() { if (IsActive) { IsActive = false; Log.Trace("LiveTradingDataFeed.Exit(): Start. Setting cancellation token..."); if (_dataQueueHandler is DataQueueHandlerManager manager) { manager.UnsupportedConfiguration -= HandleUnsupportedConfigurationEvent; } _customExchange?.Stop(); Log.Trace("LiveTradingDataFeed.Exit(): Exit Finished."); base.Exit(); } } /// /// Gets the to use by default /// /// Useful for testing /// The loaded protected virtual IDataQueueHandler GetDataQueueHandler() { var result = new DataQueueHandlerManager(_algorithm.Settings); result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent; return result; } /// /// Gets the to use /// /// Useful for testing protected virtual BaseDataExchange GetBaseDataExchange() { return new BaseDataExchange("CustomDataExchange") { SleepInterval = 100 }; } /// /// Creates a new subscription for the specified security /// /// The subscription request /// A new subscription instance of the specified security private Subscription CreateDataSubscription(SubscriptionRequest request) { Subscription subscription = null; // let's keep track of the last point we got from the file based enumerator and start our history enumeration from this point // this is much more efficient since these duplicated points will be dropped by the filter righ away causing memory usage spikes var lastPointTracker = new LastPointTracker(); var localStartTime = request.StartTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone); var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone); var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc); IEnumerator enumerator = null; if (!_channelProvider.ShouldStreamSubscription(request.Configuration)) { if (!Tiingo.IsAuthCodeSet) { // we're not using the SubscriptionDataReader, so be sure to set the auth token here Tiingo.SetAuthCode(Config.Get("tiingo-auth-token")); } var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore); var enumeratorStack = factory.CreateEnumerator(request, _dataProvider); var enqueable = new EnqueueableEnumerator(); _customExchange.AddEnumerator(request.Configuration.Symbol, enumeratorStack, handleData: data => { enqueable.Enqueue(data); subscription?.OnNewDataAvailable(); }); enumerator = enqueable; } else { var auxEnumerators = new List>(); if (LiveAuxiliaryDataEnumerator.TryCreate(request.Configuration, _timeProvider, request.Security.Cache, _mapFileProvider, _factorFileProvider, request.StartTimeLocal, out var auxDataEnumator)) { auxEnumerators.Add(auxDataEnumator); } EventHandler handler = (_, _) => subscription?.OnNewDataAvailable(); enumerator = Subscribe(request.Configuration, handler, IsExpired); if (auxEnumerators.Count > 0) { enumerator = new LiveAuxiliaryDataSynchronizingEnumerator(_timeProvider, request.Configuration.ExchangeTimeZone, enumerator, auxEnumerators); } } // scale prices before 'SubscriptionFilterEnumerator' since it updates securities realtime price // and before fill forwarding so we don't happen to apply twice the factor if (request.Configuration.PricesShouldBeScaled(liveMode: true)) { enumerator = new PriceScaleFactorEnumerator( enumerator, request.Configuration, _factorFileProvider, liveMode: true); } if (request.Configuration.FillDataForward) { var fillForwardResolution = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration); // Pass the security exchange hours explicitly to avoid using the ones in the request, since // those could be different. e.g. when requests are created for open interest data the exchange // hours are set to always open to avoid OI data being filtered out due to the exchange being closed. var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol, request.Configuration.Increment, request.Security.Exchange.Hours); enumerator = new LiveFillForwardEnumerator(_frontierTimeProvider, enumerator, request.Security.Exchange, fillForwardResolution, request.Configuration.ExtendedMarketHours, localStartTime, localEndTime, request.Configuration.Resolution, request.Configuration.DataTimeZone, useDailyStrictEndTimes, request.Configuration.Type, lastPointTracker); } // make our subscriptions aware of the frontier of the data feed, prevents future data from spewing into the feed enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, timeZoneOffsetProvider); // define market hours and user filters to incoming data after the frontier enumerator so during warmup we avoid any realtime data making it's way into the securities if (request.Configuration.IsFilteredSubscription) { enumerator = new SubscriptionFilterEnumerator(enumerator, request.Security, localEndTime, request.Configuration.ExtendedMarketHours, true, request.ExchangeHours); } enumerator = GetWarmupEnumerator(request, enumerator, lastPointTracker); var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, timeZoneOffsetProvider, enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime); subscription = new Subscription(request, subscriptionDataEnumerator, timeZoneOffsetProvider); return subscription; } /// /// Helper method to determine if the symbol associated with the requested configuration is expired or not /// /// This is useful during warmup where we can be requested to add some already expired asset. We want to skip sending it /// to our live instance to avoid explosions. But we do want to add warmup enumerators private bool IsExpired(SubscriptionDataConfig dataConfig) { var mapFile = _mapFileProvider.ResolveMapFile(dataConfig); var delistingDate = dataConfig.Symbol.GetDelistingDate(mapFile); return _timeProvider.GetUtcNow().Date > delistingDate.ConvertToUtc(dataConfig.ExchangeTimeZone); } private IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler, Func isExpired) { return new LiveSubscriptionEnumerator(dataConfig, _dataQueueHandler, newDataAvailableHandler, isExpired); } /// /// Creates a new subscription for universe selection /// /// The subscription request private Subscription CreateUniverseSubscription(SubscriptionRequest request) { Subscription subscription = null; // TODO : Consider moving the creating of universe subscriptions to a separate, testable class // grab the relevant exchange hours var config = request.Universe.Configuration; var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone); var tzOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc); IEnumerator enumerator = null; var timeTriggered = request.Universe as ITimeTriggeredUniverse; if (timeTriggered != null) { Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating user defined universe: {config.Symbol.ID}"); // spoof a tick on the requested interval to trigger the universe selection function var enumeratorFactory = new TimeTriggeredUniverseSubscriptionEnumeratorFactory(timeTriggered, MarketHoursDatabase.FromDataFolder(), _frontierTimeProvider); enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider); enumerator = new FrontierAwareEnumerator(enumerator, _timeProvider, tzOffsetProvider); var enqueueable = new EnqueueableEnumerator(); _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable)); enumerator = enqueueable; } else if (config.Type.IsAssignableTo(typeof(ETFConstituentUniverse)) || config.Type.IsAssignableTo(typeof(FundamentalUniverse)) || request.Universe is OptionChainUniverse || request.Universe is FuturesChainUniverse) { Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}"); // Will try to pull data from the data folder every 10min, file with yesterdays date. // If lean is started today it will trigger initial coarse universe selection var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore, // we adjust time to the previous tradable date time => Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours, time, Time.OneDay, 1, false, config.DataTimeZone, _algorithm.Settings.DailyPreciseEndTime), TimeSpan.FromMinutes(10) ); var enumeratorStack = factory.CreateEnumerator(request, _dataProvider); // aggregates each coarse data point into a single BaseDataCollection var aggregator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, true); var enqueable = new EnqueueableEnumerator(); _customExchange.AddEnumerator(config.Symbol, aggregator, handleData: data => { enqueable.Enqueue(data); subscription?.OnNewDataAvailable(); }); enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider, // advance time if before 23pm or after 5am and not on Saturdays time => time.Hour < 23 && time.Hour > 5 && time.DayOfWeek != DayOfWeek.Saturday); } else { Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating custom universe: " + config.Symbol.ID); var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore); var enumeratorStack = factory.CreateEnumerator(request, _dataProvider); enumerator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, liveMode: true); var enqueueable = new EnqueueableEnumerator(); _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable)); enumerator = enqueueable; } enumerator = AddScheduleWrapper(request, enumerator, new PredicateTimeProvider(_frontierTimeProvider, (currentUtcDateTime) => { // will only let time advance after it's passed the live time shift frontier return currentUtcDateTime.TimeOfDay > _scheduledUniverseUtcTimeShift; })); enumerator = GetWarmupEnumerator(request, enumerator); // create the subscription var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, tzOffsetProvider, enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime); subscription = new Subscription(request, subscriptionDataEnumerator, tzOffsetProvider); return subscription; } /// /// Build and apply the warmup enumerators when required /// private IEnumerator GetWarmupEnumerator(SubscriptionRequest request, IEnumerator liveEnumerator, LastPointTracker lastPointTracker = null) { if (_algorithm.IsWarmingUp) { var warmupRequest = new SubscriptionRequest(request, endTimeUtc: _timeProvider.GetUtcNow(), // we will not fill forward each warmup enumerators separately but concatenated bellow configuration: new SubscriptionDataConfig(request.Configuration, fillForward: false, resolution: _algorithm.Settings.WarmupResolution)); if (warmupRequest.TradableDaysInDataTimeZone.Any() // make sure there is at least room for a single bar of the requested resolution, else can cause issues with some history providers // this could happen when we create some internal subscription whose start time is 'Now', which we don't really want to warmup && warmupRequest.EndTimeUtc - warmupRequest.StartTimeUtc >= warmupRequest.Configuration.Resolution.ToTimeSpan() // since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!) && LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType)) { // since we will source data locally and from the history provider, let's limit the history request size // by setting a start date respecting the 'MaximumWarmupHistoryDaysLookBack' var historyWarmup = warmupRequest; var warmupHistoryStartDate = warmupRequest.EndTimeUtc.AddDays(-MaximumWarmupHistoryDaysLookBack); if (warmupHistoryStartDate > warmupRequest.StartTimeUtc) { historyWarmup = new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate); } lastPointTracker ??= new LastPointTracker(); var synchronizedWarmupEnumerator = TryAddFillForwardEnumerator(warmupRequest, // we concatenate the file based and history based warmup enumerators, dropping duplicate time stamps new ConcatEnumerator(true, GetFileBasedWarmupEnumerator(warmupRequest), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull = false }, // if required by the original request, we will fill forward the Synced warmup data request.Configuration.FillDataForward, _algorithm.Settings.WarmupResolution); synchronizedWarmupEnumerator = ConfigureLastPointTracker(synchronizedWarmupEnumerator, lastPointTracker, isWarmUpEnumerator: true); synchronizedWarmupEnumerator = AddScheduleWrapper(warmupRequest, synchronizedWarmupEnumerator, null); // don't let future data past. We let null pass because that's letting the next enumerator know we've ended because we always return true in live synchronizedWarmupEnumerator = new FilterEnumerator(synchronizedWarmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal); // the order here is important, concat enumerator will keep the last enumerator given and dispose of the rest liveEnumerator = new ConcatEnumerator(true, synchronizedWarmupEnumerator, liveEnumerator); } } return liveEnumerator; } /// /// File based warmup enumerator /// private IEnumerator GetFileBasedWarmupEnumerator(SubscriptionRequest warmup) { IEnumerator result = null; try { var enumerator = CreateEnumerator(warmup); if (warmup.Configuration.PricesShouldBeScaled()) { enumerator = new PriceScaleFactorEnumerator(enumerator, warmup.Configuration, _factorFileProvider); } result = new FilterEnumerator(enumerator, // don't let future data past, nor fill forward, that will be handled after merging with the history request response data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward); } catch (Exception e) { Log.Error(e, $"File based warmup: {warmup.Configuration}"); } return result; } /// /// History based warmup enumerator /// private IEnumerator GetHistoryWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker) { IEnumerator result; if (warmup.IsUniverseSubscription) { // we ignore the fill forward time span argument because we will fill forwared the concatenated file and history based enumerators next in the stack result = CreateUniverseEnumerator(warmup); } else { // we create an enumerable of which we get the enumerator to defer the creation of the history request until the file based enumeration ended // and potentially the 'lastPointTracker' is available to adjust our start time result = new[] { warmup }.SelectMany(_ => { var startTimeUtc = warmup.StartTimeUtc; if (lastPointTracker != null && lastPointTracker.LastDataPoint != null) { var lastPointExchangeTime = lastPointTracker.LastDataPoint.Time; if (warmup.Configuration.Resolution == Resolution.Daily) { // time could be 9.30 for example using strict daily end times, but we just want the date in this case lastPointExchangeTime = lastPointExchangeTime.Date; } var utcLastPointTime = lastPointExchangeTime.ConvertToUtc(warmup.ExchangeHours.TimeZone); if (utcLastPointTime > startTimeUtc) { if (Log.DebuggingEnabled) { Log.Debug($"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}"); } startTimeUtc = utcLastPointTime; } } var historyRequest = new Data.HistoryRequest(warmup.Configuration, warmup.ExchangeHours, startTimeUtc, warmup.EndTimeUtc); try { return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.TimeZone).Select(slice => { try { var data = slice.Get(historyRequest.DataType); return (BaseData)data[warmup.Configuration.Symbol]; } catch (Exception e) { Log.Error(e, $"History warmup: {warmup.Configuration}"); } return null; }); } catch { // some history providers could throw if they do not support a type } return Enumerable.Empty(); }).GetEnumerator(); } return new FilterEnumerator(result, // don't let future data past, nor fill forward, that will be handled after merging with the file based enumerator data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward); } /// /// Will wrap the provided enumerator with a /// using a that will advance time based on the provided /// function /// /// Won't advance time if now.Hour is bigger or equal than 23pm, less or equal than 5am or Saturday. /// This is done to prevent universe selection occurring in those hours so that the subscription changes /// are handled correctly. private IEnumerator GetConfiguredFrontierAwareEnumerator( IEnumerator enumerator, TimeZoneOffsetProvider tzOffsetProvider, Func customStepEvaluator) { var stepTimeProvider = new PredicateTimeProvider(_frontierTimeProvider, customStepEvaluator); return new FrontierAwareEnumerator(enumerator, stepTimeProvider, tzOffsetProvider); } private IDataQueueUniverseProvider GetUniverseProvider(SecurityType securityType) { if (_dataQueueHandler is not IDataQueueUniverseProvider or DataQueueHandlerManager { HasUniverseProvider: false }) { throw new NotSupportedException($"The DataQueueHandler does not support {securityType}."); } return (IDataQueueUniverseProvider)_dataQueueHandler; } private void HandleUnsupportedConfigurationEvent(object _, SubscriptionDataConfig config) { if (_algorithm != null) { lock (_unsupportedConfigurations) { var key = $"{config.Symbol.ID.Market} {config.Symbol.ID.SecurityType} {config.Type.Name}"; if (_unsupportedConfigurations.Add(key)) { Log.Trace($"LiveTradingDataFeed.HandleUnsupportedConfigurationEvent(): detected unsupported configuration: {config}"); _algorithm.Debug($"Warning: {key} data not supported. Please consider reviewing the data providers selection."); } } } } /// /// Overrides methods of the base data exchange implementation /// private class EnumeratorHandler : BaseDataExchange.EnumeratorHandler { public EnumeratorHandler(Symbol symbol, IEnumerator enumerator, EnqueueableEnumerator enqueueable) : base(symbol, enumerator, handleData: enqueueable.Enqueue) { EnumeratorFinished += (_, _) => enqueueable.Stop(); } } } }