/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using NodaTime;
using QuantConnect.Data;
using QuantConnect.Data.Market;
using QuantConnect.Data.UniverseSelection;
using QuantConnect.Interfaces;
using QuantConnect.Lean.Engine.DataFeeds;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
using QuantConnect.Securities;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.HistoricalData
{
///
/// Provides an abstract implementation of
/// which provides synchronization of multiple history results
///
public abstract class SynchronizingHistoryProvider : HistoryProviderBase
{
///
/// The market hours database
///
protected static readonly MarketHoursDatabase MarketHours = MarketHoursDatabase.FromDataFolder();
private int _dataPointCount;
///
/// The algorithm settings instance to use
///
public IAlgorithmSettings AlgorithmSettings { get; set; } = new AlgorithmSettings();
///
/// Gets the total number of data points emitted by this history provider
///
public override int DataPointCount => _dataPointCount;
///
/// Enumerates the subscriptions into slices
///
protected IEnumerable CreateSliceEnumerableFromSubscriptions(List subscriptions, DateTimeZone sliceTimeZone)
{
// required by TimeSlice.Create, but we don't need it's behavior
var frontier = DateTime.MinValue;
// never changes, there's no selection during a history request
var universeSelectionData = new Dictionary();
var timeSliceFactory = new TimeSliceFactory(sliceTimeZone);
while (true)
{
var earlyBirdTicks = long.MaxValue;
var data = new List();
foreach (var subscription in subscriptions.Where(subscription => !subscription.EndOfStream))
{
if (subscription.Current == null && !subscription.MoveNext())
{
// initial pump. We do it here and not when creating the subscriptions so
// that parallel workers can all start as fast as possible
continue;
}
DataFeedPacket packet = null;
while (subscription.Current.EmitTimeUtc <= frontier)
{
if (packet == null)
{
// for performance, lets be selfish about creating a new instance
packet = new DataFeedPacket(subscription.Security, subscription.Configuration);
// only add if we have data
data.Add(packet);
}
packet.Add(subscription.Current.Data);
Interlocked.Increment(ref _dataPointCount);
if (!subscription.MoveNext())
{
break;
}
}
// update our early bird ticks (next frontier time)
if (subscription.Current != null)
{
// take the earliest between the next piece of data or the next tz discontinuity
earlyBirdTicks = Math.Min(earlyBirdTicks, subscription.Current.EmitTimeUtc.Ticks);
}
}
if (data.Count != 0)
{
// reuse the slice construction code from TimeSlice.Create
yield return timeSliceFactory.Create(frontier, data, SecurityChanges.None, universeSelectionData).Slice;
}
// end of subscriptions, after we emit, else we might drop a data point
if (earlyBirdTicks == long.MaxValue) break;
frontier = new DateTime(Math.Max(earlyBirdTicks, frontier.Ticks), DateTimeKind.Utc);
}
// make sure we clean up after ourselves
foreach (var subscription in subscriptions)
{
subscription.Dispose();
}
}
///
/// Retrieves the appropriate based on the data type and symbol.
///
/// The default exchange instance.
/// The type of data being processed.
/// The security symbol.
/// The security exchange with appropriate market hours.
protected static SecurityExchange GetSecurityExchange(SecurityExchange exchange, Type dataType, Symbol symbol)
{
if (dataType == typeof(OpenInterest))
{
// Retrieve the original market hours, which include holidays and closed days.
var originalExchangeHours = MarketHours.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType);
// Use the original market hours to prevent fill-forwarding on non-trading hours.
return new SecurityExchange(originalExchangeHours);
}
return exchange;
}
///
/// Creates a subscription to process the history request
///
protected Subscription CreateSubscription(HistoryRequest request, IEnumerable history)
{
var config = request.ToSubscriptionDataConfig();
var security = new Security(
request.ExchangeHours,
config,
new Cash(Currencies.NullCurrency, 0, 1m),
SymbolProperties.GetDefault(Currencies.NullCurrency),
ErrorCurrencyConverter.Instance,
RegisteredSecurityDataTypesProvider.Null,
new SecurityCache()
);
var reader = history.GetEnumerator();
var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(AlgorithmSettings, request, config.Symbol, config.Increment);
if (useDailyStrictEndTimes)
{
reader = new StrictDailyEndTimesEnumerator(reader, request.ExchangeHours, request.StartTimeLocal);
}
// optionally apply fill forward behavior
if (request.FillForwardResolution.HasValue)
{
// FillForwardEnumerator expects these values in local times
var start = request.StartTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
var end = request.EndTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
// copy forward Bid/Ask bars for QuoteBars
if (request.DataType == typeof(QuoteBar))
{
reader = new QuoteBarFillForwardEnumerator(reader);
}
var readOnlyRef = Ref.CreateReadOnly(() => request.FillForwardResolution.Value.ToTimeSpan());
var exchange = GetSecurityExchange(security.Exchange, request.DataType, request.Symbol);
reader = new FillForwardEnumerator(reader, exchange, readOnlyRef, request.IncludeExtendedMarketHours, start, end, config.Increment, config.DataTimeZone, useDailyStrictEndTimes, request.DataType);
}
var subscriptionRequest = new SubscriptionRequest(false, null, security, config, request.StartTimeUtc, request.EndTimeUtc);
return SubscriptionUtils.Create(subscriptionRequest, reader, AlgorithmSettings.DailyPreciseEndTime);
}
}
}