/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Threading; using QuantConnect.Util; using QuantConnect.Data; using QuantConnect.Logging; using QuantConnect.Interfaces; using System.Collections.Generic; using System.Collections.Concurrent; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// Provides a means of distributing output from enumerators from a dedicated separate thread /// public class BaseDataExchange { private Thread _thread; private uint _sleepInterval = 1; private Func _isFatalError; private readonly CancellationTokenSource _cancellationTokenSource; private readonly string _name; private ManualResetEventSlim _manualResetEventSlim; private ConcurrentDictionary _enumerators; /// /// Gets or sets how long this thread will sleep when no data is available /// public uint SleepInterval { get => _sleepInterval; set { if (value == 0) { throw new ArgumentException("Sleep interval should be bigger than 0"); } _sleepInterval = value; } } /// /// Gets a name for this exchange /// public string Name { get { return _name; } } /// /// Initializes a new instance of the /// /// A name for this exchange public BaseDataExchange(string name) { _name = name; _isFatalError = x => false; _cancellationTokenSource = new CancellationTokenSource(); _manualResetEventSlim = new ManualResetEventSlim(false); _enumerators = new ConcurrentDictionary(); } /// /// Adds the enumerator to this exchange. If it has already been added /// then it will remain registered in the exchange only once /// /// The handler to use when this symbol's data is encountered public void AddEnumerator(EnumeratorHandler handler) { _enumerators[handler.Symbol] = handler; _manualResetEventSlim.Set(); } /// /// Adds the enumerator to this exchange. If it has already been added /// then it will remain registered in the exchange only once /// /// A unique symbol used to identify this enumerator /// The enumerator to be added /// Function used to determine if move next should be called on this /// enumerator, defaults to always returning true /// Delegate called when the enumerator move next returns false /// Handler for data if HandlesData=true public void AddEnumerator(Symbol symbol, IEnumerator enumerator, Func shouldMoveNext = null, Action enumeratorFinished = null, Action handleData = null) { var enumeratorHandler = new EnumeratorHandler(symbol, enumerator, shouldMoveNext, handleData); if (enumeratorFinished != null) { enumeratorHandler.EnumeratorFinished += (sender, args) => enumeratorFinished(args); } AddEnumerator(enumeratorHandler); } /// /// Sets the specified function as the error handler. This function /// returns true if it is a fatal error and queue consumption should /// cease. /// /// The error handling function to use when an /// error is encountered during queue consumption. Returns true if queue /// consumption should be stopped, returns false if queue consumption should /// continue public void SetErrorHandler(Func isFatalError) { // default to false; _isFatalError = isFatalError ?? (x => false); } /// /// Removes and returns enumerator handler with the specified symbol. /// The removed handler is returned, null if not found /// public EnumeratorHandler RemoveEnumerator(Symbol symbol) { EnumeratorHandler handler; if (_enumerators.TryRemove(symbol, out handler)) { handler.OnEnumeratorFinished(); handler.Enumerator.Dispose(); } return handler; } /// /// Begins consumption of the wrapped on /// a separate thread /// public void Start() { var manualEvent = new ManualResetEventSlim(false); _thread = new Thread(() => { manualEvent.Set(); Log.Trace($"BaseDataExchange({Name}) Starting..."); ConsumeEnumerators(); }) { IsBackground = true, Name = Name }; _thread.Start(); manualEvent.Wait(); manualEvent.DisposeSafely(); } /// /// Ends consumption of the wrapped /// public void Stop() { _thread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource); } /// Entry point for queue consumption /// This function only returns after is called or the token is cancelled private void ConsumeEnumerators() { while (!_cancellationTokenSource.Token.IsCancellationRequested) { try { // call move next each enumerator and invoke the appropriate handlers _manualResetEventSlim.Reset(); var handled = false; foreach (var kvp in _enumerators) { if (_cancellationTokenSource.Token.IsCancellationRequested) { Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Exiting..."); return; } var enumeratorHandler = kvp.Value; var enumerator = enumeratorHandler.Enumerator; // check to see if we should advance this enumerator if (!enumeratorHandler.ShouldMoveNext()) continue; if (!enumerator.MoveNext()) { enumeratorHandler.OnEnumeratorFinished(); enumeratorHandler.Enumerator.Dispose(); _enumerators.TryRemove(enumeratorHandler.Symbol, out enumeratorHandler); continue; } if (enumerator.Current == null) continue; handled = true; enumeratorHandler.HandleData(enumerator.Current); } if (!handled) { // if we didn't handle anything on this past iteration, take a nap // wait until we timeout, we are cancelled or there is a new enumerator added _manualResetEventSlim.Wait(Time.GetSecondUnevenWait((int)_sleepInterval), _cancellationTokenSource.Token); } } catch (OperationCanceledException) { // thrown by the event watcher } catch (Exception err) { Log.Error(err); if (_isFatalError(err)) { Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Fatal error encountered. Exiting..."); return; } } } Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Exiting..."); } /// /// Handler used to manage a single enumerator's move next/end of stream behavior /// public class EnumeratorHandler { private readonly Func _shouldMoveNext; private readonly Action _handleData; /// /// Event fired when MoveNext returns false /// public event EventHandler EnumeratorFinished; /// /// A unique symbol used to identify this enumerator /// public Symbol Symbol { get; init; } /// /// The enumerator this handler handles /// public IEnumerator Enumerator { get; init; } /// /// Initializes a new instance of the class /// /// The symbol to identify this enumerator /// The enumeator this handler handles /// Predicate function used to determine if we should call move next /// on the symbol's enumerator /// Handler for data if HandlesData=true public EnumeratorHandler(Symbol symbol, IEnumerator enumerator, Func shouldMoveNext = null, Action handleData = null) { Symbol = symbol; Enumerator = enumerator; _handleData = handleData; _shouldMoveNext = shouldMoveNext ?? (() => true); } /// /// Event invocator for the event /// public void OnEnumeratorFinished() { EnumeratorFinished?.Invoke(this, this); } /// /// Returns true if this enumerator should move next /// public bool ShouldMoveNext() { return _shouldMoveNext(); } /// /// Handles the specified data. /// /// The data to be handled public void HandleData(BaseData data) { _handleData?.Invoke(data); } } } }