在这个例子中,我们将为 Golang 创建一个 RabbitMQ 包。然后我们将在我们的生产者/发布者和消费者/订阅者应用程序中使用它。该包带有下面列出的重要功能,所以你不需要担心自己处理这些问题。


package rabbitmq
import "time"
type ConnectionConfig struct {
	// Name is used to name the connection. It provides visual cues to which
	// connection belongs to which application in Management Plugin. Optional.
	Name string
	// Schema segment of the AMQP URI string. Required.
	Schema string
	// Username segment of the AMQP URI string. Optional.
	Username string
	// Password segment of the AMQP URI string. Optional.
	Password string
	// Host segment of the AMQP URI string. Required.
	Host string
	// Port segment of the AMQP URI string. Required.
	Port string
	// VHost segment of the AMQP URI string. Optional.
	VHost string
	// ReconAttempt is used to define maximum amount of reconnection attempts.
	// If set to `0` attempts will be infinite. Optional.
	ReconAttempt int
	// ReconInterval defines the equal intervals between each reconnection
	// attempts. Required.
	ReconInterval time.Duration
	// CACert represents Certificate Authority (CA) certificate. Optional.
	CACert []byte
	// ClientCert represents Client certificate. Optional.
	ClientCert []byte
	// ClientCert represents Client key. Optional.
	ClientKey []byte
package rabbitmq
// ExchangeConfig is used by both producer and consumer applications.
type ExchangeConfig struct {
	// Name defines the name of the exchange. Required.
	Name string
	// Type defines the type of the exchange. Required.
	Type string
package rabbitmq
// QueueConfig is dedicated to consumer applications, not the producers.
type QueueConfig struct {
	// Name defines the name of the queue. Required.
	Name string
	// Binding defines the relationship between an exchange and a queue.
	// Optional.
	// It is often used to refer to "routing key". Not required for the `fanout`
	// exchange types.
	Binding string
	// Exchange defines the name of the exchange that needs to be used for
	// message consumption. Required.
	// This must match producer config value.
	Exchange string
	// Mode defines what type of queue shall be used. Required.
	// This has an impact on the performance. Prefer `lazy` over `default`
	// unless you have a very reasonable case. Read reference below before
	// taking a decision.
	// Ref: https://www.rabbitmq.com/lazy-queues.html
	Mode string
	// DLX is dedicated to the "dead-lettered" messages and represents the name
	// of the exchange that was declared previously. Optional.
	// If the consumer does not/will never require a DLX feature, skip this
	// option. Late declaration of a DLX and programmatically using for an
	// existing queue is not possible. However, you can manually achieve this
	// which is not always wise as it is very error prone and tedious job.
	// If the consumer requires a DLX feature, setup an exchange and queue
	// beforehand then use its name here. A DLX does not require its own
	// consumer upfront. It can be delivered when you know how to handle
	// "dead-lettered" messages.
	DLX string
package rabbitmq
type LogLevel string
const (
	LevelDebug   LogLevel = "debug"
	LevelInfo             = "info"
	LevelWarning          = "warning"
	LevelError            = "error"
// Log is used to help select what level logs the application wants to log or
// ignore. Logs are streamed via the `Server.logChan` field which is an optional
// argument. It is provided when calling the `NewServer()` method.
type Log struct {
	Level   LogLevel
	Message string
package rabbitmq
type ProducerConfig struct {
	// Name is used to reserve channels per producers. Hence the reason, each
	// producer must have an unique name. Required.
	Name string
	// ExchangeName defines the name of the exchange that needs to be used for
	// message publishing. Required.
	// This must match consumer config value.
	ExchangeName string
	// RoutingKey is defined on the message. Optional.
	// When the message is published, it ends up in a queue whose binding key
	// matches to the routing key. Not required for the `fanout` exchange types.
	RoutingKey string
// All producers must implement this interface.
type Producer interface {
	Produce(messageID string, message []byte, data interface{}) error
package rabbitmq
import "github.com/streadway/amqp"
type ConsumerConfig struct {
	// Name is used to name the consumer workers. It provides visual cues to
	// which channel is used by which consumer/worker in Management Plugin. It
	// also appears in the logs. Required.
	Name string
	// WorkerCount helps running given amount of workers for the consumer.
	// Required.
	// This has a high impact on the performance. The performance also has a
	// direct relationship with the `PrefetchCount` option. If you have a fairly
	// busy queue, avoid setting it to `1`. Also avoid setting it to very high
	// because the more workers, the more work the broker has to do to keep
	// track of them. Read reference below before taking a decision.
	// Ref: https://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
	// 1 -> n receiving rate vs consumer count / prefetch count
	WorkerCount int
	// PrefetchCount helps defining how many messages should be delivered to a
	// consumer before acknowledgments are received. Optional.
	// This has a high impact on the performance. The performance also has a
	// direct relationship with the `WorkerCount` option below. Unless you have
	// a fairly quiet queue, avoid setting it to `1`. Read reference below
	// before taking a decision. Optional.
	// Ref: https://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
	// n -> 0 sending bytes rate vs number of producers, for various message sizes
	// 1 -> n receiving rate vs consumer count / prefetch count
	PrefetchCount int
// All consumers must implement this interface.
type Consumer interface {
	Config() ConsumerConfig
	Consume(messages <-chan amqp.Delivery, workerID int)
package rabbitmq
import (
type Server struct {
	mutex     *sync.RWMutex
	conn      *amqp.Connection
	config    ConnectionConfig
	logChan   chan Log
	consumers []Consumer
	channels  map[string]*amqp.Channel
// New returns `Server` pointer type with an live AMQP connection attached to
// it.
// The optional `logChan` argument helps you get back package level logs. If it
// is to be utilised, you must use an unbuffered `Log` channel and read from it
// right after creating it. Failing to read will prevent the reconnection
// feature from establishing a new connection and possible unexpected issues.
func NewServer(config ConnectionConfig, logChan chan Log) (*Server, error) {
	if config.ReconInterval == 0 {
		return nil, fmt.Errorf("reconnection interval must be above 0")
	srv := &Server{
		mutex:    &sync.RWMutex{},
		config:   config,
		logChan:  logChan,
		channels: make(map[string]*amqp.Channel),
	if err := srv.connect(); err != nil {
		return nil, err
	return srv, nil
// Shutdown closes the AMQP connection.
func (s *Server) Shutdown() error {
	if s.conn != nil {
		if err := s.conn.Close(); err != nil {
			return fmt.Errorf("shutdown: %w", err)
	return nil
// Setup declares all the necessary components of the broker that is needed for
// producers and consumers.
func (s *Server) Setup(exchanges []ExchangeConfig, queues []QueueConfig) error {
	chn, err := s.conn.Channel()
	if err != nil {
		return fmt.Errorf("setup: %w", err)
	defer chn.Close()
	for _, exchange := range exchanges {
		if err := chn.ExchangeDeclare(
		); err != nil {
			return fmt.Errorf("setup: exchange declare: %w", err)
	for _, queue := range queues {
		args := amqp.Table{"x-queue-mode": queue.Mode}
		if queue.DLX != "" {
			args["x-dead-letter-exchange"] = queue.DLX
		if _, err := chn.QueueDeclare(
		); err != nil {
			return fmt.Errorf("setup: queue declare: %w", err)
		if err := chn.QueueBind(
		); err != nil {
			return fmt.Errorf("setup: queue bind: %w", err)
	return nil
// RegisterConsumers first registers all consumers and then runs their workers.
func (s *Server) RegisterConsumers(consumers []Consumer) error {
	s.consumers = consumers
	if err := s.runConsumerWorkers(s.consumers); err != nil {
		return fmt.Errorf("register consumers: %w", err)
	return nil
// PublishOnNewChannel publishes a message on a new channel.
// Every time this method is called a new channel is opened and closed right
// after the use. This has a negative impact on the application performance.
// The advantage of using a new channel for each publishing is that, it allows
// message delivery confirmation. It is possible for published message to not
// reach the exchange, queue or the server for any reason. The lack of an error
// on the publishing does not necessarily mean that the server has received the
// published message either.
// The disadvantage is obviously a new channel is created for each publishing
// and closed right after the use. This will result in considerably slower
// operations and higher usage of system resources such as high channel churn.
// The disadvantage becomes reality if it was used by fairly busy producers.
// If the message delivery confirmation is a "must have" feature for your use
// case you have no other choice but use this method. Otherwise always prefer


// the `PublishOnReservedChannel()` method.
// Ref: https://www.rabbitmq.com/channels.html
// High Channel Churn
func (s *Server) PublishOnNewChannel(publishing amqp.Publishing, config ProducerConfig) error {
	defer s.mutex.RUnlock()
	chn, err := s.conn.Channel()
	if err != nil {
		return fmt.Errorf("publish on new channel: get channel: %w", err)
	defer chn.Close()
	if err := chn.Confirm(false); err != nil {
		return fmt.Errorf("publish on new channel: confirm mode: %w", err)
	err = chn.Publish(config.ExchangeName, config.RoutingKey, true, false, publishing)
	if err != nil {
		return fmt.Errorf("publish on new channel: publish: %w", err)
	select {
	case ntf := <-chn.NotifyPublish(make(chan amqp.Confirmation, 1)):
		if !ntf.Ack {
			return errors.New("publish on new channel: failed to confirm publishing")
	case <-chn.NotifyReturn(make(chan amqp.Return)):
		return errors.New("publish on new channel: failed to route publishing")
	return nil
// PublishOnReservedChannel publishes a message on previously reserved channel
// on behalf of the producers.
// The reserved channels are not closed as they are meant to be long-lived and
// reused for multiple publishing. This has a positive impact on the application
// performance.
// The advantage of using a reserved channel is that, each producer uses its own
// reserved channel for each publishing. This will result in considerably
// faster operations and less usage of system resources such as low channel
// churn.
// The disadvantage is that, it will not allow message delivery confirmation. If
// you want to what we mean by the message delivery confirmation, please read
// the `PublishOnNewChannel()` method.
// If the message delivery confirmation is not important for your use case,
// always prefer this method over `PublishOnNewChannel()` method.
// Ref: https://www.rabbitmq.com/channels.html
// High Channel Churn
func (s *Server) PublishOnReservedChannel(publishing amqp.Publishing, config ProducerConfig) error {
	chn, err := s.reservedChannel(config.Name)
	if err != nil {
		return fmt.Errorf("publish on reserved channel: %w", err)
	err = chn.Publish(config.ExchangeName, config.RoutingKey, false, false, publishing)
	if err != nil {
		return fmt.Errorf("publish on reserved channel: publish: %w", err)
	return nil
// reservedChannel returns an existing channel for a producer.
// If the given producer name does not yet have an channel exist in the reserved
// channel pool, a new channel is created and reserved for later use.
// All the reserved channels have a channel listeners `producerChannelListener`
// attached to them so if the channel is closed for any given reason, the
// listener calls this method in order to recreate one.
func (s *Server) reservedChannel(producerName string) (*amqp.Channel, error) {
	defer s.mutex.Unlock()
	if chn, ok := s.channels[producerName]; ok {
		return chn, nil
	chn, err := s.conn.Channel()
	if err != nil {
		return nil, fmt.Errorf("reserved channel: get channel: %w", err)
	s.channels[producerName] = chn
	go s.producerChannelListener(chn, producerName)
	return chn, nil
// runConsumerWorkers runs all the workers that are linked to the given
// consumers.
// Each individual consumer gets its own dedicated channel and this channel is
// shared between all its workers. e.g., there are two consumers and each have
// two workers attached to them. We would have a total of two open channels in
// the broker. Given that the workers are run as goroutines and the goroutines
// are not threads, there is no point of using a channel per worker.
// All the channels have a channel listeners `consumerChannelListener`
// attached to them so if the channel is closed for any given reason, the
// listener calls `runConsumerWorkers()` method in order to rerun all the
// workers of the consumer.
// Ref: https://www.rabbitmq.com/api-guide.html
// Channels and Concurrency Considerations (Thread Safety)
func (s *Server) runConsumerWorkers(consumers []Consumer) error {
	op := "run consumer workers"
	for _, consumer := range consumers {
		consumer := consumer
		config := consumer.Config()
		chn, err := s.conn.Channel()
		if err != nil {
			return fmt.Errorf("%s: get channel: %s: %w", op, config.Name, err)
		if err := chn.Qos(config.PrefetchCount, 0, false); err != nil {
			return fmt.Errorf("%s: qos channel: %s: %w", op, config.Name, err)
		if config.WorkerCount < 1 {
			return fmt.Errorf("%s: insufficient worker count: %s-%d/0", op, config.Name, config.WorkerCount)
		go s.consumerChannelListener(chn, consumer)
		for i := 1; i <= config.WorkerCount; i++ {
			i := i
			go func() {
				messages, err := chn.Consume(
					fmt.Sprintf("%s (%d/%d)", config.Name, i, config.WorkerCount),
				if err != nil {
						Level: LevelError,
						Message: fmt.Sprintf(
							"%s: consume channel: %s-%d/%d: %v", op, config.Name, i, config.WorkerCount, err,
					Level:   LevelInfo,
					Message: fmt.Sprintf("%s: run %s-%d/%d", op, config.Name, i, config.WorkerCount),
				consumer.Consume(messages, i)
					Level:   LevelInfo,
					Message: fmt.Sprintf("%s: stop %s-%d/%d", op, config.Name, i, config.WorkerCount),
	return nil
// connect establishes a new connection based on the required schema.
func (s *Server) connect() error {
	if s.config.Schema == "amqp" {
		return s.connectAMQP()
	return s.connectAMQPS()
// connectAMQP establishes a new AMQP connection only if there is not one at the
// application bootstrap.
// However, this method will also be called by the `connectionListener()` method
// behind the scene as many times as required when the connection goes down.
// Hence the reason why it is also responsible for rerunning the consumer
// workers. Otherwise, workers would not be up and running after reconnection.
func (s *Server) connectAMQP() error {
	defer s.mutex.Unlock()
	var (
		err error
		url string
	if s.config.Username != "" || s.config.Password != "" {
		url = fmt.Sprintf(
	} else {
		url = fmt.Sprintf(
	s.conn, err = amqp.DialConfig(url,
			Properties: amqp.Table{"connection_name": s.config.Name},
	if err != nil {
		return fmt.Errorf("connect amqp: %w", err)
	if err := s.runConsumerWorkers(s.consumers); err != nil {
		return fmt.Errorf("connect amqp: %w", err)
	go s.connectionListener()
	return nil
// connectAMQPS establishes a new AMQPS connection only if there is not one at
// the application bootstrap.
// However, this method will also be called by the `connectionListener()` method
// behind the scene as many times as required when the connection goes down.
// Hence the reason why it is also responsible for rerunning the consumer
// workers. Otherwise, workers would not be up and running after reconnection.
func (s *Server) connectAMQPS() error {
	defer s.mutex.Unlock()
	tlsCnf := &tls.Config{}
	if s.config.CACert != nil {
		tlsCnf.RootCAs = x509.NewCertPool()
	if cert, err := tls.X509KeyPair(s.config.ClientCert, s.config.ClientKey); err == nil {
		tlsCnf.Certificates = append(tlsCnf.Certificates, cert)
	var (
		err error
		url string
	if s.config.Username != "" || s.config.Password != "" {
		url = fmt.Sprintf(
	} else {
		url = fmt.Sprintf(
	s.conn, err = amqp.DialConfig(url,
			Properties:      amqp.Table{"connection_name": s.config.Name},
			TLSClientConfig: tlsCnf,
	if err != nil {
		return fmt.Errorf("connect amqps: %w", err)
	if err := s.runConsumerWorkers(s.consumers); err != nil {
		return fmt.Errorf("connect amqps: %w", err)
	go s.connectionListener()
	return nil
// connectionListener listens on the closed connection notifications and
// attempts to reestablish a new connection by calling the `connect()` method.
// However, if the connection was closed explicitly, nothing shall be done.
// Total reconnection attempts and intervals are configured within the
// `ConnectionConfig` struct. For the infinite attempts, the `ReconAttempt`
// option must be set to `0`.
func (s *Server) connectionListener() {
	err := <-s.conn.NotifyClose(make(chan *amqp.Error))
	if err != nil {
		op := "connection listener"
			Level:   LevelWarning,
			Message: fmt.Sprintf("%s: closed: %v", op, err),
		ticker := time.NewTicker(s.config.ReconInterval)
		defer ticker.Stop()
		var i int
		for range ticker.C {
				Level:   LevelDebug,
				Message: fmt.Sprintf("%s: reconnection attempt: %d/%d", op, i, s.config.ReconAttempt),
			if err := s.connect(); err == nil {
					Level:   LevelInfo,
					Message: fmt.Sprintf("%s: reconnected: %d/%d", op, i, s.config.ReconAttempt),
			if i == s.config.ReconAttempt {
					Level:   LevelError,
					Message: fmt.Sprintf("%s: reconnection failed: %d/%d", op, i, s.config.ReconAttempt),
		Level:   LevelInfo,
		Message: fmt.Sprintf("connection listener: explicetly closed the connection"),
// producerChannelListener listens on the closed reserved channel notifications
// and removes the channel from the pool.
// Once removed, the very first call to the `PublishOnReservedChannel()` method
// will help recreate a new channel.
func (s *Server) producerChannelListener(chn *amqp.Channel, producerName string) {
	err := <-chn.NotifyClose(make(chan *amqp.Error))
	if err != nil

			Level:   LevelWarning,
			Message: fmt.Sprintf("producer channel listener: closed: %s: %v", producerName, err),
	} else {
			Level:   LevelWarning,
			Message: fmt.Sprintf("producer channel listener: closed: %s", producerName),
	delete(s.channels, producerName)
// consumerChannelListener listens on the closed consumer channel notifications
// and reruns the consumer workers with `runConsumerWorkers()` method. However,
// if the connection was closed explicitly, nothing shall be done.
func (s *Server) consumerChannelListener(chn *amqp.Channel, consumer Consumer) {
	err := <-chn.NotifyClose(make(chan *amqp.Error))
	if err != nil && err.Code == amqp.ConnectionForced {
	if err != nil {
			Level:   LevelWarning,
			Message: fmt.Sprintf("consumer channel listener: closed: %s: %v", consumer.Config().Name, err),
	} else {
			Level:   LevelWarning,
			Message: fmt.Sprintf("consumer channel listener: closed: %s", consumer.Config().Name),
	if err := s.runConsumerWorkers([]Consumer{consumer}); err != nil {
			Level:   LevelError,
			Message: fmt.Sprintf("consumer channel listener: %v", err),
// log sends log messages to the log channel if not nil.
func (s *Server) log(log Log) {
	if s.logChan == nil {
	s.logChan <- log
version: "3.4"
    image: "rabbitmq:3.8.3-management-alpine"
      - "5672:5672"
      - "15672:15672"


srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{
    Name:          "striker",
    Schema:        "amqp",
    Username:      "", // RABBITMQ_DEFAULT_USER
    Password:      "", // RABBITMQ_DEFAULT_PASS
    Host:          "localhost",
    Port:          "5672",
    VHost:         "", // RABBITMQ_DEFAULT_VHOST
    ReconAttempt:  300,
    ReconInterval: time.Second,
    CACert:        nil,
    ClientCert:    nil,
    ClientKey:     nil,
version: "3.4"
    image: "rabbitmq:3.8.3-management-alpine"
      - "5671:5671"
      - "15671:15671"
      RABBITMQ_SSL_CACERTFILE: "/cert/ca_certificate.pem"
      RABBITMQ_SSL_CERTFILE: "/cert/server_certificate.pem"
      RABBITMQ_SSL_KEYFILE: "/cert/server_key.pem"
      RABBITMQ_SSL_VERIFY: "verify_peer"
      - "./cert:/cert"


caCert, err := ioutil.ReadFile("./cert/ca_certificate.pem")
if err != nil {
clientCert, err := ioutil.ReadFile("./cert/client_certificate.pem")
if err != nil {
clientKey, err := ioutil.ReadFile("./cert/client_key.pem")
if err != nil {
srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{
    Name:         "striker",
    Schema:       "amqps",
    Username:      "", // RABBITMQ_DEFAULT_USER
    Password:      "", // RABBITMQ_DEFAULT_PASS
    Host:          "localhost",
    Port:          "5671",
    VHost:         "", // RABBITMQ_DEFAULT_VHOST
    ReconAttempt:  300,
    ReconInterval: time.Second,
    CACert:        caCert,
    ClientCert:    clientCert,
    ClientKey:     clientKey,


package producer
import (
type Morientes struct {
	rabbitmq *rabbitmq.Server
	config   rabbitmq.ProducerConfig
func NewMorientes(rabbitmq *rabbitmq.Server, config rabbitmq.ProducerConfig) Morientes {
	return Morientes{rabbitmq: rabbitmq, config: config}
func (m Morientes) Produce(messageID string, message []byte, data interface{}) error {
	// if err := m.rabbitmq.PublishOnReservedChannel(amqp.Publishing{
	if err := m.rabbitmq.PublishOnNewChannel(amqp.Publishing{
		DeliveryMode:    amqp.Persistent,
		ContentType:     "text/plain",
		ContentEncoding: "utf-8",
		MessageId:       messageID,
		Body:            message,
	}, m.config); err != nil {
		return err
	return nil


package main import ( "log" "time" "striker/producer" "striker/rabbitmq" func main() { log.Println("running striker ...") // RabbitMQ ---------------------------------------------------------------- exchanges := []rabbitmq.ExchangeConfig{ Name: "penalty", Type: "direct", logChan := make(chan rabbitmq.Log) go func() { log.Println("watching logs ...") for l := range logChan { log.Printf("%+v\n", l) srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{ Name: "striker", Schema: "amqp", Username: "", Password: "", Host: "localhost", Port: "5672", VHost: "", ReconAttempt: 300, ReconInterval: time.Second, CACert: nil, ClientCert: nil, ClientKey: nil, }, logChan) if err != nil { log.Fatalln(err) defer srv.Shutdown() if err := srv.Setup(exchanges, nil); err != nil { log.Fatalln(err) // Producers --------------------------------------------------------------- morientes := producer.NewMorientes(srv, rabbitmq.ProducerConfig{ Name: "morientes", ExchangeName: "penalty", RoutingKey: "spain", // ....


package consumer
import (
type Casillas struct {
	config rabbitmq.ConsumerConfig
func NewCasillas(config rabbitmq.ConsumerConfig) Casillas {
	return Casillas{config: config}
func (c Casillas) Config() rabbitmq.ConsumerConfig {
	return c.config
func (c Casillas) Consume(messages <-chan amqp.Delivery, id int) {
	for message := range messages {
		// Do the work ...
		log.Printf("[%d] %s consumed: %s\n", id, c.config.Name, string(message.Body))
		if err := message.Ack(false); err != nil {
			log.Printf("consume: ack message: %v\n", err)


package main
import (
func main() {
	log.Println("running keeper ...")
	// Consumers ---------------------------------------------------------------
	casillas := consumer.NewCasillas(rabbitmq.ConsumerConfig{
		Name:          "casillas",
		WorkerCount:   1,
		PrefetchCount: 3,
	// RabbitMQ ----------------------------------------------------------------
	exchanges := []rabbitmq.ExchangeConfig{
			Name: "penalty",
			Type: "direct",
	queues := []rabbitmq.QueueConfig{
			Name:     "casillas",
			Binding:  "spain",
			Exchange: "penalty",
			Mode:     "lazy",
	logChan := make(chan rabbitmq.Log)
	go func() {
		log.Println("watching logs ...")
		for l := range logChan {
			log.Printf("%+v\n", l)
	srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{
		Name:          "keeper",
		Schema:        "amqp",
		Username:      "",
		Password:      "",
		Host:          "localhost",
		Port:          "5672",
		VHost:         "",
		ReconAttempt:  300,
		ReconInterval: time.Second,
		CACert:        nil,
		ClientCert:    nil,
		ClientKey:     nil,
	}, logChan)
	if err != nil {
	defer srv.Shutdown()
	if err := srv.Setup(exchanges, queues); err != nil {
	if err := srv.RegisterConsumers([]rabbitmq.Consumer{casillas}); err != nil {
	select {}
