/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections.Generic; using System.Linq; using Python.Runtime; using QuantConnect.Data; using QuantConnect.Data.UniverseSelection; using QuantConnect.Interfaces; using QuantConnect.Util; namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators.Factories { /// /// Provides an implementation of to handle live custom data. /// public class LiveCustomDataSubscriptionEnumeratorFactory : ISubscriptionEnumeratorFactory { private readonly TimeSpan _minimumIntervalCheck; private readonly ITimeProvider _timeProvider; private readonly Func _dateAdjustment; private readonly IObjectStore _objectStore; /// /// Initializes a new instance of the class /// /// Time provider from data feed /// The object store to use /// Func that allows adjusting the datetime to use /// Allows specifying the minimum interval between each enumerator refresh and data check, default is 30 minutes public LiveCustomDataSubscriptionEnumeratorFactory(ITimeProvider timeProvider, IObjectStore objectStore, Func dateAdjustment = null, TimeSpan? minimumIntervalCheck = null) { _timeProvider = timeProvider; _dateAdjustment = dateAdjustment; _minimumIntervalCheck = minimumIntervalCheck ?? TimeSpan.FromMinutes(30); _objectStore = objectStore; } /// /// Creates an enumerator to read the specified request. /// /// The subscription request to be read /// Provider used to get data when it is not present on disk /// An enumerator reading the subscription request public IEnumerator CreateEnumerator(SubscriptionRequest request, IDataProvider dataProvider) { var config = request.Configuration; // frontier value used to prevent emitting duplicate time stamps between refreshed enumerators // also provides some immediate fast-forward to handle spooling through remote files quickly var frontier = Ref.Create(_dateAdjustment?.Invoke(request.StartTimeLocal) ?? request.StartTimeLocal); var lastSourceRefreshTime = DateTime.MinValue; var sourceFactory = config.GetBaseDataInstance(); // this is refreshing the enumerator stack for each new source var refresher = new RefreshEnumerator(() => { // rate limit the refresh of this enumerator stack var utcNow = _timeProvider.GetUtcNow(); var minimumTimeBetweenCalls = GetMinimumTimeBetweenCalls(config.Increment, _minimumIntervalCheck); if (utcNow - lastSourceRefreshTime < minimumTimeBetweenCalls) { return Enumerable.Empty().GetEnumerator(); } lastSourceRefreshTime = utcNow; var localDate = _dateAdjustment?.Invoke(utcNow.ConvertFromUtc(config.ExchangeTimeZone).Date) ?? utcNow.ConvertFromUtc(config.ExchangeTimeZone).Date; var source = sourceFactory.GetSource(config, localDate, true); // fetch the new source and enumerate the data source reader var enumerator = EnumerateDataSourceReader(config, dataProvider, frontier, source, localDate, sourceFactory); if (SourceRequiresFastForward(source)) { // The FastForwardEnumerator implements these two features: // (1) make sure we never emit past data // (2) data filtering based on a maximum data age // For custom data we don't want feature (2) because we would reject data points emitted later // (e.g. Quandl daily data after a weekend), so we disable it using a huge maximum data age. // apply fast forward logic for file transport mediums var maximumDataAge = GetMaximumDataAge(Time.MaxTimeSpan); enumerator = new FastForwardEnumerator(enumerator, _timeProvider, config.ExchangeTimeZone, maximumDataAge); } else { // rate limit calls to this enumerator stack enumerator = new RateLimitEnumerator(enumerator, _timeProvider, minimumTimeBetweenCalls); } if (source.Format == FileFormat.UnfoldingCollection) { // unroll collections into individual data points after fast forward/rate limiting applied enumerator = enumerator.SelectMany(data => { var collection = data as BaseDataCollection; IEnumerator collectionEnumerator; if (collection != null) { if (source.TransportMedium == SubscriptionTransportMedium.Rest || source.TransportMedium == SubscriptionTransportMedium.RemoteFile) { // we want to make sure the data points we *unroll* are not past collectionEnumerator = collection.Data .Where(baseData => baseData.EndTime > frontier.Value) .GetEnumerator(); } else { collectionEnumerator = collection.Data.GetEnumerator(); } } else { collectionEnumerator = new List { data }.GetEnumerator(); } return collectionEnumerator; }); } return enumerator; }); return refresher; } private IEnumerator EnumerateDataSourceReader(SubscriptionDataConfig config, IDataProvider dataProvider, Ref localFrontier, SubscriptionDataSource source, DateTime localDate, BaseData baseDataInstance) { using (var dataCacheProvider = new SingleEntryDataCacheProvider(dataProvider)) { var newLocalFrontier = localFrontier.Value; var dataSourceReader = GetSubscriptionDataSourceReader(source, dataCacheProvider, config, localDate, baseDataInstance, dataProvider); using var subscriptionEnumerator = SortEnumerator.TryWrapSortEnumerator(source.Sort, dataSourceReader.Read(source)); foreach (var datum in subscriptionEnumerator) { // always skip past all times emitted on the previous invocation of this enumerator // this allows data at the same time from the same refresh of the source while excluding // data from different refreshes of the source if (datum != null && datum.EndTime > localFrontier.Value) { yield return datum; } else if (!SourceRequiresFastForward(source)) { // if the 'source' is Rest and there is no new value, // we *break*, else we will be caught in a tight loop // because Rest source never ends! // edit: we 'break' vs 'return null' so that the source is refreshed // allowing date changes to impact the source value // note it will respect 'minimumTimeBetweenCalls' break; } if (datum != null) { newLocalFrontier = Time.Max(datum.EndTime, newLocalFrontier); if (!SourceRequiresFastForward(source)) { // if the 'source' is Rest we need to update the localFrontier here // because Rest source never ends! // Should be advance frontier for all source types here? localFrontier.Value = newLocalFrontier; } } } localFrontier.Value = newLocalFrontier; } } /// /// Gets the for the specified source /// protected virtual ISubscriptionDataSourceReader GetSubscriptionDataSourceReader(SubscriptionDataSource source, IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, BaseData baseDataInstance, IDataProvider dataProvider ) { return SubscriptionDataSourceReader.ForSource(source, dataCacheProvider, config, date, true, baseDataInstance, dataProvider, _objectStore); } private bool SourceRequiresFastForward(SubscriptionDataSource source) { return source.TransportMedium == SubscriptionTransportMedium.LocalFile || source.TransportMedium == SubscriptionTransportMedium.RemoteFile; } private static TimeSpan GetMinimumTimeBetweenCalls(TimeSpan increment, TimeSpan minimumInterval) { return TimeSpan.FromTicks(Math.Min(increment.Ticks, minimumInterval.Ticks)); } private static TimeSpan GetMaximumDataAge(TimeSpan increment) { return TimeSpan.FromTicks(Math.Max(increment.Ticks, TimeSpan.FromSeconds(5).Ticks)); } } }