/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections; using System.Collections.Generic; using System.Linq; using NodaTime; using QuantConnect.Data; using QuantConnect.Data.UniverseSelection; using QuantConnect.Interfaces; using QuantConnect.Util; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Represents the data required for a data feed to process a single subscription /// public class Subscription : IEnumerator { private bool _removedFromUniverse; private readonly IEnumerator _enumerator; /// /// The subcription requests associated with this subscription /// internal List SubscriptionRequests { get; private set; } /// /// Event fired when a new data point is available /// public event EventHandler NewDataAvailable; /// /// Gets the universe for this subscription /// public IEnumerable Universes => SubscriptionRequests .Where(x => x.Universe != null) .Select(x => x.Universe); /// /// Gets the security this subscription points to /// public ISecurityPrice Security { get; init; } /// /// Gets the configuration for this subscritions /// public SubscriptionDataConfig Configuration { get; init; } /// /// Gets the exchange time zone associated with this subscription /// public DateTimeZone TimeZone { get; } /// /// Gets the offset provider for time zone conversions to and from the data's local time /// public TimeZoneOffsetProvider OffsetProvider { get; init; } /// /// Gets the most current value from the subscription source /// public decimal RealtimePrice { get; set; } /// /// Gets true if this subscription is finished, false otherwise /// public bool EndOfStream { get; private set; } /// /// Gets true if this subscription is used in universe selection /// public bool IsUniverseSelectionSubscription { get; } /// /// Gets the start time of this subscription in UTC /// public DateTime UtcStartTime { get; } /// /// Gets the end time of this subscription in UTC /// public DateTime UtcEndTime { get; } /// /// Gets whether or not this subscription has been removed from its parent universe /// public IReadOnlyRef RemovedFromUniverse { get; } /// /// Initializes a new instance of the class with a universe /// /// Specified for universe subscriptions /// The subscription's data source /// The offset provider used to convert data local times to utc public Subscription( SubscriptionRequest subscriptionRequest, IEnumerator enumerator, TimeZoneOffsetProvider timeZoneOffsetProvider) { SubscriptionRequests = new List { subscriptionRequest }; _enumerator = enumerator; Security = subscriptionRequest.Security; IsUniverseSelectionSubscription = subscriptionRequest.IsUniverseSubscription; Configuration = subscriptionRequest.Configuration; OffsetProvider = timeZoneOffsetProvider; TimeZone = subscriptionRequest.Security.Exchange.TimeZone; UtcStartTime = subscriptionRequest.StartTimeUtc; UtcEndTime = subscriptionRequest.EndTimeUtc; RemovedFromUniverse = Ref.CreateReadOnly(() => _removedFromUniverse); } /// /// Adds a for this subscription /// /// The to add public bool AddSubscriptionRequest(SubscriptionRequest subscriptionRequest) { if (IsUniverseSelectionSubscription || subscriptionRequest.IsUniverseSubscription) { throw new Exception("Subscription.AddSubscriptionRequest(): Universe selection" + " subscriptions should not have more than 1 SubscriptionRequest"); } // this shouldn't happen but just in case.. if (subscriptionRequest.Configuration != Configuration) { throw new Exception("Subscription.AddSubscriptionRequest(): Requesting to add" + "a different SubscriptionDataConfig"); } // Only allow one subscription request per universe if (!Universes.Contains(subscriptionRequest.Universe)) { SubscriptionRequests.Add(subscriptionRequest); // TODO this might update the 'UtcStartTime' and 'UtcEndTime' of this subscription return true; } return false; } /// /// Removes one or all from this subscription /// /// Universe requesting to remove . /// Default value, null, will remove all universes /// True, if the subscription is empty and ready to be removed public bool RemoveSubscriptionRequest(Universe universe = null) { // TODO this might update the 'UtcStartTime' and 'UtcEndTime' of this subscription IEnumerable removedUniverses; if (universe == null) { var subscriptionRequests = SubscriptionRequests; SubscriptionRequests = new List(); removedUniverses = subscriptionRequests.Where(x => x.Universe != null) .Select(x => x.Universe); } else { SubscriptionRequests.RemoveAll(x => x.Universe == universe); removedUniverses = new[] {universe}; } var emptySubscription = !SubscriptionRequests.Any(); if (emptySubscription) { // if the security is no longer a member of the universe, then mark the subscription properly // universe may be null for internal currency conversion feeds // TODO : Put currency feeds in their own internal universe if (!removedUniverses.Any(x => x.Securities.ContainsKey(Configuration.Symbol))) { MarkAsRemovedFromUniverse(); } } return emptySubscription; } /// /// Advances the enumerator to the next element of the collection. /// /// /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection. /// /// The collection was modified after the enumerator was created. 2 public virtual bool MoveNext() { if (EndOfStream) { return false; } var moveNext = _enumerator.MoveNext(); EndOfStream = !moveNext; Current = _enumerator.Current; return moveNext; } /// /// Sets the enumerator to its initial position, which is before the first element in the collection. /// /// The collection was modified after the enumerator was created. 2 public void Reset() { _enumerator.Reset(); } /// /// Gets the element in the collection at the current position of the enumerator. /// /// /// The element in the collection at the current position of the enumerator. /// public SubscriptionData Current { get; private set; } /// /// Gets the current element in the collection. /// /// /// The current element in the collection. /// /// 2 object IEnumerator.Current => Current; /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// /// 2 public void Dispose() { EndOfStream = true; _enumerator.DisposeSafely(); } /// /// Mark this subscription as having been removed from the universe. /// Data for this time step will be discarded. /// public void MarkAsRemovedFromUniverse() { _removedFromUniverse = true; } /// /// Serves as a hash function for a particular type. /// /// /// A hash code for the current . /// /// 2 public override int GetHashCode() { return Configuration.GetHashCode(); } /// Determines whether the specified object is equal to the current object. /// The object to compare with the current object. /// /// if the specified object is equal to the current object; otherwise, . public override bool Equals(object obj) { var subscription = obj as Subscription; if (subscription == null) { return false; } return subscription.Configuration.Equals(Configuration); } /// Returns a string that represents the current object. /// A string that represents the current object. /// 2 public override string ToString() { return Configuration.ToString(); } /// /// Event invocator for the event /// public void OnNewDataAvailable() { NewDataAvailable?.Invoke(this, EventArgs.Empty); } } }