/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using NodaTime;
using QuantConnect.Data;
using QuantConnect.Data.Market;
using QuantConnect.Data.UniverseSelection;
using QuantConnect.Interfaces;
using QuantConnect.Lean.Engine.DataFeeds;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators.Factories;
using QuantConnect.Securities;
using QuantConnect.Util;
using HistoryRequest = QuantConnect.Data.HistoryRequest;
namespace QuantConnect.Lean.Engine.HistoricalData
{
///
/// Provides an implementation of that uses
/// instances to retrieve historical data
///
public class SubscriptionDataReaderHistoryProvider : SynchronizingHistoryProvider
{
private SymbolProperties _nullSymbolProperties;
private SecurityCache _nullCache;
private Cash _nullCash;
private IDataProvider _dataProvider;
private IMapFileProvider _mapFileProvider;
private IFactorFileProvider _factorFileProvider;
private IDataCacheProvider _dataCacheProvider;
private IObjectStore _objectStore;
private bool _parallelHistoryRequestsEnabled;
private bool _initialized;
///
/// Manager used to allow or deny access to a requested datasource for specific users
///
protected IDataPermissionManager DataPermissionManager { get; set; }
///
/// Initializes this history provider to work for the specified job
///
/// The initialization parameters
public override void Initialize(HistoryProviderInitializeParameters parameters)
{
if (_initialized)
{
return;
}
_initialized = true;
_dataProvider = parameters.DataProvider;
_mapFileProvider = parameters.MapFileProvider;
_dataCacheProvider = parameters.DataCacheProvider;
_factorFileProvider = parameters.FactorFileProvider;
_objectStore = parameters.ObjectStore;
AlgorithmSettings = parameters.AlgorithmSettings;
DataPermissionManager = parameters.DataPermissionManager;
_parallelHistoryRequestsEnabled = parameters.ParallelHistoryRequestsEnabled;
_nullCache = new SecurityCache();
_nullCash = new Cash(Currencies.NullCurrency, 0, 1m);
_nullSymbolProperties = SymbolProperties.GetDefault(Currencies.NullCurrency);
}
///
/// Gets the history for the requested securities
///
/// The historical data requests
/// The time zone used when time stamping the slice instances
/// An enumerable of the slices of data covering the span specified in each request
public override IEnumerable GetHistory(IEnumerable requests, DateTimeZone sliceTimeZone)
{
// create subscription objects from the configs
var subscriptions = new List();
foreach (var request in requests)
{
var subscription = CreateSubscription(request);
subscriptions.Add(subscription);
}
return CreateSliceEnumerableFromSubscriptions(subscriptions, sliceTimeZone);
}
///
/// Creates a subscription to process the request
///
private Subscription CreateSubscription(HistoryRequest request)
{
var config = request.ToSubscriptionDataConfig();
// this security is internal only we do not need to worry about a few of it's properties
// TODO: we don't need fee/fill/BPM/etc either. Even better we should refactor & remove the need for the security
var security = new Security(
request.ExchangeHours,
config,
_nullCash,
_nullSymbolProperties,
ErrorCurrencyConverter.Instance,
RegisteredSecurityDataTypesProvider.Null,
_nullCache
);
var dataReader = new SubscriptionDataReader(config,
request,
_mapFileProvider,
_factorFileProvider,
_dataCacheProvider,
_dataProvider,
_objectStore);
dataReader.InvalidConfigurationDetected += (sender, args) => { OnInvalidConfigurationDetected(args); };
dataReader.NumericalPrecisionLimited += (sender, args) => { OnNumericalPrecisionLimited(args); };
dataReader.StartDateLimited += (sender, args) => { OnStartDateLimited(args); };
dataReader.DownloadFailed += (sender, args) => { OnDownloadFailed(args); };
dataReader.ReaderErrorDetected += (sender, args) => { OnReaderErrorDetected(args); };
IEnumerator reader = dataReader;
var intraday = GetIntradayDataEnumerator(dataReader, request);
if (intraday != null)
{
// we optionally concatenate the intraday data enumerator
reader = new ConcatEnumerator(true, reader, intraday);
}
var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(AlgorithmSettings, request, config.Symbol, config.Increment);
if (useDailyStrictEndTimes)
{
// before corporate events which might yield data and we synchronize both feeds
reader = new StrictDailyEndTimesEnumerator(reader, request.ExchangeHours, request.StartTimeLocal);
}
reader = CorporateEventEnumeratorFactory.CreateEnumerators(
reader,
config,
_factorFileProvider,
dataReader,
_mapFileProvider,
request.StartTimeLocal,
request.EndTimeLocal);
// optionally apply fill forward behavior
if (request.FillForwardResolution.HasValue)
{
// copy forward Bid/Ask bars for QuoteBars
if (request.DataType == typeof(QuoteBar))
{
reader = new QuoteBarFillForwardEnumerator(reader);
}
var readOnlyRef = Ref.CreateReadOnly(() => request.FillForwardResolution.Value.ToTimeSpan());
var exchange = GetSecurityExchange(security.Exchange, request.DataType, request.Symbol);
reader = new FillForwardEnumerator(reader, exchange, readOnlyRef, request.IncludeExtendedMarketHours, request.StartTimeLocal, request.EndTimeLocal, config.Increment, config.DataTimeZone, useDailyStrictEndTimes, request.DataType);
}
// since the SubscriptionDataReader performs an any overlap condition on the trade bar's entire
// range (time->end time) we can end up passing the incorrect data (too far past, possibly future),
// so to combat this we deliberately filter the results from the data reader to fix these cases
// which only apply to non-tick data
reader = new SubscriptionFilterEnumerator(reader, security, request.EndTimeLocal, config.ExtendedMarketHours, false, request.ExchangeHours);
// allow all ticks
if (config.Resolution != Resolution.Tick)
{
var timeBasedFilter = new TimeBasedFilter(request);
reader = new FilterEnumerator(reader, timeBasedFilter.Filter);
}
var subscriptionRequest = new SubscriptionRequest(false, null, security, config, request.StartTimeUtc, request.EndTimeUtc);
if (_parallelHistoryRequestsEnabled)
{
return SubscriptionUtils.CreateAndScheduleWorker(subscriptionRequest, reader, _factorFileProvider, false, AlgorithmSettings.DailyPreciseEndTime);
}
return SubscriptionUtils.Create(subscriptionRequest, reader, AlgorithmSettings.DailyPreciseEndTime);
}
///
/// Gets the intraday data enumerator if any
///
protected virtual IEnumerator GetIntradayDataEnumerator(IEnumerator rawData, HistoryRequest request)
{
return null;
}
///
/// Internal helper class to filter data based on requested times
///
private class TimeBasedFilter
{
public Type RequestedType { get; set; }
public DateTime EndTimeLocal { get; set; }
public DateTime StartTimeLocal { get; set; }
public TimeBasedFilter(HistoryRequest request)
{
RequestedType = request.DataType;
EndTimeLocal = request.EndTimeLocal;
StartTimeLocal = request.StartTimeLocal;
}
public bool Filter(BaseData data)
{
// filter out all aux data, unless if we are asking for aux data
if (data.DataType == MarketDataType.Auxiliary && data.GetType() != RequestedType) return false;
// filter out future data
if (data.EndTime > EndTimeLocal) return false;
// filter out data before the start
return data.EndTime > StartTimeLocal;
}
}
}
}