using RabbitMQ.Client; using Newtonsoft.Json; using RabbitMQ.Client.Events; using ChatRoomContract; using System.Text; using MessagePack; using System.Diagnostics; using NLog.LayoutRenderers.Wrappers; using NLog; namespace ChatRoom; internal class ChatRoomRabbitMQService { /// /// Logger for this class. /// private Logger log = LogManager.GetCurrentClassLogger(); /// /// Communications channel to RabbitMQ message broker. /// private IModel rmqChannel; /// /// Service logic. /// private ChatRoomLogic logic; public ChatRoomRabbitMQService(ChatRoomLogic logic, IConnection connection, string exchangeName, string serverQueueName) { this.logic = logic; //get channel, configure exchanges and request queue rmqChannel = connection.CreateModel(); rmqChannel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); rmqChannel.QueueDeclare(queue: serverQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); rmqChannel.QueueBind(queue: serverQueueName, exchange: exchangeName, routingKey: serverQueueName, arguments: null); //connect to the queue as consumer //XXX: see https://www.rabbitmq.com/dotnet-api-guide.html#concurrency for threading issues var rmqConsumer = new EventingBasicConsumer(rmqChannel); rmqConsumer.Received += (consumer, delivery) => OnMessageReceived(((EventingBasicConsumer)consumer).Model, delivery); rmqChannel.BasicConsume(queue: serverQueueName, autoAck: true, consumer: rmqConsumer); } /// /// Is invoked to process messages received. /// /// Related communications channel. /// Message deliver data. private void OnMessageReceived(IModel channel, BasicDeliverEventArgs msgIn) { try { var msg = MessagePackSerializer.Deserialize(msgIn.Body); Debug.Assert(msg != null); if (msg.isResponse == true) { return; } bool hasResponse = false; byte[]? response = null; switch (msg.method) { case nameof(IChatRoomService.RegisterClient): { var name = MessagePackSerializer.Deserialize(msg.args); var clientId = logic.RegisterClient(name); response = MessagePackSerializer.Serialize(clientId); hasResponse = true; break; } case nameof(IChatRoomService.GetStrikes): { var clientId = MessagePackSerializer.Deserialize(msg.args); var strikes = logic.GetStrikes(clientId); response = MessagePackSerializer.Serialize(strikes); hasResponse = true; break; } case nameof(IChatRoomService.GetBlockedUntil): { var clientId = MessagePackSerializer.Deserialize(msg.args); var blockedUntil = logic.GetBlockedUntil(clientId); response = MessagePackSerializer.Serialize(blockedUntil); hasResponse = true; break; } case nameof(IChatRoomService.SendMessage): { var args = MessagePackSerializer.Deserialize(msg.args); var success = logic.SendMessage(args.clientId, args.contents, args.needsToBeCensored); response = MessagePackSerializer.Serialize(success); hasResponse = true; break; } case nameof(IChatRoomService.GetNewMessage): { var newMessage = logic.GetNewMessage(); response = MessagePackSerializer.Serialize(newMessage); hasResponse = true; break; } case nameof(IChatRoomService.RejectMessage): { var messageId = MessagePackSerializer.Deserialize(msg.args); logic.ApproveMessage(messageId); break; } case nameof(IChatRoomService.ApproveMessage): { var messageId = MessagePackSerializer.Deserialize(msg.args); logic.ApproveMessage(messageId); break; } default: { throw new Exception("Unknown RPC method"); } } if (hasResponse) { var responseMsg = new RPCMessage { isResponse = true, method = msg.method, args = response }; var properties = channel.CreateBasicProperties(); properties.CorrelationId = msgIn.BasicProperties.CorrelationId; channel.BasicPublish( exchange: msgIn.Exchange, routingKey: msgIn.BasicProperties.ReplyTo, basicProperties: properties, body: MessagePackSerializer.Serialize(responseMsg) ); } } catch (Exception e) { log.Error(e, "Unhandled exception caught when processing a message. The message is now lost."); } } }