/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reflection;
using Fasterflect;
using QuantConnect.AlgorithmFactory;
using QuantConnect.Brokerages;
using QuantConnect.Configuration;
using QuantConnect.Data;
using QuantConnect.Data.Market;
using QuantConnect.Interfaces;
using QuantConnect.Lean.Engine.DataFeeds;
using QuantConnect.Lean.Engine.Results;
using QuantConnect.Lean.Engine.TransactionHandlers;
using QuantConnect.Logging;
using QuantConnect.Packets;
using QuantConnect.Securities;
using QuantConnect.Util;
namespace QuantConnect.Lean.Engine.Setup
{
///
/// Defines a set up handler that initializes the algorithm instance using values retrieved from the user's brokerage account
///
public class BrokerageSetupHandler : ISetupHandler
{
///
/// Max allocation limit configuration variable name
///
public static string MaxAllocationLimitConfig = "max-allocation-limit";
///
/// The worker thread instance the setup handler should use
///
public WorkerThread WorkerThread { get; set; }
///
/// Any errors from the initialization stored here:
///
public List Errors { get; set; }
///
/// Get the maximum runtime for this algorithm job.
///
public TimeSpan MaximumRuntime { get; }
///
/// Algorithm starting capital for statistics calculations
///
public decimal StartingPortfolioValue { get; private set; }
///
/// Start date for analysis loops to search for data.
///
public DateTime StartingDate { get; private set; }
///
/// Maximum number of orders for the algorithm run -- applicable for backtests only.
///
public int MaxOrders { get; }
// saves ref to algo so we can call quit if runtime error encountered
private IBrokerageFactory _factory;
private IBrokerage _dataQueueHandlerBrokerage;
///
/// Initializes a new BrokerageSetupHandler
///
public BrokerageSetupHandler()
{
Errors = new List();
MaximumRuntime = TimeSpan.FromDays(10*365);
MaxOrders = int.MaxValue;
}
///
/// Create a new instance of an algorithm from a physical dll path.
///
/// The path to the assembly's location
/// Details of the task required
/// A new instance of IAlgorithm, or throws an exception if there was an error
public IAlgorithm CreateAlgorithmInstance(AlgorithmNodePacket algorithmNodePacket, string assemblyPath)
{
string error;
IAlgorithm algorithm;
// limit load times to 10 seconds and force the assembly to have exactly one derived type
var loader = new Loader(false, algorithmNodePacket.Language, BaseSetupHandler.AlgorithmCreationTimeout, names => names.SingleOrAlgorithmTypeName(Config.Get("algorithm-type-name", algorithmNodePacket.AlgorithmId)), WorkerThread);
var complete = loader.TryCreateAlgorithmInstanceWithIsolator(assemblyPath, algorithmNodePacket.RamAllocation, out algorithm, out error);
if (!complete) throw new AlgorithmSetupException($"During the algorithm initialization, the following exception has occurred: {error}");
return algorithm;
}
///
/// Creates the brokerage as specified by the job packet
///
/// Job packet
/// The algorithm instance before Initialize has been called
/// The brokerage factory
/// The brokerage instance, or throws if error creating instance
public IBrokerage CreateBrokerage(AlgorithmNodePacket algorithmNodePacket, IAlgorithm uninitializedAlgorithm, out IBrokerageFactory factory)
{
var liveJob = algorithmNodePacket as LiveNodePacket;
if (liveJob == null)
{
throw new ArgumentException("BrokerageSetupHandler.CreateBrokerage requires a live node packet");
}
Log.Trace($"BrokerageSetupHandler.CreateBrokerage(): creating brokerage '{liveJob.Brokerage}'");
// find the correct brokerage factory based on the specified brokerage in the live job packet
_factory = Composer.Instance.Single(brokerageFactory => brokerageFactory.BrokerageType.MatchesTypeName(liveJob.Brokerage));
factory = _factory;
PreloadDataQueueHandler(liveJob, uninitializedAlgorithm, factory);
// initialize the correct brokerage using the resolved factory
var brokerage = _factory.CreateBrokerage(liveJob, uninitializedAlgorithm);
return brokerage;
}
///
/// Primary entry point to setup a new algorithm
///
/// The parameters object to use
/// True on successfully setting up the algorithm state, or false on error.
public bool Setup(SetupHandlerParameters parameters)
{
var algorithm = parameters.Algorithm;
var brokerage = parameters.Brokerage;
// verify we were given the correct job packet type
var liveJob = parameters.AlgorithmNodePacket as LiveNodePacket;
if (liveJob == null)
{
AddInitializationError("BrokerageSetupHandler requires a LiveNodePacket");
return false;
}
algorithm.Name = liveJob.GetAlgorithmName();
// verify the brokerage was specified
if (string.IsNullOrWhiteSpace(liveJob.Brokerage))
{
AddInitializationError("A brokerage must be specified");
return false;
}
BaseSetupHandler.Setup(parameters);
// attach to the message event to relay brokerage specific initialization messages
EventHandler brokerageOnMessage = (sender, args) =>
{
if (args.Type == BrokerageMessageType.Error)
{
AddInitializationError($"Brokerage Error Code: {args.Code} - {args.Message}");
}
};
try
{
// let the world know what we're doing since logging in can take a minute
parameters.ResultHandler.SendStatusUpdate(AlgorithmStatus.LoggingIn, "Logging into brokerage...");
brokerage.Message += brokerageOnMessage;
Log.Trace("BrokerageSetupHandler.Setup(): Connecting to brokerage...");
try
{
// this can fail for various reasons, such as already being logged in somewhere else
brokerage.Connect();
}
catch (Exception err)
{
Log.Error(err);
AddInitializationError(
$"Error connecting to brokerage: {err.Message}. " +
"This may be caused by incorrect login credentials or an unsupported account type.", err);
return false;
}
if (!brokerage.IsConnected)
{
// if we're reporting that we're not connected, bail
AddInitializationError("Unable to connect to brokerage.");
return false;
}
var message = $"{brokerage.Name} account base currency: {brokerage.AccountBaseCurrency ?? algorithm.AccountCurrency}";
var accountCurrency = brokerage.AccountBaseCurrency;
if (liveJob.BrokerageData.ContainsKey(MaxAllocationLimitConfig))
{
accountCurrency = Currencies.USD;
message += ". Allocation limited, will use 'USD' account currency";
}
Log.Trace($"BrokerageSetupHandler.Setup(): {message}");
algorithm.Debug(message);
if (accountCurrency != null && accountCurrency != algorithm.AccountCurrency)
{
algorithm.SetAccountCurrency(accountCurrency);
}
Log.Trace("BrokerageSetupHandler.Setup(): Initializing algorithm...");
parameters.ResultHandler.SendStatusUpdate(AlgorithmStatus.Initializing, "Initializing algorithm...");
//Execute the initialize code:
var controls = liveJob.Controls;
var isolator = new Isolator();
var initializeComplete = isolator.ExecuteWithTimeLimit(TimeSpan.FromSeconds(300), () =>
{
try
{
//Set the default brokerage model before initialize
algorithm.SetBrokerageModel(_factory.GetBrokerageModel(algorithm.Transactions));
//Margin calls are disabled by default in live mode
algorithm.Portfolio.MarginCallModel = MarginCallModel.Null;
//Set our parameters
algorithm.SetParameters(liveJob.Parameters);
algorithm.SetAvailableDataTypes(BaseSetupHandler.GetConfiguredDataFeeds());
//Algorithm is live, not backtesting:
algorithm.SetAlgorithmMode(liveJob.AlgorithmMode);
//Initialize the algorithm's starting date
algorithm.SetDateTime(DateTime.UtcNow);
//Set the source impl for the event scheduling
algorithm.Schedule.SetEventSchedule(parameters.RealTimeHandler);
var optionChainProvider = Composer.Instance.GetPart();
if (optionChainProvider == null)
{
var baseOptionChainProvider = new LiveOptionChainProvider();
baseOptionChainProvider.Initialize(new(parameters.MapFileProvider, algorithm.HistoryProvider));
optionChainProvider = new CachingOptionChainProvider(baseOptionChainProvider);
Composer.Instance.AddPart(optionChainProvider);
}
// set the option chain provider
algorithm.SetOptionChainProvider(optionChainProvider);
var futureChainProvider = Composer.Instance.GetPart();
if (futureChainProvider == null)
{
var baseFutureChainProvider = new LiveFutureChainProvider();
baseFutureChainProvider.Initialize(new(parameters.MapFileProvider, algorithm.HistoryProvider));
futureChainProvider = new CachingFutureChainProvider(baseFutureChainProvider);
Composer.Instance.AddPart(futureChainProvider);
}
// set the future chain provider
algorithm.SetFutureChainProvider(futureChainProvider);
//Initialise the algorithm, get the required data:
algorithm.Initialize();
if (liveJob.Brokerage != "PaperBrokerage")
{
//Zero the CashBook - we'll populate directly from brokerage
foreach (var kvp in algorithm.Portfolio.CashBook)
{
kvp.Value.SetAmount(0);
}
}
}
catch (Exception err)
{
AddInitializationError(err.ToString(), err);
}
}, controls.RamAllocation,
sleepIntervalMillis: 100); // entire system is waiting on this, so be as fast as possible
if (Errors.Count != 0)
{
// if we already got an error just exit right away
return false;
}
if (!initializeComplete)
{
AddInitializationError("Initialization timed out.");
return false;
}
if (!LoadCashBalance(brokerage, algorithm))
{
return false;
}
if (!LoadExistingHoldingsAndOrders(brokerage, algorithm, parameters))
{
return false;
}
// after algorithm was initialized, should set trading days per year for our great portfolio statistics
BaseSetupHandler.SetBrokerageTradingDayPerYear(algorithm);
var dataAggregator = Composer.Instance.GetPart();
dataAggregator?.Initialize(new () { AlgorithmSettings = algorithm.Settings });
//Finalize Initialization
algorithm.PostInitialize();
BaseSetupHandler.SetupCurrencyConversions(algorithm, parameters.UniverseSelection);
if (algorithm.Portfolio.TotalPortfolioValue == 0)
{
algorithm.Debug("Warning: No cash balances or holdings were found in the brokerage account.");
}
string maxCashLimitStr;
if (liveJob.BrokerageData.TryGetValue(MaxAllocationLimitConfig, out maxCashLimitStr))
{
var maxCashLimit = decimal.Parse(maxCashLimitStr, NumberStyles.Any, CultureInfo.InvariantCulture);
// If allocation exceeded by more than $10,000; block deployment
if (algorithm.Portfolio.TotalPortfolioValue > (maxCashLimit + 10000m))
{
var exceptionMessage = $"TotalPortfolioValue '{algorithm.Portfolio.TotalPortfolioValue}' exceeds allocation limit '{maxCashLimit}'";
algorithm.Debug(exceptionMessage);
throw new ArgumentException(exceptionMessage);
}
}
//Set the starting portfolio value for the strategy to calculate performance:
StartingPortfolioValue = algorithm.Portfolio.TotalPortfolioValue;
StartingDate = DateTime.Now;
}
catch (Exception err)
{
AddInitializationError(err.ToString(), err);
}
finally
{
if (brokerage != null)
{
brokerage.Message -= brokerageOnMessage;
}
}
return Errors.Count == 0;
}
private bool LoadCashBalance(IBrokerage brokerage, IAlgorithm algorithm)
{
Log.Trace("BrokerageSetupHandler.Setup(): Fetching cash balance from brokerage...");
try
{
// set the algorithm's cash balance for each currency
var cashBalance = brokerage.GetCashBalance();
foreach (var cash in cashBalance)
{
Log.Trace($"BrokerageSetupHandler.Setup(): Setting {cash.Currency} cash to {cash.Amount}");
algorithm.Portfolio.SetCash(cash.Currency, cash.Amount, 0);
}
}
catch (Exception err)
{
Log.Error(err);
AddInitializationError("Error getting cash balance from brokerage: " + err.Message, err);
return false;
}
return true;
}
///
/// Loads existing holdings and orders
///
protected bool LoadExistingHoldingsAndOrders(IBrokerage brokerage, IAlgorithm algorithm, SetupHandlerParameters parameters)
{
Log.Trace("BrokerageSetupHandler.Setup(): Fetching open orders from brokerage...");
try
{
GetOpenOrders(algorithm, parameters.ResultHandler, parameters.TransactionHandler, brokerage);
}
catch (Exception err)
{
Log.Error(err);
AddInitializationError("Error getting open orders from brokerage: " + err.Message, err);
return false;
}
Log.Trace("BrokerageSetupHandler.Setup(): Fetching holdings from brokerage...");
try
{
var utcNow = DateTime.UtcNow;
// populate the algorithm with the account's current holdings
var holdings = brokerage.GetAccountHoldings();
// add options first to ensure raw data normalization mode is set on the equity underlyings
foreach (var holding in holdings.OrderByDescending(x => x.Type))
{
Log.Trace("BrokerageSetupHandler.Setup(): Has existing holding: " + holding);
// verify existing holding security type
Security security;
if (!GetOrAddUnrequestedSecurity(algorithm, holding.Symbol, holding.Type, out security))
{
continue;
}
var exchangeTime = utcNow.ConvertFromUtc(security.Exchange.TimeZone);
security.Holdings.SetHoldings(holding.AveragePrice, holding.Quantity);
if (holding.MarketPrice == 0)
{
// try warming current market price
holding.MarketPrice = algorithm.GetLastKnownPrice(security)?.Price ?? 0;
}
if (holding.MarketPrice != 0)
{
security.SetMarketPrice(new TradeBar
{
Time = exchangeTime,
Open = holding.MarketPrice,
High = holding.MarketPrice,
Low = holding.MarketPrice,
Close = holding.MarketPrice,
Volume = 0,
Symbol = holding.Symbol,
DataType = MarketDataType.TradeBar
});
}
}
}
catch (Exception err)
{
Log.Error(err);
AddInitializationError("Error getting account holdings from brokerage: " + err.Message, err);
return false;
}
return true;
}
private bool GetOrAddUnrequestedSecurity(IAlgorithm algorithm, Symbol symbol, SecurityType securityType, out Security security)
{
return algorithm.GetOrAddUnrequestedSecurity(symbol, out security,
onError: (supportedSecurityTypes) => AddInitializationError(
"Found unsupported security type in existing brokerage holdings: " + securityType + ". " +
"QuantConnect currently supports the following security types: " + string.Join(",", supportedSecurityTypes)));
}
///
/// Get the open orders from a brokerage. Adds and to the transaction handler
///
/// Algorithm instance
/// The configured result handler
/// The configurated transaction handler
/// Brokerage output instance
protected void GetOpenOrders(IAlgorithm algorithm, IResultHandler resultHandler, ITransactionHandler transactionHandler, IBrokerage brokerage)
{
// populate the algorithm with the account's outstanding orders
var openOrders = brokerage.GetOpenOrders();
// add options first to ensure raw data normalization mode is set on the equity underlyings
foreach (var order in openOrders.OrderByDescending(x => x.SecurityType))
{
// verify existing holding security type
Security security;
if (!GetOrAddUnrequestedSecurity(algorithm, order.Symbol, order.SecurityType, out security))
{
continue;
}
transactionHandler.AddOpenOrder(order, algorithm);
order.PriceCurrency = security?.SymbolProperties.QuoteCurrency;
Log.Trace($"BrokerageSetupHandler.Setup(): Has open order: {order}");
resultHandler.DebugMessage($"BrokerageSetupHandler.Setup(): Open order detected. Creating order tickets for open order {order.Symbol.Value} with quantity {order.Quantity}. Beware that this order ticket may not accurately reflect the quantity of the order if the open order is partially filled.");
}
}
///
/// Adds initialization error to the Errors list
///
/// The error message to be added
/// The inner exception being wrapped
private void AddInitializationError(string message, Exception inner = null)
{
Errors.Add(new AlgorithmSetupException("During the algorithm initialization, the following exception has occurred: " + message, inner));
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
/// 2
public void Dispose()
{
_factory?.DisposeSafely();
if (_dataQueueHandlerBrokerage != null)
{
if (_dataQueueHandlerBrokerage.IsConnected)
{
_dataQueueHandlerBrokerage.Disconnect();
}
_dataQueueHandlerBrokerage.DisposeSafely();
}
else
{
var dataQueueHandler = Composer.Instance.GetPart();
if (dataQueueHandler != null)
{
Log.Trace($"BrokerageSetupHandler.Setup(): Found data queue handler to dispose: {dataQueueHandler.GetType()}");
dataQueueHandler.DisposeSafely();
}
else
{
Log.Trace("BrokerageSetupHandler.Setup(): did not find any data queue handler to dispose");
}
}
}
private void PreloadDataQueueHandler(LiveNodePacket liveJob, IAlgorithm algorithm, IBrokerageFactory factory)
{
// preload the data queue handler using custom BrokerageFactory attribute
var dataQueueHandlerType = Assembly.GetAssembly(typeof(Brokerage))
.GetTypes()
.FirstOrDefault(x =>
x.FullName != null &&
x.FullName.EndsWith(liveJob.DataQueueHandler) &&
x.HasAttribute(typeof(BrokerageFactoryAttribute)));
if (dataQueueHandlerType != null)
{
var attribute = dataQueueHandlerType.GetCustomAttribute();
// only load the data queue handler if the factory is different from our brokerage factory
if (attribute.Type != factory.GetType())
{
var brokerageFactory = (BrokerageFactory)Activator.CreateInstance(attribute.Type);
// copy the brokerage data (usually credentials)
foreach (var kvp in brokerageFactory.BrokerageData)
{
if (!liveJob.BrokerageData.ContainsKey(kvp.Key))
{
liveJob.BrokerageData.Add(kvp.Key, kvp.Value);
}
}
// create the data queue handler and add it to composer
_dataQueueHandlerBrokerage = brokerageFactory.CreateBrokerage(liveJob, algorithm);
// open connection for subscriptions
_dataQueueHandlerBrokerage.Connect();
}
}
}
}
}