/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using System.Threading;
using QuantConnect.Configuration;
using QuantConnect.Data.UniverseSelection;
using QuantConnect.Interfaces;
using QuantConnect.Logging;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Implementation of the interface which provides the mechanism to stream live data to the algorithm
///
public class LiveSynchronizer : Synchronizer
{
///
/// Consumer batching timeout in ms
///
public static readonly int BatchingDelay = Config.GetInt("consumer-batching-timeout-ms");
private ITimeProvider _timeProvider;
private LiveTimeProvider _frontierTimeProvider;
private RealTimeScheduleEventService _realTimeScheduleEventService;
private readonly ManualResetEventSlim _newLiveDataEmitted = new ManualResetEventSlim(false);
///
/// Continuous UTC time provider
///
public override ITimeProvider TimeProvider => _timeProvider;
///
/// Initializes the instance of the Synchronizer class
///
public override void Initialize(
IAlgorithm algorithm,
IDataFeedSubscriptionManager dataFeedSubscriptionManager)
{
base.Initialize(algorithm, dataFeedSubscriptionManager);
// the time provider, is the real time provider
_timeProvider = GetTimeProvider();
_frontierTimeProvider = new LiveTimeProvider(realTime: TimeProvider);
// the synchronizer will use our '_frontierTimeProvider' which initially during warmup will be using
// the base time provider which is the subscription based time provider (like backtesting)
// once wawrmup finishes it will start using the realtime provider
SubscriptionSynchronizer.SetTimeProvider(_frontierTimeProvider);
// attach event handlers to subscriptions
dataFeedSubscriptionManager.SubscriptionAdded += (sender, subscription) =>
{
subscription.NewDataAvailable += OnSubscriptionNewDataAvailable;
};
dataFeedSubscriptionManager.SubscriptionRemoved += (sender, subscription) =>
{
subscription.NewDataAvailable -= OnSubscriptionNewDataAvailable;
};
_realTimeScheduleEventService = new RealTimeScheduleEventService(new RealTimeProvider());
// this schedule event will be our time pulse
_realTimeScheduleEventService.NewEvent += (sender, args) => _newLiveDataEmitted.Set();
}
///
/// Returns an enumerable which provides the data to stream to the algorithm
///
public override IEnumerable StreamData(CancellationToken cancellationToken)
{
PostInitialize();
var shouldSendExtraEmptyPacket = false;
var nextEmit = DateTime.MinValue;
var lastLoopStart = DateTime.UtcNow;
var enumerator = SubscriptionSynchronizer
.Sync(SubscriptionManager.DataFeedSubscriptions, cancellationToken)
.GetEnumerator();
var previousWasTimePulse = false;
while (!cancellationToken.IsCancellationRequested)
{
var now = DateTime.UtcNow;
if (!previousWasTimePulse)
{
if (!_newLiveDataEmitted.IsSet
// we warmup as fast as we can even if no new data point is available
&& !Algorithm.IsWarmingUp)
{
// if we just crossed into the next second let's loop again, we will flush any consolidator bar
// else we will wait to be notified by the subscriptions or our scheduled event service every second
if (lastLoopStart.Second == now.Second)
{
_realTimeScheduleEventService.ScheduleEvent(TimeSpan.FromMilliseconds(GetPulseDueTime(now)), now);
_newLiveDataEmitted.Wait();
}
}
_newLiveDataEmitted.Reset();
}
lastLoopStart = now;
TimeSlice timeSlice;
try
{
if (!enumerator.MoveNext())
{
// the enumerator ended
break;
}
timeSlice = enumerator.Current;
}
catch (Exception err)
{
// notify the algorithm about the error, so it can be reported to the user
Algorithm.SetRuntimeError(err, "LiveSynchronizer");
shouldSendExtraEmptyPacket = true;
break;
}
// check for cancellation
if (timeSlice == null || cancellationToken.IsCancellationRequested) break;
var frontierUtc = FrontierTimeProvider.GetUtcNow();
// emit on data or if we've elapsed a full second since last emit or there are security changes
if (timeSlice.SecurityChanges != SecurityChanges.None
|| timeSlice.IsTimePulse
|| timeSlice.Data.Count != 0
|| frontierUtc >= nextEmit)
{
previousWasTimePulse = timeSlice.IsTimePulse;
yield return timeSlice;
// ignore if time pulse because we will emit a slice with the same time just after this one
if (!timeSlice.IsTimePulse)
{
// force emitting every second since the data feed is
// the heartbeat of the application
nextEmit = frontierUtc.RoundDown(Time.OneSecond).Add(Time.OneSecond);
}
}
}
if (shouldSendExtraEmptyPacket)
{
// send last empty packet list before terminating,
// so the algorithm manager has a chance to detect the runtime error
// and exit showing the correct error instead of a timeout
nextEmit = FrontierTimeProvider.GetUtcNow().RoundDown(Time.OneSecond);
if (!cancellationToken.IsCancellationRequested)
{
var timeSlice = TimeSliceFactory.Create(
nextEmit,
new List(),
SecurityChanges.None,
new Dictionary());
yield return timeSlice;
}
}
enumerator.DisposeSafely();
Log.Trace("LiveSynchronizer.GetEnumerator(): Exited thread.");
}
///
/// Free resources
///
public override void Dispose()
{
_newLiveDataEmitted.Set();
_newLiveDataEmitted?.DisposeSafely();
_realTimeScheduleEventService?.DisposeSafely();
}
///
/// Gets the to use. By default this will load the
/// for live mode, else
///
/// The to use
protected override ITimeProvider GetTimeProvider()
{
return RealTimeProvider.Instance;
}
///
/// Performs additional initialization steps after algorithm initialization
///
protected override void PostInitialize()
{
base.PostInitialize();
_frontierTimeProvider.Initialize(base.GetTimeProvider());
}
///
/// Will return the amount of milliseconds that are missing for the next time pulse
///
protected virtual int GetPulseDueTime(DateTime now)
{
// let's wait until the next second starts
return 1000 - now.Millisecond + BatchingDelay;
}
///
/// Trigger new data event
///
/// Sender of the event
/// Event information
protected virtual void OnSubscriptionNewDataAvailable(object sender, EventArgs args)
{
_newLiveDataEmitted.Set();
}
}
}