/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using NodaTime; using System.IO; using System.Linq; using QuantConnect.Util; using QuantConnect.Data; using QuantConnect.Logging; using QuantConnect.Securities; using QuantConnect.Interfaces; using System.Collections.Generic; using QuantConnect.Configuration; using System.Collections.Concurrent; using QuantConnect.Data.UniverseSelection; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Data provider which downloads data using an or implementation /// public class DownloaderDataProvider : BaseDownloaderDataProvider { /// /// Synchronizer in charge of guaranteeing a single operation per file path /// private readonly static KeyStringSynchronizer DiskSynchronizer = new(); private bool _customDataDownloadError; private readonly ConcurrentDictionary _marketHoursWarning = new(); private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); private readonly IDataDownloader _dataDownloader; private readonly IDataCacheProvider _dataCacheProvider = new DiskDataCacheProvider(DiskSynchronizer); private readonly IMapFileProvider _mapFileProvider = Composer.Instance.GetPart(); /// /// Creates a new instance /// public DownloaderDataProvider() { var dataDownloaderConfig = Config.Get("data-downloader"); if (!string.IsNullOrEmpty(dataDownloaderConfig)) { _dataDownloader = Composer.Instance.GetExportedValueByTypeName(dataDownloaderConfig); } else { throw new ArgumentException("DownloaderDataProvider(): requires 'data-downloader' to be set with a valid type name"); } } /// /// Creates a new instance using a target data downloader used for testing /// public DownloaderDataProvider(IDataDownloader dataDownloader) { _dataDownloader = dataDownloader; } /// /// Determines if it should downloads new data and retrieves data from disc /// /// A string representing where the data is stored /// A of the data requested public override Stream Fetch(string key) { return DownloadOnce(key, s => { if (LeanData.TryParsePath(key, out var symbol, out var date, out var resolution, out var tickType, out var dataType)) { if (symbol.SecurityType == SecurityType.Base) { if (!_customDataDownloadError) { _customDataDownloadError = true; // lean data writter doesn't support it Log.Trace($"DownloaderDataProvider.Get(): custom data is not supported, requested: {symbol}"); } return; } MarketHoursDatabase.Entry entry; try { entry = _marketHoursDatabase.GetEntry(symbol.ID.Market, symbol, symbol.SecurityType); } catch { // this could happen for some sources using the data provider but with not market hours data base entry, like interest rates if (_marketHoursWarning.TryAdd(symbol, symbol)) { // log once Log.Trace($"DownloaderDataProvider.Get(): failed to find market hours for {symbol}, skipping"); } // this shouldn't happen for data we want can download return; } var dataTimeZone = entry.DataTimeZone; var exchangeTimeZone = entry.ExchangeHours.TimeZone; DateTime startTimeUtc; DateTime endTimeUtc; // we will download until yesterday so we are sure we don't get partial data var endTimeUtcLimit = DateTime.UtcNow.Date.AddDays(-1); if (resolution < Resolution.Hour) { // we can get the date from the path startTimeUtc = date.ConvertToUtc(dataTimeZone); // let's get the whole day endTimeUtc = date.AddDays(1).ConvertToUtc(dataTimeZone); if (endTimeUtc > endTimeUtcLimit) { // we are at the limit, avoid getting partial data return; } } else { // since hourly & daily are a single file we fetch the whole file endTimeUtc = endTimeUtcLimit; try { // we don't really know when Futures, FutureOptions, Cryptos, etc, start date so let's give it a good guess if (symbol.SecurityType == SecurityType.Crypto) { // bitcoin start startTimeUtc = new DateTime(2009, 1, 1); } else if (symbol.SecurityType.IsOption() && symbol.SecurityType != SecurityType.FutureOption) { // For options, an hourly or daily file contains a year of data, so we need to get the year of the date startTimeUtc = new DateTime(date.Year, 1, 1); endTimeUtc = startTimeUtc.AddYears(1); } else { startTimeUtc = symbol.ID.Date; } } catch (InvalidOperationException) { startTimeUtc = Time.Start; } if (startTimeUtc < Time.Start) { startTimeUtc = Time.Start; } if (endTimeUtc > endTimeUtcLimit) { endTimeUtc = endTimeUtcLimit; } } try { if (dataType == typeof(OptionUniverse)) { var processingDate = date.ConvertToUtc(dataTimeZone); UniverseExtensions.RunUniverseDownloader(_dataDownloader, new DataUniverseDownloaderGetParameters(symbol, processingDate, processingDate.AddDays(1), entry.ExchangeHours)); return; } LeanDataWriter writer = null; var getParams = new DataDownloaderGetParameters(symbol, resolution, startTimeUtc, endTimeUtc, tickType); var downloaderDataParameters = getParams.GetDataDownloaderParameterForAllMappedSymbols(_mapFileProvider, exchangeTimeZone); var downloadedData = GetDownloadedData(downloaderDataParameters, symbol, exchangeTimeZone, dataTimeZone, dataType); foreach (var dataPerSymbol in downloadedData) { if (writer == null) { writer = new LeanDataWriter(resolution, symbol, Globals.DataFolder, tickType, mapSymbol: true, dataCacheProvider: _dataCacheProvider); } // Save the data writer.Write(dataPerSymbol); } } catch (Exception e) { Log.Error(e); } } }); } /// /// Retrieves downloaded data grouped by symbol based on . /// /// Parameters specifying the data to be retrieved. /// Represents a unique security identifier, generate by ticker name. /// The time zone of the exchange where the symbol is traded. /// The time zone in which the data is represented. /// The type of data to be retrieved. (e.g. ) /// An IEnumerable containing groups of data grouped by symbol. Each group contains data related to a specific symbol. /// Thrown when the downloaderDataParameters collection is null or empty. public IEnumerable> GetDownloadedData( IEnumerable downloaderDataParameters, Symbol symbol, DateTimeZone exchangeTimeZone, DateTimeZone dataTimeZone, Type dataType) { if (downloaderDataParameters.IsNullOrEmpty()) { throw new ArgumentException($"{nameof(DownloaderDataProvider)}.{nameof(GetDownloadedData)}: DataDownloaderGetParameters are empty or equal to null."); } foreach (var downloaderDataParameter in downloaderDataParameters) { var downloadedData = _dataDownloader.Get(downloaderDataParameter); if (downloadedData == null) { // doesn't support this download request, that's okay continue; } var groupedData = FilterAndGroupDownloadDataBySymbol( downloadedData, symbol, dataType, exchangeTimeZone, dataTimeZone, downloaderDataParameter.StartUtc, downloaderDataParameter.EndUtc); foreach (var data in groupedData) { yield return data; } } } /// /// Get's the stream for a given file path /// protected override Stream GetStream(string key) { if (LeanData.TryParsePath(key, out var symbol, out var date, out var resolution, out var _) && resolution > Resolution.Minute && symbol.RequiresMapping()) { // because the file could be updated even after it's created because of symbol mapping we can't stream from disk return DiskSynchronizer.Execute(key, () => { var baseStream = base.Fetch(key); if (baseStream != null) { var result = new MemoryStream(); baseStream.CopyTo(result); baseStream.Dispose(); // move position back to the start result.Position = 0; return result; } return null; }); } return base.Fetch(key); } /// /// Main filter to determine if this file needs to be downloaded /// /// File we are looking at /// True if should download protected override bool NeedToDownload(string filePath) { // Ignore null and invalid data requests if (filePath == null || filePath.Contains("fine", StringComparison.InvariantCultureIgnoreCase) && filePath.Contains("fundamental", StringComparison.InvariantCultureIgnoreCase) || filePath.Contains("map_files", StringComparison.InvariantCultureIgnoreCase) || filePath.Contains("factor_files", StringComparison.InvariantCultureIgnoreCase) || filePath.Contains("margins", StringComparison.InvariantCultureIgnoreCase) && filePath.Contains("future", StringComparison.InvariantCultureIgnoreCase)) { return false; } // Only download if it doesn't exist or is out of date. // Files are only "out of date" for non date based files (hour, daily, margins, etc.) because this data is stored all in one file return !File.Exists(filePath) || filePath.IsOutOfDate(); } /// /// Filters and groups the provided download data by symbol, based on specified criteria. /// /// The collection of download data to process. /// The symbol to filter the data for. /// The type of data to filter for. /// The time zone of the exchange. /// The desired time zone for the data. /// The start time of data downloading in UTC. /// The end time of data downloading in UTC. /// /// An enumerable collection of groupings of download data, grouped by symbol. /// public static IEnumerable> FilterAndGroupDownloadDataBySymbol( IEnumerable downloadData, Symbol symbol, Type dataType, DateTimeZone exchangeTimeZone, DateTimeZone dataTimeZone, DateTime downloaderStartTimeUtc, DateTime downloaderEndTimeUtc) { var startDateTimeInExchangeTimeZone = downloaderStartTimeUtc.ConvertFromUtc(exchangeTimeZone); var endDateTimeInExchangeTimeZone = downloaderEndTimeUtc.ConvertFromUtc(exchangeTimeZone); return downloadData .Where(baseData => { // Sometimes, external Downloader provider returns excess data if (baseData.Time < startDateTimeInExchangeTimeZone || baseData.Time > endDateTimeInExchangeTimeZone) { return false; } if (symbol.SecurityType == SecurityType.Base || baseData.GetType() == dataType) { // we need to store the data in data time zone baseData.Time = baseData.Time.ConvertTo(exchangeTimeZone, dataTimeZone); baseData.EndTime = baseData.EndTime.ConvertTo(exchangeTimeZone, dataTimeZone); return true; } return false; }) // for canonical symbols, downloader will return data for all of the chain .GroupBy(baseData => baseData.Symbol); } } }