/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using NodaTime;
using QuantConnect.Data;
using QuantConnect.Logging;
using QuantConnect.Interfaces;
using QuantConnect.Data.Market;
using System.Collections.Generic;
using QuantConnect.Data.UniverseSelection;
using System.Linq;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Instance base class that will provide methods for creating new
///
public class TimeSliceFactory
{
private readonly DateTimeZone _timeZone;
// performance: these collections are not always used so keep a reference to an empty
// instance to use and avoid unnecessary constructors and allocations
private readonly List> _emptyCustom = new List>();
private readonly TradeBars _emptyTradeBars = new TradeBars();
private readonly QuoteBars _emptyQuoteBars = new QuoteBars();
private readonly Ticks _emptyTicks = new Ticks();
private readonly Splits _emptySplits = new Splits();
private readonly Dividends _emptyDividends = new Dividends();
private readonly Delistings _emptyDelistings = new Delistings();
private readonly OptionChains _emptyOptionChains = new OptionChains();
private readonly FuturesChains _emptyFuturesChains = new FuturesChains();
private readonly SymbolChangedEvents _emptySymbolChangedEvents = new SymbolChangedEvents();
private readonly MarginInterestRates _emptyMarginInterestRates = new MarginInterestRates();
///
/// Creates a new instance
///
/// The time zone required for computing algorithm and slice time
public TimeSliceFactory(DateTimeZone timeZone)
{
_timeZone = timeZone;
}
///
/// Creates a new empty to be used as a time pulse
///
/// The objective of this method is to standardize the time pulse creation
/// The UTC frontier date time
/// A new time pulse
public TimeSlice CreateTimePulse(DateTime utcDateTime)
{
// setting all data collections to null, this time slice shouldn't be used
// for its data, we want to see fireworks it someone tries
return new TimeSlice(utcDateTime,
0,
null,
null,
null,
null,
null,
SecurityChanges.None,
null,
isTimePulse:true);
}
///
/// Creates a new for the specified time using the specified data
///
/// The UTC frontier date time
/// The data in this
/// The new changes that are seen in this time slice as a result of universe selection
///
/// A new containing the specified data
public TimeSlice Create(DateTime utcDateTime,
List data,
SecurityChanges changes,
Dictionary universeData)
{
int count = 0;
var security = new List>(data.Count);
List> custom = null;
var consolidator = new List>(data.Count);
var allDataForAlgorithm = new List(data.Count);
var optionUnderlyingUpdates = new Dictionary();
Split split;
Dividend dividend;
Delisting delisting;
SymbolChangedEvent symbolChange;
MarginInterestRate marginInterestRate;
// we need to be able to reference the slice being created in order to define the
// evaluation of option price models, so we define a 'future' that can be referenced
// in the option price model evaluation delegates for each contract
Slice slice = null;
var sliceFuture = new Lazy(() => slice);
var algorithmTime = utcDateTime.ConvertFromUtc(_timeZone);
TradeBars tradeBars = null;
QuoteBars quoteBars = null;
Ticks ticks = null;
Splits splits = null;
Dividends dividends = null;
Delistings delistings = null;
OptionChains optionChains = null;
FuturesChains futuresChains = null;
SymbolChangedEvents symbolChanges = null;
MarginInterestRates marginInterestRates = null;
UpdateEmptyCollections(algorithmTime);
if (universeData.Count > 0)
{
// count universe data
foreach (var kvp in universeData)
{
count += kvp.Value.Data.Count;
}
}
// ensure we read equity data before option data, so we can set the current underlying price
foreach (var packet in data)
{
// filter out packets for removed subscriptions
if (packet.IsSubscriptionRemoved)
{
continue;
}
var list = packet.Data;
var symbol = packet.Configuration.Symbol;
if (list.Count == 0) continue;
// keep count of all data points
if (list.Count == 1 && list[0] is BaseDataCollection)
{
var baseDataCollectionCount = ((BaseDataCollection)list[0]).Data.Count;
if (baseDataCollectionCount == 0)
{
continue;
}
count += baseDataCollectionCount;
}
else
{
count += list.Count;
}
if (!packet.Configuration.IsInternalFeed && packet.Configuration.IsCustomData)
{
if (custom == null)
{
custom = new List>(1);
}
// This is all the custom data
custom.Add(new UpdateData(packet.Security, packet.Configuration.Type, list, packet.Configuration.IsInternalFeed));
}
var securityUpdate = new List(list.Count);
var consolidatorUpdate = new List(list.Count);
var containsFillForwardData = false;
for (var i = 0; i < list.Count; i++)
{
var baseData = list[i];
if (!packet.Configuration.IsInternalFeed)
{
// this is all the data that goes into the algorithm
allDataForAlgorithm.Add(baseData);
}
containsFillForwardData |= baseData.IsFillForward;
// don't add internal feed data to ticks/bars objects
if (baseData.DataType != MarketDataType.Auxiliary)
{
var tick = baseData as Tick;
if (!packet.Configuration.IsInternalFeed)
{
// populate data dictionaries
switch (baseData.DataType)
{
case MarketDataType.Tick:
if (ticks == null)
{
ticks = new Ticks(algorithmTime);
}
ticks.Add(baseData.Symbol, (Tick)baseData);
break;
case MarketDataType.TradeBar:
if (tradeBars == null)
{
tradeBars = new TradeBars(algorithmTime);
}
var newTradeBar = (TradeBar)baseData;
TradeBar existingTradeBar;
// if we have an existing bar keep the highest resolution one
// e.g Hour and Minute resolution subscriptions for the same symbol
// see CustomUniverseWithBenchmarkRegressionAlgorithm
if (!tradeBars.TryGetValue(baseData.Symbol, out existingTradeBar)
|| existingTradeBar.Period > newTradeBar.Period)
{
tradeBars[baseData.Symbol] = newTradeBar;
}
break;
case MarketDataType.QuoteBar:
if (quoteBars == null)
{
quoteBars = new QuoteBars(algorithmTime);
}
var newQuoteBar = (QuoteBar)baseData;
QuoteBar existingQuoteBar;
// if we have an existing bar keep the highest resolution one
// e.g Hour and Minute resolution subscriptions for the same symbol
// see CustomUniverseWithBenchmarkRegressionAlgorithm
if (!quoteBars.TryGetValue(baseData.Symbol, out existingQuoteBar)
|| existingQuoteBar.Period > newQuoteBar.Period)
{
quoteBars[baseData.Symbol] = newQuoteBar;
}
break;
case MarketDataType.OptionChain:
if (optionChains == null)
{
optionChains = new OptionChains(algorithmTime);
}
optionChains[baseData.Symbol] = (OptionChain)baseData;
break;
case MarketDataType.FuturesChain:
if (futuresChains == null)
{
futuresChains = new FuturesChains(algorithmTime);
}
futuresChains[baseData.Symbol] = (FuturesChain)baseData;
break;
}
// this is data used to update consolidators
// do not add it if it is a Suspicious tick
if (tick == null || !tick.Suspicious)
{
consolidatorUpdate.Add(baseData);
}
}
// special handling of options data to build the option chain
if (symbol.SecurityType.IsOption() && baseData.Symbol.SecurityType.IsOption())
{
// internal feeds, like open interest, will not create the chain but will update it if it exists
// this is because the open interest could arrive at some closed market hours in which there is no other data and we don't
// want to generate a chain object in this case
if (optionChains == null && !packet.Configuration.IsInternalFeed)
{
optionChains = new OptionChains(algorithmTime);
}
if (optionChains != null)
{
if (baseData.DataType == MarketDataType.OptionChain)
{
optionChains[baseData.Symbol] = (OptionChain)baseData;
}
else if (!HandleOptionData(algorithmTime, baseData, optionChains, packet.Security, sliceFuture, optionUnderlyingUpdates))
{
continue;
}
}
}
// special handling of futures data to build the futures chain. Don't push canonical continuous contract
// We don't push internal feeds because it could be a continuous mapping future not part of the requested chain
if (symbol.SecurityType == SecurityType.Future && !symbol.IsCanonical() && baseData.Symbol.SecurityType == SecurityType.Future)
{
if (futuresChains == null && !packet.Configuration.IsInternalFeed)
{
futuresChains = new FuturesChains(algorithmTime);
}
if (futuresChains != null)
{
if (baseData.DataType == MarketDataType.FuturesChain)
{
futuresChains[baseData.Symbol] = (FuturesChain)baseData;
}
else if (!HandleFuturesData(algorithmTime, baseData, futuresChains, packet.Security, packet.Configuration))
{
continue;
}
}
}
// this is the data used set market prices
// do not add it if it is a Suspicious tick
if (tick != null && tick.Suspicious) continue;
securityUpdate.Add(baseData);
// option underlying security update
if (!packet.Configuration.IsInternalFeed)
{
optionUnderlyingUpdates[symbol] = baseData;
}
}
// We emit aux data for non internal subscriptions only, except for delistings which are required in case
// of holdings in the algorithm that may require liquidation, or just for marking the security as delisted and not tradable
else if ((delisting = baseData as Delisting) != null || !packet.Configuration.IsInternalFeed)
{
// include checks for various aux types so we don't have to construct the dictionaries in Slice
if (delisting != null)
{
if (delistings == null)
{
delistings = new Delistings(algorithmTime);
}
delistings[symbol] = delisting;
}
else if ((dividend = baseData as Dividend) != null)
{
if (dividends == null)
{
dividends = new Dividends(algorithmTime);
}
dividends[symbol] = dividend;
}
else if ((split = baseData as Split) != null)
{
if (splits == null)
{
splits = new Splits(algorithmTime);
}
splits[symbol] = split;
}
else if ((symbolChange = baseData as SymbolChangedEvent) != null)
{
if (symbolChanges == null)
{
symbolChanges = new SymbolChangedEvents(algorithmTime);
}
// symbol changes is keyed by the requested symbol
symbolChanges[packet.Configuration.Symbol] = symbolChange;
}
else if ((marginInterestRate = baseData as MarginInterestRate) != null)
{
if (marginInterestRates == null)
{
marginInterestRates = new MarginInterestRates(algorithmTime);
}
marginInterestRates[packet.Configuration.Symbol] = marginInterestRate;
}
// let's make it available to the user through the cache
security.Add(new UpdateData(packet.Security, baseData.GetType(), new List { baseData }, packet.Configuration.IsInternalFeed, baseData.IsFillForward));
}
}
if (securityUpdate.Count > 0)
{
security.Add(new UpdateData(packet.Security, packet.Configuration.Type, securityUpdate, packet.Configuration.IsInternalFeed, containsFillForwardData));
}
if (consolidatorUpdate.Count > 0)
{
consolidator.Add(new UpdateData(packet.Configuration, packet.Configuration.Type, consolidatorUpdate, packet.Configuration.IsInternalFeed, containsFillForwardData));
}
}
slice = new Slice(algorithmTime, allDataForAlgorithm, tradeBars ?? _emptyTradeBars, quoteBars ?? _emptyQuoteBars, ticks ?? _emptyTicks, optionChains ?? _emptyOptionChains, futuresChains ?? _emptyFuturesChains, splits ?? _emptySplits, dividends ?? _emptyDividends, delistings ?? _emptyDelistings, symbolChanges ?? _emptySymbolChangedEvents, marginInterestRates ?? _emptyMarginInterestRates, utcDateTime, allDataForAlgorithm.Count > 0);
return new TimeSlice(utcDateTime, count, slice, data, security, consolidator, custom ?? _emptyCustom, changes, universeData);
}
private void UpdateEmptyCollections(DateTime algorithmTime)
{
// just in case
_emptyTradeBars.Clear();
_emptyQuoteBars.Clear();
_emptyTicks.Clear();
_emptySplits.Clear();
_emptyDividends.Clear();
_emptyDelistings.Clear();
_emptyOptionChains.Clear();
_emptyFuturesChains.Clear();
_emptySymbolChangedEvents.Clear();
_emptyMarginInterestRates.Clear();
#pragma warning disable 0618 // DataDictionary.Time is deprecated, ignore until removed entirely
_emptyTradeBars.Time
= _emptyQuoteBars.Time
= _emptyTicks.Time
= _emptySplits.Time
= _emptyDividends.Time
= _emptyDelistings.Time
= _emptyOptionChains.Time
= _emptyFuturesChains.Time
= _emptySymbolChangedEvents.Time
= _emptyMarginInterestRates.Time = algorithmTime;
#pragma warning restore 0618
}
private bool HandleOptionData(DateTime algorithmTime, BaseData baseData, OptionChains optionChains, ISecurityPrice security, Lazy sliceFuture, IReadOnlyDictionary optionUnderlyingUpdates)
{
var symbol = baseData.Symbol;
OptionChain chain;
var canonical = symbol.Canonical;
if (!optionChains.TryGetValue(canonical, out chain))
{
chain = new OptionChain(canonical, algorithmTime);
optionChains[canonical] = chain;
}
// set the underlying current data point in the option chain
var option = security as IOptionPrice;
if (option != null)
{
if (option.Underlying == null)
{
Log.Error($"TimeSlice.HandleOptionData(): {algorithmTime}: Option underlying is null");
return false;
}
BaseData underlyingData;
if (!optionUnderlyingUpdates.TryGetValue(option.Underlying.Symbol, out underlyingData))
{
underlyingData = option.Underlying.GetLastData();
}
if (underlyingData == null)
{
Log.Error($"TimeSlice.HandleOptionData(): {algorithmTime}: Option underlying GetLastData returned null");
return false;
}
chain.Underlying = underlyingData;
}
var universeData = baseData as BaseDataCollection;
if (universeData != null)
{
if (universeData.Underlying != null)
{
foreach (var addedContract in chain.Contracts)
{
addedContract.Value.Update(chain.Underlying);
}
}
foreach (var contractSymbol in universeData.FilteredContracts ?? Enumerable.Empty())
{
chain.FilteredContracts.Add(contractSymbol);
}
return false;
}
if (!chain.Contracts.TryGetValue(baseData.Symbol, out var contract))
{
contract = OptionContract.Create(baseData, security, chain.Underlying);
chain.Contracts[baseData.Symbol] = contract;
if (option != null)
{
contract.SetOptionPriceModel(() => option.EvaluatePriceModel(sliceFuture.Value, contract));
}
}
contract.Update(baseData);
chain.AddData(baseData);
return true;
}
private bool HandleFuturesData(DateTime algorithmTime, BaseData baseData, FuturesChains futuresChains, ISecurityPrice security, SubscriptionDataConfig configuration)
{
var symbol = baseData.Symbol;
FuturesChain chain;
var canonical = symbol.Canonical;
if (!futuresChains.TryGetValue(canonical, out chain))
{
// We don't create a chain for internal feeds, this data might belong to a continuous mapping future
if (configuration.IsInternalFeed)
{
return false;
}
chain = new FuturesChain(canonical, algorithmTime);
futuresChains[canonical] = chain;
}
var universeData = baseData as BaseDataCollection;
if (universeData != null)
{
foreach (var contractSymbol in universeData.FilteredContracts ?? Enumerable.Empty())
{
chain.FilteredContracts.Add(contractSymbol);
}
return false;
}
if (!chain.Contracts.TryGetValue(baseData.Symbol, out var contract))
{
// We don't create a contract for internal feeds, this data might belong to a continuous mapping future
if (configuration.IsInternalFeed)
{
return false;
}
contract = new FuturesContract(baseData.Symbol);
chain.Contracts[baseData.Symbol] = contract;
}
contract.Update(baseData);
chain.AddData(baseData);
return true;
}
}
}