diff --git a/rabbit.go b/rabbit.go index ae01d3b..7aa389d 100644 --- a/rabbit.go +++ b/rabbit.go @@ -20,6 +20,7 @@ import ( "github.com/pkg/errors" "github.com/relistan/go-director" + uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) @@ -33,6 +34,14 @@ const ( Producer Mode = 2 ) +var ( + // Used for identifying consumer + DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8] + + // Used for identifying producer + DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8] +) + // IRabbit is the interface that the `rabbit` library implements. It's here as // convenience. type IRabbit interface { @@ -116,6 +125,12 @@ type Options struct { // Whether to automatically acknowledge consumed message(s) AutoAck bool + + // Used for identifying consumer + ConsumerTag string + + // Used as a property to identify producer + AppID string } // ConsumeError will be passed down the error channel if/when `f()` func runs @@ -193,6 +208,14 @@ func ValidateOptions(opts *Options) error { opts.RetryReconnectSec = DefaultRetryReconnectSec } + if opts.AppID == "" { + opts.AppID = DefaultAppID + } + + if opts.ConsumerTag == "" { + opts.ConsumerTag = DefaultConsumerTag + } + validModes := []Mode{Both, Producer, Consumer} var found bool @@ -344,6 +367,7 @@ func (r *Rabbit) Publish(ctx context.Context, routingKey string, body []byte) er if err := r.ProducerServerChannel.Publish(r.Options.ExchangeName, routingKey, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, Body: body, + AppId: r.Options.AppID, }); err != nil { return err } @@ -479,7 +503,7 @@ func (r *Rabbit) newConsumerChannel() error { deliveryChannel, err := serverChannel.Consume( r.Options.QueueName, - "", + r.Options.ConsumerTag, r.Options.AutoAck, r.Options.QueueExclusive, false, diff --git a/rabbit_test.go b/rabbit_test.go index ddcdae7..e089288 100644 --- a/rabbit_test.go +++ b/rabbit_test.go @@ -191,6 +191,7 @@ var _ = Describe("Rabbit", func() { for _, msg := range receivedMessages { Expect(msg.Exchange).To(Equal(opts.ExchangeName)) Expect(msg.RoutingKey).To(Equal(opts.RoutingKey)) + Expect(msg.ConsumerTag).To(Equal(opts.ConsumerTag)) data = append(data, string(msg.Body)) } @@ -444,7 +445,7 @@ var _ = Describe("Rabbit", func() { Describe("Publish", func() { Context("happy path", func() { It("correctly publishes message", func() { - var receivedMessage []byte + var receivedMessage *amqp.Delivery go func() { var err error @@ -463,7 +464,8 @@ var _ = Describe("Rabbit", func() { // Give our consumer some time to receive the message time.Sleep(100 * time.Millisecond) - Expect(receivedMessage).To(Equal(testMessage)) + Expect(receivedMessage.Body).To(Equal(testMessage)) + Expect(receivedMessage.AppId).To(Equal(opts.AppID)) }) When("Mode is Consumer", func() { @@ -486,7 +488,7 @@ var _ = Describe("Rabbit", func() { It("will generate a new server channel", func() { r.ProducerServerChannel = nil - var receivedMessage []byte + var receivedMessage *amqp.Delivery go func() { var err error @@ -505,7 +507,7 @@ var _ = Describe("Rabbit", func() { // Give our consumer some time to receive the message time.Sleep(100 * time.Millisecond) - Expect(receivedMessage).To(Equal(testMessage)) + Expect(receivedMessage.Body).To(Equal(testMessage)) }) }) }) @@ -655,6 +657,17 @@ var _ = Describe("Rabbit", func() { Expect(err).ToNot(HaveOccurred()) Expect(opts.RetryReconnectSec).To(Equal(DefaultRetryReconnectSec)) }) + + It("sets AppID and ConsumerTag to default if unset", func() { + opts.AppID = "" + opts.ConsumerTag = "" + + err := ValidateOptions(opts) + + Expect(err).ToNot(HaveOccurred()) + Expect(opts.ConsumerTag).To(ContainSubstring("c-rabbit-")) + Expect(opts.AppID).To(ContainSubstring("p-rabbit-")) + }) }) }) }) @@ -678,6 +691,8 @@ func generateOptions() *Options { QueueDurable: false, QueueExclusive: false, QueueAutoDelete: true, + AppID: "rabbit-test-producer", + ConsumerTag: "rabbit-test-consumer", } } @@ -718,7 +733,7 @@ func publishMessages(ch *amqp.Channel, opts *Options, messages []string) error { return nil } -func receiveMessage(ch *amqp.Channel, opts *Options) ([]byte, error) { +func receiveMessage(ch *amqp.Channel, opts *Options) (*amqp.Delivery, error) { tmpQueueName := "rabbit-receiveMessages-" + uuid.NewV4().String() if _, err := ch.QueueDeclare( @@ -744,7 +759,7 @@ func receiveMessage(ch *amqp.Channel, opts *Options) ([]byte, error) { select { case m := <-deliveryChan: logrus.Debug("Test: received message in receiveMessage()") - return m.Body, nil + return &m, nil case <-time.After(5 * time.Second): logrus.Debug("Test: timed out waiting for message in receiveMessage()") return nil, errors.New("timed out")