/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections.Generic; using System.Linq; using System.Threading; using Fasterflect; using QuantConnect.Algorithm; using QuantConnect.Configuration; using QuantConnect.Data; using QuantConnect.Data.Market; using QuantConnect.Data.UniverseSelection; using QuantConnect.Interfaces; using QuantConnect.Lean.Engine.DataFeeds; using QuantConnect.Lean.Engine.RealTime; using QuantConnect.Lean.Engine.Results; using QuantConnect.Lean.Engine.Server; using QuantConnect.Lean.Engine.TransactionHandlers; using QuantConnect.Logging; using QuantConnect.Orders; using QuantConnect.Packets; using QuantConnect.Securities; using QuantConnect.Securities.Option; using QuantConnect.Securities.Volatility; using QuantConnect.Util.RateLimit; namespace QuantConnect.Lean.Engine { /// /// Algorithm manager class executes the algorithm and generates and passes through the algorithm events. /// public class AlgorithmManager { private IAlgorithm _algorithm; private readonly object _lock; private readonly bool _liveMode; private bool _cancelRequested; private CancellationTokenSource _cancellationTokenSource; /// /// Publicly accessible algorithm status /// public AlgorithmStatus State => _algorithm?.Status ?? AlgorithmStatus.Running; /// /// Public access to the currently running algorithm id. /// public string AlgorithmId { get; private set; } /// /// Provides the isolator with a function for verifying that we're not spending too much time in each /// algorithm manager time loop /// public AlgorithmTimeLimitManager TimeLimit { get; } /// /// Quit state flag for the running algorithm. When true the user has requested the backtest stops through a Quit() method. /// /// public bool QuitState => State == AlgorithmStatus.Deleted; /// /// Gets the number of data points processed per second /// public long DataPoints { get; private set; } /// /// Gets the number of data points of algorithm history provider /// public int AlgorithmHistoryDataPoints => _algorithm?.HistoryProvider?.DataPointCount ?? 0; /// /// Initializes a new instance of the class /// /// True if we're running in live mode, false for backtest mode /// Provided by LEAN when creating a new algo manager. This is the job /// that the algo manager is about to execute. Research and other consumers can provide the /// default value of null public AlgorithmManager(bool liveMode, AlgorithmNodePacket job = null) { AlgorithmId = ""; _liveMode = liveMode; _lock = new object(); // initialize the time limit manager TimeLimit = new AlgorithmTimeLimitManager( CreateTokenBucket(job?.Controls?.TrainingLimits), TimeSpan.FromMinutes(Config.GetDouble("algorithm-manager-time-loop-maximum", 20)) ); } /// /// Launch the algorithm manager to run this strategy /// /// Algorithm job /// Algorithm instance /// Instance which implements . Used to stream the data /// Transaction manager object /// Result handler object /// Realtime processing object /// ILeanManager implementation that is updated periodically with the IAlgorithm instance /// Cancellation token source to monitor /// Modify with caution public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationTokenSource cancellationTokenSource) { //Initialize: DataPoints = 0; _algorithm = algorithm; var token = cancellationTokenSource.Token; _cancellationTokenSource = cancellationTokenSource; var backtestMode = (job.Type == PacketType.BacktestNode); var methodInvokers = new Dictionary(); var marginCallFrequency = TimeSpan.FromMinutes(5); var nextMarginCallTime = DateTime.MinValue; var nextSecurityModelScan = algorithm.UtcTime.RoundDown(Time.OneHour) + Time.OneHour; var time = algorithm.StartDate.Date; var pendingDelistings = new List(); var splitWarnings = new List(); //Initialize Properties: AlgorithmId = job.AlgorithmId; //Go through the subscription types and create invokers to trigger the event handlers for each custom type: foreach (var config in algorithm.SubscriptionManager.Subscriptions) { //If type is a custom feed, check for a dedicated event handler if (config.IsCustomData) { //Get the matching method for this event handler - e.g. public void OnData(Quandl data) { .. } var genericMethod = (algorithm.GetType()).GetMethod("OnData", new[] { config.Type }); //If we already have this Type-handler then don't add it to invokers again. if (methodInvokers.ContainsKey(config.Type)) continue; if (genericMethod != null) { methodInvokers.Add(config.Type, genericMethod.DelegateForCallMethod()); } } } // Schedule a daily event for sampling at midnight every night algorithm.Schedule.On("Daily Sampling", algorithm.Schedule.DateRules.EveryDay(), algorithm.Schedule.TimeRules.Midnight, () => { results.Sample(algorithm.UtcTime); }); //Loop over the queues: get a data collection, then pass them all into relevent methods in the algorithm. Log.Trace($"AlgorithmManager.Run(): Begin DataStream - Start: {algorithm.StartDate} Stop: {algorithm.EndDate} Time: {algorithm.Time} Warmup: {algorithm.IsWarmingUp}"); foreach (var timeSlice in Stream(algorithm, synchronizer, results, token)) { // reset our timer on each loop TimeLimit.StartNewTimeStep(); //Check this backtest is still running: if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null) { Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}"); break; } //Execute with TimeLimit Monitor: if (token.IsCancellationRequested) { Log.Error($"AlgorithmManager.Run(): CancellationRequestion at {timeSlice.Time.ToStringInvariant()}"); return; } // Update the ILeanManager leanManager.Update(); time = timeSlice.Time; DataPoints += timeSlice.DataPointCount; if (backtestMode && algorithm.Portfolio.TotalPortfolioValue <= 0) { var logMessage = "AlgorithmManager.Run(): Portfolio value is less than or equal to zero, stopping algorithm."; Log.Error(logMessage); results.SystemDebugMessage(logMessage); break; } // If backtesting/warmup, we need to check if there are realtime events in the past // which didn't fire because at the scheduled times there was no data (i.e. markets closed) // and fire them with the correct date/time. realtime.ScanPastEvents(time); // will scan registered consolidators for which we've past the expected scan call. // In live mode we want to round down to the second, so we don't scan too far into the future: // The time slice might carry the data needed to complete a current consolidated bar but the // time slice time might be slightly ahead (a few milliseconds or even ticks) because in live we // use DateTime.UtcNow. So we don't want to scan past the data time so that the consolidators can // complete the current bar. var pastConsolidatorsScanTime = _liveMode ? time.RoundDown(Time.OneSecond) : time; algorithm.SubscriptionManager.ScanPastConsolidators(pastConsolidatorsScanTime, algorithm); //Set the algorithm and real time handler's time algorithm.SetDateTime(time); // the time pulse are just to advance algorithm time, lets shortcut the loop here if (timeSlice.IsTimePulse) { continue; } // Update the current slice before firing scheduled events or any other task algorithm.SetCurrentSlice(timeSlice.Slice); if (timeSlice.SecurityChanges != SecurityChanges.None) { algorithm.ProcessSecurityChanges(timeSlice.SecurityChanges); leanManager.OnSecuritiesChanged(timeSlice.SecurityChanges); realtime.OnSecuritiesChanged(timeSlice.SecurityChanges); results.OnSecuritiesChanged(timeSlice.SecurityChanges); } //Update the securities properties: first before calling user code to avoid issues with data foreach (var update in timeSlice.SecuritiesUpdateData) { var security = update.Target; security.Update(update.Data, update.DataType, update.ContainsFillForwardData); // Send market price updates to the TradeBuilder algorithm.TradeBuilder.SetMarketPrice(security.Symbol, security.Price); } // TODO: potentially push into a scheduled event if (time >= nextSecurityModelScan) { foreach (var security in algorithm.Securities.Values) { security.MarginInterestRateModel.ApplyMarginInterestRate(new MarginInterestRateParameters(security, time)); // perform check for settlement of unsettled funds security.SettlementModel.Scan(new ScanSettlementModelParameters(algorithm.Portfolio, security, time)); } nextSecurityModelScan = time.RoundDown(Time.OneHour) + Time.OneHour; } //Update the securities properties with any universe data if (timeSlice.UniverseData.Count > 0) { foreach (var dataCollection in timeSlice.UniverseData.Values) { if (!dataCollection.ShouldCacheToSecurity()) continue; foreach (var data in dataCollection.Data) { if (algorithm.Securities.TryGetValue(data.Symbol, out var security)) { security.Cache.StoreData(new[] { data }, data.GetType()); } } } } // poke each cash object to update from the recent security data foreach (var cash in algorithm.Portfolio.CashBook.Values.Where(x => x.CurrencyConversion != null)) { cash.Update(); } // security prices got updated algorithm.Portfolio.InvalidateTotalPortfolioValue(); if (timeSlice.Slice.SymbolChangedEvents.Count != 0) { try { algorithm.OnSymbolChangedEvents(timeSlice.Slice.SymbolChangedEvents); } catch (Exception err) { algorithm.SetRuntimeError(err, "OnSymbolChangedEvents"); return; } foreach (var symbol in timeSlice.Slice.SymbolChangedEvents.Keys) { // cancel all orders for the old symbol foreach (var ticket in transactions.GetOpenOrderTickets(x => x.Symbol == symbol)) { ticket.Cancel("Open order cancelled on symbol changed event"); } } } // process fill models on the updated data before entering algorithm, applies to all non-market orders transactions.ProcessSynchronousEvents(); // fire real time events after we've updated based on the new data realtime.SetTime(timeSlice.Time); // process split warnings for options ProcessSplitSymbols(algorithm, splitWarnings, pendingDelistings); //Check if the user's signalled Quit: loop over data until day changes. if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null) { Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}"); break; } if (algorithm.RunTimeError != null) { Log.Error($"AlgorithmManager.Run(): Stopping, encountered a runtime error at {algorithm.UtcTime} UTC."); return; } // perform margin calls, in live mode we can also use realtime to emit these if (time >= nextMarginCallTime || (_liveMode && nextMarginCallTime > DateTime.UtcNow)) { // determine if there are possible margin call orders to be executed bool issueMarginCallWarning; var marginCallOrders = algorithm.Portfolio.MarginCallModel.GetMarginCallOrders(out issueMarginCallWarning); var executedTicketsCount = 0; if (marginCallOrders.Count != 0) { var executingMarginCall = false; try { if (marginCallOrders.All(order => algorithm.Portfolio.Securities[order.Symbol].Exchange.ExchangeOpen)) { // tell the algorithm we're about to issue the margin call algorithm.OnMarginCall(marginCallOrders); // execute the margin call orders var executedTickets = algorithm.Portfolio.MarginCallModel.ExecuteMarginCall(marginCallOrders); executedTicketsCount = executedTickets.Count; foreach (var ticket in executedTickets) { algorithm.Error($"{algorithm.Time.ToStringInvariant()} - Executed MarginCallOrder: {ticket.Symbol} - " + $"Quantity: {ticket.Quantity.ToStringInvariant()} @ {ticket.AverageFillPrice.ToStringInvariant()}" ); } } } catch (Exception err) { algorithm.SetRuntimeError(err, executingMarginCall ? "Portfolio.MarginCallModel.ExecuteMarginCall" : "OnMarginCall"); return; } } // we didn't perform a margin call, but got the warning flag back, so issue the warning to the algorithm if (executedTicketsCount == 0 && issueMarginCallWarning) { try { algorithm.OnMarginCallWarning(); } catch (Exception err) { algorithm.SetRuntimeError(err, "OnMarginCallWarning"); return; } } nextMarginCallTime = time + marginCallFrequency; } // before we call any events, let the algorithm know about universe changes if (timeSlice.SecurityChanges != SecurityChanges.None) { try { var algorithmSecurityChanges = new SecurityChanges(timeSlice.SecurityChanges) { // by default for user code we want to filter out custom securities FilterCustomSecurities = true, // by default for user code we want to filter out internal securities FilterInternalSecurities = true }; algorithm.OnSecuritiesChanged(algorithmSecurityChanges); algorithm.OnFrameworkSecuritiesChanged(algorithmSecurityChanges); } catch (Exception err) { algorithm.SetRuntimeError(err, "OnSecuritiesChanged"); return; } } // apply dividends HandleDividends(timeSlice, algorithm, _liveMode); // apply splits HandleSplits(timeSlice, algorithm, _liveMode); //Update registered consolidators for this symbol index try { if (timeSlice.ConsolidatorUpdateData.Count > 0) { var timeKeeper = algorithm.TimeKeeper; foreach (var update in timeSlice.ConsolidatorUpdateData) { var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime; var consolidators = update.Target.Consolidators; foreach (var consolidator in consolidators) { foreach (var dataPoint in update.Data) { consolidator.Update(dataPoint); } // scan for time after we've pumped all the data through for this consolidator consolidator.Scan(localTime); } } } } catch (Exception err) { algorithm.SetRuntimeError(err, "Consolidators update"); return; } // fire custom event handlers foreach (var update in timeSlice.CustomData) { MethodInvoker methodInvoker; if (!methodInvokers.TryGetValue(update.DataType, out methodInvoker)) { continue; } try { foreach (var dataPoint in update.Data) { if (update.DataType.IsInstanceOfType(dataPoint)) { methodInvoker(algorithm, dataPoint); } } } catch (Exception err) { algorithm.SetRuntimeError(err, "Custom Data"); return; } } try { if (timeSlice.Slice.Splits.Count != 0) { algorithm.OnSplits(timeSlice.Slice.Splits); } } catch (Exception err) { algorithm.SetRuntimeError(err, "OnSplits"); return; } try { if (timeSlice.Slice.Dividends.Count != 0) { algorithm.OnDividends(timeSlice.Slice.Dividends); } } catch (Exception err) { algorithm.SetRuntimeError(err, "OnDividends"); return; } try { if (timeSlice.Slice.Delistings.Count != 0) { algorithm.OnDelistings(timeSlice.Slice.Delistings); } } catch (Exception err) { algorithm.SetRuntimeError(err, "OnDelistings"); return; } // Only track pending delistings in non-live mode. if (!algorithm.LiveMode) { // Keep this up to date even though we don't process delistings here anymore foreach (var delisting in timeSlice.Slice.Delistings.Values) { if (delisting.Type == DelistingType.Warning) { // Store our delistings warnings because they are still used by ProcessSplitSymbols above pendingDelistings.Add(delisting); } else { // If we have an actual delisting event, remove it from pending delistings var index = pendingDelistings.FindIndex(x => x.Symbol == delisting.Symbol); if (index != -1) { pendingDelistings.RemoveAt(index); } } } } // run split logic after firing split events HandleSplitSymbols(timeSlice.Slice.Splits, splitWarnings); try { if (timeSlice.Slice.HasData) { // EVENT HANDLER v3.0 -- all data in a single event algorithm.OnData(algorithm.CurrentSlice); } // always turn the crank on this method to ensure universe selection models function properly on day changes w/out data algorithm.OnFrameworkData(timeSlice.Slice); } catch (Exception err) { algorithm.SetRuntimeError(err, "OnData"); return; } //If its the historical/paper trading models, wait until market orders have been "filled" // Manually trigger the event handler to prevent thread switch. transactions.ProcessSynchronousEvents(); // Process any required events of the results handler such as sampling assets, equity, or stock prices. results.ProcessSynchronousEvents(); // poke the algorithm at the end of each time step algorithm.OnEndOfTimeStep(); } // End of ForEach feed.Bridge.GetConsumingEnumerable // stop timing the loops TimeLimit.StopEnforcingTimeLimit(); //Stream over:: Send the final packet and fire final events: Log.Trace("AlgorithmManager.Run(): Firing On End Of Algorithm..."); try { algorithm.OnEndOfAlgorithm(); } catch (Exception err) { algorithm.SetRuntimeError(err, "OnEndOfAlgorithm"); return; } // Process any required events of the results handler such as sampling assets, equity, or stock prices. results.ProcessSynchronousEvents(forceProcess: true); //Liquidate Holdings for Calculations: if (_algorithm.Status == AlgorithmStatus.Liquidated && _liveMode) { Log.Trace("AlgorithmManager.Run(): Liquidating algorithm holdings..."); algorithm.Liquidate(); results.LogMessage("Algorithm Liquidated"); results.SendStatusUpdate(AlgorithmStatus.Liquidated); } //Manually stopped the algorithm if (_algorithm.Status == AlgorithmStatus.Stopped) { Log.Trace("AlgorithmManager.Run(): Stopping algorithm..."); results.LogMessage("Algorithm Stopped"); results.SendStatusUpdate(AlgorithmStatus.Stopped); } //Backtest deleted. if (_algorithm.Status == AlgorithmStatus.Deleted) { Log.Trace("AlgorithmManager.Run(): Deleting algorithm..."); results.DebugMessage("Algorithm Id:(" + job.AlgorithmId + ") Deleted by request."); results.SendStatusUpdate(AlgorithmStatus.Deleted); } //Algorithm finished, send regardless of commands: results.SendStatusUpdate(AlgorithmStatus.Completed); SetStatus(AlgorithmStatus.Completed); //Take final samples: results.Sample(time); } // End of Run(); /// /// Set the quit state. /// public void SetStatus(AlgorithmStatus state) { lock (_lock) { //We don't want anyone else to set our internal state to "Running". //This is controlled by the algorithm private variable only. //Algorithm could be null after it's initialized and they call Run on us if (state != AlgorithmStatus.Running && _algorithm != null) { _algorithm.SetStatus(state); } if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested && !_cancelRequested) { if (state == AlgorithmStatus.Deleted) { _cancelRequested = true; // if the algorithm was deleted, let's give the algorithm a few seconds to shutdown and cancel it out _cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(5)); } else if (state == AlgorithmStatus.Stopped) { _cancelRequested = true; // if the algorithm was stopped, let's give the algorithm a few seconds to shutdown and cancel it out _cancellationTokenSource.CancelAfter(TimeSpan.FromMinutes(1)); } } } } private IEnumerable Stream(IAlgorithm algorithm, ISynchronizer synchronizer, IResultHandler results, CancellationToken cancellationToken) { var nextWarmupStatusTime = DateTime.MinValue; var warmingUp = algorithm.IsWarmingUp; var warmingUpPercent = 0; var logSubscriptionCountFlag = false; if (warmingUp) { nextWarmupStatusTime = DateTime.UtcNow.AddSeconds(1); algorithm.Debug("Algorithm starting warm up..."); results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}"); } else { results.SendStatusUpdate(AlgorithmStatus.Running); // let's be polite, and call warmup finished even though there was no warmup period and avoid algorithms having to handle it instead. // we trigger this callback here and not internally in the algorithm so that we can go through python if required algorithm.OnWarmupFinished(); } // bellow we compare with slice.Time which is in UTC var startTimeTicks = algorithm.UtcTime.Ticks; var warmupEndTicks = algorithm.StartDate.ConvertToUtc(algorithm.TimeZone).Ticks; // fulfilling history requirements of volatility models in live mode if (algorithm.LiveMode) { warmupEndTicks = DateTime.UtcNow.Ticks; ProcessVolatilityHistoryRequirements(algorithm, _liveMode); } foreach (var timeSlice in synchronizer.StreamData(cancellationToken)) { if (algorithm.IsWarmingUp) { var now = DateTime.UtcNow; if (now > nextWarmupStatusTime) { // send some status to the user letting them know we're done history, but still warming up, // catching up to real time data nextWarmupStatusTime = now.AddSeconds(2); var newPercent = (int)(100 * (timeSlice.Time.Ticks - startTimeTicks) / (double)(warmupEndTicks - startTimeTicks)); // if there isn't any progress don't send the same update many times if (newPercent != warmingUpPercent) { warmingUpPercent = newPercent; algorithm.Debug($"Processing algorithm warm-up request {warmingUpPercent}%..."); results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}"); } } if (!logSubscriptionCountFlag) { Log.Trace($"AlgorithmManager.Stream(): Subscriptions count before warm up: {algorithm.SubscriptionManager.Count}"); logSubscriptionCountFlag = true; } } else if (warmingUp) { // warmup finished, send an update warmingUp = false; // we trigger this callback here and not internally in the algorithm so that we can go through python if required algorithm.OnWarmupFinished(); algorithm.Debug("Algorithm finished warming up."); Log.Trace($"AlgorithmManager.Stream(): Subscriptions count after warm up: {algorithm.SubscriptionManager.Count}"); results.SendStatusUpdate(AlgorithmStatus.Running, "100"); } yield return timeSlice; } } /// /// Helper method used to process securities volatility history requirements /// /// Implemented as static to facilitate testing /// The algorithm instance /// Whether the algorithm is in live mode public static void ProcessVolatilityHistoryRequirements(IAlgorithm algorithm, bool liveMode) { Log.Trace("ProcessVolatilityHistoryRequirements(): Updating volatility models with historical data..."); foreach (var security in algorithm.Securities.Values) { security.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime, algorithm.TimeZone, liveMode); } Log.Trace("ProcessVolatilityHistoryRequirements(): finished."); } /// /// Helper method to apply a split to an algorithm instance /// public static void HandleSplits(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode) { foreach (var split in timeSlice.Slice.Splits.Values) { try { // only process split occurred events (ignore warnings) if (split.Type != SplitType.SplitOccurred) { continue; } if (liveMode && algorithm.IsWarmingUp) { // skip past split during live warmup, the algorithms position already reflects them Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Split during live warmup: {split}"); continue; } if (Log.DebuggingEnabled) { Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Split for {split.Symbol}"); } Security security = null; if (algorithm.Securities.TryGetValue(split.Symbol, out security) && liveMode) { Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}"); } var mode = algorithm.SubscriptionManager.SubscriptionDataConfigService .GetSubscriptionDataConfigs(split.Symbol) .DataNormalizationMode(); // apply the split event to the portfolio algorithm.Portfolio.ApplySplit(split, security, liveMode, mode); // apply the split event to the trade builder algorithm.TradeBuilder.ApplySplit(split, liveMode, mode); // apply the split event to the security volatility model ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode); if (liveMode && security != null) { Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}"); } // apply the split to open orders as well in raw mode, all other modes are split adjusted if (liveMode || mode == DataNormalizationMode.Raw) { // in live mode we always want to have our order match the order at the brokerage, so apply the split to the orders var openOrders = algorithm.Transactions.GetOpenOrderTickets(ticket => ticket.Symbol == split.Symbol); algorithm.BrokerageModel.ApplySplit(openOrders.ToList(), split); } } catch (Exception err) { algorithm.SetRuntimeError(err, "Split event"); return; } } } /// /// Helper method to apply a dividend to an algorithm instance /// public static void HandleDividends(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode) { foreach (var dividend in timeSlice.Slice.Dividends.Values) { if (liveMode && algorithm.IsWarmingUp) { // skip past dividends during live warmup, the algorithms position already reflects them Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Dividend during live warmup: {dividend}"); continue; } if (Log.DebuggingEnabled) { Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Dividend: {dividend}"); } Security security = null; if (algorithm.Securities.TryGetValue(dividend.Symbol, out security) && liveMode) { Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Dividend: {dividend}. " + $"Security Holdings: {security.Holdings.Quantity} Account Currency Holdings: " + $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}"); } var mode = algorithm.SubscriptionManager.SubscriptionDataConfigService .GetSubscriptionDataConfigs(dividend.Symbol) .DataNormalizationMode(); // apply the dividend event to the portfolio algorithm.Portfolio.ApplyDividend(dividend, liveMode, mode); // apply the dividend event to the security volatility model ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode); if (liveMode && security != null) { Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Dividend: {dividend}. Security " + $"Holdings: {security.Holdings.Quantity} Account Currency Holdings: " + $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}"); } } } /// /// Keeps track of split warnings so we can later liquidate option contracts /// private void HandleSplitSymbols(Splits newSplits, List splitWarnings) { foreach (var split in newSplits.Values) { if (split.Type != SplitType.Warning) { if (Log.DebuggingEnabled) { Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split occurred: Split Factor: {split} Reference Price: {split.ReferencePrice}"); } continue; } if (Log.DebuggingEnabled) { Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split warning: {split}"); } if (!splitWarnings.Any(x => x.Symbol == split.Symbol && x.Type == SplitType.Warning)) { splitWarnings.Add(split); } } } /// /// Liquidate option contact holdings who's underlying security has split /// private void ProcessSplitSymbols(IAlgorithm algorithm, List splitWarnings, List pendingDelistings) { // NOTE: This method assumes option contracts have the same core trading hours as their underlying contract // This is a small performance optimization to prevent scanning every contract on every time step, // instead we scan just the underlyings, thereby reducing the time footprint of this methods by a factor // of N, the number of derivative subscriptions for (int i = splitWarnings.Count - 1; i >= 0; i--) { var split = splitWarnings[i]; var security = algorithm.Securities[split.Symbol]; if (!security.IsTradable && !algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)) { Log.Debug($"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - Removing split warning for {security.Symbol}"); // remove the warning from out list splitWarnings.RemoveAt(i); // Since we are storing the split warnings for a loop // we need to check if the security was removed. // When removed, it will be marked as non tradable but just in case // we expect it not to be an active security either continue; } var nextMarketClose = security.Exchange.Hours.GetNextMarketClose(security.LocalTime, false); // determine the latest possible time we can submit a MOC order var configs = algorithm.SubscriptionManager.SubscriptionDataConfigService .GetSubscriptionDataConfigs(security.Symbol); if (configs.Count == 0) { // should never happen at this point, if it does let's give some extra info throw new Exception( $"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - No subscriptions found for {security.Symbol}" + $", IsTradable: {security.IsTradable}" + $", Active: {algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)}"); } var latestMarketOnCloseTimeRoundedDownByResolution = nextMarketClose.Subtract(MarketOnCloseOrder.SubmissionTimeBuffer) .RoundDownInTimeZone(configs.GetHighestResolution().ToTimeSpan(), security.Exchange.TimeZone, configs.First().DataTimeZone); // we don't need to do anyhing until the market closes if (security.LocalTime < latestMarketOnCloseTimeRoundedDownByResolution) continue; // fetch all option derivatives of the underlying with holdings (excluding the canonical security) var derivatives = algorithm.Securities.Values.Where(potentialDerivate => potentialDerivate.Symbol.SecurityType.IsOption() && potentialDerivate.Symbol.Underlying == security.Symbol && !potentialDerivate.Symbol.Underlying.IsCanonical() && potentialDerivate.HoldStock ); foreach (var derivative in derivatives) { var optionContractSymbol = derivative.Symbol; var optionContractSecurity = (Option)derivative; if (pendingDelistings.Any(x => x.Symbol == optionContractSymbol && x.Time.Date == optionContractSecurity.LocalTime.Date)) { // if the option is going to be delisted today we skip sending the market on close order continue; } // close any open orders algorithm.Transactions.CancelOpenOrders(optionContractSymbol, "Canceled due to impending split. Separate MarketOnClose order submitted to liquidate position."); var request = new SubmitOrderRequest(OrderType.MarketOnClose, optionContractSecurity.Type, optionContractSymbol, -optionContractSecurity.Holdings.Quantity, 0, 0, algorithm.UtcTime, "Liquidated due to impending split. Option splits are not currently supported." ); // send MOC order to liquidate option contract holdings algorithm.Transactions.AddOrder(request); // mark option contract as not tradable optionContractSecurity.IsTradable = false; algorithm.Debug($"MarketOnClose order submitted for option contract '{optionContractSymbol}' due to impending {split.Symbol.Value} split event. " + "Option splits are not currently supported."); } // remove the warning from out list splitWarnings.RemoveAt(i); } } /// /// Warms up the security's volatility model in the case of a split or dividend to avoid discontinuities when data is raw or in live mode /// private static void ApplySplitOrDividendToVolatilityModel(IAlgorithm algorithm, Security security, bool liveMode, DataNormalizationMode dataNormalizationMode) { if (security.Type == SecurityType.Equity && (liveMode || dataNormalizationMode == DataNormalizationMode.Raw)) { security?.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime, algorithm.TimeZone, liveMode, dataNormalizationMode); } } /// /// Constructs the correct instance per the provided controls. /// The provided controls will be null when /// private static ITokenBucket CreateTokenBucket(LeakyBucketControlParameters controls) { if (controls == null) { // this will only be null when the AlgorithmManager is being initialized outside of LEAN // for example, in unit tests that don't provide a job package as well as from Research // in each of the above cases, it seems best to not enforce the leaky bucket restrictions return TokenBucket.Null; } Log.Trace("AlgorithmManager.CreateTokenBucket(): Initializing LeakyBucket: " + $"Capacity: {controls.Capacity} " + $"RefillAmount: {controls.RefillAmount} " + $"TimeInterval: {controls.TimeIntervalMinutes}" ); // these parameters view 'minutes' as the resource being rate limited. the capacity is the total // number of minutes available for burst operations and after controls.TimeIntervalMinutes time // has passed, we'll add controls.RefillAmount to the 'minutes' available, maxing at controls.Capacity return new LeakyBucket( controls.Capacity, controls.RefillAmount, TimeSpan.FromMinutes(controls.TimeIntervalMinutes) ); } } }