/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Newtonsoft.Json;
using QuantConnect.Data;
using QuantConnect.Logging;
using RestSharp;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace QuantConnect.Brokerages
{
///
/// Provides shared brokerage websockets implementation
///
public abstract class BaseWebsocketsBrokerage : Brokerage
{
private const int ConnectionTimeout = 30000;
///
/// True if the current brokerage is already initialized
///
protected bool IsInitialized { get; set; }
///
/// The websockets client instance
///
protected IWebSocket WebSocket { get; set; }
///
/// The rest client instance
///
protected IRestClient RestClient { get; set; }
///
/// standard json parsing settings
///
protected JsonSerializerSettings JsonSettings { get; set; }
///
/// A list of currently active orders
///
public ConcurrentDictionary CachedOrderIDs { get; set; }
///
/// The api secret
///
protected string ApiSecret { get; set; }
///
/// The api key
///
protected string ApiKey { get; set; }
///
/// Count subscribers for each (symbol, tickType) combination
///
protected DataQueueHandlerSubscriptionManager SubscriptionManager { get; set; }
///
/// Initialize the instance of this class
///
/// The web socket base url
/// instance of websockets client
/// instance of rest client
/// api key
/// api secret
protected void Initialize(string wssUrl, IWebSocket websocket, IRestClient restClient, string apiKey, string apiSecret)
{
if (IsInitialized)
{
return;
}
IsInitialized = true;
JsonSettings = new JsonSerializerSettings { FloatParseHandling = FloatParseHandling.Decimal };
CachedOrderIDs = new ConcurrentDictionary();
WebSocket = websocket;
WebSocket.Initialize(wssUrl);
WebSocket.Message += OnMessage;
WebSocket.Open += (sender, args) =>
{
Log.Trace($"BaseWebsocketsBrokerage(): WebSocket.Open. Subscribing");
Subscribe(GetSubscribed());
};
RestClient = restClient;
ApiSecret = apiSecret;
ApiKey = apiKey;
}
///
/// Creates an instance of a websockets brokerage
///
/// Name of brokerage
protected BaseWebsocketsBrokerage(string name) : base(name)
{
}
///
/// Handles websocket received messages
///
///
///
protected abstract void OnMessage(object sender, WebSocketMessage e);
///
/// Creates wss connection, monitors for disconnection and re-connects when necessary
///
public override void Connect()
{
if (IsConnected)
return;
Log.Trace("BaseWebSocketsBrokerage.Connect(): Connecting...");
ConnectSync();
}
///
/// Handles the creation of websocket subscriptions
///
///
protected abstract bool Subscribe(IEnumerable symbols);
///
/// Gets a list of current subscriptions
///
///
protected virtual IEnumerable GetSubscribed()
{
return SubscriptionManager?.GetSubscribedSymbols() ?? Enumerable.Empty();
}
///
/// Start websocket connect
///
protected void ConnectSync()
{
var resetEvent = new ManualResetEvent(false);
EventHandler triggerEvent = (o, args) => resetEvent.Set();
WebSocket.Open += triggerEvent;
WebSocket.Connect();
if (!resetEvent.WaitOne(ConnectionTimeout))
{
throw new TimeoutException("Websockets connection timeout.");
}
WebSocket.Open -= triggerEvent;
}
}
}