/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using QuantConnect.Interfaces; using QuantConnect.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; namespace QuantConnect.Data { /// /// Count number of subscribers for each channel (Symbol, Socket) pair /// public abstract class DataQueueHandlerSubscriptionManager : IDisposable { /// /// Counter /// protected ConcurrentDictionary SubscribersByChannel { get; init; } = new ConcurrentDictionary(); /// /// Increment number of subscribers for current /// /// defines the subscription configuration data. public void Subscribe(SubscriptionDataConfig dataConfig) { try { var channel = GetChannel(dataConfig); int count; if (SubscribersByChannel.TryGetValue(channel, out count)) { SubscribersByChannel.TryUpdate(channel, count + 1, count); return; } if (Subscribe(new[] { dataConfig.Symbol }, dataConfig.TickType)) { SubscribersByChannel.AddOrUpdate(channel, 1); } } catch (Exception exception) { Log.Error(exception); throw; } } /// /// Decrement number of subscribers for current /// /// defines the subscription configuration data. public void Unsubscribe(SubscriptionDataConfig dataConfig) { try { var channel = GetChannel(dataConfig); int count; if (SubscribersByChannel.TryGetValue(channel, out count)) { if (count > 1) { SubscribersByChannel.TryUpdate(channel, count - 1, count); return; } if (Unsubscribe(new[] { dataConfig.Symbol }, dataConfig.TickType)) { SubscribersByChannel.TryRemove(channel, out count); } } } catch (Exception exception) { Log.Error(exception); throw; } } /// /// Returns subscribed symbols /// /// list of currently subscribed public IEnumerable GetSubscribedSymbols() { return SubscribersByChannel.Keys .Select(c => c.Symbol) .Distinct(); } /// /// Retrieves the list of unique instances that are currently subscribed for a specific . /// /// The type of tick data to filter subscriptions by. /// A collection of unique objects that match the specified . public IEnumerable GetSubscribedSymbols(TickType tickType) { var channelName = ChannelNameFromTickType(tickType); #pragma warning disable CA1309 return SubscribersByChannel.Keys.Where(x => x.Name.Equals(channelName, StringComparison.InvariantCultureIgnoreCase)) #pragma warning restore CA1309 .Select(c => c.Symbol) .Distinct(); } /// /// Checks if there is existing subscriber for current channel /// /// Symbol /// Type of tick data /// return true if there is one subscriber at least; otherwise false public bool IsSubscribed(Symbol symbol, TickType tickType) { return SubscribersByChannel.ContainsKey(GetChannel( symbol, tickType)); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public virtual void Dispose() { } /// /// Describes the way implements subscription /// /// Symbols to subscribe /// Type of tick data /// Returns true if subsribed; otherwise false protected abstract bool Subscribe(IEnumerable symbols, TickType tickType); /// /// Describes the way implements unsubscription /// /// Symbols to unsubscribe /// Type of tick data /// Returns true if unsubsribed; otherwise false protected abstract bool Unsubscribe(IEnumerable symbols, TickType tickType); /// /// Brokerage maps to real socket/api channel /// /// Type of tick data /// protected abstract string ChannelNameFromTickType(TickType tickType); private Channel GetChannel(SubscriptionDataConfig dataConfig) => GetChannel(dataConfig.Symbol, dataConfig.TickType); private Channel GetChannel(Symbol symbol, TickType tickType) { return new Channel( ChannelNameFromTickType(tickType), symbol); } } }