Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion rabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pkg/errors"
"github.com/relistan/go-director"
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
Expand All @@ -33,6 +34,14 @@ const (
Producer Mode = 2
)

var (
// Used for identifying consumer
DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8]

// Used for identifying producer
DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8]
)

// IRabbit is the interface that the `rabbit` library implements. It's here as
// convenience.
type IRabbit interface {
Expand Down Expand Up @@ -116,6 +125,12 @@ type Options struct {

// Whether to automatically acknowledge consumed message(s)
AutoAck bool

// Used for identifying consumer
ConsumerTag string

// Used as a property to identify producer
AppID string
}

// ConsumeError will be passed down the error channel if/when `f()` func runs
Expand Down Expand Up @@ -193,6 +208,14 @@ func ValidateOptions(opts *Options) error {
opts.RetryReconnectSec = DefaultRetryReconnectSec
}

if opts.AppID == "" {
opts.AppID = DefaultAppID
}

if opts.ConsumerTag == "" {
opts.ConsumerTag = DefaultConsumerTag
}

validModes := []Mode{Both, Producer, Consumer}

var found bool
Expand Down Expand Up @@ -344,6 +367,7 @@ func (r *Rabbit) Publish(ctx context.Context, routingKey string, body []byte) er
if err := r.ProducerServerChannel.Publish(r.Options.ExchangeName, routingKey, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
Body: body,
AppId: r.Options.AppID,
}); err != nil {
return err
}
Expand Down Expand Up @@ -479,7 +503,7 @@ func (r *Rabbit) newConsumerChannel() error {

deliveryChannel, err := serverChannel.Consume(
r.Options.QueueName,
"",
r.Options.ConsumerTag,
r.Options.AutoAck,
r.Options.QueueExclusive,
false,
Expand Down
27 changes: 21 additions & 6 deletions rabbit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ var _ = Describe("Rabbit", func() {
for _, msg := range receivedMessages {
Expect(msg.Exchange).To(Equal(opts.ExchangeName))
Expect(msg.RoutingKey).To(Equal(opts.RoutingKey))
Expect(msg.ConsumerTag).To(Equal(opts.ConsumerTag))

data = append(data, string(msg.Body))
}
Expand Down Expand Up @@ -444,7 +445,7 @@ var _ = Describe("Rabbit", func() {
Describe("Publish", func() {
Context("happy path", func() {
It("correctly publishes message", func() {
var receivedMessage []byte
var receivedMessage *amqp.Delivery

go func() {
var err error
Expand All @@ -463,7 +464,8 @@ var _ = Describe("Rabbit", func() {
// Give our consumer some time to receive the message
time.Sleep(100 * time.Millisecond)

Expect(receivedMessage).To(Equal(testMessage))
Expect(receivedMessage.Body).To(Equal(testMessage))
Expect(receivedMessage.AppId).To(Equal(opts.AppID))
})

When("Mode is Consumer", func() {
Expand All @@ -486,7 +488,7 @@ var _ = Describe("Rabbit", func() {
It("will generate a new server channel", func() {
r.ProducerServerChannel = nil

var receivedMessage []byte
var receivedMessage *amqp.Delivery

go func() {
var err error
Expand All @@ -505,7 +507,7 @@ var _ = Describe("Rabbit", func() {
// Give our consumer some time to receive the message
time.Sleep(100 * time.Millisecond)

Expect(receivedMessage).To(Equal(testMessage))
Expect(receivedMessage.Body).To(Equal(testMessage))
})
})
})
Expand Down Expand Up @@ -655,6 +657,17 @@ var _ = Describe("Rabbit", func() {
Expect(err).ToNot(HaveOccurred())
Expect(opts.RetryReconnectSec).To(Equal(DefaultRetryReconnectSec))
})

It("sets AppID and ConsumerTag to default if unset", func() {
opts.AppID = ""
opts.ConsumerTag = ""

err := ValidateOptions(opts)

Expect(err).ToNot(HaveOccurred())
Expect(opts.ConsumerTag).To(ContainSubstring("c-rabbit-"))
Expect(opts.AppID).To(ContainSubstring("p-rabbit-"))
})
})
})
})
Expand All @@ -678,6 +691,8 @@ func generateOptions() *Options {
QueueDurable: false,
QueueExclusive: false,
QueueAutoDelete: true,
AppID: "rabbit-test-producer",
ConsumerTag: "rabbit-test-consumer",
}
}

Expand Down Expand Up @@ -718,7 +733,7 @@ func publishMessages(ch *amqp.Channel, opts *Options, messages []string) error {
return nil
}

func receiveMessage(ch *amqp.Channel, opts *Options) ([]byte, error) {
func receiveMessage(ch *amqp.Channel, opts *Options) (*amqp.Delivery, error) {
tmpQueueName := "rabbit-receiveMessages-" + uuid.NewV4().String()

if _, err := ch.QueueDeclare(
Expand All @@ -744,7 +759,7 @@ func receiveMessage(ch *amqp.Channel, opts *Options) ([]byte, error) {
select {
case m := <-deliveryChan:
logrus.Debug("Test: received message in receiveMessage()")
return m.Body, nil
return &m, nil
case <-time.After(5 * time.Second):
logrus.Debug("Test: timed out waiting for message in receiveMessage()")
return nil, errors.New("timed out")
Expand Down