/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using QuantConnect.Logging;
using QuantConnect.Util;
namespace QuantConnect.Brokerages
{
///
/// A default implementation of
/// which signals disconnection if no data is received for a given time span
/// and attempts to reconnect automatically.
///
public class DefaultConnectionHandler : IConnectionHandler
{
private CancellationTokenSource _cancellationTokenSource;
private Thread _connectionMonitorThread;
private bool _isEnabled;
private readonly object _lockerConnectionMonitor = new object();
private volatile bool _connectionLost;
private DateTime _lastDataReceivedTime;
///
/// Event that fires when a connection loss is detected
///
public event EventHandler ConnectionLost;
///
/// Event that fires when a lost connection is restored
///
public event EventHandler ConnectionRestored;
///
/// Event that fires when a reconnection attempt is required
///
public event EventHandler ReconnectRequested;
///
/// The elapsed time with no received data after which a connection loss is reported
///
public TimeSpan MaximumIdleTimeSpan { get; set; } = TimeSpan.FromSeconds(5);
///
/// The minimum time in seconds to wait before attempting to reconnect
///
public int MinimumSecondsForNextReconnectionAttempt { get; set; } = 1;
///
/// The maximum time in seconds to wait before attempting to reconnect
///
public int MaximumSecondsForNextReconnectionAttempt { get; set; } = 60;
///
/// The unique Id for the connection
///
public string ConnectionId { get; private set; }
///
/// Returns true if the connection has been lost
///
public bool IsConnectionLost => _connectionLost;
///
/// Initializes the connection handler
///
/// The connection id
public void Initialize(string connectionId)
{
ConnectionId = connectionId;
using var waitHandle = new ManualResetEvent(false);
_cancellationTokenSource = new CancellationTokenSource();
_connectionMonitorThread = new Thread(() =>
{
waitHandle.Set();
var nextReconnectionAttemptUtcTime = DateTime.UtcNow;
var nextReconnectionAttemptSeconds = MinimumSecondsForNextReconnectionAttempt;
lock (_lockerConnectionMonitor)
{
_lastDataReceivedTime = DateTime.UtcNow;
}
try
{
while (!_cancellationTokenSource.IsCancellationRequested
&& !_cancellationTokenSource.Token.WaitHandle.WaitOne(Time.GetSecondUnevenWait(1000)))
{
if (!_isEnabled) continue;
try
{
TimeSpan elapsed;
lock (_lockerConnectionMonitor)
{
elapsed = DateTime.UtcNow - _lastDataReceivedTime;
}
if (!_connectionLost && elapsed > MaximumIdleTimeSpan)
{
_connectionLost = true;
nextReconnectionAttemptUtcTime = DateTime.UtcNow.AddSeconds(nextReconnectionAttemptSeconds);
OnConnectionLost();
}
else if (_connectionLost)
{
if (elapsed <= MaximumIdleTimeSpan)
{
_connectionLost = false;
nextReconnectionAttemptSeconds = MinimumSecondsForNextReconnectionAttempt;
OnConnectionRestored();
}
else
{
if (DateTime.UtcNow > nextReconnectionAttemptUtcTime)
{
// double the interval between attempts (capped to 1 minute)
nextReconnectionAttemptSeconds = Math.Min(nextReconnectionAttemptSeconds * 2, MaximumSecondsForNextReconnectionAttempt);
nextReconnectionAttemptUtcTime = DateTime.UtcNow.AddSeconds(nextReconnectionAttemptSeconds);
OnReconnectRequested();
}
}
}
}
catch (Exception exception)
{
Log.Error($"Error in DefaultConnectionHandler: {exception}");
}
}
}
catch (Exception exception)
{
Log.Error(exception);
}
}) { IsBackground = true };
_connectionMonitorThread.Start();
waitHandle.WaitOne();
}
///
/// Enables/disables monitoring of the connection
///
/// True to enable monitoring, false otherwise
public void EnableMonitoring(bool isEnabled)
{
// if we are switching to enabled, initialize the last data received time
if (!_isEnabled && isEnabled)
{
KeepAlive(DateTime.UtcNow);
}
_isEnabled = isEnabled;
}
///
/// Notifies the connection handler that new data was received
///
/// The UTC timestamp of the last data point received
public void KeepAlive(DateTime lastDataReceivedTime)
{
lock (_lockerConnectionMonitor)
{
_lastDataReceivedTime = lastDataReceivedTime;
}
}
///
/// Event invocator for the event
///
protected virtual void OnConnectionLost()
{
Log.Error("DefaultConnectionHandler.OnConnectionLost(): WebSocket connection lost.");
ConnectionLost?.Invoke(this, EventArgs.Empty);
}
///
/// Event invocator for the event
///
protected virtual void OnConnectionRestored()
{
Log.Trace("DefaultConnectionHandler.OnConnectionRestored(): WebSocket connection restored.");
ConnectionRestored?.Invoke(this, EventArgs.Empty);
}
///
/// Event invocator for the event
///
protected virtual void OnReconnectRequested()
{
ReconnectRequested?.Invoke(this, EventArgs.Empty);
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
_isEnabled = false;
// request and wait for thread to stop
_connectionMonitorThread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
_cancellationTokenSource?.DisposeSafely();
}
}
}