/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using System.Linq;
using QuantConnect.Data;
using QuantConnect.Data.Auxiliary;
using QuantConnect.Data.Fundamental;
using QuantConnect.Data.Market;
using QuantConnect.Data.UniverseSelection;
using QuantConnect.Interfaces;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators.Factories;
using QuantConnect.Lean.Engine.Results;
using QuantConnect.Logging;
using QuantConnect.Packets;
using QuantConnect.Securities;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Historical datafeed stream reader for processing files on a local disk.
///
/// Filesystem datafeeds are incredibly fast
public class FileSystemDataFeed : IDataFeed
{
private IAlgorithm _algorithm;
private ITimeProvider _timeProvider;
private IResultHandler _resultHandler;
private IMapFileProvider _mapFileProvider;
private IFactorFileProvider _factorFileProvider;
private IDataProvider _dataProvider;
private IDataCacheProvider _cacheProvider;
private SubscriptionCollection _subscriptions;
private MarketHoursDatabase _marketHoursDatabase;
private SubscriptionDataReaderSubscriptionEnumeratorFactory _subscriptionFactory;
///
/// Flag indicating the hander thread is completely finished and ready to dispose.
///
public bool IsActive { get; private set; }
///
/// Initializes the data feed for the specified job and algorithm
///
public virtual void Initialize(IAlgorithm algorithm,
AlgorithmNodePacket job,
IResultHandler resultHandler,
IMapFileProvider mapFileProvider,
IFactorFileProvider factorFileProvider,
IDataProvider dataProvider,
IDataFeedSubscriptionManager subscriptionManager,
IDataFeedTimeProvider dataFeedTimeProvider,
IDataChannelProvider dataChannelProvider)
{
_algorithm = algorithm;
_resultHandler = resultHandler;
_mapFileProvider = mapFileProvider;
_factorFileProvider = factorFileProvider;
_dataProvider = dataProvider;
_timeProvider = dataFeedTimeProvider.FrontierTimeProvider;
_subscriptions = subscriptionManager.DataFeedSubscriptions;
_cacheProvider = new ZipDataCacheProvider(dataProvider, isDataEphemeral: false);
_subscriptionFactory = new SubscriptionDataReaderSubscriptionEnumeratorFactory(
_resultHandler,
_mapFileProvider,
_factorFileProvider,
_cacheProvider,
algorithm,
enablePriceScaling: false);
IsActive = true;
_marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
}
///
/// Creates a file based data enumerator for the given subscription request
///
/// Protected so it can be used by the to warmup requests
protected IEnumerator CreateEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution = null,
LastPointTracker lastPointTracker = null, bool isWarmUp = false)
{
return request.IsUniverseSubscription ? CreateUniverseEnumerator(request) : CreateDataEnumerator(request, fillForwardResolution, lastPointTracker, isWarmUp);
}
private IEnumerator CreateDataEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution, LastPointTracker lastPointTracker, bool isWarmUp)
{
// ReSharper disable once PossibleMultipleEnumeration
var enumerator = _subscriptionFactory.CreateEnumerator(request, _dataProvider);
enumerator = ConfigureEnumerator(request, false, enumerator, fillForwardResolution, lastPointTracker, isWarmUp);
return enumerator;
}
///
/// Creates a new subscription to provide data for the specified security.
///
/// Defines the subscription to be added, including start/end times the universe and security
/// The created if successful, null otherwise
public virtual Subscription CreateSubscription(SubscriptionRequest request)
{
IEnumerator enumerator;
if(_algorithm.IsWarmingUp)
{
var pivotTimeUtc = _algorithm.StartDate.ConvertToUtc(_algorithm.TimeZone);
var lastPointTracker = new LastPointTracker();
var warmupRequest = new SubscriptionRequest(request, endTimeUtc: pivotTimeUtc,
configuration: new SubscriptionDataConfig(request.Configuration, resolution: _algorithm.Settings.WarmupResolution));
IEnumerator warmupEnumerator = null;
if (warmupRequest.TradableDaysInDataTimeZone.Any()
// since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!)
&& LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
{
// let them overlap a day if possible to avoid data gaps since each request will FFed it's own since they are different resolutions
pivotTimeUtc = Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours,
_algorithm.StartDate.ConvertTo(_algorithm.TimeZone, request.Security.Exchange.TimeZone),
Time.OneDay,
1,
false,
warmupRequest.Configuration.DataTimeZone,
LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Security.Symbol, Time.OneDay))
.ConvertToUtc(request.Security.Exchange.TimeZone);
if (pivotTimeUtc < warmupRequest.StartTimeUtc)
{
pivotTimeUtc = warmupRequest.StartTimeUtc;
}
warmupEnumerator = CreateEnumerator(warmupRequest, _algorithm.Settings.WarmupResolution, lastPointTracker, true);
// don't let future data past
warmupEnumerator = new FilterEnumerator(warmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal);
}
var normalEnumerator = CreateEnumerator(new SubscriptionRequest(request, startTimeUtc: pivotTimeUtc), lastPointTracker: lastPointTracker);
// don't let pre start data pass, since we adjust start so they overlap 1 day let's not let this data pass, we just want it for fill forwarding after the target start
// this is also useful to drop any initial selection point which was already emitted during warmup
normalEnumerator = new FilterEnumerator(normalEnumerator, data => data == null || data.EndTime >= warmupRequest.EndTimeLocal);
// after the warmup enumerator we concatenate the 'normal' one
enumerator = new ConcatEnumerator(true, warmupEnumerator, normalEnumerator);
}
else
{
enumerator = CreateEnumerator(request);
}
enumerator = AddScheduleWrapper(request, enumerator, null);
if (request.IsUniverseSubscription && request.Universe is UserDefinedUniverse)
{
// for user defined universe we do not use a worker task, since calls to AddData can happen in any moment
// and we have to be able to inject selection data points into the enumerator
return SubscriptionUtils.Create(request, enumerator, _algorithm.Settings.DailyPreciseEndTime);
}
return SubscriptionUtils.CreateAndScheduleWorker(request, enumerator, _factorFileProvider, true, _algorithm.Settings.DailyPreciseEndTime);
}
///
/// Removes the subscription from the data feed, if it exists
///
/// The subscription to remove
public virtual void RemoveSubscription(Subscription subscription)
{
}
///
/// Creates a universe enumerator from the Subscription request, the underlying enumerator func and the fill forward resolution (in some cases)
///
protected IEnumerator CreateUniverseEnumerator(SubscriptionRequest request)
{
ISubscriptionEnumeratorFactory factory = _subscriptionFactory;
if (request.Universe is ITimeTriggeredUniverse)
{
factory = new TimeTriggeredUniverseSubscriptionEnumeratorFactory(request.Universe as ITimeTriggeredUniverse,
_marketHoursDatabase,
_timeProvider);
}
else if (request.Configuration.Type == typeof(FundamentalUniverse))
{
factory = new BaseDataCollectionSubscriptionEnumeratorFactory(_algorithm.ObjectStore);
}
// define our data enumerator
var enumerator = factory.CreateEnumerator(request, _dataProvider);
return enumerator;
}
///
/// Returns a scheduled enumerator from the given arguments. It can also return the given underlying enumerator
///
protected IEnumerator AddScheduleWrapper(SubscriptionRequest request, IEnumerator underlying, ITimeProvider timeProvider)
{
if (!request.IsUniverseSubscription || !request.Universe.UniverseSettings.Schedule.Initialized)
{
return underlying;
}
var schedule = request.Universe.UniverseSettings.Schedule.Get(request.StartTimeLocal, request.EndTimeLocal);
if (schedule != null)
{
return new ScheduledEnumerator(underlying, schedule, timeProvider, request.Configuration.ExchangeTimeZone, request.StartTimeLocal);
}
return underlying;
}
///
/// Send an exit signal to the thread.
///
public virtual void Exit()
{
if (IsActive)
{
IsActive = false;
Log.Trace("FileSystemDataFeed.Exit(): Start. Setting cancellation token...");
_subscriptionFactory?.DisposeSafely();
_cacheProvider.DisposeSafely();
Log.Trace("FileSystemDataFeed.Exit(): Exit Finished.");
}
}
///
/// Configure the enumerator with aggregation/fill-forward/filter behaviors. Returns new instance if re-configured
///
protected IEnumerator ConfigureEnumerator(SubscriptionRequest request, bool aggregate, IEnumerator enumerator, Resolution? fillForwardResolution, LastPointTracker lastPointTracker, bool isWarmUpEnumerator = false)
{
if (aggregate)
{
enumerator = new BaseDataCollectionAggregatorEnumerator(enumerator, request.Configuration.Symbol);
}
enumerator = TryAddFillForwardEnumerator(request, enumerator, request.Configuration.FillDataForward, fillForwardResolution, lastPointTracker);
// optionally apply exchange/user filters
if (request.Configuration.IsFilteredSubscription)
{
enumerator = SubscriptionFilterEnumerator.WrapForDataFeed(_resultHandler, enumerator, request.Security,
request.EndTimeLocal, request.Configuration.ExtendedMarketHours, false, request.ExchangeHours);
}
enumerator = ConfigureLastPointTracker(enumerator, lastPointTracker, isWarmUpEnumerator);
return enumerator;
}
///
/// Configures the enumerator to track the last data point, if requested, and if this is a warmup enumerator
///
protected IEnumerator ConfigureLastPointTracker(IEnumerator enumerator, LastPointTracker lastPointTracker, bool isWarmUpEnumerator)
{
if (lastPointTracker != null && isWarmUpEnumerator)
{
enumerator = new FilterEnumerator(enumerator,
data =>
{
lastPointTracker.LastDataPoint = data;
return true;
});
}
return enumerator;
}
///
/// Will add a fill forward enumerator if requested
///
protected IEnumerator TryAddFillForwardEnumerator(SubscriptionRequest request, IEnumerator enumerator, bool fillForward, Resolution? fillForwardResolution, LastPointTracker lastPointTracker = null)
{
// optionally apply fill forward logic, but never for tick data
if (fillForward && request.Configuration.Resolution != Resolution.Tick)
{
// copy forward Bid/Ask bars for QuoteBars
if (request.Configuration.Type == typeof(QuoteBar))
{
enumerator = new QuoteBarFillForwardEnumerator(enumerator);
}
var fillForwardSpan = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration);
if (fillForwardResolution != null && fillForwardResolution != Resolution.Tick)
{
// if we are giving a FFspan we use it instead of the collection based one. This is useful during warmup when the warmup resolution has been set
fillForwardSpan = Ref.Create(fillForwardResolution.Value.ToTimeSpan());
}
// Pass the security exchange hours explicitly to avoid using the ones in the request, since
// those could be different. e.g. when requests are created for open interest data the exchange
// hours are set to always open to avoid OI data being filtered out due to the exchange being closed.
// This way we allow OI data to be fill-forwarded to the market close time when strict end times is enabled,
// so that OI data is available at the same time as trades and quotes.
var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol,
request.Configuration.Increment, request.Security.Exchange.Hours);
enumerator = new FillForwardEnumerator(enumerator, request.Security.Exchange, fillForwardSpan,
request.Configuration.ExtendedMarketHours, request.StartTimeLocal, request.EndTimeLocal, request.Configuration.Increment,
request.Configuration.DataTimeZone, useDailyStrictEndTimes, request.Configuration.Type, lastPointTracker);
}
return enumerator;
}
}
}