/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Threading;
using QuantConnect.Logging;
namespace QuantConnect.Util
{
///
/// This worker tread is required to guarantee all python operations are
/// executed by the same thread, to enable complete debugging functionality.
/// We don't use the main thread, to avoid any chance of blocking the process
///
public class WorkerThread : IDisposable
{
private readonly BlockingCollection _blockingCollection;
private readonly CancellationTokenSource _threadCancellationTokenSource;
private readonly Thread _workerThread;
///
/// The worker thread instance
///
public static WorkerThread Instance = new WorkerThread();
///
/// Will be set when the worker thread finishes a work item
///
public AutoResetEvent FinishedWorkItem { get; }
///
/// Creates a new instance, which internally launches a new worker thread
///
///
protected WorkerThread()
{
_threadCancellationTokenSource = new CancellationTokenSource();
FinishedWorkItem = new AutoResetEvent(false);
_blockingCollection = new BlockingCollection();
_workerThread = new Thread(() =>
{
try
{
foreach (var action in _blockingCollection.GetConsumingEnumerable(_threadCancellationTokenSource.Token))
{
FinishedWorkItem.Reset();
try
{
action();
}
catch (Exception exception)
{
Log.Error(exception, "WorkerThread(): exception thrown when running task");
}
FinishedWorkItem.Set();
}
}
catch (OperationCanceledException)
{
// pass, when the token gets cancelled
}
})
{
IsBackground = true,
Name = "Isolator Thread",
Priority = ThreadPriority.Highest
};
_workerThread.Start();
}
///
/// Adds a new item of work
///
/// The work item to add
public void Add(Action action)
{
_blockingCollection.Add(action);
}
///
/// Disposes the worker thread.
///
/// Note that the worker thread is a background thread,
/// so it won't block the process from terminating even if not disposed
public virtual void Dispose()
{
try
{
_blockingCollection.CompleteAdding();
_workerThread.StopSafely(TimeSpan.FromMilliseconds(50), _threadCancellationTokenSource);
_threadCancellationTokenSource.DisposeSafely();
}
catch (Exception exception)
{
Log.Error(exception);
}
}
}
}