/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace QuantConnect.Lean.Engine.DataFeeds.WorkScheduling
{
///
/// This singleton class will create a thread pool to processes work
/// that will be prioritized based on it's weight
///
/// The threads in the pool will take ownership of the
/// and not share it with another thread.
/// This is required because the data enumerator stack yields, which state
/// depends on the thread id
public class WeightedWorkScheduler : WorkScheduler
{
private static readonly Lazy _instance = new Lazy(() => new WeightedWorkScheduler());
///
/// This is the size of each work sprint
///
public const int WorkBatchSize = 50;
///
/// This is the maximum size a work item can weigh,
/// if reached, it will be ignored and not executed until its less
///
/// This is useful to limit RAM and CPU usage
public static int MaxWorkWeight;
private readonly ConcurrentQueue _newWork;
private readonly AutoResetEvent _newWorkEvent;
private Task _initializationTask;
private readonly List _workerQueues;
///
/// Singleton instance
///
public static WeightedWorkScheduler Instance => _instance.Value;
private WeightedWorkScheduler()
{
_newWork = new ConcurrentQueue();
_newWorkEvent = new AutoResetEvent(false);
_workerQueues = new List(WorkersCount);
_initializationTask = Task.Run(() =>
{
MaxWorkWeight = Configuration.Config.GetInt("data-feed-max-work-weight", 400);
Logging.Log.Trace($"WeightedWorkScheduler(): will use {WorkersCount} workers and MaxWorkWeight is {MaxWorkWeight}");
for (var i = 0; i < WorkersCount; i++)
{
var workQueue = new WeightedWorkQueue();
_workerQueues.Add(workQueue);
var thread = new Thread(() => workQueue.WorkerThread(_newWork, _newWorkEvent))
{
IsBackground = true,
Priority = workQueue.ThreadPriority,
Name = $"WeightedWorkThread{i}"
};
thread.Start();
}
});
}
///
/// Add a new work item to the queue
///
/// The symbol associated with this work
/// The work function to run
/// The weight function.
/// Work will be sorted in ascending order based on this weight
public override void QueueWork(Symbol symbol, Func workFunc, Func weightFunc)
{
_newWork.Enqueue(new WorkItem(workFunc, weightFunc));
_newWorkEvent.Set();
}
///
/// Execute the given action in all workers once
///
public void AddSingleCallForAll(Action action)
{
if (!_initializationTask.Wait(TimeSpan.FromSeconds(10)))
{
throw new TimeoutException("Timeout waiting for worker threads to start");
}
for (var i = 0; i < _workerQueues.Count; i++)
{
_workerQueues[i].AddSingleCall(action);
}
}
}
}