/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Linq;
using QuantConnect.Util;
using QuantConnect.Data;
using System.Collections;
using System.Globalization;
using QuantConnect.Logging;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using QuantConnect.Configuration;
using QuantConnect.Data.Auxiliary;
using QuantConnect.Data.Custom.Tiingo;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
using QuantConnect.Securities;
using QuantConnect.Data.UniverseSelection;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Subscription data reader is a wrapper on the stream reader class to download, unpack and iterate over a data file.
///
/// The class accepts any subscription configuration and automatically makes it available to enumerate
public class SubscriptionDataReader : IEnumerator, ITradableDatesNotifier, IDataProviderEvents
{
private IDataProvider _dataProvider;
private IObjectStore _objectStore;
private bool _initialized;
// Source string to create memory stream:
private SubscriptionDataSource _source;
private bool _endOfStream;
private IEnumerator _subscriptionFactoryEnumerator;
/// Configuration of the data-reader:
private readonly SubscriptionDataConfig _config;
/// true if we can find a scale factor file for the security of the form: ..\Lean\Data\equity\market\factor_files\{SYMBOL}.csv
private bool _hasScaleFactors;
// Location of the datafeed - the type of this data.
// Create a single instance to invoke all Type Methods:
private BaseData _dataFactory;
//Start finish times of the backtest:
private DateTime _periodStart;
private readonly DateTime _periodFinish;
private readonly IMapFileProvider _mapFileProvider;
private readonly IFactorFileProvider _factorFileProvider;
private IFactorProvider _factorFile;
private MapFile _mapFile;
private bool _pastDelistedDate;
private BaseData _previous;
private decimal? _lastRawPrice;
private DateChangeTimeKeeper _timeKeeper;
private readonly IEnumerable _tradableDatesInDataTimeZone;
private readonly SecurityExchangeHours _exchangeHours;
// used when emitting aux data from within while loop
private readonly IDataCacheProvider _dataCacheProvider;
private DateTime _delistingDate;
private bool _updatingDataEnumerator;
///
/// Event fired when an invalid configuration has been detected
///
public event EventHandler InvalidConfigurationDetected;
///
/// Event fired when the numerical precision in the factor file has been limited
///
public event EventHandler NumericalPrecisionLimited;
///
/// Event fired when the start date has been limited
///
public event EventHandler StartDateLimited;
///
/// Event fired when there was an error downloading a remote file
///
public event EventHandler DownloadFailed;
///
/// Event fired when there was an error reading the data
///
public event EventHandler ReaderErrorDetected;
///
/// Event fired when there is a new tradable date
///
public event EventHandler NewTradableDate;
///
/// Last read BaseData object from this type and source
///
public BaseData Current
{
get;
private set;
}
///
/// Explicit Interface Implementation for Current
///
object IEnumerator.Current
{
get { return Current; }
}
///
/// Subscription data reader takes a subscription request, loads the type, accepts the data source and enumerate on the results.
///
/// Subscription configuration object
/// The data request
/// Used for resolving the correct map files
/// Used for getting factor files
/// Used for caching files
/// The data provider to use
public SubscriptionDataReader(SubscriptionDataConfig config,
BaseDataRequest dataRequest,
IMapFileProvider mapFileProvider,
IFactorFileProvider factorFileProvider,
IDataCacheProvider dataCacheProvider,
IDataProvider dataProvider,
IObjectStore objectStore)
{
//Save configuration of data-subscription:
_config = config;
//Save Start and End Dates:
_periodStart = dataRequest.StartTimeLocal;
_periodFinish = dataRequest.EndTimeLocal;
_mapFileProvider = mapFileProvider;
_factorFileProvider = factorFileProvider;
_dataCacheProvider = dataCacheProvider;
_dataProvider = dataProvider;
_objectStore = objectStore;
_tradableDatesInDataTimeZone = dataRequest.TradableDaysInDataTimeZone;
_exchangeHours = dataRequest.ExchangeHours;
}
///
/// Initializes the instance
///
/// Should be called after all consumers of event are set,
/// since it will produce events.
public void Initialize()
{
if (_initialized)
{
return;
}
//Save the type of data we'll be getting from the source.
try
{
_dataFactory = _config.GetBaseDataInstance();
}
catch (ArgumentException exception)
{
OnInvalidConfigurationDetected(new InvalidConfigurationDetectedEventArgs(_config.Symbol, exception.Message));
_endOfStream = true;
return;
}
// If Tiingo data, set the access token in data factory
var tiingo = _dataFactory as TiingoPrice;
if (tiingo != null)
{
if (!Tiingo.IsAuthCodeSet)
{
Tiingo.SetAuthCode(Config.Get("tiingo-auth-token"));
}
}
// load up the map files for equities, options, and custom data if it supports it.
// Only load up factor files for equities
if (_dataFactory.RequiresMapping())
{
try
{
var mapFile = _mapFileProvider.ResolveMapFile(_config);
// only take the resolved map file if it has data, otherwise we'll use the empty one we defined above
if (mapFile.Any()) _mapFile = mapFile;
if (_config.PricesShouldBeScaled())
{
var factorFile = _factorFileProvider.Get(_config.Symbol);
_hasScaleFactors = factorFile != null;
if (_hasScaleFactors)
{
_factorFile = factorFile;
// if factor file has minimum date, update start period if before minimum date
if (_factorFile != null && _factorFile.FactorFileMinimumDate.HasValue)
{
if (_periodStart < _factorFile.FactorFileMinimumDate.Value)
{
_periodStart = _factorFile.FactorFileMinimumDate.Value;
OnNumericalPrecisionLimited(
new NumericalPrecisionLimitedEventArgs(_config.Symbol,
$"[{_config.Symbol.Value}, {_factorFile.FactorFileMinimumDate.Value.ToShortDateString()}]"));
}
}
}
if (_periodStart < mapFile.FirstDate)
{
_periodStart = mapFile.FirstDate;
OnStartDateLimited(
new StartDateLimitedEventArgs(_config.Symbol,
$"[{_config.Symbol.Value}," +
$" {mapFile.FirstDate.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture)}]"));
}
}
}
catch (Exception err)
{
Log.Error(err, "Fetching Price/Map Factors: " + _config.Symbol.ID + ": ");
}
}
_factorFile ??= _config.Symbol.GetEmptyFactorFile();
_mapFile ??= new MapFile(_config.Symbol.Value, Enumerable.Empty());
_delistingDate = _config.Symbol.GetDelistingDate(_mapFile);
_timeKeeper = new DateChangeTimeKeeper(_tradableDatesInDataTimeZone, _config, _exchangeHours, _delistingDate);
_timeKeeper.NewExchangeDate += HandleNewTradableDate;
UpdateDataEnumerator(true);
_initialized = true;
}
///
/// Advances the enumerator to the next element of the collection.
///
///
/// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
///
/// The collection was modified after the enumerator was created. 2
public bool MoveNext()
{
if (!_initialized)
{
// Late initialization so it is performed in the data feed stack
// and not in the algorithm thread
Initialize();
}
if (_endOfStream)
{
return false;
}
if (Current != null)
{
// only save previous price data
_previous = Current;
}
if (_subscriptionFactoryEnumerator == null)
{
_endOfStream = true;
return false;
}
do
{
if (_pastDelistedDate)
{
break;
}
// keep enumerating until we find something that is within our time frame
while (_subscriptionFactoryEnumerator.MoveNext())
{
var instance = _subscriptionFactoryEnumerator.Current;
if (instance == null)
{
// keep reading until we get valid data
continue;
}
// We rely on symbol change to detect a mapping or symbol change, instead of using SubscriptionDataConfig.NewSymbol
// because only one of the configs with the same symbol will trigger a symbol change event.
var previousMappedSymbol = _config.MappedSymbol;
// Advance the time keeper either until the current instance time (to synchronize) or until the source changes.
// Note: use time instead of end time to avoid skipping instances that all have the same timestamps in the same file (e.g. universe data)
var currentSource = _source;
var nextExchangeDate = _config.Resolution == Resolution.Daily
&& _timeKeeper.IsExchangeBehindData()
&& !_config.Type.IsAssignableTo(typeof(BaseDataCollection))
// If daily and exchange is behind data, data for date X will have a start time within date X-1,
// so we use the actual date from end time. e.g. a daily bar for Jan15 can have a start time of Jan14 8PM
// (exchange tz 4 hours behind data tz) and end time would be Jan15 8PM.
// This doesn't apply to universe files (BaseDataCollection check) because they are not read in the same way
// price daily files are read: they are read in a collection with end time of X+1. We don't want to skip them or advance time yet.
? instance.EndTime
: instance.Time;
while (_timeKeeper.ExchangeTime < nextExchangeDate && currentSource == _source)
{
_timeKeeper.AdvanceTowardsExchangeTime(nextExchangeDate);
}
// Source change, check if we should emit the current instance
if (currentSource != _source
&& (
// After a mapping for every resolution except daily:
// For other resolutions, the instance that triggered the exchange date change should be skipped,
// it's end time will be either midnight or for a future date. The new source might have a data point with this times.
(_config.MappedSymbol != previousMappedSymbol && _config.Resolution != Resolution.Daily)
// Skip if the exchange time zone is behind of the data time zone:
// The new source might have data for these same times, we want data for the new symbol
|| (_config.Resolution == Resolution.Daily && _timeKeeper.IsExchangeBehindData())
// skip if the instance if it's beyond what the previous source should have.
// e.g. A file mistakenly has data for the next day
// (see SubscriptionDataReaderTests.DoesNotEmitDataBeyondTradableDate unit test)
// or the instance that triggered the exchange date change is for a future date (no data found in between)
|| instance.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone).Date >= _timeKeeper.DataTime.Date
))
{
continue;
}
// This can happen after a mapping, we already have data but we need to skip some points that belong to a previous date.
if (Current != null && instance.EndTime < _timeKeeper.ExchangeTime)
{
continue;
}
// prevent emitting past data, this can happen when switching symbols on daily data
if (_previous != null && _config.Resolution != Resolution.Tick)
{
if (_config.IsCustomData)
{
// Skip the point if time went backwards for custom data?
// TODO: Should this be the case for all datapoints?
if (instance.EndTime < _previous.EndTime) continue;
}
else
{
// all other resolutions don't allow duplicate end times
if (instance.EndTime <= _previous.EndTime) continue;
}
}
if (instance.EndTime < _periodStart)
{
// keep reading until we get a value on or after the start
_previous = instance;
continue;
}
// We have to perform this check after refreshing the enumerator, if appropriate
// 'instance' could be a data point far in the future due to remapping (GH issue 5232) in which case it will be dropped
if (instance.Time > _periodFinish)
{
// stop reading when we get a value after the end
_endOfStream = true;
return false;
}
// we've made it past all of our filters, we're withing the requested start/end of the subscription,
// we've satisfied user and market hour filters, so this data is good to go as current
Current = instance;
// we keep the last raw price registered before we return so we are not affected by anyone (price scale) modifying our current
_lastRawPrice = Current.Price;
return true;
}
// we've ended the enumerator, time to refresh
UpdateDataEnumerator(true);
}
while (_subscriptionFactoryEnumerator != null);
_endOfStream = true;
return false;
}
///
/// Emits a new tradable date event and tries to update the data enumerator if necessary
///
private void HandleNewTradableDate(object sender, DateTime date)
{
OnNewTradableDate(new NewTradableDateEventArgs(date, _previous, _config.Symbol, _lastRawPrice));
UpdateDataEnumerator(false);
}
///
/// Resolves the next enumerator to be used in and updates
///
///
/// True, if the enumerator has been updated (even if updated to null)
private bool UpdateDataEnumerator(bool endOfEnumerator)
{
// Guard for infinite recursion: during an enumerator update, we might ask for a new date,
// which might end up with a new exchange date being detected and another update being requested.
// Just skip that update and let's do it ourselves after the date is resolved
if (_updatingDataEnumerator)
{
return false;
}
_updatingDataEnumerator = true;
try
{
do
{
var date = _timeKeeper.DataTime.Date;
// Update current date only if the enumerator has ended, else we might just need to change files
// (e.g. same date, but symbol was mapped)
if (endOfEnumerator && !TryGetNextDate(out date))
{
_subscriptionFactoryEnumerator = null;
// if we run out of dates then we're finished with this subscription
return true;
}
// fetch the new source, using the data time zone for the date
var newSource = _dataFactory.GetSource(_config, date, false);
if (newSource == null)
{
// move to the next day
continue;
}
// check if we should create a new subscription factory
var sourceChanged = _source != newSource && !string.IsNullOrEmpty(newSource.Source);
if (sourceChanged)
{
// dispose of the current enumerator before creating a new one
_subscriptionFactoryEnumerator.DisposeSafely();
// save off for comparison next time
_source = newSource;
var subscriptionFactory = CreateSubscriptionFactory(newSource, _dataFactory, _dataProvider);
_subscriptionFactoryEnumerator = SortEnumerator.TryWrapSortEnumerator(newSource.Sort, subscriptionFactory.Read(newSource));
return true;
}
// if there's still more in the enumerator and we received the same source from the GetSource call
// above, then just keep using the same enumerator as we were before
if (!endOfEnumerator) // && !sourceChanged is always true here
{
return false;
}
// keep churning until we find a new source or run out of tradeable dates
// in live mode tradeable dates won't advance beyond today's date, but
// TryGetNextDate will return false if it's already at today
}
while (true);
}
finally
{
_updatingDataEnumerator = false;
}
}
private ISubscriptionDataSourceReader CreateSubscriptionFactory(SubscriptionDataSource source, BaseData baseDataInstance, IDataProvider dataProvider)
{
var factory = SubscriptionDataSourceReader.ForSource(source, _dataCacheProvider, _config, _timeKeeper.DataTime.Date, false, baseDataInstance, dataProvider, _objectStore);
AttachEventHandlers(factory, source);
return factory;
}
private void AttachEventHandlers(ISubscriptionDataSourceReader dataSourceReader, SubscriptionDataSource source)
{
dataSourceReader.InvalidSource += (sender, args) =>
{
if (_config.IsCustomData && !_config.Type.GetBaseDataInstance().IsSparseData())
{
OnDownloadFailed(
new DownloadFailedEventArgs(_config.Symbol,
"We could not fetch the requested data. " +
"This may not be valid data, or a failed download of custom data. " +
$"Skipping source ({args.Source.Source})."));
return;
}
switch (args.Source.TransportMedium)
{
case SubscriptionTransportMedium.LocalFile:
// the local uri doesn't exist, write an error and return null so we we don't try to get data for today
// Log.Trace(string.Format("SubscriptionDataReader.GetReader(): Could not find QC Data, skipped: {0}", source));
break;
case SubscriptionTransportMedium.RemoteFile:
OnDownloadFailed(
new DownloadFailedEventArgs(_config.Symbol,
$"Error downloading custom data source file, skipped: {source} " +
$"Error: {args.Exception.Message}", args.Exception.StackTrace));
break;
case SubscriptionTransportMedium.Rest:
break;
case SubscriptionTransportMedium.ObjectStore:
break;
default:
throw new ArgumentOutOfRangeException();
}
};
if (dataSourceReader is TextSubscriptionDataSourceReader)
{
// handle empty files/instantiation errors
var textSubscriptionFactory = (TextSubscriptionDataSourceReader)dataSourceReader;
// handle parser errors
textSubscriptionFactory.ReaderError += (sender, args) =>
{
OnReaderErrorDetected(
new ReaderErrorDetectedEventArgs(_config.Symbol,
$"Error invoking {_config.Symbol} data reader. " +
$"Line: {args.Line} Error: {args.Exception.Message}",
args.Exception.StackTrace));
};
}
}
///
/// Iterates the tradeable dates enumerator
///
/// The next tradeable date
/// True if we got a new date from the enumerator, false if it's exhausted, or in live mode if we're already at today
private bool TryGetNextDate(out DateTime date)
{
while (_timeKeeper.TryAdvanceUntilNextDataDate())
{
date = _timeKeeper.DataTime.Date;
if (!_mapFile.HasData(date))
{
continue;
}
// don't do other checks if we haven't gotten data for this date yet
if (_previous != null && _previous.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone) > date)
{
continue;
}
// we've passed initial checks,now go get data for this date!
return true;
}
if (_timeKeeper.ExchangeTime.Date > _delistingDate)
{
_pastDelistedDate = true;
}
// no more tradeable dates, we've exhausted the enumerator
date = DateTime.MaxValue.Date;
return false;
}
///
/// Reset the IEnumeration
///
/// Not used
public void Reset()
{
throw new NotImplementedException("Reset method not implemented. Assumes loop will only be used once.");
}
///
/// Dispose of the Stream Reader and close out the source stream and file connections.
///
public void Dispose()
{
_subscriptionFactoryEnumerator.DisposeSafely();
if (_initialized)
{
_timeKeeper.NewExchangeDate -= HandleNewTradableDate;
_timeKeeper.DisposeSafely();
}
}
///
/// Event invocator for the event
///
/// Event arguments for the event
protected virtual void OnInvalidConfigurationDetected(InvalidConfigurationDetectedEventArgs e)
{
InvalidConfigurationDetected?.Invoke(this, e);
}
///
/// Event invocator for the event
///
/// Event arguments for the event
protected virtual void OnNumericalPrecisionLimited(NumericalPrecisionLimitedEventArgs e)
{
NumericalPrecisionLimited?.Invoke(this, e);
}
///
/// Event invocator for the event
///
/// Event arguments for the event
protected virtual void OnStartDateLimited(StartDateLimitedEventArgs e)
{
StartDateLimited?.Invoke(this, e);
}
///
/// Event invocator for the event
///
/// Event arguments for the event
protected virtual void OnDownloadFailed(DownloadFailedEventArgs e)
{
DownloadFailed?.Invoke(this, e);
}
///
/// Event invocator for the event
///
/// Event arguments for the event
protected virtual void OnReaderErrorDetected(ReaderErrorDetectedEventArgs e)
{
ReaderErrorDetected?.Invoke(this, e);
}
///
/// Event invocator for the event
///
/// Event arguments for the event
protected virtual void OnNewTradableDate(NewTradableDateEventArgs e)
{
NewTradableDate?.Invoke(this, e);
}
}
}