/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using QuantConnect.AlgorithmFactory.Python.Wrappers;
using QuantConnect.Brokerages;
using QuantConnect.Configuration;
using QuantConnect.Data;
using QuantConnect.Data.Auxiliary;
using QuantConnect.Exceptions;
using QuantConnect.Interfaces;
using QuantConnect.Lean.Engine.DataFeeds;
using QuantConnect.Lean.Engine.HistoricalData;
using QuantConnect.Lean.Engine.Setup;
using QuantConnect.Logging;
using QuantConnect.Orders;
using QuantConnect.Packets;
using QuantConnect.Securities;
using QuantConnect.Util;
using static QuantConnect.StringExtensions;
namespace QuantConnect.Lean.Engine
{
///
/// LEAN ALGORITHMIC TRADING ENGINE: ENTRY POINT.
///
/// The engine loads new tasks, create the algorithms and threads, and sends them
/// to Algorithm Manager to be executed. It is the primary operating loop.
///
public class Engine
{
private bool _historyStartDateLimitedWarningEmitted;
private bool _historyNumericalPrecisionLimitedWarningEmitted;
private readonly bool _liveMode;
private readonly Task _marketHoursDatabaseTask;
///
/// Gets the configured system handlers for this engine instance
///
public LeanEngineSystemHandlers SystemHandlers { get; }
///
/// Gets the configured algorithm handlers for this engine instance
///
public LeanEngineAlgorithmHandlers AlgorithmHandlers { get; }
///
/// Initializes a new instance of the class using the specified handlers
///
/// The system handlers for controlling acquisition of jobs, messaging, and api calls
/// The algorithm handlers for managing algorithm initialization, data, results, transaction, and real time events
/// True when running in live mode, false otherwise
public Engine(LeanEngineSystemHandlers systemHandlers, LeanEngineAlgorithmHandlers algorithmHandlers, bool liveMode)
{
_liveMode = liveMode;
SystemHandlers = systemHandlers;
AlgorithmHandlers = algorithmHandlers;
_marketHoursDatabaseTask = Task.Run(StaticInitializations);
}
///
/// Runs a single backtest/live job from the job queue
///
/// The algorithm job to be processed
/// The algorithm manager instance
/// The path to the algorithm's assembly
/// The worker thread instance
public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemblyPath, WorkerThread workerThread)
{
var algorithm = default(IAlgorithm);
var algorithmManager = manager;
try
{
Log.Trace($"Engine.Run(): Resource limits '{job.Controls.CpuAllocation}' CPUs. {job.Controls.RamAllocation} MB RAM.");
TextSubscriptionDataSourceReader.SetCacheSize((int) (job.RamAllocation * 0.4));
//Reset thread holders.
var initializeComplete = false;
//-> Initialize messaging system
SystemHandlers.Notify.SetAuthentication(job);
//-> Set the result handler type for this algorithm job, and launch the associated result thread.
AlgorithmHandlers.Results.Initialize(new (job, SystemHandlers.Notify, SystemHandlers.Api, AlgorithmHandlers.Transactions, AlgorithmHandlers.MapFileProvider));
IBrokerage brokerage = null;
DataManager dataManager = null;
var synchronizer = _liveMode ? new LiveSynchronizer() : new Synchronizer();
try
{
// we get the mhdb before creating the algorithm instance,
// since the algorithm constructor will use it
var marketHoursDatabase = _marketHoursDatabaseTask.Result;
AlgorithmHandlers.Setup.WorkerThread = workerThread;
// Save algorithm to cache, load algorithm instance:
algorithm = AlgorithmHandlers.Setup.CreateAlgorithmInstance(job, assemblyPath);
algorithm.ProjectId = job.ProjectId;
// Set algorithm in ILeanManager
SystemHandlers.LeanManager.SetAlgorithm(algorithm);
// initialize the object store
AlgorithmHandlers.ObjectStore.Initialize(job.UserId, job.ProjectId, job.UserToken, job.Controls);
// initialize the data permission manager
AlgorithmHandlers.DataPermissionsManager.Initialize(job);
// notify the user of any errors w/ object store persistence
AlgorithmHandlers.ObjectStore.ErrorRaised += (sender, args) => algorithm.Debug($"ObjectStore Persistence Error: {args.Error.Message}");
// set the order processor on the transaction manager,needs to be done before initializing the brokerage which might start using it
algorithm.Transactions.SetOrderProcessor(AlgorithmHandlers.Transactions);
// Initialize the brokerage
IBrokerageFactory factory;
brokerage = AlgorithmHandlers.Setup.CreateBrokerage(job, algorithm, out factory);
// forward brokerage message events to the result handler
brokerage.Message += (_, e) => AlgorithmHandlers.Results.BrokerageMessage(e);
var symbolPropertiesDatabase = SymbolPropertiesDatabase.FromDataFolder();
var mapFilePrimaryExchangeProvider = new MapFilePrimaryExchangeProvider(AlgorithmHandlers.MapFileProvider);
var registeredTypesProvider = new RegisteredSecurityDataTypesProvider();
var securityService = new SecurityService(algorithm.Portfolio.CashBook,
marketHoursDatabase,
symbolPropertiesDatabase,
algorithm,
registeredTypesProvider,
new SecurityCacheProvider(algorithm.Portfolio),
mapFilePrimaryExchangeProvider,
algorithm);
algorithm.Securities.SetSecurityService(securityService);
dataManager = new DataManager(AlgorithmHandlers.DataFeed,
new UniverseSelection(
algorithm,
securityService,
AlgorithmHandlers.DataPermissionsManager,
AlgorithmHandlers.DataProvider),
algorithm,
algorithm.TimeKeeper,
marketHoursDatabase,
_liveMode,
registeredTypesProvider,
AlgorithmHandlers.DataPermissionsManager);
algorithm.SubscriptionManager.SetDataManager(dataManager);
synchronizer.Initialize(algorithm, dataManager);
// Set the algorithm's object store before initializing the data feed, which might use it
algorithm.SetObjectStore(AlgorithmHandlers.ObjectStore);
// Initialize the data feed before we initialize so he can intercept added securities/universes via events
AlgorithmHandlers.DataFeed.Initialize(
algorithm,
job,
AlgorithmHandlers.Results,
AlgorithmHandlers.MapFileProvider,
AlgorithmHandlers.FactorFileProvider,
AlgorithmHandlers.DataProvider,
dataManager,
(IDataFeedTimeProvider) synchronizer,
AlgorithmHandlers.DataPermissionsManager.DataChannelProvider);
// set the history provider before setting up the algorithm
var historyProvider = GetHistoryProvider();
historyProvider.SetBrokerage(brokerage);
historyProvider.Initialize(
new HistoryProviderInitializeParameters(
job,
SystemHandlers.Api,
AlgorithmHandlers.DataProvider,
AlgorithmHandlers.DataCacheProvider,
AlgorithmHandlers.MapFileProvider,
AlgorithmHandlers.FactorFileProvider,
progress =>
{
// send progress updates to the result handler only during initialization
if (!algorithm.GetLocked() || algorithm.IsWarmingUp)
{
AlgorithmHandlers.Results.SendStatusUpdate(AlgorithmStatus.History,
Invariant($"Processing history {progress}%..."));
}
},
// disable parallel history requests for live trading
parallelHistoryRequestsEnabled: !_liveMode,
dataPermissionManager: AlgorithmHandlers.DataPermissionsManager,
objectStore: algorithm.ObjectStore,
algorithmSettings: algorithm.Settings
)
);
historyProvider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
historyProvider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
historyProvider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
Composer.Instance.AddPart(historyProvider);
algorithm.HistoryProvider = historyProvider;
// initialize the default brokerage message handler
algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job, SystemHandlers.Api);
var brokerageDataQueueHandlers = Composer.Instance.GetParts().OfType()
// In backtesting, brokerages can be used as data downloaders (BrokerageDataDownloader)
// and are added to the composer as IBrokerage
.Concat(Composer.Instance.GetParts())
.Where(x => !ReferenceEquals(brokerage, x));
foreach (var x in new[] { brokerage }.Concat(brokerageDataQueueHandlers))
{
x.Message += (sender, message) =>
{
algorithm.BrokerageMessageHandler.HandleMessage(message);
if (algorithm.GetLocked())
{
// fire brokerage message events
algorithm.OnBrokerageMessage(message);
switch (message.Type)
{
case BrokerageMessageType.Disconnect:
algorithm.OnBrokerageDisconnect();
break;
case BrokerageMessageType.Reconnect:
algorithm.OnBrokerageReconnect();
break;
}
}
};
}
//Initialize the internal state of algorithm and job: executes the algorithm.Initialize() method.
initializeComplete = AlgorithmHandlers.Setup.Setup(new SetupHandlerParameters(dataManager.UniverseSelection, algorithm,
brokerage, job, AlgorithmHandlers.Results, AlgorithmHandlers.Transactions, AlgorithmHandlers.RealTime,
AlgorithmHandlers.DataCacheProvider, AlgorithmHandlers.MapFileProvider));
// set this again now that we've actually added securities
AlgorithmHandlers.Results.SetAlgorithm(algorithm, AlgorithmHandlers.Setup.StartingPortfolioValue);
//If there are any reasons it failed, pass these back to the IDE.
if (!initializeComplete || AlgorithmHandlers.Setup.Errors.Count > 0)
{
initializeComplete = false;
//Get all the error messages: internal in algorithm and external in setup handler.
var errorMessage = string.Join(",", algorithm.ErrorMessages);
string stackTrace = "";
errorMessage += string.Join(",", AlgorithmHandlers.Setup.Errors.Select(e =>
{
var message = e.Message;
if (e.InnerException != null)
{
var interpreter = StackExceptionInterpreter.Instance.Value;
var err = interpreter.Interpret(e.InnerException);
var stackMessage = interpreter.GetExceptionMessageHeader(err);
message += stackMessage;
stackTrace += stackMessage;
}
return message;
}));
Log.Error("Engine.Run(): " + errorMessage);
AlgorithmHandlers.Results.RuntimeError(errorMessage, stackTrace);
SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, errorMessage);
}
}
catch (Exception err)
{
Log.Error(err);
// for python we don't add the ugly pythonNet stack trace
var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
var runtimeMessage = "Algorithm.Initialize() Error: " + err.Message + " Stack Trace: " + stackTrace;
AlgorithmHandlers.Results.RuntimeError(runtimeMessage, stackTrace);
SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, runtimeMessage);
}
var historyProviderName = algorithm?.HistoryProvider != null ? algorithm.HistoryProvider.GetType().FullName : string.Empty;
// log the job endpoints
Log.Trace($"JOB HANDLERS:{Environment.NewLine}" +
$" DataFeed: {AlgorithmHandlers.DataFeed.GetType().FullName}{Environment.NewLine}" +
$" Setup: {AlgorithmHandlers.Setup.GetType().FullName}{Environment.NewLine}" +
$" RealTime: {AlgorithmHandlers.RealTime.GetType().FullName}{Environment.NewLine}" +
$" Results: {AlgorithmHandlers.Results.GetType().FullName}{Environment.NewLine}" +
$" Transactions: {AlgorithmHandlers.Transactions.GetType().FullName}{Environment.NewLine}" +
$" Object Store: {AlgorithmHandlers.ObjectStore.GetType().FullName}{Environment.NewLine}" +
$" History Provider: {historyProviderName}{Environment.NewLine}" +
$" Brokerage: {brokerage?.GetType().FullName}{Environment.NewLine}" +
$" Data Provider: {AlgorithmHandlers.DataProvider.GetType().FullName}{Environment.NewLine}");
//-> Using the job + initialization: load the designated handlers:
if (initializeComplete)
{
// notify the LEAN manager that the algorithm is initialized and starting
SystemHandlers.LeanManager.OnAlgorithmStart();
//-> Reset the backtest stopwatch; we're now running the algorithm.
var startTime = DateTime.UtcNow;
//Set algorithm as locked; set it to live mode if we're trading live, and set it to locked for no further updates.
algorithm.SetAlgorithmId(job.AlgorithmId);
algorithm.SetLocked();
//Load the associated handlers for transaction and realtime events:
AlgorithmHandlers.Transactions.Initialize(algorithm, brokerage, AlgorithmHandlers.Results);
try
{
AlgorithmHandlers.RealTime.Setup(algorithm, job, AlgorithmHandlers.Results, SystemHandlers.Api, algorithmManager.TimeLimit);
// Result manager scanning message queue: (started earlier)
AlgorithmHandlers.Results.DebugMessage(
$"Launching analysis for {job.AlgorithmId} with LEAN Engine v{Globals.Version}");
//Create a new engine isolator class
var isolator = new Isolator();
// Execute the Algorithm Code:
var complete = isolator.ExecuteWithTimeLimit(AlgorithmHandlers.Setup.MaximumRuntime, algorithmManager.TimeLimit.IsWithinLimit, () =>
{
try
{
//Run Algorithm Job:
// -> Using this Data Feed,
// -> Send Orders to this TransactionHandler,
// -> Send Results to ResultHandler.
algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationTokenSource);
}
catch (Exception err)
{
algorithm.SetRuntimeError(err, "AlgorithmManager.Run");
return;
}
Log.Trace("Engine.Run(): Exiting Algorithm Manager");
}, job.Controls.RamAllocation, workerThread:workerThread, sleepIntervalMillis: algorithm.LiveMode ? 10000 : 1000);
if (!complete)
{
Log.Error("Engine.Main(): Failed to complete in time: " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F"));
throw new Exception("Failed to complete algorithm within " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F")
+ " seconds. Please make it run faster.");
}
}
catch (Exception err)
{
//Error running the user algorithm: purge datafeed, send error messages, set algorithm status to failed.
algorithm.SetRuntimeError(err, "Engine Isolator");
}
// Algorithm runtime error:
if (algorithm.RunTimeError != null)
{
HandleAlgorithmError(job, algorithm.RunTimeError);
}
// notify the LEAN manager that the algorithm has finished
SystemHandlers.LeanManager.OnAlgorithmEnd();
try
{
var csvTransactionsFileName = Config.Get("transaction-log");
if (!string.IsNullOrEmpty(csvTransactionsFileName))
{
SaveListOfTrades(AlgorithmHandlers.Transactions, csvTransactionsFileName);
}
if (!_liveMode)
{
//Diagnostics Completed, Send Result Packet:
var totalSeconds = (DateTime.UtcNow - startTime).TotalSeconds;
var dataPoints = algorithmManager.DataPoints + algorithm.HistoryProvider.DataPointCount;
var kps = dataPoints / (double) 1000 / totalSeconds;
AlgorithmHandlers.Results.DebugMessage($"Algorithm Id:({job.AlgorithmId}) completed in {totalSeconds:F2} seconds at {kps:F0}k data points per second. Processing total of {dataPoints:N0} data points.");
}
}
catch (Exception err)
{
Log.Error(err, "Error sending analysis results");
}
//Before we return, send terminate commands to close up the threads
AlgorithmHandlers.Transactions.Exit();
AlgorithmHandlers.RealTime.Exit();
dataManager?.RemoveAllSubscriptions();
workerThread?.Dispose();
}
synchronizer.DisposeSafely();
// Close data feed, alphas. Could be running even if algorithm initialization failed
AlgorithmHandlers.DataFeed.Exit();
//Close result handler:
AlgorithmHandlers.Results.Exit();
//Wait for the threads to complete:
var millisecondInterval = 10;
var millisecondTotalWait = 0;
while ((AlgorithmHandlers.Results.IsActive
|| (AlgorithmHandlers.Transactions != null && AlgorithmHandlers.Transactions.IsActive)
|| (AlgorithmHandlers.DataFeed != null && AlgorithmHandlers.DataFeed.IsActive)
|| (AlgorithmHandlers.RealTime != null && AlgorithmHandlers.RealTime.IsActive))
&& millisecondTotalWait < 30*1000)
{
Thread.Sleep(millisecondInterval);
if (millisecondTotalWait % (millisecondInterval * 10) == 0)
{
Log.Trace("Waiting for threads to exit...");
}
millisecondTotalWait += millisecondInterval;
}
if (brokerage != null)
{
Log.Trace("Engine.Run(): Disconnecting from brokerage...");
brokerage.Disconnect();
brokerage.Dispose();
}
if (AlgorithmHandlers.Setup != null)
{
Log.Trace("Engine.Run(): Disposing of setup handler...");
AlgorithmHandlers.Setup.Dispose();
}
Log.Trace("Engine.Main(): Analysis Completed and Results Posted.");
}
catch (Exception err)
{
Log.Error(err, "Error running algorithm");
}
finally
{
//No matter what for live mode; make sure we've set algorithm status in the API for "not running" conditions:
if (_liveMode && algorithmManager.State != AlgorithmStatus.Running && algorithmManager.State != AlgorithmStatus.RuntimeError)
SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, algorithmManager.State);
AlgorithmHandlers.Results.Exit();
AlgorithmHandlers.DataFeed.Exit();
AlgorithmHandlers.Transactions.Exit();
AlgorithmHandlers.RealTime.Exit();
AlgorithmHandlers.DataMonitor.Exit();
(algorithm as AlgorithmPythonWrapper)?.DisposeSafely();
}
}
///
/// Handle an error in the algorithm.Run method.
///
/// Job we're processing
/// Error from algorithm stack
private void HandleAlgorithmError(AlgorithmNodePacket job, Exception err)
{
AlgorithmHandlers.DataFeed?.Exit();
if (AlgorithmHandlers.Results != null)
{
var message = $"Runtime Error: {err.Message}";
Log.Trace("Engine.Run(): Sending runtime error to user...");
AlgorithmHandlers.Results.LogMessage(message);
// for python we don't add the ugly pythonNet stack trace
var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
AlgorithmHandlers.Results.RuntimeError(message, stackTrace);
SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, $"{message} Stack Trace: {stackTrace}");
}
}
///
/// Load the history provider from the Composer
///
private HistoryProviderManager GetHistoryProvider()
{
var provider = new HistoryProviderManager();
provider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
provider.NumericalPrecisionLimited += (sender, args) =>
{
if (!_historyNumericalPrecisionLimitedWarningEmitted)
{
_historyNumericalPrecisionLimitedWarningEmitted = true;
AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if there are numerical precision errors in the factor files.");
}
};
provider.StartDateLimited += (sender, args) =>
{
if (!_historyStartDateLimitedWarningEmitted)
{
_historyStartDateLimitedWarningEmitted = true;
AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if it is before the first known date for the symbol.");
}
};
provider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
provider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
return provider;
}
///
/// Save a list of trades to disk for a given path
///
/// Transactions list via an OrderProvider
/// File path to create
private static void SaveListOfTrades(IOrderProvider transactions, string csvFileName)
{
var orders = transactions.GetOrders(x => x.Status.IsFill());
var path = Path.GetDirectoryName(csvFileName);
if (path != null && !Directory.Exists(path))
Directory.CreateDirectory(path);
using (var writer = new StreamWriter(csvFileName))
{
foreach (var order in orders)
{
var line = Invariant($"{order.Time.ToStringInvariant("yyyy-MM-dd HH:mm:ss")},") +
Invariant($"{order.Symbol.Value},{order.Direction},{order.Quantity},{order.Price}");
writer.WriteLine(line);
}
}
}
///
/// Initialize slow static variables
///
[MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
private static MarketHoursDatabase StaticInitializations()
{
SymbolPropertiesDatabase.FromDataFolder();
// This is slow because it create all static timezones
var nyTime = TimeZones.NewYork;
// slow because if goes to disk and parses json
return MarketHoursDatabase.FromDataFolder();
}
} // End Algorithm Node Core Thread
} // End Namespace