/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using QuantConnect.Data;
using QuantConnect.Util;
using QuantConnect.Logging;
using QuantConnect.Packets;
using QuantConnect.Securities;
using QuantConnect.Interfaces;
using QuantConnect.Data.Market;
using System.Collections.Generic;
using Timer = System.Timers.Timer;
using QuantConnect.Lean.Engine.HistoricalData;
namespace QuantConnect.Lean.Engine.DataFeeds.Queues
{
///
/// This is an implementation of used for testing.
///
public class FakeDataQueue : IDataQueueHandler, IDataQueueUniverseProvider
{
private int _count;
private readonly Random _random = new Random();
private int _dataPointsPerSecondPerSymbol;
private readonly Timer _timer;
private readonly IOptionChainProvider _optionChainProvider;
private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager;
private readonly IDataAggregator _aggregator;
private readonly MarketHoursDatabase _marketHoursDatabase;
private readonly Dictionary _symbolExchangeTimeZones;
///
/// Continuous UTC time provider
///
protected virtual ITimeProvider TimeProvider { get; } = RealTimeProvider.Instance;
///
/// Initializes a new instance of the class to randomly emit data for each symbol
///
public FakeDataQueue()
: this(Composer.Instance.GetExportedValueByTypeName(nameof(AggregationManager)))
{
}
///
/// Initializes a new instance of the class to randomly emit data for each symbol
///
public FakeDataQueue(IDataAggregator dataAggregator, int dataPointsPerSecondPerSymbol = 500000)
{
_aggregator = dataAggregator;
_dataPointsPerSecondPerSymbol = dataPointsPerSecondPerSymbol;
var mapFileProvider = Composer.Instance.GetPart();
var historyManager = (IHistoryProvider)Composer.Instance.GetPart();
if (historyManager == null)
{
historyManager = Composer.Instance.GetPart();
}
var optionChainProvider = new LiveOptionChainProvider();
optionChainProvider.Initialize(new(mapFileProvider, historyManager));
_optionChainProvider = optionChainProvider;
_marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
_symbolExchangeTimeZones = new Dictionary();
_subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager();
_subscriptionManager.SubscribeImpl += (s, t) => true;
_subscriptionManager.UnsubscribeImpl += (s, t) => true;
_timer = new Timer
{
AutoReset = false,
Enabled = true,
Interval = 1000,
};
var lastCount = 0;
var lastTime = DateTime.UtcNow;
_timer.Elapsed += (sender, args) =>
{
var elapsed = (DateTime.UtcNow - lastTime);
var ticksPerSecond = (_count - lastCount)/elapsed.TotalSeconds;
Log.Trace("TICKS PER SECOND:: " + ticksPerSecond.ToStringInvariant("000000.0") + " ITEMS IN QUEUE:: " + 0);
lastCount = _count;
lastTime = DateTime.UtcNow;
PopulateQueue();
try
{
_timer.Reset();
}
catch (ObjectDisposedException)
{
// pass
}
};
}
///
/// Subscribe to the specified configuration
///
/// defines the parameters to subscribe to a data feed
/// handler to be fired on new data available
/// The new enumerator for this subscription request
public IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
{
var enumerator = _aggregator.Add(dataConfig, newDataAvailableHandler);
_subscriptionManager.Subscribe(dataConfig);
return enumerator;
}
///
/// Sets the job we're subscribing for
///
/// Job we're subscribing for
public void SetJob(LiveNodePacket job)
{
}
///
/// Removes the specified configuration
///
/// Subscription config to be removed
public void Unsubscribe(SubscriptionDataConfig dataConfig)
{
_subscriptionManager.Unsubscribe(dataConfig);
_aggregator.Remove(dataConfig);
}
///
/// Returns whether the data provider is connected
///
/// true if the data provider is connected
public bool IsConnected => true;
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
_timer.Stop();
_timer.DisposeSafely();
}
///
/// Pumps a bunch of ticks into the queue
///
private void PopulateQueue()
{
var symbols = _subscriptionManager.GetSubscribedSymbols();
foreach (var symbol in symbols)
{
if (symbol.IsCanonical() || symbol.Contains("UNIVERSE"))
{
continue;
}
var offsetProvider = GetTimeZoneOffsetProvider(symbol);
var trades = SubscriptionManager.DefaultDataTypes()[symbol.SecurityType].Contains(TickType.Trade);
var quotes = SubscriptionManager.DefaultDataTypes()[symbol.SecurityType].Contains(TickType.Quote);
// emits 500k per second
for (var i = 0; i < _dataPointsPerSecondPerSymbol; i++)
{
var now = TimeProvider.GetUtcNow();
var exchangeTime = offsetProvider.ConvertFromUtc(now);
var lastTrade = 100 + (decimal)Math.Abs(Math.Sin(now.TimeOfDay.TotalMilliseconds));
if (trades)
{
_count++;
_aggregator.Update(new Tick
{
Time = exchangeTime,
Symbol = symbol,
Value = lastTrade,
TickType = TickType.Trade,
Quantity = _random.Next(10, (int)_timer.Interval)
});
}
if (quotes)
{
_count++;
var bidPrice = lastTrade * 0.95m;
var askPrice = lastTrade * 1.05m;
var bidSize = _random.Next(10, (int) _timer.Interval);
var askSize = _random.Next(10, (int)_timer.Interval);
_aggregator.Update(new Tick(exchangeTime, symbol, "", "", bidSize: bidSize, bidPrice: bidPrice, askPrice: askPrice, askSize: askSize));
}
}
}
}
private TimeZoneOffsetProvider GetTimeZoneOffsetProvider(Symbol symbol)
{
TimeZoneOffsetProvider offsetProvider;
if (!_symbolExchangeTimeZones.TryGetValue(symbol, out offsetProvider))
{
// read the exchange time zone from market-hours-database
var exchangeTimeZone = _marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType).TimeZone;
_symbolExchangeTimeZones[symbol] = offsetProvider = new TimeZoneOffsetProvider(exchangeTimeZone, TimeProvider.GetUtcNow(), Time.EndOfTime);
}
return offsetProvider;
}
///
/// Method returns a collection of Symbols that are available at the data source.
///
/// Symbol to lookup
/// Include expired contracts
/// Expected security currency(if any)
/// Enumerable of Symbols, that are associated with the provided Symbol
public IEnumerable LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null)
{
switch (symbol.SecurityType)
{
case SecurityType.Option:
case SecurityType.IndexOption:
case SecurityType.FutureOption:
foreach (var result in _optionChainProvider.GetOptionContractList(symbol, DateTime.UtcNow.Date))
{
yield return result;
}
break;
default:
break;
}
}
///
/// Checks if the FakeDataQueue can perform selection
///
public bool CanPerformSelection()
{
return true;
}
}
}