/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NodaTime;
using QuantConnect.Configuration;
using QuantConnect.Data;
using QuantConnect.Interfaces;
using QuantConnect.Lean.Engine.DataFeeds;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
using QuantConnect.Logging;
using QuantConnect.Util;
using System;
using System.Collections.Generic;
using System.Linq;
using HistoryRequest = QuantConnect.Data.HistoryRequest;
namespace QuantConnect.Lean.Engine.HistoricalData
{
///
/// Provides an implementation of which
/// acts as a wrapper to use multiple history providers together
///
public class HistoryProviderManager : HistoryProviderBase
{
private IDataPermissionManager _dataPermissionManager;
private IBrokerage _brokerage;
private bool _initialized;
///
/// Collection of history providers being used
///
/// Protected for testing purposes
private List _historyProviders = new();
///
/// Gets the total number of data points emitted by this history provider
///
public override int DataPointCount => GetDataPointCount();
///
/// Sets the brokerage to be used for historical requests
///
/// The brokerage instance
public void SetBrokerage(IBrokerage brokerage)
{
_brokerage = brokerage;
}
///
/// Initializes this history provider to work for the specified job
///
/// The initialization parameters
public override void Initialize(HistoryProviderInitializeParameters parameters)
{
if (_initialized)
{
// let's make sure no one tries to change our parameters values
throw new InvalidOperationException("BrokerageHistoryProvider can only be initialized once");
}
_initialized = true;
var dataProvidersList = parameters.Job?.HistoryProvider.DeserializeList() ?? new List();
if (dataProvidersList.IsNullOrEmpty())
{
dataProvidersList.AddRange(Config.Get("history-provider", "SubscriptionDataReaderHistoryProvider").DeserializeList());
}
_dataPermissionManager = parameters.DataPermissionManager;
foreach (var historyProviderName in dataProvidersList)
{
IHistoryProvider historyProvider;
if (HistoryExtensions.TryGetBrokerageName(historyProviderName, out var brokerageName))
{
// we get the data queue handler if it already exists
var dataQueueHandler = Composer.Instance.GetPart((x) => x.GetType().Name == brokerageName);
if (dataQueueHandler == null)
{
// we need to create the brokerage/data queue handler
dataQueueHandler = Composer.Instance.GetExportedValueByTypeName(brokerageName);
// initialize it
dataQueueHandler.SetJob((Packets.LiveNodePacket)parameters.Job);
Log.Trace($"HistoryProviderManager.Initialize(): Created and wrapped '{brokerageName}' as '{typeof(BrokerageHistoryProvider).Name}'");
}
else
{
Log.Trace($"HistoryProviderManager.Initialize(): Wrapping '{brokerageName}' instance as '{typeof(BrokerageHistoryProvider).Name}'");
}
// wrap it
var brokerageHistoryProvider = new BrokerageHistoryProvider();
brokerageHistoryProvider.SetBrokerage((IBrokerage)dataQueueHandler);
historyProvider = brokerageHistoryProvider;
}
else
{
historyProvider = Composer.Instance.GetExportedValueByTypeName(historyProviderName);
if (historyProvider is BrokerageHistoryProvider)
{
(historyProvider as BrokerageHistoryProvider).SetBrokerage(_brokerage);
}
}
historyProvider.Initialize(parameters);
historyProvider.InvalidConfigurationDetected += (sender, args) => { OnInvalidConfigurationDetected(args); };
historyProvider.NumericalPrecisionLimited += (sender, args) => { OnNumericalPrecisionLimited(args); };
historyProvider.StartDateLimited += (sender, args) => { OnStartDateLimited(args); };
historyProvider.DownloadFailed += (sender, args) => { OnDownloadFailed(args); };
historyProvider.ReaderErrorDetected += (sender, args) => { OnReaderErrorDetected(args); };
_historyProviders.Add(historyProvider);
}
Log.Trace($"HistoryProviderManager.Initialize(): history providers [{string.Join(",", _historyProviders.Select(x => x.GetType().Name))}]");
}
///
/// Gets the history for the requested securities
///
/// The historical data requests
/// The time zone used when time stamping the slice instances
/// An enumerable of the slices of data covering the span specified in each request
public override IEnumerable GetHistory(IEnumerable requests, DateTimeZone sliceTimeZone)
{
List> historyEnumerators = new(_historyProviders.Count);
var historyRequets = new List();
foreach (var request in requests)
{
var config = request.ToSubscriptionDataConfig();
_dataPermissionManager?.AssertConfiguration(config, request.StartTimeLocal, request.EndTimeLocal);
historyRequets.Add(request);
}
foreach (var historyProvider in _historyProviders)
{
try
{
var history = historyProvider.GetHistory(historyRequets, sliceTimeZone);
if (history == null)
{
// doesn't support this history request, that's okay
continue;
}
historyEnumerators.Add(history.GetEnumerator());
}
catch (Exception e)
{
// ignore
}
}
using var synchronizer = new SynchronizingSliceEnumerator(historyEnumerators);
Slice latestMergeSlice = null;
while (synchronizer.MoveNext())
{
if (synchronizer.Current == null)
{
continue;
}
if (latestMergeSlice == null)
{
latestMergeSlice = synchronizer.Current;
continue;
}
if (synchronizer.Current.UtcTime > latestMergeSlice.UtcTime)
{
// a newer slice we emit the old and keep a reference of the new
// so in the next loop we merge if required
yield return latestMergeSlice;
latestMergeSlice = synchronizer.Current;
}
else
{
// a new slice with same time we merge them into 'latestMergeSlice'
latestMergeSlice.MergeSlice(synchronizer.Current);
}
}
if (latestMergeSlice != null)
{
yield return latestMergeSlice;
}
}
private int GetDataPointCount()
{
var dataPointCount = 0;
foreach (var historyProvider in _historyProviders)
{
dataPointCount += historyProvider.DataPointCount;
}
return dataPointCount;
}
}
}