/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using QuantConnect.Configuration;
using QuantConnect.Data.Auxiliary;
using QuantConnect.Data.Market;
using QuantConnect.Interfaces;
using QuantConnect.Util;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using QuantConnect.Lean.Engine.DataFeeds;
using DateTime = System.DateTime;
using Log = QuantConnect.Logging.Log;
using QuantConnect.Data.UniverseSelection;
using static QuantConnect.Data.UniverseSelection.CoarseFundamentalDataProvider;
using QuantConnect.Data.Fundamental;
namespace QuantConnect.ToolBox.CoarseUniverseGenerator
{
///
/// Coarse
///
public class CoarseUniverseGeneratorProgram
{
///
/// Has fundamental data source
///
public const FundamentalProperty HasFundamentalSource = FundamentalProperty.CompanyReference_CompanyId;
private readonly DirectoryInfo _dailyDataFolder;
private readonly DirectoryInfo _destinationFolder;
private readonly IMapFileProvider _mapFileProvider;
private readonly IFactorFileProvider _factorFileProvider;
private readonly string _market;
private readonly FileInfo _blackListedTickersFile;
///
/// Runs the Coarse universe generator with default values.
///
///
public static bool CoarseUniverseGenerator()
{
var dailyDataFolder = new DirectoryInfo(Path.Combine(Globals.DataFolder, SecurityType.Equity.SecurityTypeToLower(), Market.USA, Resolution.Daily.ResolutionToLower()));
var destinationFolder = new DirectoryInfo(Path.Combine(Globals.DataFolder, SecurityType.Equity.SecurityTypeToLower(), Market.USA, "fundamental", "coarse"));
var blackListedTickersFile = new FileInfo("blacklisted-tickers.txt");
var reservedWordPrefix = Config.Get("reserved-words-prefix", "quantconnect-");
var dataProvider = new DefaultDataProvider();
var mapFileProvider = new LocalDiskMapFileProvider();
mapFileProvider.Initialize(dataProvider);
var factorFileProvider = new LocalDiskFactorFileProvider();
factorFileProvider.Initialize(mapFileProvider, dataProvider);
FundamentalService.Initialize(dataProvider, nameof(CoarseFundamentalDataProvider), false);
var generator = new CoarseUniverseGeneratorProgram(dailyDataFolder, destinationFolder, Market.USA, blackListedTickersFile, reservedWordPrefix, mapFileProvider, factorFileProvider);
return generator.Run(out _, out _);
}
///
/// Initializes a new instance of the class.
///
/// The daily data folder.
/// The destination folder.
/// The market.
/// The black listed tickers file.
/// The reserved words prefix.
/// The map file provider.
/// The factor file provider.
/// if set to true [debug enabled].
public CoarseUniverseGeneratorProgram(
DirectoryInfo dailyDataFolder,
DirectoryInfo destinationFolder,
string market,
FileInfo blackListedTickersFile,
string reservedWordsPrefix,
IMapFileProvider mapFileProvider,
IFactorFileProvider factorFileProvider,
bool debugEnabled = false)
{
_blackListedTickersFile = blackListedTickersFile;
_market = market;
_factorFileProvider = factorFileProvider;
_mapFileProvider = mapFileProvider;
_destinationFolder = destinationFolder;
_dailyDataFolder = dailyDataFolder;
Log.DebuggingEnabled = debugEnabled;
}
///
/// Runs this instance.
///
///
public bool Run(out ConcurrentDictionary> coarsePerSecurity, out DateTime[] dates)
{
var startTime = DateTime.UtcNow;
var success = true;
Log.Trace($"CoarseUniverseGeneratorProgram.ProcessDailyFolder(): Processing: {_dailyDataFolder.FullName}");
var symbolsProcessed = 0;
var filesRead = 0;
var dailyFilesNotFound = 0;
var coarseFilesGenerated = 0;
var mapFileResolver = _mapFileProvider.Get(new AuxiliaryDataKey(_market, SecurityType.Equity));
var result = coarsePerSecurity = new();
dates = Array.Empty();
var blackListedTickers = new HashSet();
if (_blackListedTickersFile.Exists)
{
blackListedTickers = File.ReadAllLines(_blackListedTickersFile.FullName).ToHashSet();
}
var securityIdentifierContexts = PopulateSidContex(mapFileResolver, blackListedTickers);
var dailyPricesByTicker = new ConcurrentDictionary>();
var outputCoarseContent = new ConcurrentDictionary>();
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 2) };
try
{
Parallel.ForEach(securityIdentifierContexts, parallelOptions, sidContext =>
{
var coarseForSecurity = new List();
var symbol = new Symbol(sidContext.SID, sidContext.LastTicker);
var symbolCount = Interlocked.Increment(ref symbolsProcessed);
Log.Debug($"CoarseUniverseGeneratorProgram.Run(): Processing {symbol} with tickers: '{string.Join(",", sidContext.Tickers)}'");
var factorFile = _factorFileProvider.Get(symbol);
// Populate dailyPricesByTicker with all daily data by ticker for all tickers of this security.
foreach (var ticker in sidContext.Tickers)
{
var pathFile = Path.Combine(_dailyDataFolder.FullName, $"{ticker}.zip");
var dailyFile = new FileInfo(pathFile);
if (!dailyFile.Exists)
{
Log.Debug($"CoarseUniverseGeneratorProgram.Run(): {dailyFile.FullName} not found, looking for daily data in data folder");
dailyFile = new FileInfo(Path.Combine(Globals.DataFolder, "equity", "usa", "daily", $"{ticker}.zip"));
if (!dailyFile.Exists)
{
Log.Error($"CoarseUniverseGeneratorProgram.Run(): {dailyFile} not found!");
Interlocked.Increment(ref dailyFilesNotFound);
continue;
}
}
if (!dailyPricesByTicker.ContainsKey(ticker))
{
dailyPricesByTicker.AddOrUpdate(ticker, ParseDailyFile(dailyFile));
Interlocked.Increment(ref filesRead);
}
}
// Look for daily data for each ticker of the actual security
for (int mapFileRowIndex = sidContext.MapFileRows.Length - 1; mapFileRowIndex >= 1; mapFileRowIndex--)
{
var ticker = sidContext.MapFileRows[mapFileRowIndex].Item2.ToLowerInvariant();
var endDate = sidContext.MapFileRows[mapFileRowIndex].Item1;
var startDate = sidContext.MapFileRows[mapFileRowIndex - 1].Item1;
List tickerDailyData;
if (!dailyPricesByTicker.TryGetValue(ticker, out tickerDailyData))
{
Log.Error($"CoarseUniverseGeneratorProgram.Run(): Daily data for ticker {ticker.ToUpperInvariant()} not found!");
continue;
}
// Get daily data only for the time the ticker was
foreach (var tradeBar in tickerDailyData.Where(tb => tb.Time >= startDate && tb.Time <= endDate))
{
var coarseFundamental = GenerateFactorFileRow(ticker, sidContext, factorFile as CorporateFactorProvider, tradeBar);
coarseForSecurity.Add(coarseFundamental);
outputCoarseContent.AddOrUpdate(tradeBar.Time,
new List { coarseFundamental },
(time, list) =>
{
lock (list)
{
list.Add(coarseFundamental);
return list;
}
});
}
}
if(coarseForSecurity.Count > 0)
{
result[sidContext.SID] = coarseForSecurity;
}
if (symbolCount % 1000 == 0)
{
var elapsed = DateTime.UtcNow - startTime;
Log.Trace($"CoarseUniverseGeneratorProgram.Run(): Processed {symbolCount} in {elapsed:g} at {symbolCount / elapsed.TotalMinutes:F2} symbols/minute ");
}
});
_destinationFolder.Create();
var startWriting = DateTime.UtcNow;
Parallel.ForEach(outputCoarseContent, coarseByDate =>
{
var filename = $"{coarseByDate.Key.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture)}.csv";
var filePath = Path.Combine(_destinationFolder.FullName, filename);
Log.Debug($"CoarseUniverseGeneratorProgram.Run(): Saving {filename} with {coarseByDate.Value.Count} entries.");
File.WriteAllLines(filePath, coarseByDate.Value.Select(x => CoarseFundamental.ToRow(x)).OrderBy(cr => cr));
var filesCount = Interlocked.Increment(ref coarseFilesGenerated);
if (filesCount % 1000 == 0)
{
var elapsed = DateTime.UtcNow - startWriting;
Log.Trace($"CoarseUniverseGeneratorProgram.Run(): Processed {filesCount} in {elapsed:g} at {filesCount / elapsed.TotalSeconds:F2} files/second ");
}
});
dates = outputCoarseContent.Keys.OrderBy(x => x).ToArray();
Log.Trace($"\n\nTotal of {coarseFilesGenerated} coarse files generated in {DateTime.UtcNow - startTime:g}:\n" +
$"\t => {filesRead} daily data files read.\n");
}
catch (Exception e)
{
Log.Error(e, $"CoarseUniverseGeneratorProgram.Run(): FAILED!");
success = false;
}
return success;
}
///
/// Generates the factor file row.
///
/// The ticker.
/// The sid context.
/// The factor file.
/// The trade bar.
/// The fine available dates.
/// The fine fundamental folder.
///
private static CoarseFundamental GenerateFactorFileRow(string ticker, SecurityIdentifierContext sidContext, CorporateFactorProvider factorFile, TradeBar tradeBar)
{
var date = tradeBar.Time;
var factorFileRow = factorFile?.GetScalingFactors(date);
var dollarVolume = Math.Truncate((double)(tradeBar.Close * tradeBar.Volume));
var priceFactor = factorFileRow?.PriceFactor.Normalize() ?? 1m;
var splitFactor = factorFileRow?.SplitFactor.Normalize() ?? 1m;
var hasFundamentalData = CheckFundamentalData(date, sidContext.SID);
// sid,symbol,close,volume,dollar volume,has fundamental data,price factor,split factor
return new CoarseFundamentalSource
{
Symbol = new Symbol(sidContext.SID, ticker),
Value = tradeBar.Close.Normalize(),
Time = date,
VolumeSetter = decimal.ToInt64(tradeBar.Volume),
DollarVolumeSetter = dollarVolume,
PriceFactorSetter = priceFactor,
SplitFactorSetter = splitFactor,
HasFundamentalDataSetter = hasFundamentalData
};
}
///
/// Checks if there is fundamental data for
///
/// The date.
/// The security identifier.
/// True if fundamental data is available
private static bool CheckFundamentalData(DateTime date, SecurityIdentifier sid)
{
return !string.IsNullOrEmpty(FundamentalService.Get(date, sid, HasFundamentalSource));
}
///
/// Parses the daily file.
///
/// The daily file.
///
private static List ParseDailyFile(FileInfo dailyFile)
{
var scaleFactor = 1 / 10000m;
var output = new List();
using (var fileStream = dailyFile.OpenRead())
using (var stream = Compression.UnzipStreamToStreamReader(fileStream))
{
while (!stream.EndOfStream)
{
var tradeBar = new TradeBar
{
Time = stream.GetDateTime(),
Open = stream.GetDecimal() * scaleFactor,
High = stream.GetDecimal() * scaleFactor,
Low = stream.GetDecimal() * scaleFactor,
Close = stream.GetDecimal() * scaleFactor,
Volume = stream.GetDecimal()
};
output.Add(tradeBar);
}
}
return output;
}
///
/// Populates the sid contex.
///
/// The map file resolver.
/// The exclusions.
///
private IEnumerable PopulateSidContex(MapFileResolver mapFileResolver, HashSet exclusions)
{
Log.Trace("CoarseUniverseGeneratorProgram.PopulateSidContex(): Generating SID context from QuantQuote's map files.");
foreach (var mapFile in mapFileResolver)
{
if (exclusions.Contains(mapFile.Last().MappedSymbol))
{
continue;
}
yield return new SecurityIdentifierContext(mapFile, _market);
}
}
}
}