/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using NodaTime; using QuantConnect.Data; using QuantConnect.Logging; using QuantConnect.Interfaces; using QuantConnect.Data.Market; using System.Collections.Generic; using QuantConnect.Data.UniverseSelection; using System.Linq; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Instance base class that will provide methods for creating new /// public class TimeSliceFactory { private readonly DateTimeZone _timeZone; // performance: these collections are not always used so keep a reference to an empty // instance to use and avoid unnecessary constructors and allocations private readonly List> _emptyCustom = new List>(); private readonly TradeBars _emptyTradeBars = new TradeBars(); private readonly QuoteBars _emptyQuoteBars = new QuoteBars(); private readonly Ticks _emptyTicks = new Ticks(); private readonly Splits _emptySplits = new Splits(); private readonly Dividends _emptyDividends = new Dividends(); private readonly Delistings _emptyDelistings = new Delistings(); private readonly OptionChains _emptyOptionChains = new OptionChains(); private readonly FuturesChains _emptyFuturesChains = new FuturesChains(); private readonly SymbolChangedEvents _emptySymbolChangedEvents = new SymbolChangedEvents(); private readonly MarginInterestRates _emptyMarginInterestRates = new MarginInterestRates(); /// /// Creates a new instance /// /// The time zone required for computing algorithm and slice time public TimeSliceFactory(DateTimeZone timeZone) { _timeZone = timeZone; } /// /// Creates a new empty to be used as a time pulse /// /// The objective of this method is to standardize the time pulse creation /// The UTC frontier date time /// A new time pulse public TimeSlice CreateTimePulse(DateTime utcDateTime) { // setting all data collections to null, this time slice shouldn't be used // for its data, we want to see fireworks it someone tries return new TimeSlice(utcDateTime, 0, null, null, null, null, null, SecurityChanges.None, null, isTimePulse:true); } /// /// Creates a new for the specified time using the specified data /// /// The UTC frontier date time /// The data in this /// The new changes that are seen in this time slice as a result of universe selection /// /// A new containing the specified data public TimeSlice Create(DateTime utcDateTime, List data, SecurityChanges changes, Dictionary universeData) { int count = 0; var security = new List>(data.Count); List> custom = null; var consolidator = new List>(data.Count); var allDataForAlgorithm = new List(data.Count); var optionUnderlyingUpdates = new Dictionary(); Split split; Dividend dividend; Delisting delisting; SymbolChangedEvent symbolChange; MarginInterestRate marginInterestRate; // we need to be able to reference the slice being created in order to define the // evaluation of option price models, so we define a 'future' that can be referenced // in the option price model evaluation delegates for each contract Slice slice = null; var sliceFuture = new Lazy(() => slice); var algorithmTime = utcDateTime.ConvertFromUtc(_timeZone); TradeBars tradeBars = null; QuoteBars quoteBars = null; Ticks ticks = null; Splits splits = null; Dividends dividends = null; Delistings delistings = null; OptionChains optionChains = null; FuturesChains futuresChains = null; SymbolChangedEvents symbolChanges = null; MarginInterestRates marginInterestRates = null; UpdateEmptyCollections(algorithmTime); if (universeData.Count > 0) { // count universe data foreach (var kvp in universeData) { count += kvp.Value.Data.Count; } } // ensure we read equity data before option data, so we can set the current underlying price foreach (var packet in data) { // filter out packets for removed subscriptions if (packet.IsSubscriptionRemoved) { continue; } var list = packet.Data; var symbol = packet.Configuration.Symbol; if (list.Count == 0) continue; // keep count of all data points if (list.Count == 1 && list[0] is BaseDataCollection) { var baseDataCollectionCount = ((BaseDataCollection)list[0]).Data.Count; if (baseDataCollectionCount == 0) { continue; } count += baseDataCollectionCount; } else { count += list.Count; } if (!packet.Configuration.IsInternalFeed && packet.Configuration.IsCustomData) { if (custom == null) { custom = new List>(1); } // This is all the custom data custom.Add(new UpdateData(packet.Security, packet.Configuration.Type, list, packet.Configuration.IsInternalFeed)); } var securityUpdate = new List(list.Count); var consolidatorUpdate = new List(list.Count); var containsFillForwardData = false; for (var i = 0; i < list.Count; i++) { var baseData = list[i]; if (!packet.Configuration.IsInternalFeed) { // this is all the data that goes into the algorithm allDataForAlgorithm.Add(baseData); } containsFillForwardData |= baseData.IsFillForward; // don't add internal feed data to ticks/bars objects if (baseData.DataType != MarketDataType.Auxiliary) { var tick = baseData as Tick; if (!packet.Configuration.IsInternalFeed) { // populate data dictionaries switch (baseData.DataType) { case MarketDataType.Tick: if (ticks == null) { ticks = new Ticks(algorithmTime); } ticks.Add(baseData.Symbol, (Tick)baseData); break; case MarketDataType.TradeBar: if (tradeBars == null) { tradeBars = new TradeBars(algorithmTime); } var newTradeBar = (TradeBar)baseData; TradeBar existingTradeBar; // if we have an existing bar keep the highest resolution one // e.g Hour and Minute resolution subscriptions for the same symbol // see CustomUniverseWithBenchmarkRegressionAlgorithm if (!tradeBars.TryGetValue(baseData.Symbol, out existingTradeBar) || existingTradeBar.Period > newTradeBar.Period) { tradeBars[baseData.Symbol] = newTradeBar; } break; case MarketDataType.QuoteBar: if (quoteBars == null) { quoteBars = new QuoteBars(algorithmTime); } var newQuoteBar = (QuoteBar)baseData; QuoteBar existingQuoteBar; // if we have an existing bar keep the highest resolution one // e.g Hour and Minute resolution subscriptions for the same symbol // see CustomUniverseWithBenchmarkRegressionAlgorithm if (!quoteBars.TryGetValue(baseData.Symbol, out existingQuoteBar) || existingQuoteBar.Period > newQuoteBar.Period) { quoteBars[baseData.Symbol] = newQuoteBar; } break; case MarketDataType.OptionChain: if (optionChains == null) { optionChains = new OptionChains(algorithmTime); } optionChains[baseData.Symbol] = (OptionChain)baseData; break; case MarketDataType.FuturesChain: if (futuresChains == null) { futuresChains = new FuturesChains(algorithmTime); } futuresChains[baseData.Symbol] = (FuturesChain)baseData; break; } // this is data used to update consolidators // do not add it if it is a Suspicious tick if (tick == null || !tick.Suspicious) { consolidatorUpdate.Add(baseData); } } // special handling of options data to build the option chain if (symbol.SecurityType.IsOption() && baseData.Symbol.SecurityType.IsOption()) { // internal feeds, like open interest, will not create the chain but will update it if it exists // this is because the open interest could arrive at some closed market hours in which there is no other data and we don't // want to generate a chain object in this case if (optionChains == null && !packet.Configuration.IsInternalFeed) { optionChains = new OptionChains(algorithmTime); } if (optionChains != null) { if (baseData.DataType == MarketDataType.OptionChain) { optionChains[baseData.Symbol] = (OptionChain)baseData; } else if (!HandleOptionData(algorithmTime, baseData, optionChains, packet.Security, sliceFuture, optionUnderlyingUpdates)) { continue; } } } // special handling of futures data to build the futures chain. Don't push canonical continuous contract // We don't push internal feeds because it could be a continuous mapping future not part of the requested chain if (symbol.SecurityType == SecurityType.Future && !symbol.IsCanonical() && baseData.Symbol.SecurityType == SecurityType.Future) { if (futuresChains == null && !packet.Configuration.IsInternalFeed) { futuresChains = new FuturesChains(algorithmTime); } if (futuresChains != null) { if (baseData.DataType == MarketDataType.FuturesChain) { futuresChains[baseData.Symbol] = (FuturesChain)baseData; } else if (!HandleFuturesData(algorithmTime, baseData, futuresChains, packet.Security, packet.Configuration)) { continue; } } } // this is the data used set market prices // do not add it if it is a Suspicious tick if (tick != null && tick.Suspicious) continue; securityUpdate.Add(baseData); // option underlying security update if (!packet.Configuration.IsInternalFeed) { optionUnderlyingUpdates[symbol] = baseData; } } // We emit aux data for non internal subscriptions only, except for delistings which are required in case // of holdings in the algorithm that may require liquidation, or just for marking the security as delisted and not tradable else if ((delisting = baseData as Delisting) != null || !packet.Configuration.IsInternalFeed) { // include checks for various aux types so we don't have to construct the dictionaries in Slice if (delisting != null) { if (delistings == null) { delistings = new Delistings(algorithmTime); } delistings[symbol] = delisting; } else if ((dividend = baseData as Dividend) != null) { if (dividends == null) { dividends = new Dividends(algorithmTime); } dividends[symbol] = dividend; } else if ((split = baseData as Split) != null) { if (splits == null) { splits = new Splits(algorithmTime); } splits[symbol] = split; } else if ((symbolChange = baseData as SymbolChangedEvent) != null) { if (symbolChanges == null) { symbolChanges = new SymbolChangedEvents(algorithmTime); } // symbol changes is keyed by the requested symbol symbolChanges[packet.Configuration.Symbol] = symbolChange; } else if ((marginInterestRate = baseData as MarginInterestRate) != null) { if (marginInterestRates == null) { marginInterestRates = new MarginInterestRates(algorithmTime); } marginInterestRates[packet.Configuration.Symbol] = marginInterestRate; } // let's make it available to the user through the cache security.Add(new UpdateData(packet.Security, baseData.GetType(), new List { baseData }, packet.Configuration.IsInternalFeed, baseData.IsFillForward)); } } if (securityUpdate.Count > 0) { security.Add(new UpdateData(packet.Security, packet.Configuration.Type, securityUpdate, packet.Configuration.IsInternalFeed, containsFillForwardData)); } if (consolidatorUpdate.Count > 0) { consolidator.Add(new UpdateData(packet.Configuration, packet.Configuration.Type, consolidatorUpdate, packet.Configuration.IsInternalFeed, containsFillForwardData)); } } slice = new Slice(algorithmTime, allDataForAlgorithm, tradeBars ?? _emptyTradeBars, quoteBars ?? _emptyQuoteBars, ticks ?? _emptyTicks, optionChains ?? _emptyOptionChains, futuresChains ?? _emptyFuturesChains, splits ?? _emptySplits, dividends ?? _emptyDividends, delistings ?? _emptyDelistings, symbolChanges ?? _emptySymbolChangedEvents, marginInterestRates ?? _emptyMarginInterestRates, utcDateTime, allDataForAlgorithm.Count > 0); return new TimeSlice(utcDateTime, count, slice, data, security, consolidator, custom ?? _emptyCustom, changes, universeData); } private void UpdateEmptyCollections(DateTime algorithmTime) { // just in case _emptyTradeBars.Clear(); _emptyQuoteBars.Clear(); _emptyTicks.Clear(); _emptySplits.Clear(); _emptyDividends.Clear(); _emptyDelistings.Clear(); _emptyOptionChains.Clear(); _emptyFuturesChains.Clear(); _emptySymbolChangedEvents.Clear(); _emptyMarginInterestRates.Clear(); #pragma warning disable 0618 // DataDictionary.Time is deprecated, ignore until removed entirely _emptyTradeBars.Time = _emptyQuoteBars.Time = _emptyTicks.Time = _emptySplits.Time = _emptyDividends.Time = _emptyDelistings.Time = _emptyOptionChains.Time = _emptyFuturesChains.Time = _emptySymbolChangedEvents.Time = _emptyMarginInterestRates.Time = algorithmTime; #pragma warning restore 0618 } private bool HandleOptionData(DateTime algorithmTime, BaseData baseData, OptionChains optionChains, ISecurityPrice security, Lazy sliceFuture, IReadOnlyDictionary optionUnderlyingUpdates) { var symbol = baseData.Symbol; OptionChain chain; var canonical = symbol.Canonical; if (!optionChains.TryGetValue(canonical, out chain)) { chain = new OptionChain(canonical, algorithmTime); optionChains[canonical] = chain; } // set the underlying current data point in the option chain var option = security as IOptionPrice; if (option != null) { if (option.Underlying == null) { Log.Error($"TimeSlice.HandleOptionData(): {algorithmTime}: Option underlying is null"); return false; } BaseData underlyingData; if (!optionUnderlyingUpdates.TryGetValue(option.Underlying.Symbol, out underlyingData)) { underlyingData = option.Underlying.GetLastData(); } if (underlyingData == null) { Log.Error($"TimeSlice.HandleOptionData(): {algorithmTime}: Option underlying GetLastData returned null"); return false; } chain.Underlying = underlyingData; } var universeData = baseData as BaseDataCollection; if (universeData != null) { if (universeData.Underlying != null) { foreach (var addedContract in chain.Contracts) { addedContract.Value.Update(chain.Underlying); } } foreach (var contractSymbol in universeData.FilteredContracts ?? Enumerable.Empty()) { chain.FilteredContracts.Add(contractSymbol); } return false; } if (!chain.Contracts.TryGetValue(baseData.Symbol, out var contract)) { contract = OptionContract.Create(baseData, security, chain.Underlying); chain.Contracts[baseData.Symbol] = contract; if (option != null) { contract.SetOptionPriceModel(() => option.EvaluatePriceModel(sliceFuture.Value, contract)); } } contract.Update(baseData); chain.AddData(baseData); return true; } private bool HandleFuturesData(DateTime algorithmTime, BaseData baseData, FuturesChains futuresChains, ISecurityPrice security, SubscriptionDataConfig configuration) { var symbol = baseData.Symbol; FuturesChain chain; var canonical = symbol.Canonical; if (!futuresChains.TryGetValue(canonical, out chain)) { // We don't create a chain for internal feeds, this data might belong to a continuous mapping future if (configuration.IsInternalFeed) { return false; } chain = new FuturesChain(canonical, algorithmTime); futuresChains[canonical] = chain; } var universeData = baseData as BaseDataCollection; if (universeData != null) { foreach (var contractSymbol in universeData.FilteredContracts ?? Enumerable.Empty()) { chain.FilteredContracts.Add(contractSymbol); } return false; } if (!chain.Contracts.TryGetValue(baseData.Symbol, out var contract)) { // We don't create a contract for internal feeds, this data might belong to a continuous mapping future if (configuration.IsInternalFeed) { return false; } contract = new FuturesContract(baseData.Symbol); chain.Contracts[baseData.Symbol] = contract; } contract.Update(baseData); chain.AddData(baseData); return true; } } }