/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using QuantConnect.Util;
using QuantConnect.Logging;
using QuantConnect.Configuration;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using QuantConnect.Optimizer.Objectives;
using QuantConnect.Optimizer.Parameters;
using QuantConnect.Optimizer.Strategies;
namespace QuantConnect.Optimizer
{
///
/// Base Lean optimizer class in charge of handling an optimization job packet
///
public abstract class LeanOptimizer : IDisposable
{
private readonly int _optimizationUpdateInterval = Config.GetInt("optimization-update-interval", 10);
private DateTime _startedAt = DateTime.UtcNow;
private DateTime _lastUpdate;
private int _failedBacktest;
private int _completedBacktest;
private volatile bool _disposed;
///
/// The total completed backtests count
///
protected int CompletedBacktests => _failedBacktest + _completedBacktest;
///
/// Lock to update optimization status
///
private object _statusLock = new object();
///
/// The current optimization status
///
protected OptimizationStatus Status { get; private set; } = OptimizationStatus.New;
///
/// The optimization target
///
protected Target OptimizationTarget { get; }
///
/// Collection holding for each backtest id we are waiting to finish
///
protected ConcurrentDictionary RunningParameterSetForBacktest { get; init; }
///
/// Collection holding for each backtest id we are waiting to launch
///
/// We can't launch 1 million backtests at the same time
protected ConcurrentQueue PendingParameterSet { get; init; }
///
/// The optimization strategy being used
///
protected IOptimizationStrategy Strategy { get; init; }
///
/// The optimization packet
///
protected OptimizationNodePacket NodePacket { get; init; }
///
/// Indicates whether optimizer was disposed
///
protected bool Disposed => _disposed;
///
/// Event triggered when the optimization work ended
///
public event EventHandler Ended;
///
/// Creates a new instance
///
/// The optimization node packet to handle
protected LeanOptimizer(OptimizationNodePacket nodePacket)
{
if (nodePacket.OptimizationParameters.IsNullOrEmpty())
{
throw new ArgumentException("Cannot start an optimization job with no parameter to optimize");
}
if (string.IsNullOrEmpty(nodePacket.Criterion?.Target))
{
throw new ArgumentException("Cannot start an optimization job with no target to optimize");
}
NodePacket = nodePacket;
OptimizationTarget = NodePacket.Criterion;
OptimizationTarget.Reached += (s, e) =>
{
// we've reached the optimization target
TriggerOnEndEvent();
};
Strategy = (IOptimizationStrategy)Activator.CreateInstance(Type.GetType(NodePacket.OptimizationStrategy));
RunningParameterSetForBacktest = new ConcurrentDictionary();
PendingParameterSet = new ConcurrentQueue();
Strategy.Initialize(OptimizationTarget, nodePacket.Constraints, NodePacket.OptimizationParameters, NodePacket.OptimizationStrategySettings);
Strategy.NewParameterSet += (s, parameterSet) =>
{
if (parameterSet == null)
{
// shouldn't happen
Log.Error($"Strategy.NewParameterSet({GetLogDetails()}): generated a null {nameof(ParameterSet)} instance");
return;
}
LaunchLeanForParameterSet(parameterSet);
};
}
///
/// Starts the optimization
///
public virtual void Start()
{
lock (RunningParameterSetForBacktest)
{
Strategy.PushNewResults(OptimizationResult.Initial);
// if after we started there are no running parameter sets means we have failed to start
if (RunningParameterSetForBacktest.Count == 0)
{
SetOptimizationStatus(OptimizationStatus.Aborted);
throw new InvalidOperationException($"LeanOptimizer.Start({GetLogDetails()}): failed to start");
}
Log.Trace($"LeanOptimizer.Start({GetLogDetails()}): start ended. Waiting on {RunningParameterSetForBacktest.Count + PendingParameterSet.Count} backtests");
}
SetOptimizationStatus(OptimizationStatus.Running);
ProcessUpdate(forceSend: true);
}
///
/// Triggers the optimization job end event
///
protected virtual void TriggerOnEndEvent()
{
if (_disposed)
{
return;
}
SetOptimizationStatus(OptimizationStatus.Completed);
var result = Strategy.Solution;
if (result != null)
{
var constraint = NodePacket.Constraints != null ? $"Constraints: ({string.Join(",", NodePacket.Constraints)})" : string.Empty;
Log.Trace($"LeanOptimizer.TriggerOnEndEvent({GetLogDetails()}): Optimization has ended. " +
$"Result for {OptimizationTarget}: was reached using ParameterSet: ({result.ParameterSet}) backtestId '{result.BacktestId}'. " +
$"{constraint}");
}
else
{
Log.Trace($"LeanOptimizer.TriggerOnEndEvent({GetLogDetails()}): Optimization has ended. Result was not reached");
}
// we clean up before we send an update so that the runtime stats are updated
CleanUpRunningInstance();
ProcessUpdate(forceSend: true);
Ended?.Invoke(this, result);
}
///
/// Handles starting Lean for a given parameter set
///
/// The parameter set for the backtest to run
/// The backtest name to use
/// The new unique backtest id
protected abstract string RunLean(ParameterSet parameterSet, string backtestName);
///
/// Get's a new backtest name
///
protected virtual string GetBacktestName(ParameterSet parameterSet)
{
return "OptimizationBacktest";
}
///
/// Handles a new backtest json result matching a requested backtest id
///
/// The backtest json result
/// The associated backtest id
protected virtual void NewResult(string jsonBacktestResult, string backtestId)
{
lock (RunningParameterSetForBacktest)
{
ParameterSet parameterSet;
// we take a lock so that there is no race condition with launching Lean adding the new backtest id and receiving the backtest result for that id
// before it's even in the collection 'ParameterSetForBacktest'
if (!RunningParameterSetForBacktest.TryRemove(backtestId, out parameterSet))
{
Interlocked.Increment(ref _failedBacktest);
Log.Error(
$"LeanOptimizer.NewResult({GetLogDetails()}): Optimization compute job with id '{backtestId}' was not found");
return;
}
// we got a new result if there are any pending parameterSet to run we can now trigger 1
// we do this before 'Strategy.PushNewResults' so FIFO is respected
if (PendingParameterSet.TryDequeue(out var pendingParameterSet))
{
LaunchLeanForParameterSet(pendingParameterSet);
}
var result = new OptimizationResult(null, parameterSet, backtestId);
if (string.IsNullOrEmpty(jsonBacktestResult))
{
Interlocked.Increment(ref _failedBacktest);
Log.Error(
$"LeanOptimizer.NewResult({GetLogDetails()}): Got null/empty backtest result for backtest id '{backtestId}'");
}
else
{
Interlocked.Increment(ref _completedBacktest);
result = new OptimizationResult(jsonBacktestResult, parameterSet, backtestId);
}
// always notify the strategy
Strategy.PushNewResults(result);
// strategy could of added more
if (RunningParameterSetForBacktest.Count == 0)
{
TriggerOnEndEvent();
}
else
{
ProcessUpdate();
}
}
}
///
/// Disposes of any resources
///
public virtual void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
CleanUpRunningInstance();
}
///
/// Returns the current optimization status and strategy estimates
///
public int GetCurrentEstimate()
{
return Strategy.GetTotalBacktestEstimate();
}
///
/// Get the current runtime statistics
///
public Dictionary GetRuntimeStatistics()
{
var completedCount = _completedBacktest;
var totalEndedCount = completedCount + _failedBacktest;
var runtime = DateTime.UtcNow - _startedAt;
var result = new Dictionary
{
{ "Completed", $"{completedCount}"},
{ "Failed", $"{_failedBacktest}"},
{ "Running", $"{RunningParameterSetForBacktest.Count}"},
{ "In Queue", $"{PendingParameterSet.Count}"},
{ "Average Length", $"{(totalEndedCount > 0 ? new TimeSpan(runtime.Ticks / totalEndedCount) : TimeSpan.Zero).ToString(@"hh\:mm\:ss", CultureInfo.InvariantCulture)}"},
{ "Total Runtime", $"{runtime.ToString(@"hh\:mm\:ss", CultureInfo.InvariantCulture)}" }
};
return result;
}
///
/// Helper method to have pretty more informative logs
///
protected string GetLogDetails()
{
if (NodePacket.UserId == 0)
{
return $"OID {NodePacket.OptimizationId}";
}
return $"UI {NodePacket.UserId} PID {NodePacket.ProjectId} OID {NodePacket.OptimizationId} S {Status}";
}
///
/// Handles stopping Lean process
///
/// Specified backtest id
protected abstract void AbortLean(string backtestId);
///
/// Sends an update of the current optimization status to the user
///
protected abstract void SendUpdate();
///
/// Sets the current optimization status
///
/// The new optimization status
protected virtual void SetOptimizationStatus(OptimizationStatus optimizationStatus)
{
lock (_statusLock)
{
// we never come back from an aborted/ended status
if (Status != OptimizationStatus.Aborted && Status != OptimizationStatus.Completed)
{
Status = optimizationStatus;
}
}
}
///
/// Clean up any pending or running lean instance
///
private void CleanUpRunningInstance()
{
PendingParameterSet.Clear();
lock (RunningParameterSetForBacktest)
{
foreach (var backtestId in RunningParameterSetForBacktest.Keys)
{
ParameterSet parameterSet;
if (RunningParameterSetForBacktest.TryRemove(backtestId, out parameterSet))
{
Interlocked.Increment(ref _failedBacktest);
try
{
AbortLean(backtestId);
}
catch
{
// pass
}
}
}
}
}
///
/// Will determine if it's right time to trigger an update call
///
/// True will force send, skipping interval, useful on start and end
private void ProcessUpdate(bool forceSend = false)
{
if (!forceSend && Status == OptimizationStatus.New)
{
// don't send any update until we finish the Start(), will be creating a bunch of backtests don't want to send partial/multiple updates
return;
}
try
{
var now = DateTime.UtcNow;
if (forceSend || (now - _lastUpdate > TimeSpan.FromSeconds(_optimizationUpdateInterval)))
{
_lastUpdate = now;
Log.Debug($"LeanOptimizer.ProcessUpdate({GetLogDetails()}): start sending update...");
SendUpdate();
Log.Debug($"LeanOptimizer.ProcessUpdate({GetLogDetails()}): finished sending update successfully.");
}
}
catch (Exception e)
{
Log.Error(e, "Failed to send status update");
}
}
private void LaunchLeanForParameterSet(ParameterSet parameterSet)
{
if (_disposed || Status == OptimizationStatus.Completed || Status == OptimizationStatus.Aborted)
{
return;
}
lock (RunningParameterSetForBacktest)
{
if (NodePacket.MaximumConcurrentBacktests != 0 && RunningParameterSetForBacktest.Count >= NodePacket.MaximumConcurrentBacktests)
{
// we hit the limit on the concurrent backtests
PendingParameterSet.Enqueue(parameterSet);
return;
}
try
{
var backtestName = GetBacktestName(parameterSet);
var backtestId = RunLean(parameterSet, backtestName);
if (!string.IsNullOrEmpty(backtestId))
{
Log.Trace($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): launched backtest '{backtestId}' with parameters '{parameterSet}'");
RunningParameterSetForBacktest.TryAdd(backtestId, parameterSet);
}
else
{
Interlocked.Increment(ref _failedBacktest);
// always notify the strategy
Strategy.PushNewResults(new OptimizationResult(null, parameterSet, backtestId));
Log.Error($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): Initial/null optimization compute job could not be placed into the queue");
}
ProcessUpdate();
}
catch (Exception ex)
{
Log.Error($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): Error encountered while placing optimization message into the queue: {ex.Message}");
}
}
}
}
}