/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Linq;
using System.Threading;
using QuantConnect.Data;
using QuantConnect.Util;
using QuantConnect.Logging;
using System.Collections.Generic;
using System.Collections.Concurrent;
namespace QuantConnect.Brokerages.LevelOneOrderBook
{
///
/// Manages subscriptions and real-time updates for multiple instances.
/// Facilitates routing of quote and trade data to a shared in a thread-safe manner.
///
public sealed class LevelOneServiceManager : IDisposable
{
///
/// The shared data aggregator that receives all tick updates from subscribed symbols.
///
private readonly IDataAggregator _dataAggregator;
///
/// Synchronization lock used to ensure thread safety during updates.
///
private readonly Lock _lock = new();
///
/// Maps subscribed symbols to their corresponding instances.
///
private readonly ConcurrentDictionary _levelOneServiceBySymbol = new();
///
/// Internal subscription manager used to delegate low-level subscribe/unsubscribe logic.
///
private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager;
///
/// Gets whether there are no active subscriptions.
///
public bool IsEmpty => _levelOneServiceBySymbol.IsEmpty;
///
/// Gets the number of currently subscribed symbols.
///
public int Count => _levelOneServiceBySymbol.Count;
///
/// Initializes a new instance of the class.
///
/// The aggregator to which all tick data will be published.
/// Delegate used to perform symbol subscription logic.
/// Delegate used to perform symbol unsubscription logic.
public LevelOneServiceManager(IDataAggregator dataAggregator, Func, TickType, bool> subscribeCallback, Func, TickType, bool> unsubscribeCallback)
{
_dataAggregator = dataAggregator;
_subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager()
{
SubscribeImpl = (symbols, tickType) => SubscribeCallbackWrapper(symbols, tickType, subscribeCallback),
UnsubscribeImpl = (symbols, tickType) => UnsubscribeCallbackWrapper(symbols, tickType, unsubscribeCallback)
};
}
///
/// Subscribes to the specified symbol based on the given .
///
/// The subscription configuration containing symbol and type information.
public void Subscribe(SubscriptionDataConfig dataConfig)
{
_subscriptionManager.Subscribe(dataConfig);
}
///
/// Unsubscribes from the specified symbol and removes its associated service instance.
///
/// The subscription configuration used for unsubscription.
public void Unsubscribe(SubscriptionDataConfig dataConfig)
{
_subscriptionManager.Unsubscribe(dataConfig);
}
///
/// Handles incoming quote data for a symbol.
/// Deduplicates updates and routes changes to the relevant instance.
///
/// The symbol for which quote data is received.
/// The UTC timestamp of the quote.
/// The bid price.
/// The size at the bid price.
/// The ask price.
/// The size at the ask price.
public void HandleQuote(Symbol symbol, DateTime? quoteDateTimeUtc, decimal? bidPrice, decimal? bidSize, decimal? askPrice, decimal? askSize)
{
if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData))
{
levelOneMarketData.UpdateQuote(quoteDateTimeUtc, bidPrice, bidSize, askPrice, askSize);
}
}
///
/// Handles incoming last trade data for a symbol and routes it to the corresponding instance.
///
/// The symbol for which trade data is received.
/// The UTC timestamp of the trade.
/// The trade size.
/// The trade price.
/// Optional sale condition string.
/// Optional exchange identifier.
public void HandleLastTrade(Symbol symbol, DateTime? tradeDateTimeUtc, decimal? lastQuantity, decimal? lastPrice, string saleCondition = "", string exchange = "")
{
if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData))
{
levelOneMarketData.UpdateLastTrade(tradeDateTimeUtc, lastQuantity, lastPrice, saleCondition, exchange);
}
}
///
/// Handles open interest updates for the specified symbol.
/// If the symbol is subscribed, forwards the open interest data to the corresponding
/// instance for publishing.
///
/// The trading symbol associated with the open interest update.
/// The UTC timestamp when the open interest value was observed.
/// The reported open interest value.
public void HandleOpenInterest(Symbol symbol, DateTime? openInterestDateTimeUtc, decimal? openInterest)
{
if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData))
{
levelOneMarketData.UpdateOpenInterest(openInterestDateTimeUtc, openInterest);
}
}
///
/// Sets the flag for the specified symbol,
/// controlling how zero-sized quote updates are handled for that symbol's market data stream.
///
/// The symbol whose quote update behavior should be configured.
///
/// If true, zero-sized bid or ask updates will be ignored for the given symbol,
/// preserving existing book values. If false, zero sizes will be applied as valid updates.
///
///
/// This is typically used to differentiate between real-time and delayed data feeds, where zero-size
/// updates in real-time may indicate incomplete data, but in delayed feeds may represent actual market states.
///
public void SetIgnoreZeroSizeUpdates(Symbol symbol, bool ignoreZeroSizeUpdates)
{
if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData))
{
levelOneMarketData.IgnoreZeroSizeUpdates = ignoreZeroSizeUpdates;
}
}
///
/// Returns subscribed symbols
///
/// list of currently subscribed
public IEnumerable GetSubscribedSymbols()
{
return _subscriptionManager.GetSubscribedSymbols();
}
///
/// Handles BaseData updates emitted by instances.
/// Forwards the BaseData to the shared data aggregator in a thread-safe manner.
///
/// The originator of the BaseData.
/// The BaseData event data.
private void BaseDataReceived(object _, BaseDataEventArgs eventData)
{
lock (_lock)
{
_dataAggregator.Update(eventData.BaseData);
}
}
///
/// Wraps the subscription delegate to attach symbol-specific handlers and track active Level 1 services.
///
/// The symbols to subscribe.
/// The tick type to subscribe for.
/// The original subscription logic delegate.
/// True if the subscription was successful; otherwise, false.
private bool SubscribeCallbackWrapper(IEnumerable symbols, TickType tickType, Func, TickType, bool> subscribeCallback)
{
if (subscribeCallback(symbols, tickType))
{
foreach (var symbol in symbols)
{
_levelOneServiceBySymbol[symbol] = new(symbol);
_levelOneServiceBySymbol[symbol].BaseDataReceived += BaseDataReceived;
}
return true;
}
Log.Error($"{nameof(LevelOneServiceManager)}.{nameof(SubscribeCallbackWrapper)}: Failed for symbols: {string.Join(", ", symbols.Select(s => s.Value))}");
return false;
}
///
/// Wraps the unsubscription delegate to detach symbol-specific handlers and remove Level 1 service tracking.
///
/// The symbols to unsubscribe.
/// The tick type to unsubscribe from.
/// The original unsubscription logic delegate.
/// True if the unsubscription was successful; otherwise, false.
private bool UnsubscribeCallbackWrapper(IEnumerable symbols, TickType tickType, Func, TickType, bool> unsubscribeCallback)
{
if (unsubscribeCallback(symbols, tickType))
{
foreach (var symbol in symbols)
{
if (_levelOneServiceBySymbol.TryRemove(symbol, out var levelOneService))
{
levelOneService.BaseDataReceived -= BaseDataReceived;
}
}
return true;
}
Log.Error($"{nameof(LevelOneServiceManager)}.{nameof(UnsubscribeCallbackWrapper)}: Failed for symbols: {string.Join(", ", symbols.Select(s => s.Value))}");
return true;
}
///
/// Attempts to retrieve the instance associated with the specified symbol.
///
/// The symbol whose market data instance is to be retrieved.
///
/// When this method returns, contains the instance associated with the symbol,
/// if the symbol is found; otherwise, null.
///
///
/// true if the symbol is found and the associated market data instance is returned;
/// otherwise, false. Logs an error if the symbol is not found.
///
private bool TryGetLevelOneMarketData(Symbol symbol, out LevelOneMarketData levelOneMarketData)
{
if (_levelOneServiceBySymbol.TryGetValue(symbol, out levelOneMarketData))
{
return true;
}
Log.Error($"{nameof(LevelOneServiceManager)}.{nameof(HandleLastTrade)}: Symbol {symbol} not found in {nameof(_levelOneServiceBySymbol)}. This could indicate an unexpected symbol or a missing initialization step.");
return false;
}
///
/// Releases all resources used by the .
///
public void Dispose()
{
_subscriptionManager.Dispose();
}
}
}