/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Net; using System.Net.Sockets; using Newtonsoft.Json; using QuantConnect.Configuration; using QuantConnect.Interfaces; using QuantConnect.Logging; using QuantConnect.Notifications; using QuantConnect.Packets; using NetMQ; using NetMQ.Sockets; using QuantConnect.Orders.Serialization; namespace QuantConnect.Messaging { /// /// Message handler that sends messages over tcp using NetMQ. /// public class StreamingMessageHandler : IMessagingHandler { private string _port; private PushSocket _server; private AlgorithmNodePacket _job; private OrderEventJsonConverter _orderEventJsonConverter; /// /// Gets or sets whether this messaging handler has any current subscribers. /// This is not used in this message handler. Messages are sent via tcp as they arrive /// public bool HasSubscribers { get; set; } /// /// Initialize the messaging system /// /// The parameters required for initialization public void Initialize(MessagingHandlerInitializeParameters initializeParameters) { _port = Config.Get("desktop-http-port"); CheckPort(); _server = new PushSocket("@tcp://*:" + _port); } /// /// Set the user communication channel /// /// public void SetAuthentication(AlgorithmNodePacket job) { _job = job; _orderEventJsonConverter = new OrderEventJsonConverter(job.AlgorithmId); Transmit(_job); } /// /// Send any notification with a base type of Notification. /// /// The notification to be sent. public void SendNotification(Notification notification) { if (!notification.CanSend()) { Log.Error("Messaging.SendNotification(): Send not implemented for notification of type: " + notification.GetType().Name); return; } notification.Send(); } /// /// Send all types of packets /// public void Send(Packet packet) { Transmit(packet); } /// /// Send a message to the _server using ZeroMQ /// /// Packet to transmit public void Transmit(Packet packet) { var payload = JsonConvert.SerializeObject(packet, _orderEventJsonConverter); var message = new NetMQMessage(); message.Append(payload); _server.SendMultipartMessage(message); } /// /// Check if port to be used by the desktop application is available. /// private void CheckPort() { try { TcpListener tcpListener = new TcpListener(IPAddress.Any, _port.ToInt32()); tcpListener.Start(); tcpListener.Stop(); } catch { throw new Exception("The port configured in config.json is either being used or blocked by a firewall." + "Please choose a new port or open the port in the firewall."); } } /// /// Dispose any resources used before destruction /// public void Dispose() { } } }