/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using Newtonsoft.Json; using QuantConnect.Data; using QuantConnect.Logging; using RestSharp; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; namespace QuantConnect.Brokerages { /// /// Provides shared brokerage websockets implementation /// public abstract class BaseWebsocketsBrokerage : Brokerage { private const int ConnectionTimeout = 30000; /// /// True if the current brokerage is already initialized /// protected bool IsInitialized { get; set; } /// /// The websockets client instance /// protected IWebSocket WebSocket { get; set; } /// /// The rest client instance /// protected IRestClient RestClient { get; set; } /// /// standard json parsing settings /// protected JsonSerializerSettings JsonSettings { get; set; } /// /// A list of currently active orders /// public ConcurrentDictionary CachedOrderIDs { get; set; } /// /// The api secret /// protected string ApiSecret { get; set; } /// /// The api key /// protected string ApiKey { get; set; } /// /// Count subscribers for each (symbol, tickType) combination /// protected DataQueueHandlerSubscriptionManager SubscriptionManager { get; set; } /// /// Initialize the instance of this class /// /// The web socket base url /// instance of websockets client /// instance of rest client /// api key /// api secret protected void Initialize(string wssUrl, IWebSocket websocket, IRestClient restClient, string apiKey, string apiSecret) { if (IsInitialized) { return; } IsInitialized = true; JsonSettings = new JsonSerializerSettings { FloatParseHandling = FloatParseHandling.Decimal }; CachedOrderIDs = new ConcurrentDictionary(); WebSocket = websocket; WebSocket.Initialize(wssUrl); WebSocket.Message += OnMessage; WebSocket.Open += (sender, args) => { Log.Trace($"BaseWebsocketsBrokerage(): WebSocket.Open. Subscribing"); Subscribe(GetSubscribed()); }; RestClient = restClient; ApiSecret = apiSecret; ApiKey = apiKey; } /// /// Creates an instance of a websockets brokerage /// /// Name of brokerage protected BaseWebsocketsBrokerage(string name) : base(name) { } /// /// Handles websocket received messages /// /// /// protected abstract void OnMessage(object sender, WebSocketMessage e); /// /// Creates wss connection, monitors for disconnection and re-connects when necessary /// public override void Connect() { if (IsConnected) return; Log.Trace("BaseWebSocketsBrokerage.Connect(): Connecting..."); ConnectSync(); } /// /// Handles the creation of websocket subscriptions /// /// protected abstract bool Subscribe(IEnumerable symbols); /// /// Gets a list of current subscriptions /// /// protected virtual IEnumerable GetSubscribed() { return SubscriptionManager?.GetSubscribedSymbols() ?? Enumerable.Empty(); } /// /// Start websocket connect /// protected void ConnectSync() { var resetEvent = new ManualResetEvent(false); EventHandler triggerEvent = (o, args) => resetEvent.Set(); WebSocket.Open += triggerEvent; WebSocket.Connect(); if (!resetEvent.WaitOne(ConnectionTimeout)) { throw new TimeoutException("Websockets connection timeout."); } WebSocket.Open -= triggerEvent; } } }