/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using QuantConnect.Data;
using QuantConnect.Data.Auxiliary;
using QuantConnect.Data.UniverseSelection;
using QuantConnect.Interfaces;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
using QuantConnect.Lean.Engine.DataFeeds.WorkScheduling;
using QuantConnect.Logging;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Utilities related to data
///
public static class SubscriptionUtils
{
///
/// Creates a new which will directly consume the provided enumerator
///
/// The subscription data request
/// The data enumerator stack
/// A new subscription instance ready to consume
public static Subscription Create(
SubscriptionRequest request,
IEnumerator enumerator,
bool dailyStrictEndTimeEnabled)
{
if (enumerator == null)
{
return GetEndedSubscription(request);
}
var exchangeHours = request.Security.Exchange.Hours;
var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
var dataEnumerator = new SubscriptionDataEnumerator(
request.Configuration,
exchangeHours,
timeZoneOffsetProvider,
enumerator,
request.IsUniverseSubscription,
dailyStrictEndTimeEnabled
);
return new Subscription(request, dataEnumerator, timeZoneOffsetProvider);
}
///
/// Setups a new which will consume a blocking
/// that will be feed by a worker task
///
/// The subscription data request
/// The data enumerator stack
/// The factor file provider
/// Enables price factoring
/// A new subscription instance ready to consume
public static Subscription CreateAndScheduleWorker(
SubscriptionRequest request,
IEnumerator enumerator,
IFactorFileProvider factorFileProvider,
bool enablePriceScale,
bool dailyStrictEndTimeEnabled)
{
if(enumerator == null)
{
return GetEndedSubscription(request);
}
var exchangeHours = request.Security.Exchange.Hours;
var enqueueable = new EnqueueableEnumerator(true);
var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
var subscription = new Subscription(request, enqueueable, timeZoneOffsetProvider);
var config = subscription.Configuration;
enablePriceScale = enablePriceScale && config.PricesShouldBeScaled();
var lastTradableDate = DateTime.MinValue;
Func produce = (workBatchSize) =>
{
try
{
var count = 0;
while (enumerator.MoveNext())
{
// subscription has been removed, no need to continue enumerating
if (enqueueable.HasFinished)
{
enumerator.DisposeSafely();
return false;
}
var data = enumerator.Current;
// Use our config filter to see if we should emit this
// This currently catches Auxiliary data that we don't want to emit
if (data != null && !config.ShouldEmitData(data, request.IsUniverseSubscription))
{
continue;
}
// In the event we have "Raw" configuration, we will force our subscription data
// to precalculate adjusted data. The data will still be emitted as raw, but
// if the config is changed at any point it can emit adjusted data as well
// See SubscriptionData.Create() and PrecalculatedSubscriptionData for more
var requestMode = config.DataNormalizationMode;
if (config.SecurityType == SecurityType.Equity)
{
requestMode = requestMode != DataNormalizationMode.Raw ? requestMode : DataNormalizationMode.Adjusted;
}
var priceScaleFrontierDate = data.GetUpdatePriceScaleFrontier().Date;
// We update our price scale factor when the date changes for non fill forward bars or if we haven't initialized yet.
// We don't take into account auxiliary data because we don't scale it and because the underlying price data could be fill forwarded
if (enablePriceScale && priceScaleFrontierDate > lastTradableDate && data.DataType != MarketDataType.Auxiliary && (!data.IsFillForward || lastTradableDate == DateTime.MinValue))
{
var factorFile = factorFileProvider.Get(request.Configuration.Symbol);
lastTradableDate = priceScaleFrontierDate;
request.Configuration.PriceScaleFactor = factorFile.GetPriceScale(lastTradableDate, requestMode, config.ContractDepthOffset, config.DataMappingMode);
}
SubscriptionData subscriptionData = SubscriptionData.Create(dailyStrictEndTimeEnabled,
config,
exchangeHours,
subscription.OffsetProvider,
data,
requestMode,
enablePriceScale ? request.Configuration.PriceScaleFactor : null);
// drop the data into the back of the enqueueable
enqueueable.Enqueue(subscriptionData);
count++;
// stop executing if added more data than the work batch size, we don't want to fill the ram
if (count > workBatchSize)
{
return true;
}
}
}
catch (Exception exception)
{
Log.Error(exception, $"Subscription worker task exception {request.Configuration}.");
}
// we made it here because MoveNext returned false or we exploded, stop the enqueueable
enqueueable.Stop();
// we have to dispose of the enumerator
enumerator.DisposeSafely();
return false;
};
WeightedWorkScheduler.Instance.QueueWork(config.Symbol, produce,
// if the subscription finished we return 0, so the work is prioritized and gets removed
() =>
{
if (enqueueable.HasFinished)
{
return 0;
}
return enqueueable.Count;
}
);
return subscription;
}
///
/// Return an ended subscription so it doesn't blow up at runtime on the data worker, this can happen if there's no tradable date
///
private static Subscription GetEndedSubscription(SubscriptionRequest request)
{
var result = new Subscription(request, null, null);
// set subscription as ended
result.Dispose();
return result;
}
}
}