/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Newtonsoft.Json;
using QuantConnect.Interfaces;
using QuantConnect.Util;
namespace QuantConnect.Data
{
///
/// Monitors data requests and reports on missing data
///
public class DataMonitor : IDataMonitor
{
private bool _exited;
private TextWriter _succeededDataRequestsWriter;
private TextWriter _failedDataRequestsWriter;
private long _succeededDataRequestsCount;
private long _failedDataRequestsCount;
private long _succeededUniverseDataRequestsCount;
private long _failedUniverseDataRequestsCount;
private readonly List _requestRates = new();
private long _prevRequestsCount;
private DateTime _lastRequestRateCalculationTime;
private Thread _requestRateCalculationThread;
private CancellationTokenSource _cancellationTokenSource;
private readonly string _succeededDataRequestsFileName;
private readonly string _failedDataRequestsFileName;
private readonly string _resultsDestinationFolder;
private readonly object _threadLock = new();
///
/// Initializes a new instance of the class
///
public DataMonitor()
{
_resultsDestinationFolder = Globals.ResultsDestinationFolder;
_succeededDataRequestsFileName = GetFilePath("succeeded-data-requests.txt");
_failedDataRequestsFileName = GetFilePath("failed-data-requests.txt");
}
///
/// Event handler for the event
///
public void OnNewDataRequest(object sender, DataProviderNewDataRequestEventArgs e)
{
if (_exited)
{
return;
}
Initialize();
if (e.Path.Contains("map_files", StringComparison.OrdinalIgnoreCase) ||
e.Path.Contains("factor_files", StringComparison.OrdinalIgnoreCase))
{
return;
}
var path = StripDataFolder(e.Path);
var isUniverseData = path.Contains("coarse", StringComparison.OrdinalIgnoreCase) ||
path.Contains("universe", StringComparison.OrdinalIgnoreCase);
if (e.Succeeded)
{
WriteLineToFile(_succeededDataRequestsWriter, path, _succeededDataRequestsFileName);
Interlocked.Increment(ref _succeededDataRequestsCount);
if (isUniverseData)
{
Interlocked.Increment(ref _succeededUniverseDataRequestsCount);
}
}
else
{
WriteLineToFile(_failedDataRequestsWriter, path, _failedDataRequestsFileName);
Interlocked.Increment(ref _failedDataRequestsCount);
if (isUniverseData)
{
Interlocked.Increment(ref _failedUniverseDataRequestsCount);
}
if (Logging.Log.DebuggingEnabled)
{
Logging.Log.Debug($"DataMonitor.OnNewDataRequest(): Data from {path} could not be fetched, error: {e.ErrorMessage}");
}
}
}
///
/// Terminates the data monitor generating a final report
///
public void Exit()
{
if (_exited || _requestRateCalculationThread == null)
{
return;
}
_exited = true;
_requestRateCalculationThread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
_succeededDataRequestsWriter?.Close();
_failedDataRequestsWriter?.Close();
StoreDataMonitorReport(GenerateReport());
_succeededDataRequestsWriter.DisposeSafely();
_failedDataRequestsWriter.DisposeSafely();
_cancellationTokenSource.DisposeSafely();
}
///
/// Disposes this object
///
public void Dispose()
{
Exit();
}
///
/// Strips the given data folder path
///
protected virtual string StripDataFolder(string path)
{
if (path.StartsWith(Globals.DataFolder, StringComparison.OrdinalIgnoreCase))
{
return path.Substring(Globals.DataFolder.Length);
}
return path;
}
///
/// Initializes the instance
///
private void Initialize()
{
if (_requestRateCalculationThread != null)
{
return;
}
lock (_threadLock)
{
if (_requestRateCalculationThread != null)
{
return;
}
// we create the files on demand
_succeededDataRequestsWriter = OpenStream(_succeededDataRequestsFileName);
_failedDataRequestsWriter = OpenStream(_failedDataRequestsFileName);
_cancellationTokenSource = new CancellationTokenSource();
_requestRateCalculationThread = new Thread(() =>
{
while (!_cancellationTokenSource.Token.WaitHandle.WaitOne(3000))
{
ComputeFileRequestFrequency();
}
})
{ IsBackground = true };
_requestRateCalculationThread.Start();
}
}
private DataMonitorReport GenerateReport()
{
var report = new DataMonitorReport(_succeededDataRequestsCount,
_failedDataRequestsCount,
_succeededUniverseDataRequestsCount,
_failedUniverseDataRequestsCount,
_requestRates);
Logging.Log.Trace($"DataMonitor.GenerateReport():{Environment.NewLine}" +
$"DATA USAGE:: Total data requests {report.TotalRequestsCount}{Environment.NewLine}" +
$"DATA USAGE:: Succeeded data requests {report.SucceededDataRequestsCount}{Environment.NewLine}" +
$"DATA USAGE:: Failed data requests {report.FailedDataRequestsCount}{Environment.NewLine}" +
$"DATA USAGE:: Failed data requests percentage {report.FailedDataRequestsPercentage}%{Environment.NewLine}" +
$"DATA USAGE:: Total universe data requests {report.TotalUniverseDataRequestsCount}{Environment.NewLine}" +
$"DATA USAGE:: Succeeded universe data requests {report.SucceededUniverseDataRequestsCount}{Environment.NewLine}" +
$"DATA USAGE:: Failed universe data requests {report.FailedUniverseDataRequestsCount}{Environment.NewLine}" +
$"DATA USAGE:: Failed universe data requests percentage {report.FailedUniverseDataRequestsPercentage}%");
return report;
}
private void ComputeFileRequestFrequency()
{
var requestsCount = _succeededDataRequestsCount + _failedDataRequestsCount;
if (_lastRequestRateCalculationTime == default)
{
// First time we calculate the request rate.
// We don't have a previous value to compare to so we just store the current value.
_lastRequestRateCalculationTime = DateTime.UtcNow;
_prevRequestsCount = requestsCount;
return;
}
var requestsCountDelta = requestsCount - _prevRequestsCount;
var now = DateTime.UtcNow;
var timeDelta = now - _lastRequestRateCalculationTime;
_requestRates.Add(Math.Round(requestsCountDelta / timeDelta.TotalSeconds));
_prevRequestsCount = requestsCount;
_lastRequestRateCalculationTime = now;
}
///
/// Stores the data monitor report
///
/// The data monitor report to be stored
private void StoreDataMonitorReport(DataMonitorReport report)
{
if (report == null)
{
return;
}
var path = GetFilePath("data-monitor-report.json");
var data = JsonConvert.SerializeObject(report, Formatting.None);
File.WriteAllText(path, data);
}
private string GetFilePath(string filename)
{
var baseFilename = Path.GetFileNameWithoutExtension(filename);
var timestamp = DateTime.UtcNow.ToStringInvariant("yyyyMMddHHmmssfff");
var extension = Path.GetExtension(filename);
return Path.Combine(_resultsDestinationFolder, $"{baseFilename}-{timestamp}{extension}");
}
private static TextWriter OpenStream(string filename)
{
var writer = new StreamWriter(filename);
return TextWriter.Synchronized(writer);
}
private static void WriteLineToFile(TextWriter writer, string line, string filename)
{
try
{
writer.WriteLine(line);
}
catch (IOException exception)
{
Logging.Log.Error($"DataMonitor.OnNewDataRequest(): Failed to write to file {filename}: {exception.Message}");
}
}
}
}