/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using QuantConnect.Data;
using QuantConnect.Interfaces;
using QuantConnect.Logging;
using QuantConnect.Packets;
using QuantConnect.Util;
namespace QuantConnect.Brokerages
{
///
/// Provides a default implementation o that will forward
/// messages as follows:
/// Information -> IResultHandler.Debug
/// Warning -> IResultHandler.Error && IApi.SendUserEmail
/// Error -> IResultHandler.Error && IAlgorithm.RunTimeError
///
public class DefaultBrokerageMessageHandler : IBrokerageMessageHandler
{
private static readonly TimeSpan DefaultOpenThreshold = TimeSpan.FromMinutes(5);
private static readonly TimeSpan DefaultInitialDelay = TimeSpan.FromMinutes(15);
private volatile bool _connected;
private readonly IAlgorithm _algorithm;
private readonly TimeSpan _openThreshold;
private readonly TimeSpan _initialDelay;
private CancellationTokenSource _cancellationTokenSource;
///
/// Initializes a new instance of the class
///
/// The running algorithm
///
/// Defines how long before market open to re-check for brokerage reconnect message
public DefaultBrokerageMessageHandler(IAlgorithm algorithm, TimeSpan? initialDelay = null, TimeSpan? openThreshold = null)
: this(algorithm, null, null, initialDelay, openThreshold)
{
}
///
/// Initializes a new instance of the class
///
/// The running algorithm
/// The job that produced the algorithm
/// The api for the algorithm
///
/// Defines how long before market open to re-check for brokerage reconnect message
public DefaultBrokerageMessageHandler(IAlgorithm algorithm, AlgorithmNodePacket job, IApi api, TimeSpan? initialDelay = null, TimeSpan? openThreshold = null)
{
_algorithm = algorithm;
_connected = true;
_openThreshold = openThreshold ?? DefaultOpenThreshold;
_initialDelay = initialDelay ?? DefaultInitialDelay;
}
///
/// Handles the message
///
/// The message to be handled
public void HandleMessage(BrokerageMessageEvent message)
{
// based on message type dispatch to result handler
switch (message.Type)
{
case BrokerageMessageType.Information:
_algorithm.Debug(Messages.DefaultBrokerageMessageHandler.BrokerageInfo(message));
break;
case BrokerageMessageType.Warning:
_algorithm.Error(Messages.DefaultBrokerageMessageHandler.BrokerageWarning(message));
break;
case BrokerageMessageType.Error:
// unexpected error, we need to close down shop
_algorithm.SetRuntimeError(new Exception(message.Message),
Messages.DefaultBrokerageMessageHandler.BrokerageErrorContext);
break;
case BrokerageMessageType.Disconnect:
_connected = false;
Log.Trace(Messages.DefaultBrokerageMessageHandler.Disconnected);
// check to see if any non-custom security exchanges are open within the next x minutes
var open = (from kvp in _algorithm.Securities
let security = kvp.Value
where security.Type != SecurityType.Base
let exchange = security.Exchange
let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
where exchange.IsOpenDuringBar(
localTime,
localTime + _openThreshold,
_algorithm.SubscriptionManager.SubscriptionDataConfigService
.GetSubscriptionDataConfigs(security.Symbol)
.IsExtendedMarketHours())
select security).Any();
// if any are open then we need to kill the algorithm
if (open)
{
Log.Trace(Messages.DefaultBrokerageMessageHandler.DisconnectedWhenExchangesAreOpen(_initialDelay));
// wait 15 minutes before killing algorithm
StartCheckReconnected(_initialDelay, message);
}
else
{
Log.Trace(Messages.DefaultBrokerageMessageHandler.DisconnectedWhenExchangesAreClosed);
// if they aren't open, we'll need to check again a little bit before markets open
DateTime nextMarketOpenUtc;
if (_algorithm.Securities.Count != 0)
{
nextMarketOpenUtc = (from kvp in _algorithm.Securities
let security = kvp.Value
where security.Type != SecurityType.Base
let exchange = security.Exchange
let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
let marketOpen = exchange.Hours.GetNextMarketOpen(localTime,
_algorithm.SubscriptionManager.SubscriptionDataConfigService
.GetSubscriptionDataConfigs(security.Symbol)
.IsExtendedMarketHours())
let marketOpenUtc = marketOpen.ConvertToUtc(exchange.TimeZone)
select marketOpenUtc).Min();
}
else
{
// if we have no securities just make next market open an hour from now
nextMarketOpenUtc = DateTime.UtcNow.AddHours(1);
}
var timeUntilNextMarketOpen = nextMarketOpenUtc - DateTime.UtcNow - _openThreshold;
Log.Trace(Messages.DefaultBrokerageMessageHandler.TimeUntilNextMarketOpen(timeUntilNextMarketOpen));
// wake up 5 minutes before market open and check if we've reconnected
StartCheckReconnected(timeUntilNextMarketOpen, message);
}
break;
case BrokerageMessageType.Reconnect:
_connected = true;
Log.Trace(Messages.DefaultBrokerageMessageHandler.Reconnected);
if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}
break;
}
}
///
/// Handles a new order placed manually in the brokerage side
///
/// The new order event
/// Whether the order should be added to the transaction handler
public bool HandleOrder(NewBrokerageOrderNotificationEventArgs eventArgs)
{
return false;
}
private void StartCheckReconnected(TimeSpan delay, BrokerageMessageEvent message)
{
_cancellationTokenSource.DisposeSafely();
_cancellationTokenSource = new CancellationTokenSource(delay);
Task.Run(() =>
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
Thread.Sleep(TimeSpan.FromMinutes(1));
}
CheckReconnected(message);
}, _cancellationTokenSource.Token);
}
private void CheckReconnected(BrokerageMessageEvent message)
{
if (!_connected)
{
Log.Error(Messages.DefaultBrokerageMessageHandler.StillDisconnected);
_algorithm.SetRuntimeError(new Exception(message.Message),
Messages.DefaultBrokerageMessageHandler.BrokerageDisconnectedShutDownContext);
}
}
}
}