/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Linq;
using QuantConnect.Data;
using QuantConnect.Data.Market;
using QuantConnect.Data.UniverseSelection;
using QuantConnect.Interfaces;
using QuantConnect.Securities;
namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators.Factories
{
///
/// Provides an implementation of to emit
/// ticks based on , allowing universe
/// selection to fire at planned times.
///
public class TimeTriggeredUniverseSubscriptionEnumeratorFactory : ISubscriptionEnumeratorFactory
{
private readonly ITimeProvider _timeProvider;
private readonly ITimeTriggeredUniverse _universe;
private readonly MarketHoursDatabase _marketHoursDatabase;
///
/// Initializes a new instance of the class
///
/// The user defined universe
/// The market hours database
/// The time provider
public TimeTriggeredUniverseSubscriptionEnumeratorFactory(ITimeTriggeredUniverse universe, MarketHoursDatabase marketHoursDatabase, ITimeProvider timeProvider)
{
_universe = universe;
_timeProvider = timeProvider;
_marketHoursDatabase = marketHoursDatabase;
}
///
/// Creates an enumerator to read the specified request
///
/// The subscription request to be read
/// Provider used to get data when it is not present on disk
/// An enumerator reading the subscription request
public IEnumerator CreateEnumerator(SubscriptionRequest request, IDataProvider dataProvider)
{
var enumerator = (IEnumerator) _universe.GetTriggerTimes(request.StartTimeUtc, request.EndTimeUtc, _marketHoursDatabase)
.Select(x => new Tick { Time = x, Symbol = request.Configuration.Symbol })
.GetEnumerator();
var universe = request.Universe as UserDefinedUniverse;
if (universe != null)
{
enumerator = new InjectionEnumerator(enumerator);
// Trigger universe selection when security added/removed after Initialize
universe.CollectionChanged += (sender, args) =>
{
// If it is an add we will set time 1 tick ahead to properly sync data
// with next timeslice, avoid emitting now twice, if it is a remove then we will set time to now
// we do the same in the 'DataManager' when handling FF resolution changes
IList items;
DateTime time;
if (args.Action == NotifyCollectionChangedAction.Add)
{
items = args.NewItems;
time = _timeProvider.GetUtcNow().AddTicks(1);
}
else if (args.Action == NotifyCollectionChangedAction.Remove)
{
items = args.OldItems;
time = _timeProvider.GetUtcNow();
}
else
{
items = null;
time = DateTime.MinValue;
}
// Check that we have our items and time
if (items == null || time == DateTime.MinValue) return;
var symbol = items.OfType().FirstOrDefault();
if(symbol == null) return;
// the data point time should always be in exchange timezone
time = time.ConvertFromUtc(request.Configuration.ExchangeTimeZone);
var collection = new BaseDataCollection(time, symbol);
((InjectionEnumerator) enumerator).InjectDataPoint(collection);
};
}
return enumerator;
}
private class InjectionEnumerator : IEnumerator
{
private volatile bool _wasInjected;
private readonly IEnumerator _underlyingEnumerator;
public BaseData Current { get; private set; }
object IEnumerator.Current => Current;
public InjectionEnumerator(IEnumerator underlyingEnumerator)
{
_underlyingEnumerator = underlyingEnumerator;
}
public void InjectDataPoint(BaseData baseData)
{
// we use a lock because the main algorithm thread is the one injecting and the base exchange is the thread pulling MoveNext()
lock (_underlyingEnumerator)
{
_wasInjected = true;
Current = baseData;
}
}
public void Dispose()
{
_underlyingEnumerator.Dispose();
}
public bool MoveNext()
{
lock (_underlyingEnumerator)
{
if (_wasInjected)
{
_wasInjected = false;
return true;
}
_underlyingEnumerator.MoveNext();
Current = _underlyingEnumerator.Current;
return true;
}
}
public void Reset()
{
_underlyingEnumerator.Reset();
}
}
}
}