/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Linq; using System.Threading; using QuantConnect.Data; using QuantConnect.Util; using QuantConnect.Logging; using System.Collections.Generic; using System.Collections.Concurrent; namespace QuantConnect.Brokerages.LevelOneOrderBook { /// /// Manages subscriptions and real-time updates for multiple instances. /// Facilitates routing of quote and trade data to a shared in a thread-safe manner. /// public sealed class LevelOneServiceManager : IDisposable { /// /// The shared data aggregator that receives all tick updates from subscribed symbols. /// private readonly IDataAggregator _dataAggregator; /// /// Synchronization lock used to ensure thread safety during updates. /// private readonly Lock _lock = new(); /// /// Maps subscribed symbols to their corresponding instances. /// private readonly ConcurrentDictionary _levelOneServiceBySymbol = new(); /// /// Internal subscription manager used to delegate low-level subscribe/unsubscribe logic. /// private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager; /// /// Gets whether there are no active subscriptions. /// public bool IsEmpty => _levelOneServiceBySymbol.IsEmpty; /// /// Gets the number of currently subscribed symbols. /// public int Count => _levelOneServiceBySymbol.Count; /// /// Initializes a new instance of the class. /// /// The aggregator to which all tick data will be published. /// Delegate used to perform symbol subscription logic. /// Delegate used to perform symbol unsubscription logic. public LevelOneServiceManager(IDataAggregator dataAggregator, Func, TickType, bool> subscribeCallback, Func, TickType, bool> unsubscribeCallback) { _dataAggregator = dataAggregator; _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager() { SubscribeImpl = (symbols, tickType) => SubscribeCallbackWrapper(symbols, tickType, subscribeCallback), UnsubscribeImpl = (symbols, tickType) => UnsubscribeCallbackWrapper(symbols, tickType, unsubscribeCallback) }; } /// /// Subscribes to the specified symbol based on the given . /// /// The subscription configuration containing symbol and type information. public void Subscribe(SubscriptionDataConfig dataConfig) { _subscriptionManager.Subscribe(dataConfig); } /// /// Unsubscribes from the specified symbol and removes its associated service instance. /// /// The subscription configuration used for unsubscription. public void Unsubscribe(SubscriptionDataConfig dataConfig) { _subscriptionManager.Unsubscribe(dataConfig); } /// /// Handles incoming quote data for a symbol. /// Deduplicates updates and routes changes to the relevant instance. /// /// The symbol for which quote data is received. /// The UTC timestamp of the quote. /// The bid price. /// The size at the bid price. /// The ask price. /// The size at the ask price. public void HandleQuote(Symbol symbol, DateTime? quoteDateTimeUtc, decimal? bidPrice, decimal? bidSize, decimal? askPrice, decimal? askSize) { if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData)) { levelOneMarketData.UpdateQuote(quoteDateTimeUtc, bidPrice, bidSize, askPrice, askSize); } } /// /// Handles incoming last trade data for a symbol and routes it to the corresponding instance. /// /// The symbol for which trade data is received. /// The UTC timestamp of the trade. /// The trade size. /// The trade price. /// Optional sale condition string. /// Optional exchange identifier. public void HandleLastTrade(Symbol symbol, DateTime? tradeDateTimeUtc, decimal? lastQuantity, decimal? lastPrice, string saleCondition = "", string exchange = "") { if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData)) { levelOneMarketData.UpdateLastTrade(tradeDateTimeUtc, lastQuantity, lastPrice, saleCondition, exchange); } } /// /// Handles open interest updates for the specified symbol. /// If the symbol is subscribed, forwards the open interest data to the corresponding /// instance for publishing. /// /// The trading symbol associated with the open interest update. /// The UTC timestamp when the open interest value was observed. /// The reported open interest value. public void HandleOpenInterest(Symbol symbol, DateTime? openInterestDateTimeUtc, decimal? openInterest) { if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData)) { levelOneMarketData.UpdateOpenInterest(openInterestDateTimeUtc, openInterest); } } /// /// Sets the flag for the specified symbol, /// controlling how zero-sized quote updates are handled for that symbol's market data stream. /// /// The symbol whose quote update behavior should be configured. /// /// If true, zero-sized bid or ask updates will be ignored for the given symbol, /// preserving existing book values. If false, zero sizes will be applied as valid updates. /// /// /// This is typically used to differentiate between real-time and delayed data feeds, where zero-size /// updates in real-time may indicate incomplete data, but in delayed feeds may represent actual market states. /// public void SetIgnoreZeroSizeUpdates(Symbol symbol, bool ignoreZeroSizeUpdates) { if (TryGetLevelOneMarketData(symbol, out var levelOneMarketData)) { levelOneMarketData.IgnoreZeroSizeUpdates = ignoreZeroSizeUpdates; } } /// /// Returns subscribed symbols /// /// list of currently subscribed public IEnumerable GetSubscribedSymbols() { return _subscriptionManager.GetSubscribedSymbols(); } /// /// Handles BaseData updates emitted by instances. /// Forwards the BaseData to the shared data aggregator in a thread-safe manner. /// /// The originator of the BaseData. /// The BaseData event data. private void BaseDataReceived(object _, BaseDataEventArgs eventData) { lock (_lock) { _dataAggregator.Update(eventData.BaseData); } } /// /// Wraps the subscription delegate to attach symbol-specific handlers and track active Level 1 services. /// /// The symbols to subscribe. /// The tick type to subscribe for. /// The original subscription logic delegate. /// True if the subscription was successful; otherwise, false. private bool SubscribeCallbackWrapper(IEnumerable symbols, TickType tickType, Func, TickType, bool> subscribeCallback) { if (subscribeCallback(symbols, tickType)) { foreach (var symbol in symbols) { _levelOneServiceBySymbol[symbol] = new(symbol); _levelOneServiceBySymbol[symbol].BaseDataReceived += BaseDataReceived; } return true; } Log.Error($"{nameof(LevelOneServiceManager)}.{nameof(SubscribeCallbackWrapper)}: Failed for symbols: {string.Join(", ", symbols.Select(s => s.Value))}"); return false; } /// /// Wraps the unsubscription delegate to detach symbol-specific handlers and remove Level 1 service tracking. /// /// The symbols to unsubscribe. /// The tick type to unsubscribe from. /// The original unsubscription logic delegate. /// True if the unsubscription was successful; otherwise, false. private bool UnsubscribeCallbackWrapper(IEnumerable symbols, TickType tickType, Func, TickType, bool> unsubscribeCallback) { if (unsubscribeCallback(symbols, tickType)) { foreach (var symbol in symbols) { if (_levelOneServiceBySymbol.TryRemove(symbol, out var levelOneService)) { levelOneService.BaseDataReceived -= BaseDataReceived; } } return true; } Log.Error($"{nameof(LevelOneServiceManager)}.{nameof(UnsubscribeCallbackWrapper)}: Failed for symbols: {string.Join(", ", symbols.Select(s => s.Value))}"); return true; } /// /// Attempts to retrieve the instance associated with the specified symbol. /// /// The symbol whose market data instance is to be retrieved. /// /// When this method returns, contains the instance associated with the symbol, /// if the symbol is found; otherwise, null. /// /// /// true if the symbol is found and the associated market data instance is returned; /// otherwise, false. Logs an error if the symbol is not found. /// private bool TryGetLevelOneMarketData(Symbol symbol, out LevelOneMarketData levelOneMarketData) { if (_levelOneServiceBySymbol.TryGetValue(symbol, out levelOneMarketData)) { return true; } Log.Error($"{nameof(LevelOneServiceManager)}.{nameof(HandleLastTrade)}: Symbol {symbol} not found in {nameof(_levelOneServiceBySymbol)}. This could indicate an unexpected symbol or a missing initialization step."); return false; } /// /// Releases all resources used by the . /// public void Dispose() { _subscriptionManager.Dispose(); } } }