/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using QuantConnect.Data; using QuantConnect.Util; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Provides a collection for holding subscriptions. /// public class SubscriptionCollection : IEnumerable { private readonly ConcurrentDictionary _subscriptions; private bool _sortingSubscriptionRequired; private bool _frozen; private readonly Ref _fillForwardResolution; // some asset types (options, futures, crypto) have multiple subscriptions for different tick types, // we keep a sorted list of subscriptions so we can return them in a deterministic order private List _subscriptionsByTickType; /// /// Event fired when the fill forward resolution changes /// public event EventHandler FillForwardResolutionChanged; /// /// Initializes a new instance of the class /// public SubscriptionCollection() { _subscriptions = new ConcurrentDictionary(); _subscriptionsByTickType = new List(); var ffres = Time.OneMinute; _fillForwardResolution = Ref.Create(() => ffres, res => ffres = res); } /// /// Checks the collection for the specified subscription configuration /// /// The subscription configuration to check for /// True if a subscription with the specified configuration is found in this collection, false otherwise public bool Contains(SubscriptionDataConfig configuration) { return _subscriptions.ContainsKey(configuration); } /// /// Attempts to add the specified subscription to the collection. If another subscription /// exists with the same configuration then it won't be added. /// /// The subscription to add /// True if the subscription is successfully added, false otherwise public bool TryAdd(Subscription subscription) { if (_subscriptions.TryAdd(subscription.Configuration, subscription)) { UpdateFillForwardResolution(FillForwardResolutionOperation.AfterAdd, subscription.Configuration); _sortingSubscriptionRequired = true; return true; } return false; } /// /// Attempts to retrieve the subscription with the specified configuration /// /// The subscription's configuration /// The subscription matching the configuration, null if not found /// True if the subscription is successfully retrieved, false otherwise public bool TryGetValue(SubscriptionDataConfig configuration, out Subscription subscription) { return _subscriptions.TryGetValue(configuration, out subscription); } /// /// Attempts to remove the subscription with the specified configuraton from the collection. /// /// The configuration of the subscription to remove /// The removed subscription, null if not found. /// True if the subscription is successfully removed, false otherwise public bool TryRemove(SubscriptionDataConfig configuration, out Subscription subscription) { if (_subscriptions.TryRemove(configuration, out subscription)) { // for user friendlyness only look at removals triggerd by the user not those that are due to a data feed ending because of no more data, // let's try to respect the users original FF enumerator request if (!subscription.EndOfStream) { UpdateFillForwardResolution(FillForwardResolutionOperation.AfterRemove, configuration); } _sortingSubscriptionRequired = true; return true; } return false; } /// /// Returns an enumerator that iterates through the collection. /// /// /// An enumerator that can be used to iterate through the collection. /// public IEnumerator GetEnumerator() { SortSubscriptions(); return _subscriptionsByTickType.GetEnumerator(); } /// /// Returns an enumerator that iterates through a collection. /// /// /// An object that can be used to iterate through the collection. /// IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } /// /// Gets and updates the fill forward resolution by checking specified subscription configurations and /// selecting the smallest resoluton not equal to tick /// public Ref UpdateAndGetFillForwardResolution(SubscriptionDataConfig configuration = null) { if (configuration != null) { UpdateFillForwardResolution(FillForwardResolutionOperation.BeforeAdd, configuration); } return _fillForwardResolution; } /// /// Will disable or enable fill forward resolution updates /// public void FreezeFillForwardResolution(bool freeze) { _frozen = freeze; } /// /// Helper method to validate a configuration to be included in the fill forward calculation /// private static bool ValidateFillForwardResolution(SubscriptionDataConfig configuration) { return !configuration.IsInternalFeed && configuration.Resolution != Resolution.Tick; } /// /// Gets and updates the fill forward resolution by checking specified subscription configurations and /// selecting the smallest resoluton not equal to tick /// private void UpdateFillForwardResolution(FillForwardResolutionOperation operation, SubscriptionDataConfig configuration) { if(_frozen) { return; } // Due to performance implications let's be jealous in updating the _fillForwardResolution if (ValidateFillForwardResolution(configuration) && ( ((FillForwardResolutionOperation.BeforeAdd == operation || FillForwardResolutionOperation.AfterAdd == operation) && configuration.Increment != _fillForwardResolution.Value) // check if the new Increment is different || (operation == FillForwardResolutionOperation.AfterRemove // We are removing && configuration.Increment == _fillForwardResolution.Value // True: We are removing the resolution we were using // False: there is at least another one equal, no need to update, but we only look at those valid configuration which are the ones which set the FF resolution && _subscriptions.Keys.All(x => !ValidateFillForwardResolution(x) || x.Resolution != configuration.Resolution))) ) { var configurations = (operation == FillForwardResolutionOperation.BeforeAdd) ? _subscriptions.Keys.Concat(new[] { configuration }) : _subscriptions.Keys; var eventArgs = new FillForwardResolutionChangedEvent { Old = _fillForwardResolution.Value }; _fillForwardResolution.Value = configurations.Where(ValidateFillForwardResolution) .Select(x => x.Resolution) .Distinct() .DefaultIfEmpty(Resolution.Minute) .Min().ToTimeSpan(); if (_fillForwardResolution.Value != eventArgs.Old) { eventArgs.New = _fillForwardResolution.Value; // notify consumers if any FillForwardResolutionChanged?.Invoke(this, eventArgs); } } } /// /// Sorts subscriptions so that equity subscriptions are enumerated before option /// securities to ensure the underlying data is available when we process the options data /// private void SortSubscriptions() { if (_sortingSubscriptionRequired) { _sortingSubscriptionRequired = false; // it's important that we enumerate underlying securities before derivatives to this end, // we order by security type so that equity subscriptions are enumerated before option // securities to ensure the underlying data is available when we process the options data _subscriptionsByTickType = _subscriptions .Select(x => x.Value) .OrderBy(x => x.Configuration.SecurityType) .ThenBy(x => x.Configuration.TickType) .ThenBy(x => x.Configuration.Symbol) .ToList(); } } private enum FillForwardResolutionOperation { AfterRemove, BeforeAdd, AfterAdd } } }