/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; using QuantConnect.Brokerages; using QuantConnect.Configuration; using QuantConnect.Data.UniverseSelection; using QuantConnect.Interfaces; using QuantConnect.Logging; using QuantConnect.Notifications; using QuantConnect.Orders; using QuantConnect.Packets; using QuantConnect.Securities; using QuantConnect.Securities.Positions; using QuantConnect.Statistics; using QuantConnect.Util; namespace QuantConnect.Lean.Engine.Results { /// /// Live trading result handler implementation passes the messages to the QC live trading interface. /// /// Live trading result handler is quite busy. It sends constant price updates, equity updates and order/holdings updates. public class LiveTradingResultHandler : BaseResultsHandler, IResultHandler { // Required properties for the cloud app. private LiveNodePacket _job; //Update loop: private DateTime _nextUpdate; private DateTime _nextChartsUpdate; private DateTime _nextChartTrimming; private DateTime _nextLogStoreUpdate; private DateTime _nextStatisticsUpdate; private DateTime _nextInsightStoreUpdate; private DateTime _currentUtcDate; private readonly TimeSpan _storeInsightPeriod; private DateTime _nextPortfolioMarginUpdate; private DateTime _previousPortfolioMarginUpdate; private readonly TimeSpan _samplePortfolioPeriod; private readonly Chart _intradayPortfolioState = new(PortfolioMarginKey); /// /// The earliest time of next dump to the status file /// private DateTime _nextStatusUpdate; //Log Message Store: private DateTime _nextSample; private IApi _api; private readonly CancellationTokenSource _cancellationTokenSource; private readonly int _streamedChartLimit; private readonly int _streamedChartGroupSize; private bool _sampleChartAlways; private bool _userExchangeIsOpen; private ReferenceWrapper _portfolioValue; private ReferenceWrapper _benchmarkValue; private DateTime _lastChartSampleLogicCheck; private readonly Dictionary _exchangeHours; /// /// Creates a new instance /// public LiveTradingResultHandler() { _exchangeHours = new Dictionary(); _cancellationTokenSource = new CancellationTokenSource(); ResamplePeriod = TimeSpan.FromSeconds(2); NotificationPeriod = TimeSpan.FromSeconds(1); _samplePortfolioPeriod = _storeInsightPeriod = TimeSpan.FromMinutes(10); _streamedChartLimit = Config.GetInt("streamed-chart-limit", 12); _streamedChartGroupSize = Config.GetInt("streamed-chart-group-size", 3); _portfolioValue = new ReferenceWrapper(0); _benchmarkValue = new ReferenceWrapper(0); } /// /// Initialize the result handler with this result packet. /// /// DTO parameters class to initialize a result handler public override void Initialize(ResultHandlerInitializeParameters parameters) { _api = parameters.Api; _job = (LiveNodePacket)parameters.Job; if (_job == null) throw new Exception("LiveResultHandler.Constructor(): Submitted Job type invalid."); var utcNow = DateTime.UtcNow; _currentUtcDate = utcNow.Date; _nextPortfolioMarginUpdate = utcNow.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod); base.Initialize(parameters); } /// /// Live trading result handler thread. /// protected override void Run() { // give the algorithm time to initialize, else we will log an error right away ExitEvent.WaitOne(3000); // -> 1. Run Primary Sender Loop: Continually process messages from queue as soon as they arrive. while (!(ExitTriggered && Messages.IsEmpty)) { try { //1. Process Simple Messages in Queue Packet packet; if (Messages.TryDequeue(out packet)) { MessagingHandler.Send(packet); } //2. Update the packet scanner: Update(); if (Messages.IsEmpty) { // prevent thread lock/tight loop when there's no work to be done ExitEvent.WaitOne(Time.GetSecondUnevenWait(1000)); } } catch (Exception err) { Log.Error(err); } } // While !End. Log.Trace("LiveTradingResultHandler.Run(): Ending Thread..."); } // End Run(); /// /// Every so often send an update to the browser with the current state of the algorithm. /// private void Update() { //Error checks if the algorithm & threads have not loaded yet, or are closing down. if (Algorithm?.Transactions == null || TransactionHandler.Orders == null || !Algorithm.GetLocked()) { Log.Debug("LiveTradingResultHandler.Update(): Algorithm not yet initialized."); ExitEvent.WaitOne(1000); return; } if (ExitTriggered) { return; } var utcNow = DateTime.UtcNow; if (utcNow > _nextUpdate) { try { Dictionary deltaOrders; { var stopwatch = Stopwatch.StartNew(); deltaOrders = GetDeltaOrders(LastDeltaOrderPosition, shouldStop: orderCount => stopwatch.ElapsedMilliseconds > 15); } var deltaOrderEvents = TransactionHandler.OrderEvents.Skip(LastDeltaOrderEventsPosition).Take(50).ToList(); LastDeltaOrderEventsPosition += deltaOrderEvents.Count; //Create and send back the changes in chart since the algorithm started. var deltaCharts = new Dictionary(); Log.Debug("LiveTradingResultHandler.Update(): Build delta charts"); var performanceCharts = new Dictionary(); lock (ChartLock) { //Get the updates since the last chart foreach (var chart in Charts) { var chartUpdates = chart.Value.GetUpdates(); // we only want to stream charts that have new updates if (!chartUpdates.IsEmpty()) { // remove directory pathing characters from chart names var safeName = chart.Value.Name.Replace('/', '-'); DictionarySafeAdd(deltaCharts, safeName, chartUpdates, "deltaCharts"); } if (AlgorithmPerformanceCharts.Contains(chart.Key)) { performanceCharts[chart.Key] = chart.Value.Clone(); } if (chartUpdates.Name == PortfolioMarginKey) { PortfolioMarginChart.RemoveSinglePointSeries(chartUpdates); } } } Log.Debug("LiveTradingResultHandler.Update(): End build delta charts"); //Profit loss changes, get the banner statistics, summary information on the performance for the headers. var serverStatistics = GetServerStatistics(utcNow); var holdings = GetHoldings(Algorithm.Securities.Values, Algorithm.SubscriptionManager.SubscriptionDataConfigService); //Add the algorithm statistics first. Log.Debug("LiveTradingResultHandler.Update(): Build run time stats"); var summary = GenerateStatisticsResults(performanceCharts).Summary; var runtimeStatistics = GetAlgorithmRuntimeStatistics(summary); Log.Debug("LiveTradingResultHandler.Update(): End build run time stats"); // since we're sending multiple packets, let's do it async and forget about it // chart data can get big so let's break them up into groups var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings, Algorithm.Portfolio.CashBook, runtimeStatistics, serverStatistics, deltaOrderEvents); foreach (var liveResultPacket in splitPackets) { MessagingHandler.Send(liveResultPacket); } //Send full packet to storage. if (utcNow > _nextChartsUpdate) { Log.Debug("LiveTradingResultHandler.Update(): Pre-store result"); var chartComplete = new Dictionary(); lock (ChartLock) { foreach (var chart in Charts) { // remove directory pathing characters from chart names var safeName = chart.Value.Name.Replace('/', '-'); DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(), "chartComplete"); } } var orderEvents = GetOrderEventsToStore(); var deltaStatistics = new Dictionary(); var orders = new Dictionary(TransactionHandler.Orders); var complete = new LiveResultPacket(_job, new LiveResult(new LiveResultParameters(chartComplete, orders, Algorithm.Transactions.TransactionRecord, holdings, Algorithm.Portfolio.CashBook, deltaStatistics, runtimeStatistics, orderEvents, serverStatistics, state: GetAlgorithmState()))); StoreResult(complete); _nextChartsUpdate = DateTime.UtcNow.Add(ChartUpdateInterval); Log.Debug("LiveTradingResultHandler.Update(): End-store result"); } // Upload the logs every 1-2 minutes; this can be a heavy operation depending on amount of live logging and should probably be done asynchronously. if (utcNow > _nextLogStoreUpdate) { List logs; Log.Debug("LiveTradingResultHandler.Update(): Storing log..."); lock (LogStore) { // we need a new container instance so we can store the logs outside the lock logs = new List(LogStore); LogStore.Clear(); } SaveLogs(AlgorithmId, logs); _nextLogStoreUpdate = DateTime.UtcNow.AddMinutes(2); Log.Debug("LiveTradingResultHandler.Update(): Finished storing log"); } // Every minute send usage statistics: if (utcNow > _nextStatisticsUpdate) { try { _api.SendStatistics( _job.AlgorithmId, Algorithm.Portfolio.TotalUnrealizedProfit, Algorithm.Portfolio.TotalFees, Algorithm.Portfolio.TotalNetProfit, Algorithm.Portfolio.TotalHoldingsValue, Algorithm.Portfolio.TotalPortfolioValue, GetNetReturn(), Algorithm.Portfolio.TotalSaleVolume, TotalTradesCount(), 0); } catch (Exception err) { Log.Error(err, "Error sending statistics:"); } _nextStatisticsUpdate = utcNow.AddMinutes(1); } if (utcNow > _nextStatusUpdate) { var chartComplete = new Dictionary(); lock (ChartLock) { foreach (var chart in Charts) { // remove directory pathing characters from chart names var safeName = chart.Value.Name.Replace('/', '-'); DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(), "chartComplete"); } } StoreStatusFile( runtimeStatistics, // only store holdings we are invested in holdings.Where(pair => pair.Value.Quantity != 0).ToDictionary(pair => pair.Key, pair => pair.Value), chartComplete, GetAlgorithmState(), new SortedDictionary(Algorithm.Transactions.TransactionRecord), serverStatistics); SetNextStatusUpdate(); } if (_currentUtcDate != utcNow.Date) { StoreOrderEvents(_currentUtcDate, GetOrderEventsToStore()); // start storing in a new date file _currentUtcDate = utcNow.Date; } if (utcNow > _nextChartTrimming) { Log.Debug("LiveTradingResultHandler.Update(): Trimming charts"); var timeLimitUtc = utcNow.AddDays(-2); lock (ChartLock) { foreach (var chart in Charts) { foreach (var series in chart.Value.Series) { // trim data that's older than 2 days series.Value.Values = (from v in series.Value.Values where v.Time > timeLimitUtc select v).ToList(); } } } _nextChartTrimming = DateTime.UtcNow.AddMinutes(10); Log.Debug("LiveTradingResultHandler.Update(): Finished trimming charts"); } if (utcNow > _nextInsightStoreUpdate) { StoreInsights(); _nextInsightStoreUpdate = DateTime.UtcNow.Add(_storeInsightPeriod); } } catch (Exception err) { Log.Error(err, "LiveTradingResultHandler().Update(): ", true); } //Set the new update time after we've finished processing. // The processing can takes time depending on how large the packets are. _nextUpdate = DateTime.UtcNow.Add(MainUpdateInterval); } // End Update Charts: } /// /// Assigns the next earliest status update time /// protected virtual void SetNextStatusUpdate() { // Update the status json file every X _nextStatusUpdate = DateTime.UtcNow.AddMinutes(10); } /// /// Stores the order events /// /// The utc date associated with these order events /// The order events to store protected override void StoreOrderEvents(DateTime utcTime, List orderEvents) { if (orderEvents.Count <= 0) { return; } var filename = $"{AlgorithmId}-{utcTime:yyyy-MM-dd}-order-events.json"; var path = GetResultsPath(filename); var data = JsonConvert.SerializeObject(orderEvents, Formatting.None, SerializerSettings); File.WriteAllText(path, data); } /// /// Gets the order events generated in '_currentUtcDate' /// private List GetOrderEventsToStore() { return TransactionHandler.OrderEvents.Where(orderEvent => orderEvent.UtcTime >= _currentUtcDate).ToList(); } /// /// Will store the complete status of the algorithm in a single json file /// /// Will sample charts every 12 hours, 2 data points per day at maximum, /// to reduce file size private void StoreStatusFile(SortedDictionary runtimeStatistics, Dictionary holdings, Dictionary chartComplete, Dictionary algorithmState, SortedDictionary profitLoss, Dictionary serverStatistics = null, StatisticsResults statistics = null) { try { Log.Debug("LiveTradingResultHandler.Update(): status update start..."); if (statistics == null) { statistics = GenerateStatisticsResults(chartComplete, profitLoss); } // sample the entire charts with a 12 hours resolution var dailySampler = new SeriesSampler(TimeSpan.FromHours(12)); chartComplete = dailySampler.SampleCharts(chartComplete, Time.Start, Time.EndOfTime); if (chartComplete.TryGetValue(PortfolioMarginKey, out var marginChart)) { PortfolioMarginChart.RemoveSinglePointSeries(marginChart); } var result = new LiveResult(new LiveResultParameters(chartComplete, new Dictionary(TransactionHandler.Orders), Algorithm?.Transactions.TransactionRecord ?? new(), holdings, Algorithm?.Portfolio.CashBook ?? new(), statistics: statistics.Summary, runtimeStatistics: runtimeStatistics, orderEvents: null, // we stored order events separately serverStatistics: serverStatistics, state: algorithmState)); SaveResults($"{AlgorithmId}.json", result); Log.Debug("LiveTradingResultHandler.Update(): status update end."); } catch (Exception err) { Log.Error(err, "Error storing status update"); } } /// /// Run over all the data and break it into smaller packets to ensure they all arrive at the terminal /// private IEnumerable SplitPackets(Dictionary deltaCharts, Dictionary deltaOrders, Dictionary holdings, CashBook cashbook, SortedDictionary runtimeStatistics, Dictionary serverStatistics, List deltaOrderEvents) { // break the charts into groups var current = new Dictionary(); var chartPackets = new List(); // First add send charts // Loop through all the charts, add them to packets to be sent. // Group three charts per packet foreach (var deltaChart in deltaCharts.Values) { current.Add(deltaChart.Name, deltaChart); if (current.Count >= _streamedChartGroupSize) { // Add the micro packet to transport. chartPackets.Add(new LiveResultPacket(_job, new LiveResult { Charts = current })); // Reset the carrier variable. current = new Dictionary(); if (chartPackets.Count * _streamedChartGroupSize >= _streamedChartLimit) { // stream a maximum number of charts break; } } } // Add whatever is left over here too // unless it is a wildcard subscription if (current.Count > 0) { chartPackets.Add(new LiveResultPacket(_job, new LiveResult { Charts = current })); } // these are easier to split up, not as big as the chart objects var packets = new[] { new LiveResultPacket(_job, new LiveResult { Holdings = holdings, CashBook = cashbook}), new LiveResultPacket(_job, new LiveResult { RuntimeStatistics = runtimeStatistics, ServerStatistics = serverStatistics }) }; var result = packets.Concat(chartPackets); // only send order and order event packet if there is actually any update if (deltaOrders.Count > 0 || deltaOrderEvents.Count > 0) { result = result.Concat(new[] { new LiveResultPacket(_job, new LiveResult { Orders = deltaOrders, OrderEvents = deltaOrderEvents }) }); } return result; } /// /// Send a live trading debug message to the live console. /// /// Message we'd like shown in console. /// When there are already 500 messages in the queue it stops adding new messages. public void DebugMessage(string message) { if (Messages.Count > 500) return; //if too many in the queue already skip the logging. Messages.Enqueue(new DebugPacket(_job.ProjectId, AlgorithmId, CompileId, message)); AddToLogStore(message); } /// /// Send a live trading system debug message to the live console. /// /// Message we'd like shown in console. public void SystemDebugMessage(string message) { Messages.Enqueue(new SystemDebugPacket(_job.ProjectId, AlgorithmId, CompileId, message)); AddToLogStore(message); } /// /// Log string messages and send them to the console. /// /// String message wed like logged. /// When there are already 500 messages in the queue it stops adding new messages. public void LogMessage(string message) { //Send the logging messages out immediately for live trading: if (Messages.Count > 500) return; Messages.Enqueue(new LogPacket(AlgorithmId, message)); AddToLogStore(message); } /// /// Save an algorithm message to the log store. Uses a different timestamped method of adding messaging to interweve debug and logging messages. /// /// String message to send to browser. protected override void AddToLogStore(string message) { Log.Debug("LiveTradingResultHandler.AddToLogStore(): Adding"); base.AddToLogStore(DateTime.Now.ToStringInvariant(DateFormat.UI) + " " + message); Log.Debug("LiveTradingResultHandler.AddToLogStore(): Finished adding"); } /// /// Send an error message back to the browser console and highlight it read. /// /// Message we'd like shown in console. /// Stacktrace to show in the console. public void ErrorMessage(string message, string stacktrace = "") { if (Messages.Count > 500) return; Messages.Enqueue(new HandledErrorPacket(AlgorithmId, message, stacktrace)); AddToLogStore(message + (!string.IsNullOrEmpty(stacktrace) ? ": StackTrace: " + stacktrace : string.Empty)); } /// /// Send a list of secutity types that the algorithm trades to the browser to show the market clock - is this market open or closed! /// /// List of security types public void SecurityType(List types) { var packet = new SecurityTypesPacket { Types = types }; Messages.Enqueue(packet); } /// /// Send a runtime error back to the users browser and highlight it red. /// /// Runtime error message /// Associated error stack trace. public virtual void RuntimeError(string message, string stacktrace = "") { Messages.Enqueue(new RuntimeErrorPacket(_job.UserId, AlgorithmId, message, stacktrace)); AddToLogStore(message + (!string.IsNullOrEmpty(stacktrace) ? ": StackTrace: " + stacktrace : string.Empty)); SetAlgorithmState(message, stacktrace); } /// /// Process brokerage message events /// /// The brokerage message event public virtual void BrokerageMessage(BrokerageMessageEvent brokerageMessageEvent) { // NOP } /// /// Add a sample to the chart specified by the chartName, and seriesName. /// /// String chart name to place the sample. /// Series name for the chart. /// Series chart index - which chart should this series belong /// Series type for the chart. /// Value for the chart sample. /// Unit for the chart axis /// Sample can be used to create new charts or sample equity - daily performance. protected override void Sample(string chartName, string seriesName, int seriesIndex, SeriesType seriesType, ISeriesPoint value, string unit = "$") { // Sampling during warming up period skews statistics if (Algorithm.IsWarmingUp) { return; } Log.Debug("LiveTradingResultHandler.Sample(): Sampling " + chartName + "." + seriesName); lock (ChartLock) { //Add a copy locally: if (!Charts.TryGetValue(chartName, out var chart)) { Charts.AddOrUpdate(chartName, new Chart(chartName)); chart = Charts[chartName]; } //Add the sample to our chart: if (!chart.Series.TryGetValue(seriesName, out var series)) { series = BaseSeries.Create(seriesType, seriesName, seriesIndex, unit); chart.Series.Add(seriesName, series); } //Add our value: series.Values.Add(value); } Log.Debug("LiveTradingResultHandler.Sample(): Done sampling " + chartName + "." + seriesName); } /// /// Add a range of samples from the users algorithms to the end of our current list. /// /// Chart updates since the last request. /// protected void SampleRange(IEnumerable updates) { Log.Debug("LiveTradingResultHandler.SampleRange(): Begin sampling"); lock (ChartLock) { foreach (var update in updates) { //Create the chart if it doesn't exist already: Chart chart; if (!Charts.TryGetValue(update.Name, out chart)) { chart = new Chart(update.Name); Charts.AddOrUpdate(update.Name, chart); } //Add these samples to this chart. foreach (BaseSeries series in update.Series.Values) { if (series.Values.Count > 0) { var thisSeries = chart.TryAddAndGetSeries(series.Name, series, forceAddNew: false); if (series.SeriesType == SeriesType.Pie) { var dataPoint = series.ConsolidateChartPoints(); if (dataPoint != null) { thisSeries.AddPoint(dataPoint); } } else { //We already have this record, so just the new samples to the end: thisSeries.Values.AddRange(series.Values); } } } } } Log.Debug("LiveTradingResultHandler.SampleRange(): Finished sampling"); } /// /// Set the algorithm of the result handler after its been initialized. /// /// Algorithm object matching IAlgorithm interface /// Algorithm starting capital for statistics calculations public virtual void SetAlgorithm(IAlgorithm algorithm, decimal startingPortfolioValue) { Algorithm = algorithm; Algorithm.SetStatisticsService(this); DailyPortfolioValue = StartingPortfolioValue = startingPortfolioValue; _portfolioValue = new ReferenceWrapper(startingPortfolioValue); CumulativeMaxPortfolioValue = StartingPortfolioValue; AlgorithmCurrencySymbol = Currencies.GetCurrencySymbol(Algorithm.AccountCurrency); var types = new List(); foreach (var kvp in Algorithm.Securities) { var security = kvp.Value; if (!types.Contains(security.Type)) types.Add(security.Type); } SecurityType(types); // we need to forward Console.Write messages to the algorithm's Debug function var debug = new FuncTextWriter(algorithm.Debug); var error = new FuncTextWriter(algorithm.Error); Console.SetOut(debug); Console.SetError(error); UpdateAlgorithmStatus(); // Wire algorithm name and tags updates algorithm.NameUpdated += (sender, name) => AlgorithmNameUpdated(name); algorithm.TagsUpdated += (sender, tags) => AlgorithmTagsUpdated(tags); } /// /// Send a algorithm status update to the user of the algorithms running state. /// /// Status enum of the algorithm. /// Optional string message describing reason for status change. public void SendStatusUpdate(AlgorithmStatus status, string message = "") { Log.Trace($"LiveTradingResultHandler.SendStatusUpdate(): status: '{status}'. {(string.IsNullOrEmpty(message) ? string.Empty : " " + message)}"); var packet = new AlgorithmStatusPacket(_job.AlgorithmId, _job.ProjectId, status, message); Messages.Enqueue(packet); } /// /// Set a dynamic runtime statistic to show in the (live) algorithm header /// /// Runtime headline statistic name /// Runtime headline statistic value public void RuntimeStatistic(string key, string value) { Log.Debug("LiveTradingResultHandler.RuntimeStatistic(): Begin setting statistic"); lock (RuntimeStatistics) { if (!RuntimeStatistics.ContainsKey(key)) { RuntimeStatistics.Add(key, value); } RuntimeStatistics[key] = value; } Log.Debug("LiveTradingResultHandler.RuntimeStatistic(): End setting statistic"); } /// /// Send a final analysis result back to the IDE. /// protected void SendFinalResult() { Log.Trace("LiveTradingResultHandler.SendFinalResult(): Starting..."); try { var endTime = DateTime.UtcNow; var endState = GetAlgorithmState(endTime); LiveResultPacket result; // could happen if algorithm failed to init if (Algorithm != null) { //Convert local dictionary: var charts = new Dictionary(); lock (ChartLock) { foreach (var kvp in Charts) { charts.Add(kvp.Key, kvp.Value.Clone()); } } var orders = new Dictionary(TransactionHandler.Orders); var profitLoss = new SortedDictionary(Algorithm.Transactions.TransactionRecord); var holdings = GetHoldings(Algorithm.Securities.Values, Algorithm.SubscriptionManager.SubscriptionDataConfigService, onlyInvested: true); var statisticsResults = GenerateStatisticsResults(charts, profitLoss); var runtime = GetAlgorithmRuntimeStatistics(statisticsResults.Summary); StoreStatusFile(runtime, holdings, charts, endState, profitLoss, statistics: statisticsResults); //Create a packet: result = new LiveResultPacket(_job, new LiveResult(new LiveResultParameters(charts, orders, profitLoss, new Dictionary(), Algorithm.Portfolio.CashBook, statisticsResults.Summary, runtime, GetOrderEventsToStore(), algorithmConfiguration: AlgorithmConfiguration.Create(Algorithm, null), state: endState))); } else { StoreStatusFile(new(), new(), new(), endState, new()); result = LiveResultPacket.CreateEmpty(_job); result.Results.State = endState; } StoreInsights(); //Store to S3: StoreResult(result); Log.Trace("LiveTradingResultHandler.SendFinalResult(): Finished storing results. Start sending..."); //Truncate packet to fit within 32kb: result.Results = new LiveResult(); //Send the truncated packet: MessagingHandler.Send(result); } catch (Exception err) { Log.Error(err); } Log.Trace("LiveTradingResultHandler.SendFinalResult(): Ended"); } /// /// Process the log entries and save it to permanent storage /// /// Id that will be incorporated into the algorithm log name /// Log list /// Returns the location of the logs public override string SaveLogs(string id, List logs) { try { var logLines = logs.Select(x => x.Message); var filename = $"{id}-log.txt"; var path = GetResultsPath(filename); File.AppendAllLines(path, logLines); return path; } catch (Exception err) { Log.Error(err); } return ""; } /// /// Save the snapshot of the total results to storage. /// /// Packet to store. protected override void StoreResult(Packet packet) { try { Log.Debug("LiveTradingResultHandler.StoreResult(): Begin store result sampling"); // Make sure this is the right type of packet: if (packet.Type != PacketType.LiveResult) return; // Port to packet format: var live = packet as LiveResultPacket; if (live != null) { if (live.Results.OrderEvents != null) { // we store order events separately StoreOrderEvents(_currentUtcDate, live.Results.OrderEvents); // lets null the orders events so that they aren't stored again and generate a giant file live.Results.OrderEvents = null; } // we need to down sample var start = DateTime.UtcNow.Date; var stop = start.AddDays(1); // truncate to just today, we don't need more than this for anyone Truncate(live.Results, start, stop); var highResolutionCharts = new Dictionary(live.Results.Charts); // minute resolution data, save today var minuteSampler = new SeriesSampler(TimeSpan.FromMinutes(1)); var minuteCharts = minuteSampler.SampleCharts(live.Results.Charts, start, stop); // swap out our charts with the sampled data minuteCharts.Remove(PortfolioMarginKey); live.Results.Charts = minuteCharts; SaveResults(CreateKey("minute"), live.Results); // 10 minute resolution data, save today var tenminuteSampler = new SeriesSampler(TimeSpan.FromMinutes(10)); var tenminuteCharts = tenminuteSampler.SampleCharts(live.Results.Charts, start, stop); lock (_intradayPortfolioState) { var clone = _intradayPortfolioState.Clone(); PortfolioMarginChart.RemoveSinglePointSeries(clone); tenminuteCharts[PortfolioMarginKey] = clone; } live.Results.Charts = tenminuteCharts; SaveResults(CreateKey("10minute"), live.Results); // high resolution data, we only want to save an hour highResolutionCharts.Remove(PortfolioMarginKey); live.Results.Charts = highResolutionCharts; start = DateTime.UtcNow.RoundDown(TimeSpan.FromHours(1)); stop = DateTime.UtcNow.RoundUp(TimeSpan.FromHours(1)); Truncate(live.Results, start, stop); foreach (var name in live.Results.Charts.Keys) { var result = new LiveResult { Orders = new Dictionary(live.Results.Orders), Holdings = new Dictionary(live.Results.Holdings), Charts = new Dictionary { { name, live.Results.Charts[name] } } }; SaveResults(CreateKey("second_" + CreateSafeChartName(name), "yyyy-MM-dd-HH"), result); } } else { Log.Error("LiveResultHandler.StoreResult(): Result Null."); } Log.Debug("LiveTradingResultHandler.StoreResult(): End store result sampling"); } catch (Exception err) { Log.Error(err); } } /// /// New order event for the algorithm /// /// New event details public override void OrderEvent(OrderEvent newEvent) { var brokerIds = string.Empty; var order = TransactionHandler.GetOrderById(newEvent.OrderId); if (order != null && order.BrokerId.Count > 0) brokerIds = string.Join(", ", order.BrokerId); //Send the message to frontend as packet: Log.Trace("LiveTradingResultHandler.OrderEvent(): " + newEvent + " BrokerId: " + brokerIds, true); Messages.Enqueue(new OrderEventPacket(AlgorithmId, newEvent)); var message = "New Order Event: " + newEvent; DebugMessage(message); } /// /// Terminate the result thread and apply any required exit procedures like sending final results /// public override void Exit() { if (!ExitTriggered) { _cancellationTokenSource.Cancel(); if (Algorithm != null) { // first process synchronous events so we add any new message or log ProcessSynchronousEvents(true); } // Set exit flag, update task will send any message before stopping ExitTriggered = true; ExitEvent.Set(); lock (LogStore) { SaveLogs(AlgorithmId, LogStore); LogStore.Clear(); } StopUpdateRunner(); SendFinalResult(); base.Exit(); _cancellationTokenSource.DisposeSafely(); } } /// /// Truncates the chart and order data in the result packet to within the specified time frame /// private static void Truncate(LiveResult result, DateTime start, DateTime stop) { //Log.Trace("LiveTradingResultHandler.Truncate: Start: " + start.ToString("u") + " Stop : " + stop.ToString("u")); //Log.Trace("LiveTradingResultHandler.Truncate: Truncate Delta: " + (unixDateStop - unixDateStart) + " Incoming Points: " + result.Charts["Strategy Equity"].Series["Equity"].Values.Count); var charts = new Dictionary(); foreach (var kvp in result.Charts) { var chart = kvp.Value; var newChart = new Chart(chart.Name); charts.Add(kvp.Key, newChart); foreach (var series in chart.Series.Values) { var newSeries = series.Clone(empty: true); newSeries.Values.AddRange(series.Values.Where(chartPoint => chartPoint.Time >= start && chartPoint.Time <= stop)); newChart.AddSeries(newSeries); } } result.Charts = charts; result.Orders = result.Orders.Values.Where(x => (x.Time >= start && x.Time <= stop) || (x.LastFillTime != null && x.LastFillTime >= start && x.LastFillTime <= stop) || (x.LastUpdateTime != null && x.LastUpdateTime >= start && x.LastUpdateTime <= stop) ).ToDictionary(x => x.Id); //Log.Trace("LiveTradingResultHandler.Truncate: Truncate Outgoing: " + result.Charts["Strategy Equity"].Series["Equity"].Values.Count); } private string CreateKey(string suffix, string dateFormat = "yyyy-MM-dd") { return $"{AlgorithmId}-{DateTime.UtcNow.ToStringInvariant(dateFormat)}_{suffix}.json"; } /// /// Escape the chartname so that it can be saved to a file system /// /// The name of a chart /// The name of the chart will all escape all characters except RFC 2396 unreserved characters protected virtual string CreateSafeChartName(string chartName) { return Uri.EscapeDataString(chartName); } /// /// Process the synchronous result events, sampling and message reading. /// This method is triggered from the algorithm manager thread. /// /// Prime candidate for putting into a base class. Is identical across all result handlers. public virtual void ProcessSynchronousEvents(bool forceProcess = false) { var time = DateTime.UtcNow; // Check to see if we should update stored portfolio values UpdatePortfolioValue(time, forceProcess); // Update the equity bar UpdateAlgorithmEquity(); if (time > _nextPortfolioMarginUpdate || forceProcess) { _nextPortfolioMarginUpdate = time.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod); var newState = PortfolioState.Create(Algorithm.Portfolio, time, GetPortfolioValue()); lock (_intradayPortfolioState) { if (_previousPortfolioMarginUpdate.Date != time.Date) { // we crossed into a new day _previousPortfolioMarginUpdate = time.Date; _intradayPortfolioState.Series.Clear(); } if (newState != null) { PortfolioMarginChart.AddSample(_intradayPortfolioState, newState, MapFileProvider, time); } } } if (time > _nextSample || forceProcess) { Log.Debug("LiveTradingResultHandler.ProcessSynchronousEvents(): Enter"); //Set next sample time: 4000 samples per backtest _nextSample = time.Add(ResamplePeriod); // Check to see if we should update stored bench values UpdateBenchmarkValue(time, forceProcess); //Sample the portfolio value over time for chart. SampleEquity(time); //Also add the user samples / plots to the result handler tracking: SampleRange(Algorithm.GetChartUpdates(true)); } ProcessAlgorithmLogs(messageQueueLimit: 500); //Set the running statistics: foreach (var pair in Algorithm.RuntimeStatistics) { RuntimeStatistic(pair.Key, pair.Value); } //Send all the notification messages but timeout within a second, or if this is a force process, wait till its done. var timeout = DateTime.UtcNow.AddSeconds(1); while (!Algorithm.Notify.Messages.IsEmpty && (DateTime.UtcNow < timeout || forceProcess)) { Notification message; if (Algorithm.Notify.Messages.TryDequeue(out message)) { //Process the notification messages: Log.Trace("LiveTradingResultHandler.ProcessSynchronousEvents(): Processing Notification..."); try { MessagingHandler.SendNotification(message); } catch (Exception err) { Algorithm.Debug(err.Message); Log.Error(err, "Sending notification: " + message.GetType().FullName); } } } Log.Debug("LiveTradingResultHandler.ProcessSynchronousEvents(): Exit"); } /// /// Event fired each time that we add/remove securities from the data feed. /// On Security change we re determine when should we sample charts, if the user added Crypto, Forex or an extended market hours subscription /// we will always sample charts. Else, we will keep the exchange per market to query later on demand /// public override void OnSecuritiesChanged(SecurityChanges changes) { if (_sampleChartAlways) { return; } foreach (var securityChange in changes.AddedSecurities) { var symbol = securityChange.Symbol; if (symbol.SecurityType == QuantConnect.SecurityType.Base) { // ignore custom data continue; } // if the user added Crypto, Forex, Daily or an extended market hours subscription just sample always, one way trip. _sampleChartAlways = symbol.SecurityType == QuantConnect.SecurityType.Crypto || symbol.SecurityType == QuantConnect.SecurityType.Forex || Algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol) .Any(config => config.ExtendedMarketHours || config.Resolution == Resolution.Daily); if (_sampleChartAlways) { // we set it once to true return; } if (!_exchangeHours.ContainsKey(securityChange.Symbol.ID.Market)) { // per market we keep track of the exchange hours _exchangeHours[securityChange.Symbol.ID.Market] = securityChange.Exchange.Hours; } } } /// /// Samples portfolio equity, benchmark, and daily performance /// /// Current UTC time in the AlgorithmManager loop public void Sample(DateTime time) { // Force an update for our values before doing our daily sample UpdatePortfolioValue(time); UpdateBenchmarkValue(time); base.Sample(time); } /// /// Gets the current portfolio value /// /// Useful so that live trading implementation can freeze the returned value if there is no user exchange open /// so we ignore extended market hours updates protected override decimal GetPortfolioValue() { return _portfolioValue.Value; } /// /// Gets the current benchmark value /// /// Useful so that live trading implementation can freeze the returned value if there is no user exchange open /// so we ignore extended market hours updates /// Time to resolve benchmark value at protected override decimal GetBenchmarkValue(DateTime time) { return _benchmarkValue.Value; } /// /// True if user exchange are open and we should update portfolio and benchmark value /// /// Useful so that live trading implementation can freeze the returned value if there is no user exchange open /// so we ignore extended market hours updates [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool UserExchangeIsOpen(DateTime utcDateTime) { if (_sampleChartAlways || _exchangeHours.Count == 0) { return true; } if (_lastChartSampleLogicCheck.Day == utcDateTime.Day && _lastChartSampleLogicCheck.Hour == utcDateTime.Hour && _lastChartSampleLogicCheck.Minute == utcDateTime.Minute) { // we cache the value for a minute return _userExchangeIsOpen; } _lastChartSampleLogicCheck = utcDateTime; foreach (var exchangeHour in _exchangeHours.Values) { if (exchangeHour.IsOpen(utcDateTime.ConvertFromUtc(exchangeHour.TimeZone), false)) { // one of the users exchanges is open _userExchangeIsOpen = true; return true; } } // no user exchange is open _userExchangeIsOpen = false; return false; } private static void DictionarySafeAdd(Dictionary dictionary, string key, T value, string dictionaryName) { if (!dictionary.TryAdd(key, value)) { Log.Error($"LiveTradingResultHandler.DictionarySafeAdd(): dictionary {dictionaryName} already contains key {key}"); } } /// /// Will launch a task which will call the API and update the algorithm status every minute /// private void UpdateAlgorithmStatus() { if (!ExitTriggered && !_cancellationTokenSource.IsCancellationRequested) // just in case { // wait until after we're warmed up to start sending running status each minute if (!Algorithm.IsWarmingUp) { _api.SetAlgorithmStatus(_job.AlgorithmId, AlgorithmStatus.Running); } Task.Delay(TimeSpan.FromMinutes(1), _cancellationTokenSource.Token).ContinueWith(_ => UpdateAlgorithmStatus()); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void UpdateBenchmarkValue(DateTime time, bool force = false) { if (force || UserExchangeIsOpen(time)) { _benchmarkValue = new ReferenceWrapper(base.GetBenchmarkValue(time)); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void UpdatePortfolioValue(DateTime time, bool force = false) { if (force || UserExchangeIsOpen(time)) { _portfolioValue = new ReferenceWrapper(base.GetPortfolioValue()); } } /// /// Helper method to fetch the algorithm holdings /// public static Dictionary GetHoldings(IEnumerable securities, ISubscriptionDataConfigService subscriptionDataConfigService, bool onlyInvested = false) { var holdings = new Dictionary(); foreach (var security in securities // If we are invested we send it always, if not, we send non internal, non canonical and tradable securities. When securities are removed they are marked as non tradable. .Where(s => s.Invested || !onlyInvested && (!s.IsInternalFeed() && s.IsTradable && !s.Symbol.IsCanonical() // Continuous futures are different because it's mapped securities are internal and the continuous contract is canonical and non tradable but we want to send them anyways // but we don't want to sent non canonical, non tradable futures, these would be the future chain assets, or continuous mapped contracts that have been removed || s.Symbol.SecurityType == QuantConnect.SecurityType.Future && (s.IsTradable || s.Symbol.IsCanonical() && subscriptionDataConfigService.GetSubscriptionDataConfigs(s.Symbol).Any()))) .OrderBy(x => x.Symbol.Value)) { DictionarySafeAdd(holdings, security.Symbol.ID.ToString(), new Holding(security), "holdings"); } return holdings; } /// /// Calculates and gets the current statistics for the algorithm /// /// The current statistics public StatisticsResults StatisticsResults() { return GenerateStatisticsResults(); } /// /// Sets or updates a custom summary statistic /// /// The statistic name /// The statistic value public void SetSummaryStatistic(string name, string value) { SummaryStatistic(name, value); } /// /// Handles updates to the algorithm's name /// /// The new name public virtual void AlgorithmNameUpdated(string name) { Messages.Enqueue(new AlgorithmNameUpdatePacket(AlgorithmId, name)); } /// /// Handles updates to the algorithm's tags /// /// The new tags public virtual void AlgorithmTagsUpdated(HashSet tags) { Messages.Enqueue(new AlgorithmTagsUpdatePacket(AlgorithmId, tags)); } } }