/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using QuantConnect.Util;
using System.Collections.Generic;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Allows to setup a real time scheduled event, internally using a ,
/// that is guaranteed to trigger at or after the requested time, never before.
///
/// This class is of value because could fire the
/// event before time.
public class RealTimeScheduleEventService : IDisposable
{
private readonly Thread _pulseThread;
private readonly Queue _work;
private readonly ManualResetEvent _event;
private readonly CancellationTokenSource _tokenSource;
///
/// Event fired when the scheduled time is past
///
public event EventHandler NewEvent;
///
/// Creates a new instance
///
/// The time provider to use
public RealTimeScheduleEventService(ITimeProvider timeProvider)
{
_tokenSource = new CancellationTokenSource();
_event = new ManualResetEvent(false);
_work = new Queue();
_pulseThread = new Thread(() =>
{
while (!_tokenSource.Token.IsCancellationRequested)
{
DateTime nextUtcScheduledEvent;
lock (_work)
{
_work.TryDequeue(out nextUtcScheduledEvent);
}
if (nextUtcScheduledEvent == default)
{
_event.WaitOne(_tokenSource.Token);
_event.Reset();
if (_tokenSource.Token.IsCancellationRequested)
{
return;
}
continue;
}
// testing has shown that it sometimes requires more than one loop
var diff = nextUtcScheduledEvent - timeProvider.GetUtcNow();
while (diff.Ticks > 0)
{
_tokenSource.Token.WaitHandle.WaitOne(diff);
diff = nextUtcScheduledEvent - timeProvider.GetUtcNow();
if (_tokenSource.Token.IsCancellationRequested)
{
return;
}
}
NewEvent?.Invoke(this, EventArgs.Empty);
}
}) { IsBackground = true, Name = "RealTimeScheduleEventService" };
_pulseThread.Start();
}
///
/// Schedules a new event
///
/// The desired due time
/// Current utc time
/// Scheduling a new event will try to disable previous scheduled event,
/// but it is not guaranteed.
public void ScheduleEvent(TimeSpan dueTime, DateTime utcNow)
{
lock (_work)
{
_work.Enqueue(utcNow + dueTime);
_event.Set();
}
}
///
/// Disposes of the underlying instance
///
public void Dispose()
{
_pulseThread.StopSafely(TimeSpan.FromSeconds(1), _tokenSource);
_tokenSource.DisposeSafely();
_event.DisposeSafely();
}
}
}