/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using NodaTime;
using QuantConnect.Data;
using QuantConnect.Data.UniverseSelection;
using QuantConnect.Interfaces;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Represents the data required for a data feed to process a single subscription
///
public class Subscription : IEnumerator
{
private bool _removedFromUniverse;
private readonly IEnumerator _enumerator;
///
/// The subcription requests associated with this subscription
///
internal List SubscriptionRequests { get; private set; }
///
/// Event fired when a new data point is available
///
public event EventHandler NewDataAvailable;
///
/// Gets the universe for this subscription
///
public IEnumerable Universes => SubscriptionRequests
.Where(x => x.Universe != null)
.Select(x => x.Universe);
///
/// Gets the security this subscription points to
///
public ISecurityPrice Security { get; init; }
///
/// Gets the configuration for this subscritions
///
public SubscriptionDataConfig Configuration { get; init; }
///
/// Gets the exchange time zone associated with this subscription
///
public DateTimeZone TimeZone { get; }
///
/// Gets the offset provider for time zone conversions to and from the data's local time
///
public TimeZoneOffsetProvider OffsetProvider { get; init; }
///
/// Gets the most current value from the subscription source
///
public decimal RealtimePrice { get; set; }
///
/// Gets true if this subscription is finished, false otherwise
///
public bool EndOfStream { get; private set; }
///
/// Gets true if this subscription is used in universe selection
///
public bool IsUniverseSelectionSubscription { get; }
///
/// Gets the start time of this subscription in UTC
///
public DateTime UtcStartTime { get; }
///
/// Gets the end time of this subscription in UTC
///
public DateTime UtcEndTime { get; }
///
/// Gets whether or not this subscription has been removed from its parent universe
///
public IReadOnlyRef RemovedFromUniverse { get; }
///
/// Initializes a new instance of the class with a universe
///
/// Specified for universe subscriptions
/// The subscription's data source
/// The offset provider used to convert data local times to utc
public Subscription(
SubscriptionRequest subscriptionRequest,
IEnumerator enumerator,
TimeZoneOffsetProvider timeZoneOffsetProvider)
{
SubscriptionRequests = new List { subscriptionRequest };
_enumerator = enumerator;
Security = subscriptionRequest.Security;
IsUniverseSelectionSubscription = subscriptionRequest.IsUniverseSubscription;
Configuration = subscriptionRequest.Configuration;
OffsetProvider = timeZoneOffsetProvider;
TimeZone = subscriptionRequest.Security.Exchange.TimeZone;
UtcStartTime = subscriptionRequest.StartTimeUtc;
UtcEndTime = subscriptionRequest.EndTimeUtc;
RemovedFromUniverse = Ref.CreateReadOnly(() => _removedFromUniverse);
}
///
/// Adds a for this subscription
///
/// The to add
public bool AddSubscriptionRequest(SubscriptionRequest subscriptionRequest)
{
if (IsUniverseSelectionSubscription
|| subscriptionRequest.IsUniverseSubscription)
{
throw new Exception("Subscription.AddSubscriptionRequest(): Universe selection" +
" subscriptions should not have more than 1 SubscriptionRequest");
}
// this shouldn't happen but just in case..
if (subscriptionRequest.Configuration != Configuration)
{
throw new Exception("Subscription.AddSubscriptionRequest(): Requesting to add" +
"a different SubscriptionDataConfig");
}
// Only allow one subscription request per universe
if (!Universes.Contains(subscriptionRequest.Universe))
{
SubscriptionRequests.Add(subscriptionRequest);
// TODO this might update the 'UtcStartTime' and 'UtcEndTime' of this subscription
return true;
}
return false;
}
///
/// Removes one or all from this subscription
///
/// Universe requesting to remove .
/// Default value, null, will remove all universes
/// True, if the subscription is empty and ready to be removed
public bool RemoveSubscriptionRequest(Universe universe = null)
{
// TODO this might update the 'UtcStartTime' and 'UtcEndTime' of this subscription
IEnumerable removedUniverses;
if (universe == null)
{
var subscriptionRequests = SubscriptionRequests;
SubscriptionRequests = new List();
removedUniverses = subscriptionRequests.Where(x => x.Universe != null)
.Select(x => x.Universe);
}
else
{
SubscriptionRequests.RemoveAll(x => x.Universe == universe);
removedUniverses = new[] {universe};
}
var emptySubscription = !SubscriptionRequests.Any();
if (emptySubscription)
{
// if the security is no longer a member of the universe, then mark the subscription properly
// universe may be null for internal currency conversion feeds
// TODO : Put currency feeds in their own internal universe
if (!removedUniverses.Any(x => x.Securities.ContainsKey(Configuration.Symbol)))
{
MarkAsRemovedFromUniverse();
}
}
return emptySubscription;
}
///
/// Advances the enumerator to the next element of the collection.
///
///
/// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
///
/// The collection was modified after the enumerator was created. 2
public virtual bool MoveNext()
{
if (EndOfStream)
{
return false;
}
var moveNext = _enumerator.MoveNext();
EndOfStream = !moveNext;
Current = _enumerator.Current;
return moveNext;
}
///
/// Sets the enumerator to its initial position, which is before the first element in the collection.
///
/// The collection was modified after the enumerator was created. 2
public void Reset()
{
_enumerator.Reset();
}
///
/// Gets the element in the collection at the current position of the enumerator.
///
///
/// The element in the collection at the current position of the enumerator.
///
public SubscriptionData Current { get; private set; }
///
/// Gets the current element in the collection.
///
///
/// The current element in the collection.
///
/// 2
object IEnumerator.Current => Current;
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
/// 2
public void Dispose()
{
EndOfStream = true;
_enumerator.DisposeSafely();
}
///
/// Mark this subscription as having been removed from the universe.
/// Data for this time step will be discarded.
///
public void MarkAsRemovedFromUniverse()
{
_removedFromUniverse = true;
}
///
/// Serves as a hash function for a particular type.
///
///
/// A hash code for the current .
///
/// 2
public override int GetHashCode()
{
return Configuration.GetHashCode();
}
/// Determines whether the specified object is equal to the current object.
/// The object to compare with the current object.
///
/// if the specified object is equal to the current object; otherwise, .
public override bool Equals(object obj)
{
var subscription = obj as Subscription;
if (subscription == null)
{
return false;
}
return subscription.Configuration.Equals(Configuration);
}
/// Returns a string that represents the current object.
/// A string that represents the current object.
/// 2
public override string ToString()
{
return Configuration.ToString();
}
///
/// Event invocator for the event
///
public void OnNewDataAvailable()
{
NewDataAvailable?.Invoke(this, EventArgs.Empty);
}
}
}