/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using QuantConnect.Interfaces;
using QuantConnect.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
namespace QuantConnect.Data
{
///
/// Count number of subscribers for each channel (Symbol, Socket) pair
///
public abstract class DataQueueHandlerSubscriptionManager : IDisposable
{
///
/// Counter
///
protected ConcurrentDictionary SubscribersByChannel { get; init; } = new ConcurrentDictionary();
///
/// Increment number of subscribers for current
///
/// defines the subscription configuration data.
public void Subscribe(SubscriptionDataConfig dataConfig)
{
try
{
var channel = GetChannel(dataConfig);
int count;
if (SubscribersByChannel.TryGetValue(channel, out count))
{
SubscribersByChannel.TryUpdate(channel, count + 1, count);
return;
}
if (Subscribe(new[] { dataConfig.Symbol }, dataConfig.TickType))
{
SubscribersByChannel.AddOrUpdate(channel, 1);
}
}
catch (Exception exception)
{
Log.Error(exception);
throw;
}
}
///
/// Decrement number of subscribers for current
///
/// defines the subscription configuration data.
public void Unsubscribe(SubscriptionDataConfig dataConfig)
{
try
{
var channel = GetChannel(dataConfig);
int count;
if (SubscribersByChannel.TryGetValue(channel, out count))
{
if (count > 1)
{
SubscribersByChannel.TryUpdate(channel, count - 1, count);
return;
}
if (Unsubscribe(new[] { dataConfig.Symbol }, dataConfig.TickType))
{
SubscribersByChannel.TryRemove(channel, out count);
}
}
}
catch (Exception exception)
{
Log.Error(exception);
throw;
}
}
///
/// Returns subscribed symbols
///
/// list of currently subscribed
public IEnumerable GetSubscribedSymbols()
{
return SubscribersByChannel.Keys
.Select(c => c.Symbol)
.Distinct();
}
///
/// Retrieves the list of unique instances that are currently subscribed for a specific .
///
/// The type of tick data to filter subscriptions by.
/// A collection of unique objects that match the specified .
public IEnumerable GetSubscribedSymbols(TickType tickType)
{
var channelName = ChannelNameFromTickType(tickType);
#pragma warning disable CA1309
return SubscribersByChannel.Keys.Where(x => x.Name.Equals(channelName, StringComparison.InvariantCultureIgnoreCase))
#pragma warning restore CA1309
.Select(c => c.Symbol)
.Distinct();
}
///
/// Checks if there is existing subscriber for current channel
///
/// Symbol
/// Type of tick data
/// return true if there is one subscriber at least; otherwise false
public bool IsSubscribed(Symbol symbol, TickType tickType)
{
return SubscribersByChannel.ContainsKey(GetChannel(
symbol,
tickType));
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public virtual void Dispose()
{
}
///
/// Describes the way implements subscription
///
/// Symbols to subscribe
/// Type of tick data
/// Returns true if subsribed; otherwise false
protected abstract bool Subscribe(IEnumerable symbols, TickType tickType);
///
/// Describes the way implements unsubscription
///
/// Symbols to unsubscribe
/// Type of tick data
/// Returns true if unsubsribed; otherwise false
protected abstract bool Unsubscribe(IEnumerable symbols, TickType tickType);
///
/// Brokerage maps to real socket/api channel
///
/// Type of tick data
///
protected abstract string ChannelNameFromTickType(TickType tickType);
private Channel GetChannel(SubscriptionDataConfig dataConfig) => GetChannel(dataConfig.Symbol, dataConfig.TickType);
private Channel GetChannel(Symbol symbol, TickType tickType)
{
return new Channel(
ChannelNameFromTickType(tickType),
symbol);
}
}
}