using MessagePack;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace ChatRoomContract;
///
/// RabbitMQ chat room client
///
public class ChatRoomClient : IChatRoomService
{
///
/// Exchange name
///
private string exchangeName;
///
/// Client queue name
///
private string clientQueueName;
///
/// Server queue name
///
private string serverQueueName;
///
/// RabbitMQ channel
///
private IModel rmqChannel;
///
/// Chat Room client constructor
///
/// RabbitMQ connection
/// Exchange name
/// Client queue name
/// Server queue name
public ChatRoomClient(IConnection connection, string exchangeName, string clientQueueName, string serverQueueName)
{
this.exchangeName = exchangeName;
this.clientQueueName = clientQueueName;
this.serverQueueName = serverQueueName;
rmqChannel = connection.CreateModel();
rmqChannel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
rmqChannel.QueueDeclare(queue: clientQueueName, durable: false, exclusive: true, autoDelete: false, arguments: null);
rmqChannel.QueueBind(queue: clientQueueName, exchange: exchangeName, routingKey: clientQueueName, arguments: null);
}
///
/// Send rpc message without waiting for a response
///
/// Method name
/// Serialized arguments
/// Optional correlation ID
private void CallVoid(string method, byte[] args, string? correlationId = null)
{
correlationId ??= Guid.NewGuid().ToString();
var requestProps = rmqChannel.CreateBasicProperties();
requestProps.CorrelationId = correlationId;
requestProps.ReplyTo = clientQueueName;
var msg = new RPCMessage{
isResponse = false,
method = method,
args = args
};
rmqChannel.BasicPublish(
exchange: exchangeName,
routingKey: serverQueueName,
basicProperties: requestProps,
body: MessagePackSerializer.Serialize(msg)
);
}
///
/// Send a rpc message and wait for a response
///
/// Result type
/// Method name
/// Serialized arguments
/// Result
private ResultType Call(string method, byte[]? args)
{
var correlationId = Guid.NewGuid().ToString();
var isResultReady = false;
var resultReadySignal = new AutoResetEvent(false);
ResultType result = default;
//ensure contents of variables set in main thread, are loadable by receiver thread
Thread.MemoryBarrier();
//create response message consumer
var consumer = new EventingBasicConsumer(rmqChannel);
consumer.Received +=
(channel, delivery) => {
//ensure contents of variables set in main thread are loaded into this thread
Thread.MemoryBarrier();
if (isResultReady)
{
return;
}
if (delivery.BasicProperties.CorrelationId != correlationId)
{
return;
}
var msg = MessagePackSerializer.Deserialize(delivery.Body);
if (msg.isResponse && msg.method == method)
{
if (msg.args != null)
{
result = MessagePackSerializer.Deserialize(msg.args);
}
//indicate result has been received, ensure it is loadable by main thread
isResultReady = true;
Thread.MemoryBarrier();
//signal main thread that result has been received
resultReadySignal.Set();
}
};
//attach message consumer to the response queue
var consumerTag = rmqChannel.BasicConsume(clientQueueName, true, consumer);
CallVoid(method, args, correlationId);
//wait for the result to be ready
resultReadySignal.WaitOne();
//ensure contents of variables set by the receiver are loaded into this thread
Thread.MemoryBarrier();
//detach message consumer from the response queue
rmqChannel.BasicCancel(consumerTag);
return result;
}
///
/// Approve a message
///
/// Message ID
public void ApproveMessage(int messageId)
{
CallVoid(
nameof(IChatRoomService.ApproveMessage),
MessagePackSerializer.Serialize(messageId)
);
}
///
/// Get timestamp until when the client is blocked
///
/// Client ID
/// Optional datetime object
public DateTime? GetBlockedUntil(int clientId)
{
return Call(
nameof(IChatRoomService.GetBlockedUntil),
MessagePackSerializer.Serialize(clientId)
);
}
///
/// Get the next message which hasn't been approved or rejected
///
/// Message object. Returns null if there is no message
public Message? GetNewMessage()
{
return Call(
nameof(IChatRoomService.GetNewMessage),
null
);
}
///
/// Get number of strikes a participant has
///
/// Client ID
/// Number of strikes
public int GetStrikes(int clientId)
{
return Call(
nameof(IChatRoomService.GetStrikes),
MessagePackSerializer.Serialize(clientId)
);
}
///
/// Register client with a name
///
/// Name of client, can be duplicate between clients
/// Client ID
public int RegisterClient(string name)
{
return Call(
nameof(IChatRoomService.RegisterClient),
MessagePackSerializer.Serialize(name)
);
}
///
/// Reject a message
///
/// Message ID
public void RejectMessage(int messageId)
{
CallVoid(
nameof(IChatRoomService.RejectMessage),
MessagePackSerializer.Serialize(messageId)
);
}
///
/// Send a message, will be given to a moderator to be approved
///
/// Client ID
/// Message contents
/// Does this message need to be censored?
/// Was sending successful, can fail if user is blocked
public bool SendMessage(int clientId, string contents, bool needsToBeCensored)
{
var args = new SendMessageArgs {
clientId = clientId,
contents = contents,
needsToBeCensored = needsToBeCensored
};
return Call(
nameof(IChatRoomService.SendMessage),
MessagePackSerializer.Serialize(args)
);
}
}