/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using Fasterflect; using Newtonsoft.Json; using QuantConnect.Brokerages; using QuantConnect.Configuration; using QuantConnect.Data; using QuantConnect.Interfaces; using QuantConnect.Logging; using QuantConnect.Packets; using QuantConnect.Python; using QuantConnect.Util; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; namespace QuantConnect.Queues { /// /// Implementation of local/desktop job request: /// public class JobQueue : IJobQueueHandler { // The type name of the QuantConnect.Brokerages.Paper.PaperBrokerage private static readonly TextWriter Console = System.Console.Out; private const string PaperBrokerageTypeName = "PaperBrokerage"; private const string DefaultHistoryProvider = "SubscriptionDataReaderHistoryProvider"; private const string DefaultDataQueueHandler = "LiveDataQueue"; private const string DefaultDataChannelProvider = "DataChannelProvider"; private static readonly string Channel = Config.Get("data-channel"); private readonly string AlgorithmTypeName = Config.Get("algorithm-type-name"); private Language? _language; /// /// This property is protected for testing purposes /// protected Language Language { get { if (_language == null) { string algorithmLanguage = Config.Get("algorithm-language"); if (string.IsNullOrEmpty(algorithmLanguage)) { var extension = Path.GetExtension(AlgorithmLocation).ToLower(); switch (extension) { case ".dll": _language = Language.CSharp; break; case ".py": _language = Language.Python; break; default: throw new ArgumentException($"Unknown extension, algorithm extension was {extension}"); } } else { _language = (Language)Enum.Parse(typeof(Language), algorithmLanguage, ignoreCase: true); } } return (Language)_language; } } /// /// Physical location of Algorithm DLL. /// /// We expect this dll to be copied into the output directory private string AlgorithmLocation { get; } = Config.Get("algorithm-location", "QuantConnect.Algorithm.CSharp.dll"); /// /// Initialize the job queue: /// public void Initialize(IApi api, IMessagingHandler messagingHandler) { api.Initialize(Globals.UserId, Globals.UserToken, Globals.DataFolder); } /// /// Gets Brokerage Factory for provided IDQH /// /// /// An Instance of Brokerage Factory if possible, otherwise null public static IBrokerageFactory GetFactoryFromDataQueueHandler(string dataQueueHandler) { IBrokerageFactory brokerageFactory = null; var dataQueueHandlerType = Composer.Instance.GetExportedTypes() .FirstOrDefault(x => x.FullName != null && x.FullName.EndsWith(dataQueueHandler, StringComparison.InvariantCultureIgnoreCase) && x.HasAttribute(typeof(BrokerageFactoryAttribute))); if (dataQueueHandlerType != null) { var attribute = dataQueueHandlerType.GetCustomAttribute(); brokerageFactory = (BrokerageFactory)Activator.CreateInstance(attribute.Type); } return brokerageFactory; } /// /// Desktop/Local Get Next Task - Get task from the Algorithm folder of VS Solution. /// /// public AlgorithmNodePacket NextJob(out string algorithmPath) { algorithmPath = GetAlgorithmLocation(); Log.Trace($"JobQueue.NextJob(): Selected {algorithmPath}"); // check for parameters in the config var parameters = new Dictionary(); var parametersConfigString = Config.Get("parameters"); if (!string.IsNullOrEmpty(parametersConfigString)) { parameters = JsonConvert.DeserializeObject>(parametersConfigString); } var controls = new Controls() { MinuteLimit = Config.GetInt("symbol-minute-limit", 10000), SecondLimit = Config.GetInt("symbol-second-limit", 10000), TickLimit = Config.GetInt("symbol-tick-limit", 10000), RamAllocation = int.MaxValue, MaximumDataPointsPerChartSeries = Config.GetInt("maximum-data-points-per-chart-series", 1000000), MaximumChartSeries = Config.GetInt("maximum-chart-series", 30), StorageLimit = Config.GetValue("storage-limit", 10737418240L), StorageFileCount = Config.GetInt("storage-file-count", 10000), StorageAccess = Config.GetValue("storage-permissions", new Packets.StoragePermissions()) }; var algorithmId = Config.Get("algorithm-id", AlgorithmTypeName); //If this isn't a backtesting mode/request, attempt a live job. if (Globals.LiveMode) { var dataHandlers = Config.Get("data-queue-handler", DefaultDataQueueHandler); var liveJob = new LiveNodePacket { Type = PacketType.LiveNode, Algorithm = File.ReadAllBytes(AlgorithmLocation), Brokerage = Config.Get("live-mode-brokerage", PaperBrokerageTypeName), HistoryProvider = Config.Get("history-provider", DefaultHistoryProvider), DataQueueHandler = dataHandlers, DataChannelProvider = Config.Get("data-channel-provider", DefaultDataChannelProvider), Channel = Channel, UserToken = Globals.UserToken, UserId = Globals.UserId, ProjectId = Globals.ProjectId, OrganizationId = Globals.OrganizationID, Version = Globals.Version, DeployId = algorithmId, Parameters = parameters, Language = Language, Controls = controls, PythonVirtualEnvironment = Config.Get("python-venv"), DeploymentTarget = DeploymentTarget.LocalPlatform, }; Type brokerageName = null; try { // import the brokerage data for the configured brokerage var brokerageFactory = Composer.Instance.Single(factory => factory.BrokerageType.MatchesTypeName(liveJob.Brokerage)); brokerageName = brokerageFactory.BrokerageType; liveJob.BrokerageData = brokerageFactory.BrokerageData; } catch (Exception err) { Log.Error(err, $"Error resolving BrokerageData for live job for brokerage {liveJob.Brokerage}"); } var brokerageBasedHistoryProvider = liveJob.HistoryProvider.DeserializeList().Select(x => { HistoryExtensions.TryGetBrokerageName(x, out var brokerageName); return brokerageName; }).Where(x => x != null); foreach (var dataHandlerName in dataHandlers.DeserializeList().Concat(brokerageBasedHistoryProvider).Distinct()) { var brokerageFactoryForDataHandler = GetFactoryFromDataQueueHandler(dataHandlerName); if (brokerageFactoryForDataHandler == null) { Log.Trace($"JobQueue.NextJob(): Not able to fetch brokerage factory with name: {dataHandlerName}"); continue; } if (brokerageFactoryForDataHandler.BrokerageType == brokerageName) { //Don't need to add brokerageData again if added by brokerage continue; } foreach (var data in brokerageFactoryForDataHandler.BrokerageData) { if (data.Key == "live-holdings" || data.Key == "live-cash-balance") { //live holdings & cash balance not required for data handler continue; } liveJob.BrokerageData.TryAdd(data.Key, data.Value); } } return liveJob; } var optimizationId = Config.Get("optimization-id"); //Default run a backtesting job. var backtestJob = new BacktestNodePacket(0, 0, "", new byte[] { }, Config.Get("backtest-name", "local")) { Type = PacketType.BacktestNode, Algorithm = File.ReadAllBytes(AlgorithmLocation), HistoryProvider = Config.Get("history-provider", DefaultHistoryProvider), Channel = Channel, UserToken = Globals.UserToken, UserId = Globals.UserId, ProjectId = Globals.ProjectId, OrganizationId = Globals.OrganizationID, Version = Globals.Version, BacktestId = algorithmId, Language = Language, Parameters = parameters, Controls = controls, PythonVirtualEnvironment = Config.Get("python-venv"), DeploymentTarget = DeploymentTarget.LocalPlatform, }; var outOfSampleMaxEndDate = Config.Get("out-of-sample-max-end-date"); if (!string.IsNullOrEmpty(outOfSampleMaxEndDate)) { backtestJob.OutOfSampleMaxEndDate = Time.ParseDate(outOfSampleMaxEndDate); } backtestJob.OutOfSampleDays = Config.GetInt("out-of-sample-days"); // Only set optimization id when backtest is for optimization if (!optimizationId.IsNullOrEmpty()) { backtestJob.OptimizationId = optimizationId; } return backtestJob; } /// /// Get the algorithm location for client side backtests. /// /// private string GetAlgorithmLocation() { if (Language == Language.Python) { if (!File.Exists(AlgorithmLocation)) { throw new FileNotFoundException($"JobQueue.TryCreatePythonAlgorithm(): Unable to find py file: {AlgorithmLocation}"); } // Add this directory to our Python Path so it may be imported properly var pythonFile = new FileInfo(AlgorithmLocation); PythonInitializer.AddAlgorithmLocationPath(pythonFile.Directory.FullName); } return AlgorithmLocation; } /// /// Desktop/Local acknowledge the task processed. Nothing to do. /// /// public void AcknowledgeJob(AlgorithmNodePacket job) { // Make the console window pause so we can read log output before exiting and killing the application completely Console.WriteLine("Engine.Main(): Analysis Complete."); // closing automatically is useful for optimization, we don't want to leave open all the ended lean instances if (!Config.GetBool("close-automatically")) { Console.WriteLine("Engine.Main(): Press any key to continue."); System.Console.Read(); } } } }