AmqpHandler.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. <?php
  2. /*
  3. * This file is part of the Monolog package.
  4. *
  5. * (c) Jordi Boggiano <j.boggiano@seld.be>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Monolog\Handler;
  11. use Monolog\Logger;
  12. use Monolog\Formatter\JsonFormatter;
  13. use PhpAmqpLib\Message\AMQPMessage;
  14. use PhpAmqpLib\Channel\AMQPChannel;
  15. use AMQPExchange;
  16. class AmqpHandler extends AbstractProcessingHandler
  17. {
  18. /**
  19. * @var AMQPExchange|AMQPChannel $exchange
  20. */
  21. protected $exchange;
  22. /**
  23. * @var string
  24. */
  25. protected $exchangeName;
  26. /**
  27. * @param AMQPExchange|AMQPChannel $exchange AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
  28. * @param string $exchangeName
  29. * @param int $level
  30. * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
  31. */
  32. public function __construct($exchange, $exchangeName = 'log', $level = Logger::DEBUG, $bubble = true)
  33. {
  34. if ($exchange instanceof AMQPExchange) {
  35. $exchange->setName($exchangeName);
  36. } elseif ($exchange instanceof AMQPChannel) {
  37. $this->exchangeName = $exchangeName;
  38. } else {
  39. throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
  40. }
  41. $this->exchange = $exchange;
  42. parent::__construct($level, $bubble);
  43. }
  44. /**
  45. * {@inheritDoc}
  46. */
  47. protected function write(array $record)
  48. {
  49. $data = $record["formatted"];
  50. $routingKey = $this->getRoutingKey($record);
  51. if ($this->exchange instanceof AMQPExchange) {
  52. $this->exchange->publish(
  53. $data,
  54. $routingKey,
  55. 0,
  56. array(
  57. 'delivery_mode' => 2,
  58. 'content_type' => 'application/json',
  59. )
  60. );
  61. } else {
  62. $this->exchange->basic_publish(
  63. $this->createAmqpMessage($data),
  64. $this->exchangeName,
  65. $routingKey
  66. );
  67. }
  68. }
  69. /**
  70. * {@inheritDoc}
  71. */
  72. public function handleBatch(array $records)
  73. {
  74. if ($this->exchange instanceof AMQPExchange) {
  75. parent::handleBatch($records);
  76. return;
  77. }
  78. foreach ($records as $record) {
  79. if (!$this->isHandling($record)) {
  80. continue;
  81. }
  82. $record = $this->processRecord($record);
  83. $data = $this->getFormatter()->format($record);
  84. $this->exchange->batch_basic_publish(
  85. $this->createAmqpMessage($data),
  86. $this->exchangeName,
  87. $this->getRoutingKey($record)
  88. );
  89. }
  90. $this->exchange->publish_batch();
  91. }
  92. /**
  93. * Gets the routing key for the AMQP exchange
  94. *
  95. * @param array $record
  96. * @return string
  97. */
  98. protected function getRoutingKey(array $record)
  99. {
  100. $routingKey = sprintf(
  101. '%s.%s',
  102. // TODO 2.0 remove substr call
  103. substr($record['level_name'], 0, 4),
  104. $record['channel']
  105. );
  106. return strtolower($routingKey);
  107. }
  108. /**
  109. * @param string $data
  110. * @return AMQPMessage
  111. */
  112. private function createAmqpMessage($data)
  113. {
  114. return new AMQPMessage(
  115. (string) $data,
  116. array(
  117. 'delivery_mode' => 2,
  118. 'content_type' => 'application/json',
  119. )
  120. );
  121. }
  122. /**
  123. * {@inheritDoc}
  124. */
  125. protected function getDefaultFormatter()
  126. {
  127. return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
  128. }
  129. }