/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using QuantConnect.Logging; using QuantConnect.Util; namespace QuantConnect { /// /// Isolator class - create a new instance of the algorithm and ensure it doesn't /// exceed memory or time execution limits. /// public class Isolator { /// /// Algo cancellation controls - cancel source. /// public CancellationTokenSource CancellationTokenSource { get; private set; } /// /// Initializes a new instance of the class /// public Isolator() { CancellationTokenSource = new CancellationTokenSource(); } /// /// Execute a code block with a maximum limit on time and memory. /// /// Timeout in timespan /// Function used to determine if the codeBlock is within custom limits, such as with algorithm manager /// timing individual time loops, return a non-null and non-empty string with a message indicating the error/reason for stoppage /// Action codeblock to execute /// Maximum memory allocation, default 1024Mb /// Sleep interval between each check in ms /// The worker thread instance that will execute the provided action, if null /// will use a /// True if algorithm exited successfully, false if cancelled because it exceeded limits. public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func withinCustomLimits, Action codeBlock, long memoryCap = 1024, int sleepIntervalMillis = 1000, WorkerThread workerThread = null) { workerThread?.Add(codeBlock); var task = workerThread == null //Launch task ? Task.Factory.StartNew(codeBlock, CancellationTokenSource.Token) // wrapper task so we can reuse MonitorTask : Task.Factory.StartNew(() => workerThread.FinishedWorkItem.WaitOne(), CancellationTokenSource.Token); try { return MonitorTask(task, timeSpan, withinCustomLimits, memoryCap, sleepIntervalMillis); } catch (Exception) { if (!task.IsCompleted) { // lets free the wrapper task even if the worker thread didn't finish workerThread?.FinishedWorkItem.Set(); } throw; } } private bool MonitorTask(Task task, TimeSpan timeSpan, Func withinCustomLimits, long memoryCap = 1024, int sleepIntervalMillis = 1000) { // default to always within custom limits withinCustomLimits = withinCustomLimits ?? (() => new IsolatorLimitResult(TimeSpan.Zero, string.Empty)); var message = string.Empty; var emaPeriod = 60d; var memoryUsed = 0L; var utcNow = DateTime.UtcNow; var end = utcNow + timeSpan; var memoryLogger = utcNow + Time.OneMinute; var isolatorLimitResult = new IsolatorLimitResult(TimeSpan.Zero, string.Empty); //Convert to bytes memoryCap *= 1024 * 1024; var spikeLimit = memoryCap*2; if (memoryCap <= 0) { memoryCap = long.MaxValue; spikeLimit = long.MaxValue; } while (!task.IsCompleted && !CancellationTokenSource.IsCancellationRequested && utcNow < end) { // if over 80% allocation force GC then sample var sample = Convert.ToDouble(GC.GetTotalMemory(memoryUsed > memoryCap * 0.8)); // find the EMA of the memory used to prevent spikes killing stategy memoryUsed = Convert.ToInt64((emaPeriod-1)/emaPeriod * memoryUsed + (1/emaPeriod)*sample); // if the rolling EMA > cap; or the spike is more than 2x the allocation. if (memoryUsed > memoryCap || sample > spikeLimit) { message = Messages.Isolator.MemoryUsageMaxedOut(PrettyFormatRam(memoryCap), PrettyFormatRam((long)sample)); break; } if (utcNow > memoryLogger) { if (memoryUsed > memoryCap * 0.8) { Log.Error(Messages.Isolator.MemoryUsageOver80Percent(sample)); } Log.Trace("Isolator.ExecuteWithTimeLimit(): " + Messages.Isolator.MemoryUsageInfo( PrettyFormatRam(memoryUsed), PrettyFormatRam((long)sample), PrettyFormatRam(OS.ApplicationMemoryUsed * 1024 * 1024), isolatorLimitResult.CurrentTimeStepElapsed, (int)Math.Ceiling(OS.CpuUsage))); memoryLogger = utcNow.AddMinutes(1); } // check to see if we're within other custom limits defined by the caller isolatorLimitResult = withinCustomLimits(); if (!isolatorLimitResult.IsWithinCustomLimits) { message = isolatorLimitResult.ErrorMessage; break; } if (task.Wait(utcNow.GetSecondUnevenWait(sleepIntervalMillis))) { break; } utcNow = DateTime.UtcNow; } if (task.IsCompleted == false) { if (CancellationTokenSource.IsCancellationRequested) { Log.Trace($"Isolator.ExecuteWithTimeLimit(): Operation was canceled"); throw new OperationCanceledException("Operation was canceled"); } else if (string.IsNullOrEmpty(message)) { message = Messages.Isolator.MemoryUsageMonitorTaskTimedOut(timeSpan); Log.Trace($"Isolator.ExecuteWithTimeLimit(): {message}"); } } if (!string.IsNullOrEmpty(message)) { if (!CancellationTokenSource.IsCancellationRequested) { CancellationTokenSource.Cancel(); } Log.Error($"Security.ExecuteWithTimeLimit(): {message}"); throw new TimeoutException(message); } return task.IsCompleted; } /// /// Execute a code block with a maximum limit on time and memory. /// /// Timeout in timespan /// Action codeblock to execute /// Maximum memory allocation, default 1024Mb /// Sleep interval between each check in ms /// The worker thread instance that will execute the provided action, if null /// will use a /// True if algorithm exited successfully, false if cancelled because it exceeded limits. public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Action codeBlock, long memoryCap, int sleepIntervalMillis = 1000, WorkerThread workerThread = null) { return ExecuteWithTimeLimit(timeSpan, null, codeBlock, memoryCap, sleepIntervalMillis, workerThread); } /// /// Convert the bytes to a MB in double format for string display /// /// /// private static string PrettyFormatRam(long ramInBytes) { return Math.Round(Convert.ToDouble(ramInBytes/(1024*1024))).ToStringInvariant(); } } }