/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Linq; using System.Threading; using QuantConnect.Util; using QuantConnect.Logging; using QuantConnect.Packets; using QuantConnect.Interfaces; using QuantConnect.Scheduling; using QuantConnect.Securities; using QuantConnect.Lean.Engine.Results; namespace QuantConnect.Lean.Engine.RealTime { /// /// Live trading realtime event processing. /// public class LiveTradingRealTimeHandler : BacktestingRealTimeHandler { private Thread _realTimeThread; private CancellationTokenSource _cancellationTokenSource = new(); /// /// Gets the current market hours database instance /// protected MarketHoursDatabase MarketHoursDatabase { get; set; } = MarketHoursDatabase.FromDataFolder(); /// /// Gets the current symbol properties database instance /// protected SymbolPropertiesDatabase SymbolPropertiesDatabase { get; set; } = SymbolPropertiesDatabase.FromDataFolder(); /// /// Gets the time provider /// /// /// This should be fixed to RealTimeHandler, but made a protected property for testing purposes /// protected virtual ITimeProvider TimeProvider { get; } = RealTimeProvider.Instance; /// /// Boolean flag indicating thread state. /// public override bool IsActive { get; protected set; } /// /// Initializes the real time handler for the specified algorithm and job /// public override void Setup(IAlgorithm algorithm, AlgorithmNodePacket job, IResultHandler resultHandler, IApi api, IIsolatorLimitResultProvider isolatorLimitProvider) { base.Setup(algorithm, job, resultHandler, api, isolatorLimitProvider); var utcNow = TimeProvider.GetUtcNow(); var todayInAlgorithmTimeZone = utcNow.ConvertFromUtc(Algorithm.TimeZone).Date; // set up an scheduled event to refresh market hours and symbol properties every certain period of time var times = Time.DateTimeRange(utcNow.Date, Time.EndOfTime, Algorithm.Settings.DatabasesRefreshPeriod).Where(date => date > utcNow); Add(new ScheduledEvent("RefreshMarketHoursAndSymbolProperties", times, (name, triggerTime) => { ResetMarketHoursDatabase(); ResetSymbolPropertiesDatabase(); })); } /// /// Get's the timeout the scheduled task time monitor should use /// protected override int GetTimeMonitorTimeout() { return 500; } /// /// Execute the live realtime event thread montioring. /// It scans every second monitoring for an event trigger. /// private void Run() { IsActive = true; // continue thread until cancellation is requested while (!_cancellationTokenSource.IsCancellationRequested) { var time = TimeProvider.GetUtcNow(); // pause until the next second var nextSecond = time.RoundUp(TimeSpan.FromSeconds(1)); var delay = Convert.ToInt32((nextSecond - time).TotalMilliseconds); Thread.Sleep(delay < 0 ? 1 : delay); // poke each event to see if it should fire, we order by unique id to be deterministic foreach (var kvp in ScheduledEvents.OrderBySafe(pair => pair.Value)) { var scheduledEvent = kvp.Key; try { IsolatorLimitProvider.Consume(scheduledEvent, time, TimeMonitor); } catch (Exception exception) { Algorithm.SetRuntimeError(exception, $"Scheduled event: '{scheduledEvent.Name}' at {time}"); } } } IsActive = false; Log.Trace("LiveTradingRealTimeHandler.Run(): Exiting thread... Exit triggered: " + _cancellationTokenSource.IsCancellationRequested); } /// /// Set the current time. If the date changes re-start the realtime event setup routines. /// /// public override void SetTime(DateTime time) { if (Algorithm.IsWarmingUp) { base.SetTime(time); } else if (_realTimeThread == null) { // in live mode we use current time for our time keeping // this method is used by backtesting to set time based on the data _realTimeThread = new Thread(Run) { IsBackground = true, Name = "RealTime Thread" }; _realTimeThread.Start(); // RealTime scan time for time based events } } /// /// Scan for past events that didn't fire because there was no data at the scheduled time. /// /// Current time. public override void ScanPastEvents(DateTime time) { if (Algorithm.IsWarmingUp) { base.ScanPastEvents(time); } // in live mode we use current time for our time keeping // this method is used by backtesting to scan for past events based on the data } /// /// Stop the real time thread /// public override void Exit() { _realTimeThread.StopSafely(TimeSpan.FromMinutes(1), _cancellationTokenSource); _cancellationTokenSource.DisposeSafely(); base.Exit(); } /// /// Resets the market hours database, forcing a reload when reused. /// Called in tests where multiple algorithms are run sequentially, /// and we need to guarantee that every test starts with the same environment. /// protected virtual void ResetMarketHoursDatabase() { MarketHoursDatabase.UpdateDataFolderDatabase(); Log.Trace("LiveTradingRealTimeHandler.ResetMarketHoursDatabase(): Updated market hours database."); } /// /// Resets the symbol properties database, forcing a reload when reused. /// protected virtual void ResetSymbolPropertiesDatabase() { SymbolPropertiesDatabase.UpdateDataFolderDatabase(); Log.Trace("LiveTradingRealTimeHandler.ResetSymbolPropertiesDatabase(): Updated symbol properties database."); } } }