/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using QuantConnect.Data;
using QuantConnect.Util;
using System.Collections;
using QuantConnect.Logging;
using QuantConnect.Interfaces;
using System.Collections.Generic;
namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators
{
///
/// Enumerator that will subscribe through the provided data queue handler and refresh the subscription if any mapping occurs
///
public class LiveSubscriptionEnumerator : IEnumerator
{
private BaseData _current;
private readonly Symbol _requestedSymbol;
private SubscriptionDataConfig _currentConfig;
private IEnumerator _previousEnumerator;
private IEnumerator _underlyingEnumerator;
///
/// The current data object instance
///
public BaseData Current => _current;
///
/// The current data object instance
///
object IEnumerator.Current => Current;
///
/// Creates a new instance
///
public LiveSubscriptionEnumerator(SubscriptionDataConfig dataConfig, IDataQueueHandler dataQueueHandler, EventHandler handler, Func isExpired)
{
_requestedSymbol = dataConfig.Symbol;
_underlyingEnumerator = dataQueueHandler.SubscribeWithMapping(dataConfig, handler, isExpired, out _currentConfig);
// for any mapping event we will re subscribe
dataConfig.NewSymbol += (_, _) =>
{
dataQueueHandler.Unsubscribe(_currentConfig);
_previousEnumerator = _underlyingEnumerator;
var oldSymbol = _currentConfig.Symbol;
_underlyingEnumerator = dataQueueHandler.SubscribeWithMapping(dataConfig, handler, isExpired, out _currentConfig);
Log.Trace($"LiveSubscriptionEnumerator({_requestedSymbol}): " +
$"resubscribing old: '{oldSymbol.Value}' new '{_currentConfig.Symbol.Value}'");
};
}
///
/// Advances the enumerator to the next element.
///
public bool MoveNext()
{
if (_previousEnumerator != null)
{
// if previous is set we dispose of it here since we are the consumers of it
_previousEnumerator.DisposeSafely();
_previousEnumerator = null;
}
var result = _underlyingEnumerator.MoveNext();
if (result)
{
_current = _underlyingEnumerator.Current;
}
else
{
_current = null;
}
if (_current != null && _current.Symbol != _requestedSymbol)
{
// if we've done some mapping at this layer let's clone the underlying and set the requested symbol,
// don't trust the IDQH implementations for data uniqueness, since the configuration could be shared
_current = _current.Clone();
_current.Symbol = _requestedSymbol;
}
return result;
}
///
/// Reset the IEnumeration
///
public void Reset()
{
_underlyingEnumerator.Reset();
}
///
/// Disposes of the used enumerators
///
public void Dispose()
{
_previousEnumerator.DisposeSafely();
_underlyingEnumerator.DisposeSafely();
}
}
}