/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Threading;
using QuantConnect.Util;
using QuantConnect.Data;
using QuantConnect.Logging;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using System.Collections.Concurrent;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Provides a means of distributing output from enumerators from a dedicated separate thread
///
public class BaseDataExchange
{
private Thread _thread;
private uint _sleepInterval = 1;
private Func _isFatalError;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _name;
private ManualResetEventSlim _manualResetEventSlim;
private ConcurrentDictionary _enumerators;
///
/// Gets or sets how long this thread will sleep when no data is available
///
public uint SleepInterval
{
get => _sleepInterval;
set
{
if (value == 0)
{
throw new ArgumentException("Sleep interval should be bigger than 0");
}
_sleepInterval = value;
}
}
///
/// Gets a name for this exchange
///
public string Name
{
get { return _name; }
}
///
/// Initializes a new instance of the
///
/// A name for this exchange
public BaseDataExchange(string name)
{
_name = name;
_isFatalError = x => false;
_cancellationTokenSource = new CancellationTokenSource();
_manualResetEventSlim = new ManualResetEventSlim(false);
_enumerators = new ConcurrentDictionary();
}
///
/// Adds the enumerator to this exchange. If it has already been added
/// then it will remain registered in the exchange only once
///
/// The handler to use when this symbol's data is encountered
public void AddEnumerator(EnumeratorHandler handler)
{
_enumerators[handler.Symbol] = handler;
_manualResetEventSlim.Set();
}
///
/// Adds the enumerator to this exchange. If it has already been added
/// then it will remain registered in the exchange only once
///
/// A unique symbol used to identify this enumerator
/// The enumerator to be added
/// Function used to determine if move next should be called on this
/// enumerator, defaults to always returning true
/// Delegate called when the enumerator move next returns false
/// Handler for data if HandlesData=true
public void AddEnumerator(Symbol symbol, IEnumerator enumerator, Func shouldMoveNext = null, Action enumeratorFinished = null, Action handleData = null)
{
var enumeratorHandler = new EnumeratorHandler(symbol, enumerator, shouldMoveNext, handleData);
if (enumeratorFinished != null)
{
enumeratorHandler.EnumeratorFinished += (sender, args) => enumeratorFinished(args);
}
AddEnumerator(enumeratorHandler);
}
///
/// Sets the specified function as the error handler. This function
/// returns true if it is a fatal error and queue consumption should
/// cease.
///
/// The error handling function to use when an
/// error is encountered during queue consumption. Returns true if queue
/// consumption should be stopped, returns false if queue consumption should
/// continue
public void SetErrorHandler(Func isFatalError)
{
// default to false;
_isFatalError = isFatalError ?? (x => false);
}
///
/// Removes and returns enumerator handler with the specified symbol.
/// The removed handler is returned, null if not found
///
public EnumeratorHandler RemoveEnumerator(Symbol symbol)
{
EnumeratorHandler handler;
if (_enumerators.TryRemove(symbol, out handler))
{
handler.OnEnumeratorFinished();
handler.Enumerator.Dispose();
}
return handler;
}
///
/// Begins consumption of the wrapped on
/// a separate thread
///
public void Start()
{
var manualEvent = new ManualResetEventSlim(false);
_thread = new Thread(() =>
{
manualEvent.Set();
Log.Trace($"BaseDataExchange({Name}) Starting...");
ConsumeEnumerators();
}) { IsBackground = true, Name = Name };
_thread.Start();
manualEvent.Wait();
manualEvent.DisposeSafely();
}
///
/// Ends consumption of the wrapped
///
public void Stop()
{
_thread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
}
/// Entry point for queue consumption
/// This function only returns after is called or the token is cancelled
private void ConsumeEnumerators()
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
// call move next each enumerator and invoke the appropriate handlers
_manualResetEventSlim.Reset();
var handled = false;
foreach (var kvp in _enumerators)
{
if (_cancellationTokenSource.Token.IsCancellationRequested)
{
Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Exiting...");
return;
}
var enumeratorHandler = kvp.Value;
var enumerator = enumeratorHandler.Enumerator;
// check to see if we should advance this enumerator
if (!enumeratorHandler.ShouldMoveNext()) continue;
if (!enumerator.MoveNext())
{
enumeratorHandler.OnEnumeratorFinished();
enumeratorHandler.Enumerator.Dispose();
_enumerators.TryRemove(enumeratorHandler.Symbol, out enumeratorHandler);
continue;
}
if (enumerator.Current == null) continue;
handled = true;
enumeratorHandler.HandleData(enumerator.Current);
}
if (!handled)
{
// if we didn't handle anything on this past iteration, take a nap
// wait until we timeout, we are cancelled or there is a new enumerator added
_manualResetEventSlim.Wait(Time.GetSecondUnevenWait((int)_sleepInterval), _cancellationTokenSource.Token);
}
}
catch (OperationCanceledException)
{
// thrown by the event watcher
}
catch (Exception err)
{
Log.Error(err);
if (_isFatalError(err))
{
Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Fatal error encountered. Exiting...");
return;
}
}
}
Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Exiting...");
}
///
/// Handler used to manage a single enumerator's move next/end of stream behavior
///
public class EnumeratorHandler
{
private readonly Func _shouldMoveNext;
private readonly Action _handleData;
///
/// Event fired when MoveNext returns false
///
public event EventHandler EnumeratorFinished;
///
/// A unique symbol used to identify this enumerator
///
public Symbol Symbol { get; init; }
///
/// The enumerator this handler handles
///
public IEnumerator Enumerator { get; init; }
///
/// Initializes a new instance of the class
///
/// The symbol to identify this enumerator
/// The enumeator this handler handles
/// Predicate function used to determine if we should call move next
/// on the symbol's enumerator
/// Handler for data if HandlesData=true
public EnumeratorHandler(Symbol symbol, IEnumerator enumerator, Func shouldMoveNext = null, Action handleData = null)
{
Symbol = symbol;
Enumerator = enumerator;
_handleData = handleData;
_shouldMoveNext = shouldMoveNext ?? (() => true);
}
///
/// Event invocator for the event
///
public void OnEnumeratorFinished()
{
EnumeratorFinished?.Invoke(this, this);
}
///
/// Returns true if this enumerator should move next
///
public bool ShouldMoveNext()
{
return _shouldMoveNext();
}
///
/// Handles the specified data.
///
/// The data to be handled
public void HandleData(BaseData data)
{
_handleData?.Invoke(data);
}
}
}
}