/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using QuantConnect.Data;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Provides a collection for holding subscriptions.
///
public class SubscriptionCollection : IEnumerable
{
private readonly ConcurrentDictionary _subscriptions;
private bool _sortingSubscriptionRequired;
private bool _frozen;
private readonly Ref _fillForwardResolution;
// some asset types (options, futures, crypto) have multiple subscriptions for different tick types,
// we keep a sorted list of subscriptions so we can return them in a deterministic order
private List _subscriptionsByTickType;
///
/// Event fired when the fill forward resolution changes
///
public event EventHandler FillForwardResolutionChanged;
///
/// Initializes a new instance of the class
///
public SubscriptionCollection()
{
_subscriptions = new ConcurrentDictionary();
_subscriptionsByTickType = new List();
var ffres = Time.OneMinute;
_fillForwardResolution = Ref.Create(() => ffres, res => ffres = res);
}
///
/// Checks the collection for the specified subscription configuration
///
/// The subscription configuration to check for
/// True if a subscription with the specified configuration is found in this collection, false otherwise
public bool Contains(SubscriptionDataConfig configuration)
{
return _subscriptions.ContainsKey(configuration);
}
///
/// Attempts to add the specified subscription to the collection. If another subscription
/// exists with the same configuration then it won't be added.
///
/// The subscription to add
/// True if the subscription is successfully added, false otherwise
public bool TryAdd(Subscription subscription)
{
if (_subscriptions.TryAdd(subscription.Configuration, subscription))
{
UpdateFillForwardResolution(FillForwardResolutionOperation.AfterAdd, subscription.Configuration);
_sortingSubscriptionRequired = true;
return true;
}
return false;
}
///
/// Attempts to retrieve the subscription with the specified configuration
///
/// The subscription's configuration
/// The subscription matching the configuration, null if not found
/// True if the subscription is successfully retrieved, false otherwise
public bool TryGetValue(SubscriptionDataConfig configuration, out Subscription subscription)
{
return _subscriptions.TryGetValue(configuration, out subscription);
}
///
/// Attempts to remove the subscription with the specified configuraton from the collection.
///
/// The configuration of the subscription to remove
/// The removed subscription, null if not found.
/// True if the subscription is successfully removed, false otherwise
public bool TryRemove(SubscriptionDataConfig configuration, out Subscription subscription)
{
if (_subscriptions.TryRemove(configuration, out subscription))
{
// for user friendlyness only look at removals triggerd by the user not those that are due to a data feed ending because of no more data,
// let's try to respect the users original FF enumerator request
if (!subscription.EndOfStream)
{
UpdateFillForwardResolution(FillForwardResolutionOperation.AfterRemove, configuration);
}
_sortingSubscriptionRequired = true;
return true;
}
return false;
}
///
/// Returns an enumerator that iterates through the collection.
///
///
/// An enumerator that can be used to iterate through the collection.
///
public IEnumerator GetEnumerator()
{
SortSubscriptions();
return _subscriptionsByTickType.GetEnumerator();
}
///
/// Returns an enumerator that iterates through a collection.
///
///
/// An object that can be used to iterate through the collection.
///
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
///
/// Gets and updates the fill forward resolution by checking specified subscription configurations and
/// selecting the smallest resoluton not equal to tick
///
public Ref UpdateAndGetFillForwardResolution(SubscriptionDataConfig configuration = null)
{
if (configuration != null)
{
UpdateFillForwardResolution(FillForwardResolutionOperation.BeforeAdd, configuration);
}
return _fillForwardResolution;
}
///
/// Will disable or enable fill forward resolution updates
///
public void FreezeFillForwardResolution(bool freeze)
{
_frozen = freeze;
}
///
/// Helper method to validate a configuration to be included in the fill forward calculation
///
private static bool ValidateFillForwardResolution(SubscriptionDataConfig configuration)
{
return !configuration.IsInternalFeed && configuration.Resolution != Resolution.Tick;
}
///
/// Gets and updates the fill forward resolution by checking specified subscription configurations and
/// selecting the smallest resoluton not equal to tick
///
private void UpdateFillForwardResolution(FillForwardResolutionOperation operation, SubscriptionDataConfig configuration)
{
if(_frozen)
{
return;
}
// Due to performance implications let's be jealous in updating the _fillForwardResolution
if (ValidateFillForwardResolution(configuration) &&
(
((FillForwardResolutionOperation.BeforeAdd == operation || FillForwardResolutionOperation.AfterAdd == operation)
&& configuration.Increment != _fillForwardResolution.Value) // check if the new Increment is different
||
(operation == FillForwardResolutionOperation.AfterRemove // We are removing
&& configuration.Increment == _fillForwardResolution.Value // True: We are removing the resolution we were using
// False: there is at least another one equal, no need to update, but we only look at those valid configuration which are the ones which set the FF resolution
&& _subscriptions.Keys.All(x => !ValidateFillForwardResolution(x) || x.Resolution != configuration.Resolution)))
)
{
var configurations = (operation == FillForwardResolutionOperation.BeforeAdd)
? _subscriptions.Keys.Concat(new[] { configuration }) : _subscriptions.Keys;
var eventArgs = new FillForwardResolutionChangedEvent { Old = _fillForwardResolution.Value };
_fillForwardResolution.Value = configurations.Where(ValidateFillForwardResolution)
.Select(x => x.Resolution)
.Distinct()
.DefaultIfEmpty(Resolution.Minute)
.Min().ToTimeSpan();
if (_fillForwardResolution.Value != eventArgs.Old)
{
eventArgs.New = _fillForwardResolution.Value;
// notify consumers if any
FillForwardResolutionChanged?.Invoke(this, eventArgs);
}
}
}
///
/// Sorts subscriptions so that equity subscriptions are enumerated before option
/// securities to ensure the underlying data is available when we process the options data
///
private void SortSubscriptions()
{
if (_sortingSubscriptionRequired)
{
_sortingSubscriptionRequired = false;
// it's important that we enumerate underlying securities before derivatives to this end,
// we order by security type so that equity subscriptions are enumerated before option
// securities to ensure the underlying data is available when we process the options data
_subscriptionsByTickType = _subscriptions
.Select(x => x.Value)
.OrderBy(x => x.Configuration.SecurityType)
.ThenBy(x => x.Configuration.TickType)
.ThenBy(x => x.Configuration.Symbol)
.ToList();
}
}
private enum FillForwardResolutionOperation
{
AfterRemove,
BeforeAdd,
AfterAdd
}
}
}