/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using QuantConnect.Data.Market;
using QuantConnect.Data.UniverseSelection;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Provides the ability to synchronize subscriptions into time slices
///
public class SubscriptionSynchronizer : ISubscriptionSynchronizer, ITimeProvider
{
private readonly UniverseSelection _universeSelection;
private TimeSliceFactory _timeSliceFactory;
private ITimeProvider _timeProvider;
private ManualTimeProvider _frontierTimeProvider;
///
/// Event fired when a is finished
///
public event EventHandler SubscriptionFinished;
///
/// Initializes a new instance of the class
///
/// The universe selection instance used to handle universe
/// selection subscription output
/// A time slice for the specified frontier time
public SubscriptionSynchronizer(UniverseSelection universeSelection)
{
_universeSelection = universeSelection;
}
///
/// Sets the time provider. If already set will throw.
///
/// The time provider, used to obtain the current frontier UTC value
public void SetTimeProvider(ITimeProvider timeProvider)
{
if (_timeProvider != null)
{
throw new Exception("SubscriptionSynchronizer.SetTimeProvider(): can only be called once");
}
_timeProvider = timeProvider;
_frontierTimeProvider = new ManualTimeProvider(_timeProvider.GetUtcNow());
}
///
/// Sets the instance to use
///
/// Used to create the new
public void SetTimeSliceFactory(TimeSliceFactory timeSliceFactory)
{
if (_timeSliceFactory != null)
{
throw new Exception("SubscriptionSynchronizer.SetTimeSliceFactory(): can only be called once");
}
_timeSliceFactory = timeSliceFactory;
}
///
/// Syncs the specified subscriptions. The frontier time used for synchronization is
/// managed internally and dependent upon previous synchronization operations.
///
/// The subscriptions to sync
/// The cancellation token to stop enumeration
public IEnumerable Sync(IEnumerable subscriptions,
CancellationToken cancellationToken)
{
var delayedSubscriptionFinished = new Queue();
while (!cancellationToken.IsCancellationRequested)
{
var changes = SecurityChanges.None;
var data = new List(1);
// NOTE: Tight coupling in UniverseSelection.ApplyUniverseSelection
Dictionary universeData = null; // lazy construction for performance
var universeDataForTimeSliceCreate = new Dictionary();
var frontierUtc = _timeProvider.GetUtcNow();
_frontierTimeProvider.SetCurrentTimeUtc(frontierUtc);
SecurityChanges newChanges;
do
{
newChanges = SecurityChanges.None;
foreach (var subscription in subscriptions)
{
if (subscription.EndOfStream)
{
OnSubscriptionFinished(subscription);
continue;
}
// prime if needed
if (subscription.Current == null)
{
if (!subscription.MoveNext())
{
OnSubscriptionFinished(subscription);
continue;
}
}
DataFeedPacket packet = null;
while (subscription.Current != null && subscription.Current.EmitTimeUtc <= frontierUtc)
{
if (packet == null)
{
// for performance, lets be selfish about creating a new instance
packet = new DataFeedPacket(
subscription.Security,
subscription.Configuration,
subscription.RemovedFromUniverse
);
}
// If our subscription is a universe, and we get a delisting event emitted for it, then
// the universe itself should be unselected and removed, because the Symbol that the
// universe is based on has been delisted. Doing the disposal here allows us to
// process the delisting at this point in time before emitting out to the algorithm.
// This is very useful for universes that can be delisted, such as ETF constituent
// universes (e.g. for ETF constituent universes, since the ETF itself is used to create
// the universe Symbol (and set as its underlying), once the ETF is delisted, the
// universe should cease to exist, since there are no more constituents of that ETF).
if (subscription.Current.Data.DataType == MarketDataType.Auxiliary && subscription.Current.Data is Delisting delisting)
{
if(subscription.IsUniverseSelectionSubscription)
{
subscription.Universes.Single().Dispose();
}
else if(delisting.Type == DelistingType.Delisted)
{
changes += _universeSelection.HandleDelisting(subscription.Current.Data, subscription.Configuration.IsInternalFeed);
}
}
packet.Add(subscription.Current.Data);
if (!subscription.MoveNext())
{
delayedSubscriptionFinished.Enqueue(subscription);
break;
}
}
if (packet?.Count > 0)
{
// we have new universe data to select based on, store the subscription data until the end
if (!subscription.IsUniverseSelectionSubscription)
{
data.Add(packet);
}
else
{
// assume that if the first item is a base data collection then the enumerator handled the aggregation,
// otherwise, load all the the data into a new collection instance
var packetBaseDataCollection = packet.Data[0] as BaseDataCollection;
var packetData = packetBaseDataCollection == null
? packet.Data
: packetBaseDataCollection.Data;
BaseDataCollection collection;
if (universeData != null
&& universeData.TryGetValue(subscription.Universes.Single(), out collection))
{
collection.AddRange(packetData);
}
else
{
collection = new BaseDataCollection(frontierUtc, frontierUtc, subscription.Configuration.Symbol, packetData, packetBaseDataCollection?.Underlying, packetBaseDataCollection?.FilteredContracts);
if (universeData == null)
{
universeData = new Dictionary();
}
universeData[subscription.Universes.Single()] = collection;
}
}
}
if (subscription.IsUniverseSelectionSubscription
&& subscription.Universes.Single().DisposeRequested)
{
var universe = subscription.Universes.Single();
// check if a universe selection isn't already scheduled for this disposed universe
if (universeData == null || !universeData.ContainsKey(universe))
{
if (universeData == null)
{
universeData = new Dictionary();
}
// we force trigger one last universe selection for this disposed universe, so it deselects all subscriptions it added
universeData[universe] = new BaseDataCollection(frontierUtc, subscription.Configuration.Symbol);
}
// we need to do this after all usages of subscription.Universes
OnSubscriptionFinished(subscription);
}
}
if (universeData != null && universeData.Count > 0)
{
// if we are going to perform universe selection we emit an empty
// time pulse to align algorithm time with current frontier
yield return _timeSliceFactory.CreateTimePulse(frontierUtc);
// trigger the smalled resolution first, so that FF res get's set once from the start correctly
// while at it, let's make it determininstic and sort by universe sid later
foreach (var kvp in universeData.OrderBy(x => x.Key.Configuration.Resolution).ThenBy(x => x.Key.Symbol.ID))
{
var universe = kvp.Key;
var baseDataCollection = kvp.Value;
universeDataForTimeSliceCreate[universe] = baseDataCollection;
newChanges += _universeSelection.ApplyUniverseSelection(universe, frontierUtc, baseDataCollection);
}
universeData.Clear();
}
changes += newChanges;
}
while (newChanges != SecurityChanges.None
|| _universeSelection.AddPendingInternalDataFeeds(frontierUtc));
var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate);
while (delayedSubscriptionFinished.Count > 0)
{
// these subscriptions added valid data to the packet
// we need to trigger OnSubscriptionFinished after we create the TimeSlice
// else it will drop the data
var subscription = delayedSubscriptionFinished.Dequeue();
OnSubscriptionFinished(subscription);
}
yield return timeSlice;
}
}
///
/// Event invocator for the event
///
protected virtual void OnSubscriptionFinished(Subscription subscription)
{
SubscriptionFinished?.Invoke(this, subscription);
}
///
/// Returns the current UTC frontier time
///
public DateTime GetUtcNow()
{
return _frontierTimeProvider.GetUtcNow();
}
}
}