/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using System.Threading;
using NodaTime;
using QuantConnect.Interfaces;
using QuantConnect.Logging;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Implementation of the interface which provides the mechanism to stream data to the algorithm
///
public class Synchronizer : ISynchronizer, IDataFeedTimeProvider, IDisposable
{
private DateTimeZone _dateTimeZone;
///
/// The algorithm instance
///
protected IAlgorithm Algorithm { get; set; }
///
/// The subscription manager
///
protected IDataFeedSubscriptionManager SubscriptionManager { get; set; }
///
/// The subscription synchronizer
///
protected SubscriptionSynchronizer SubscriptionSynchronizer { get; set; }
///
/// The time slice factory
///
protected TimeSliceFactory TimeSliceFactory { get; set; }
///
/// Continuous UTC time provider, only valid for live trading see
///
public virtual ITimeProvider TimeProvider => null;
///
/// Time provider which returns current UTC frontier time
///
public ITimeProvider FrontierTimeProvider => SubscriptionSynchronizer;
///
/// Initializes the instance of the Synchronizer class
///
public virtual void Initialize(
IAlgorithm algorithm,
IDataFeedSubscriptionManager dataFeedSubscriptionManager)
{
SubscriptionManager = dataFeedSubscriptionManager;
Algorithm = algorithm;
SubscriptionSynchronizer = new SubscriptionSynchronizer(
SubscriptionManager.UniverseSelection);
}
///
/// Returns an enumerable which provides the data to stream to the algorithm
///
public virtual IEnumerable StreamData(CancellationToken cancellationToken)
{
PostInitialize();
// GetTimeProvider() will call GetInitialFrontierTime() which
// will consume added subscriptions so we need to do this after initialization
SubscriptionSynchronizer.SetTimeProvider(GetTimeProvider());
var previousEmitTime = DateTime.MaxValue;
var enumerator = SubscriptionSynchronizer
.Sync(SubscriptionManager.DataFeedSubscriptions, cancellationToken)
.GetEnumerator();
var previousWasTimePulse = false;
// this is a just in case flag to stop looping if time does not advance
var retried = false;
while (!cancellationToken.IsCancellationRequested)
{
TimeSlice timeSlice;
try
{
if (!enumerator.MoveNext())
{
// the enumerator ended
break;
}
timeSlice = enumerator.Current;
}
catch (Exception err)
{
// notify the algorithm about the error, so it can be reported to the user
Algorithm.SetRuntimeError(err, "Synchronizer");
break;
}
// check for cancellation
if (timeSlice == null || cancellationToken.IsCancellationRequested) break;
if (timeSlice.IsTimePulse && Algorithm.UtcTime == timeSlice.Time)
{
previousWasTimePulse = timeSlice.IsTimePulse;
// skip time pulse when algorithms already at that time
continue;
}
// SubscriptionFrontierTimeProvider will return twice the same time if there are no more subscriptions or if Subscription.Current is null
if (timeSlice.Time != previousEmitTime || previousWasTimePulse || timeSlice.UniverseData.Count != 0)
{
previousEmitTime = timeSlice.Time;
previousWasTimePulse = timeSlice.IsTimePulse;
// if we emitted, clear retry flag
retried = false;
yield return timeSlice;
}
else
{
// if the slice has data lets retry just once more... this could happen
// with subscriptions added after initialize using algorithm.AddSecurity() API,
// where the subscription start time is the current time loop (but should just happen once)
if (!timeSlice.Slice.HasData || retried)
{
// there's no more data to pull off, we're done (frontier is max value and no security changes)
break;
}
retried = true;
}
}
enumerator.DisposeSafely();
Log.Trace("Synchronizer.GetEnumerator(): Exited thread.");
}
///
/// Performs additional initialization steps after algorithm initialization
///
protected virtual void PostInitialize()
{
SubscriptionSynchronizer.SubscriptionFinished += (sender, subscription) =>
{
SubscriptionManager.RemoveSubscription(subscription.Configuration);
if (Log.DebuggingEnabled)
{
Log.Debug("Synchronizer.SubscriptionFinished(): Finished subscription:" +
$"{subscription.Configuration} at {FrontierTimeProvider.GetUtcNow()} UTC");
}
};
// this is set after the algorithm initializes
_dateTimeZone = Algorithm.TimeZone;
TimeSliceFactory = new TimeSliceFactory(_dateTimeZone);
SubscriptionSynchronizer.SetTimeSliceFactory(TimeSliceFactory);
}
///
/// Gets the to use. By default this will load the
/// for live mode, else
///
/// The to use
protected virtual ITimeProvider GetTimeProvider()
{
return new SubscriptionFrontierTimeProvider(GetInitialFrontierTime(), SubscriptionManager);
}
private DateTime GetInitialFrontierTime()
{
var frontier = DateTime.MaxValue;
foreach (var subscription in SubscriptionManager.DataFeedSubscriptions)
{
var current = subscription.Current;
if (current == null)
{
continue;
}
// we need to initialize both the frontier time and the offset provider, in order to do
// this we'll first convert the current.EndTime to UTC time, this will allow us to correctly
// determine the offset in ticks using the OffsetProvider, we can then use this to recompute
// the UTC time. This seems odd, but is necessary given Noda time's lenient mapping, the
// OffsetProvider exists to give forward marching mapping
// compute the initial frontier time
if (current.EmitTimeUtc < frontier)
{
frontier = current.EmitTimeUtc;
}
}
if (frontier == DateTime.MaxValue)
{
// here we use Time and not StartDate because Time will be before the start during warmup period.
frontier = Algorithm.Time.ConvertToUtc(_dateTimeZone);
}
return frontier;
}
///
/// Free resources
///
public virtual void Dispose()
{
}
}
}