/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using QuantConnect.Data; using QuantConnect.Interfaces; using QuantConnect.Lean.Engine.DataFeeds; using QuantConnect.Logging; using QuantConnect.Packets; using QuantConnect.Tests.Common.Data; using QuantConnect.Util; namespace QuantConnect.Tests.Engine.DataFeeds { /// /// Provides an implementation of that can be specified /// via a function /// public class FuncDataQueueHandler : IDataQueueHandler { private readonly HashSet _subscriptions; private readonly CancellationTokenSource _cancellationTokenSource; private readonly AggregationManager _aggregationManager; private readonly DataQueueHandlerSubscriptionManager _subscriptionManager; /// /// Gets the subscriptions configurations currently being managed by the queue handler /// public List SubscriptionDataConfigs { get { lock (_subscriptions) return _subscriptions.ToList(); } } /// /// Gets the subscriptions Symbols currently being managed by the queue handler /// public List Subscriptions => _subscriptionManager.GetSubscribedSymbols().ToList(); /// /// Returns whether the data provider is connected /// /// true if the data provider is connected public bool IsConnected => true; /// /// Initializes a new instance of the class /// /// The functional implementation to get ticks function /// The time provider to use public FuncDataQueueHandler(Func> getNextTicksFunction, ITimeProvider timeProvider, IAlgorithmSettings algorithmSettings) { _subscriptions = new HashSet(); _cancellationTokenSource = new CancellationTokenSource(); _aggregationManager = new TestAggregationManager(timeProvider); _aggregationManager.Initialize(new DataAggregatorInitializeParameters() { AlgorithmSettings = algorithmSettings }); _subscriptionManager = new FakeDataQueuehandlerSubscriptionManager((t) => "quote-trade"); Task.Factory.StartNew(() => { while (!_cancellationTokenSource.IsCancellationRequested) { var emitted = false; try { foreach (var baseData in getNextTicksFunction(this)) { if (_cancellationTokenSource.IsCancellationRequested) { break; } emitted = true; _aggregationManager.Update(baseData); } } catch (Exception exception) { if (exception is ObjectDisposedException) { return; } Log.Error(exception); } if (!emitted) { Thread.Sleep(50); } else { Thread.Sleep(10); } } }, TaskCreationOptions.LongRunning); } /// /// Sets the job we're subscribing for /// /// Job we're subscribing for public void SetJob(LiveNodePacket job) { } /// /// Adds the specified symbols to the subscription /// /// defines the parameters to subscribe to a data feed /// handler to be fired on new data available /// The new enumerator for this subscription request public IEnumerator Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler) { var enumerator = _aggregationManager.Add(dataConfig, newDataAvailableHandler); lock (_subscriptions) { _subscriptions.Add(dataConfig); _subscriptionManager.Subscribe(dataConfig); } return enumerator; } /// /// Removes the specified configuration /// /// The data config to remove public void Unsubscribe(SubscriptionDataConfig dataConfig) { lock (_subscriptions) { _subscriptions.Remove(dataConfig); _subscriptionManager.Subscribe(dataConfig); } _aggregationManager.Remove(dataConfig); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { if (!_cancellationTokenSource.IsCancellationRequested) { _cancellationTokenSource.Cancel(); } _aggregationManager.DisposeSafely(); _cancellationTokenSource.DisposeSafely(); } private class TestAggregationManager : AggregationManager { public TestAggregationManager(ITimeProvider timeProvider) { TimeProvider = timeProvider; } } } }