/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using System.Linq;
using System.Text;
using QuantConnect.Util;
using System.Globalization;
using QuantConnect.Logging;
using System.Threading.Tasks;
using QuantConnect.Interfaces;
using QuantConnect.Securities;
using System.Collections.Generic;
using QuantConnect.Configuration;
using QuantConnect.Data.Auxiliary;
namespace QuantConnect.Data
{
///
/// Data writer for saving an IEnumerable of BaseData into the LEAN data directory.
///
public class LeanDataWriter
{
private static KeyStringSynchronizer _keySynchronizer = new();
///
/// The map file provider instance to use
///
/// Public for testing
public static Lazy MapFileProvider { get; set; } = new(
Composer.Instance.GetExportedValueByTypeName(Config.Get("map-file-provider", "LocalDiskMapFileProvider"), forceTypeNameOnExisting: false)
);
private readonly Symbol _symbol;
private readonly bool _mapSymbol;
private readonly string _dataDirectory;
private readonly TickType _tickType;
private readonly Resolution _resolution;
private readonly WritePolicy _writePolicy;
private readonly SecurityType _securityType;
private readonly IDataCacheProvider _dataCacheProvider;
///
/// Create a new lean data writer to this base data directory.
///
/// Symbol string
/// Base data directory
/// Resolution of the desired output data
/// The tick type
/// The data cache provider to use
/// The file write policy to use
/// True if the symbol should be mapped while writting the data
public LeanDataWriter(Resolution resolution, Symbol symbol, string dataDirectory, TickType tickType = TickType.Trade,
IDataCacheProvider dataCacheProvider = null, WritePolicy? writePolicy = null, bool mapSymbol = false) : this(
dataDirectory,
resolution,
symbol.ID.SecurityType,
tickType,
dataCacheProvider,
writePolicy
)
{
_symbol = symbol;
_mapSymbol = mapSymbol;
// All fx data is quote data.
if (_securityType == SecurityType.Forex || _securityType == SecurityType.Cfd)
{
_tickType = TickType.Quote;
}
if (_securityType != SecurityType.Equity && _securityType != SecurityType.Forex && _securityType != SecurityType.Cfd && _securityType != SecurityType.Crypto
&& _securityType != SecurityType.Future && _securityType != SecurityType.Option && _securityType != SecurityType.FutureOption
&& _securityType != SecurityType.Index && _securityType != SecurityType.IndexOption && _securityType != SecurityType.CryptoFuture)
{
throw new NotImplementedException("Sorry this security type is not yet supported by the LEAN data writer: " + _securityType);
}
}
///
/// Create a new lean data writer to this base data directory.
///
/// Base data directory
/// Resolution of the desired output data
/// The security type
/// The tick type
/// The data cache provider to use
/// The file write policy to use
public LeanDataWriter(string dataDirectory, Resolution resolution, SecurityType securityType, TickType tickType,
IDataCacheProvider dataCacheProvider = null, WritePolicy? writePolicy = null)
{
_dataDirectory = dataDirectory;
_resolution = resolution;
_securityType = securityType;
_tickType = tickType;
if (writePolicy == null)
{
_writePolicy = resolution >= Resolution.Hour ? WritePolicy.Merge : WritePolicy.Overwrite;
}
else
{
_writePolicy = writePolicy.Value;
}
_dataCacheProvider = dataCacheProvider ?? new DiskDataCacheProvider();
}
///
/// Given the constructor parameters, write out the data in LEAN format.
///
/// IEnumerable source of the data: sorted from oldest to newest.
public void Write(IEnumerable source)
{
var lastTime = DateTime.MinValue;
var outputFile = string.Empty;
Symbol symbol = null;
var currentFileData = new List();
var writeTasks = new Queue();
foreach (var data in source)
{
// Ensure the data is sorted as a safety check
if (data.Time < lastTime) throw new Exception("The data must be pre-sorted from oldest to newest");
// Update our output file
// Only do this on date change, because we know we don't have a any data zips smaller than a day, saves time
if (data.Time.Date != lastTime.Date)
{
var mappedSymbol = GetMappedSymbol(data.Time, data.Symbol);
// Get the latest file name, if it has changed, we have entered a new file, write our current data to file
var latestOutputFile = GetZipOutputFileName(_dataDirectory, data.Time, mappedSymbol);
var latestSymbol = mappedSymbol;
if (outputFile.IsNullOrEmpty() || outputFile != latestOutputFile)
{
if (!currentFileData.IsNullOrEmpty())
{
// Launch a write task for the current file and data set
var file = outputFile;
var fileData = currentFileData;
var fileSymbol = symbol;
writeTasks.Enqueue(Task.Run(() =>
{
WriteFile(file, fileData, fileSymbol);
}));
}
// Reset our dictionary and store new output file
currentFileData = new List();
outputFile = latestOutputFile;
symbol = latestSymbol;
}
}
// Add data to our current dictionary
var line = LeanData.GenerateLine(data, _securityType, _resolution);
currentFileData.Add(new TimedLine(data.Time, line));
// Update our time
lastTime = data.Time;
}
// Finish off my processing the last file as well
if (!currentFileData.IsNullOrEmpty())
{
// we want to finish ASAP so let's do it ourselves
WriteFile(outputFile, currentFileData, symbol);
}
// Wait for all our write tasks to finish
while (writeTasks.Count > 0)
{
var task = writeTasks.Dequeue();
task.Wait();
}
}
///
/// Downloads historical data from the brokerage and saves it in LEAN format.
///
/// The brokerage from where to fetch the data
/// The list of symbols
/// The starting date/time (UTC)
/// The ending date/time (UTC)
public void DownloadAndSave(IBrokerage brokerage, List symbols, DateTime startTimeUtc, DateTime endTimeUtc)
{
if (symbols.Count == 0)
{
throw new ArgumentException("DownloadAndSave(): The symbol list cannot be empty.");
}
if (_tickType != TickType.Trade && _tickType != TickType.Quote)
{
throw new ArgumentException("DownloadAndSave(): The tick type must be Trade or Quote.");
}
if (symbols.Any(x => x.SecurityType != _securityType))
{
throw new ArgumentException($"DownloadAndSave(): All symbols must have {_securityType} security type.");
}
if (symbols.DistinctBy(x => x.ID.Symbol).Count() > 1)
{
throw new ArgumentException("DownloadAndSave(): All symbols must have the same root ticker.");
}
var dataType = LeanData.GetDataType(_resolution, _tickType);
var marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
var ticker = symbols.First().ID.Symbol;
var market = symbols.First().ID.Market;
var canonicalSymbol = Symbol.Create(ticker, _securityType, market);
var exchangeHours = marketHoursDatabase.GetExchangeHours(canonicalSymbol.ID.Market, canonicalSymbol, _securityType);
var dataTimeZone = marketHoursDatabase.GetDataTimeZone(canonicalSymbol.ID.Market, canonicalSymbol, _securityType);
foreach (var symbol in symbols)
{
var historyRequest = new HistoryRequest(
startTimeUtc,
endTimeUtc,
dataType,
symbol,
_resolution,
exchangeHours,
dataTimeZone,
_resolution,
true,
false,
DataNormalizationMode.Raw,
_tickType
);
var history = brokerage.GetHistory(historyRequest)?
.Select(
x =>
{
// Convert to date timezone before we write it
x.Time = x.Time.ConvertTo(exchangeHours.TimeZone, dataTimeZone);
return x;
})?
.ToList();
if (history == null)
{
continue;
}
// Generate a writer for this data and write it
var writer = new LeanDataWriter(_resolution, symbol, _dataDirectory, _tickType);
writer.Write(history);
}
}
///
/// Loads an existing Lean zip file into a SortedDictionary
///
private bool TryLoadFile(string fileName, string entryName, DateTime date, out SortedDictionary rows)
{
rows = new SortedDictionary();
using (var stream = _dataCacheProvider.Fetch($"{fileName}#{entryName}"))
{
if (stream == null)
{
return false;
}
using (var reader = new StreamReader(stream))
{
string line;
while ((line = reader.ReadLine()) != null)
{
rows[LeanData.ParseTime(line, date, _resolution)] = line;
}
}
return true;
}
}
///
/// Write this file to disk with the given data.
///
/// The full path to the new file
/// The data to write as a list of dates and strings
/// The symbol associated with this data
/// The reason we have the data as IEnumerable(DateTime, string) is to support
/// a generic write that works for all resolutions. In order to merge in hour/daily case I need the
/// date of the data to correctly merge the two. In order to support writing ticks I need to allow
/// two data points to have the same time. Thus I cannot use a single list of just strings nor
/// a sorted dictionary of DateTimes and strings.
private void WriteFile(string filePath, List data, Symbol symbol)
{
filePath = FileExtension.ToNormalizedPath(filePath);
if (data == null || data.Count == 0)
{
return;
}
// because we read & write the same file we need to take a lock per file path so we don't read something that might get outdated
// by someone writting to the same path at the same time
_keySynchronizer.Execute(filePath, singleExecution: false, () =>
{
var date = data[0].Time;
// Generate this csv entry name
var entryName = LeanData.GenerateZipEntryName(symbol, date, _resolution, _tickType);
// Check disk once for this file ahead of time, reuse where possible
var fileExists = File.Exists(filePath);
// If our file doesn't exist its possible the directory doesn't exist, make sure at least the directory exists
if (!fileExists)
{
Directory.CreateDirectory(Path.GetDirectoryName(filePath));
}
// Handle merging of files
// Only merge on files with hour/daily resolution, that exist, and can be loaded
string finalData = null;
if (_writePolicy == WritePolicy.Append)
{
var streamWriter = new ZipStreamWriter(filePath, entryName);
foreach (var tuple in data)
{
streamWriter.WriteLine(tuple.Line);
}
streamWriter.DisposeSafely();
}
else if (_writePolicy == WritePolicy.Merge && fileExists && TryLoadFile(filePath, entryName, date, out var rows))
{
// Preform merge on loaded rows
foreach (var timedLine in data)
{
rows[timedLine.Time] = timedLine.Line;
}
// Final merged data product
finalData = string.Join("\n", rows.Values);
}
else
{
// Otherwise just extract the data from the given list.
finalData = string.Join("\n", data.Select(x => x.Line));
}
if (finalData != null)
{
var bytes = Encoding.UTF8.GetBytes(finalData);
_dataCacheProvider.Store($"{filePath}#{entryName}", bytes);
}
if (Log.DebuggingEnabled)
{
var from = data[0].Time.Date.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture);
var to = data[data.Count - 1].Time.Date.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture);
Log.Debug($"LeanDataWriter.Write({symbol.ID}): Appended: {filePath} @ {entryName} {from}->{to}");
}
});
}
///
/// Get the output zip file
///
/// Base output directory for the zip file
/// Date/time for the data we're writing
/// The associated symbol. For example for options/futures it will be different than the canonical at
/// The full path to the output zip file
private string GetZipOutputFileName(string baseDirectory, DateTime time, Symbol symbol)
{
return LeanData.GenerateZipFilePath(baseDirectory, symbol, time, _resolution, _tickType);
}
///
/// Helper method to map a symbol if required at the given date
///
private Symbol GetMappedSymbol(DateTime time, Symbol symbol)
{
if (!_mapSymbol)
{
return _symbol;
}
if (symbol.RequiresMapping())
{
var mapFileResolver = MapFileProvider.Value.Get(AuxiliaryDataKey.Create(symbol.ID));
var mapFile = mapFileResolver.ResolveMapFile(symbol);
var mappedTicker = mapFile.GetMappedSymbol(time);
if(!string.IsNullOrEmpty(mappedTicker))
{
// only update if we got something to map to
symbol = symbol.UpdateMappedSymbol(mappedTicker);
}
}
return symbol;
}
private class TimedLine
{
public string Line { get; }
public DateTime Time { get; }
public TimedLine(DateTime time, string line)
{
Line = line;
Time = time;
}
}
}
}