/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using QuantConnect.Util; using System.Collections.Generic; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Allows to setup a real time scheduled event, internally using a , /// that is guaranteed to trigger at or after the requested time, never before. /// /// This class is of value because could fire the /// event before time. public class RealTimeScheduleEventService : IDisposable { private readonly Thread _pulseThread; private readonly Queue _work; private readonly ManualResetEvent _event; private readonly CancellationTokenSource _tokenSource; /// /// Event fired when the scheduled time is past /// public event EventHandler NewEvent; /// /// Creates a new instance /// /// The time provider to use public RealTimeScheduleEventService(ITimeProvider timeProvider) { _tokenSource = new CancellationTokenSource(); _event = new ManualResetEvent(false); _work = new Queue(); _pulseThread = new Thread(() => { while (!_tokenSource.Token.IsCancellationRequested) { DateTime nextUtcScheduledEvent; lock (_work) { _work.TryDequeue(out nextUtcScheduledEvent); } if (nextUtcScheduledEvent == default) { _event.WaitOne(_tokenSource.Token); _event.Reset(); if (_tokenSource.Token.IsCancellationRequested) { return; } continue; } // testing has shown that it sometimes requires more than one loop var diff = nextUtcScheduledEvent - timeProvider.GetUtcNow(); while (diff.Ticks > 0) { _tokenSource.Token.WaitHandle.WaitOne(diff); diff = nextUtcScheduledEvent - timeProvider.GetUtcNow(); if (_tokenSource.Token.IsCancellationRequested) { return; } } NewEvent?.Invoke(this, EventArgs.Empty); } }) { IsBackground = true, Name = "RealTimeScheduleEventService" }; _pulseThread.Start(); } /// /// Schedules a new event /// /// The desired due time /// Current utc time /// Scheduling a new event will try to disable previous scheduled event, /// but it is not guaranteed. public void ScheduleEvent(TimeSpan dueTime, DateTime utcNow) { lock (_work) { _work.Enqueue(utcNow + dueTime); _event.Set(); } } /// /// Disposes of the underlying instance /// public void Dispose() { _pulseThread.StopSafely(TimeSpan.FromSeconds(1), _tokenSource); _tokenSource.DisposeSafely(); _event.DisposeSafely(); } } }