Source code for toxiccommon.exchange
# -*- coding: utf-8 -*-
# Copyright 2018, 2023 Juca Crispim <juca@poraodojuca.dev>
# This file is part of toxicbuild.
# toxicbuild is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# toxicbuild is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with toxicbuild. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import json
from uuid import uuid4
from aioamqp.exceptions import AmqpClosedConnection
import asyncamqp
from toxiccore.utils import LoggerMixin
from .coordination import Lock
[docs]class JsonAckMessage(asyncamqp.consumer.Message):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.body = json.loads(self.body.decode())
[docs] async def acknowledge(self):
await self.channel.basic_client_ack(
delivery_tag=self.envelope.delivery_tag)
[docs] async def reject(self, requeue=True):
await self.channel.basic_reject(
self.envelope.delivery_tag, requeue=requeue)
[docs]class Consumer(asyncamqp.consumer.Consumer, LoggerMixin):
async def __aexit__(self, *args, **kwargs):
await super().__aexit__(*args, **kwargs)
await self.channel.close()
[docs] async def fetch_message(self, cancel_on_timeout=True):
if self.nowait and self.queue.empty():
await self.cancel()
return None
elif self.timeout and self.queue.empty():
t = 0
while t <= self.timeout:
await asyncio.sleep(0.001)
if not self.queue.empty():
break
t += 1
else:
if cancel_on_timeout:
await self.cancel()
raise asyncamqp.exceptions.ConsumerTimeout(
'Could not get a message in {} ms'.format(self.timeout))
msg = await self.queue.get()
self.message = self.MESSAGE_CLASS(*msg)
return self.message
asyncamqp.channel.Channel.CONSUMER_CLASS = Consumer
asyncamqp.consumer.Consumer.MESSAGE_CLASS = JsonAckMessage
[docs]class AmqpConnection(LoggerMixin):
"""A connection for a broker. We can have many channels over
one connection."""
def __init__(self, **conn_kwargs):
"""Constructor for AmqpConnection.
:param conn_kwargs: Kwargs used by ``asyncamqp.connect()``.
"""
self.conn_kwargs = conn_kwargs
self.transport = None
self.protocol = None
self._connected = False
self._reconn_lock = None
@property
def reconn_lock(self):
if self._reconn_lock:
return self._reconn_lock # pragma no cover
self._reconn_lock = Lock('amqp-reconn-lock')
return self._reconn_lock
[docs] async def connect(self, **conn_kwargs):
"""Connects to the Rabbitmq server.
:param conn_kwargs: Named arguments passed to ``asyncamqp.connect()``
"""
kw = conn_kwargs or self.conn_kwargs
self.transport, self.protocol = await asyncamqp.connect(**kw)
self.conn_kwargs = kw
self._connected = True
[docs] async def disconnect(self):
"""Disconnects from the Rabbitmq server."""
await self.protocol.close()
# these lines are covered, but coverage says they're not
self.transport.close() # pragma no cover
self._connected = False # pragma no cover
[docs] async def reconnect(self):
async with await self.reconn_lock.acquire_write():
try:
await self.disconnect()
except Exception as e:
msg = 'Error disconnectig on reconnect... {}'.format(str(e))
self.log(msg, level='error')
await self.connect()
[docs]class Exchange(LoggerMixin):
"""A simple abstraction for an amqp exchange.
"""
def __init__(self, name, connection, exchange_type, durable=False,
bind_publisher=True, exclusive_consumer_queue=False):
"""Constructor for :class:`~toxiccommon.exchange.Exchange`
:param name: The exchange's name.
:param connection: An instance of
:class:`~toxicbuid.core.exchange.AmqpConnection`.
:param exchange_type: The type of the exchange.
:param durable: Indicates if the messages are stored to disk or not.
:param bind_publisher: If true, a queue will be bound to the
exchange when publishing stuff. It means that all messages published
will be sent to a queue when it is published. Otherwise messages
will be sent to a queue only when a consumer binds a queue to the
exchange.
:param exclusive_consumer_queue: Indicates if the consumer queue is
exclusive to the consumer.
"""
self.name = name
self.connection = connection
self.queue_name = '{}_queue'.format(self.name)
self.exchange_type = exchange_type
self.durable = durable
self.connection = connection
self.bind_publisher = bind_publisher
self.transport = None
self.protocol = None
self.channel = None
self.exchange_declared = False
self.queue_info = None
self._store_consume_futures = False
self._consume_futures = []
self._bound_rt = set()
self.exclusive_consumer_queue = exclusive_consumer_queue
self._declared_queues = set()
self._exclusive_queues = {}
[docs] async def declare(self, queue_name=None):
"""Declares the exchange and queue.
:param queue_name: The name for the queue to be declared. If None,
self.queue_name will be used."""
if not self.connection._connected:
await self.connection.connect()
channel = await self._get_channel()
try:
if not queue_name:
queue_name = self.queue_name
self.exchange_declared = await channel.exchange_declare(
self.name, self.exchange_type, durable=self.durable)
if not self.is_declared(queue_name) and \
not self.exclusive_consumer_queue:
await self._declare_queue(queue_name, channel)
finally:
await channel.close()
async def _declare_queue(self, queue_name, channel):
durable = self.durable and not self.exclusive_consumer_queue
self.queue_info = await channel.queue_declare(
queue_name, durable=durable,
exclusive=self.exclusive_consumer_queue,
auto_delete=self.exclusive_consumer_queue)
self._declared_queues.add(queue_name)
[docs] def is_declared(self, queue_name=None):
return queue_name in self._declared_queues
def _get_queue_name_for_routing_key(self, routing_key, queue_name=None):
queue_name = queue_name or self.queue_name
if routing_key:
queue_name = '{}-{}'.format(queue_name, routing_key)
return queue_name
[docs] async def bind(self, routing_key, queue_name=None, channel=None):
"""Binds the queue to the exchange.
:param routing_key: Routing key to bind the queue.
:param queue_name: The name of the queue to be bound. If not
self.queue_name will be used.
:param channel: Optional channel to use in the communication. A new
one will be used."""
if not queue_name:
queue_name = self.queue_name
queue_name = self._get_queue_name_for_routing_key(routing_key,
queue_name)
if not self.is_declared(queue_name):
await self._declare_queue(queue_name, channel)
r = await channel.queue_bind(exchange_name=self.name,
queue_name=queue_name,
routing_key=routing_key)
self._bound_rt.add(routing_key)
return r
[docs] async def unbind(self, routing_key, channel=None):
channel = channel or await self._get_channel()
try:
r = await channel.queue_unbind(exchange_name=self.name,
queue_name=self.queue_name,
routing_key=routing_key)
self._bound_rt.remove(routing_key)
finally:
await channel.close()
return r
[docs] async def publish(self, message, routing_key=''):
"""Publishes a message to a Rabbitmq exchange
:param message: The message that will be published in the
exchange. Must be something that can be serialized into a json.
:param routing_key: The routing key to publish the message."""
channel = await self._get_channel()
try:
if self.bind_publisher:
await self.bind(routing_key, channel=channel)
message = json.dumps(message).encode()
properties = {}
if self.durable:
properties['delivery_mode'] = 2
kw = {'payload': message, 'exchange_name': self.name,
'properties': properties, 'routing_key': routing_key}
await channel.publish(**kw)
finally:
await channel.close()
[docs] async def consume(self, wait_message=True, timeout=0,
routing_key='', no_ack=False):
"""Consumes a message from a Rabbitmq queue.
:param wait_message: Should we wait for new messages in the queue?
:param timeout: Timeout for waiting messages.
:param no_ack: Indicates if we should send a ack response to the
server. The ack must be sent by the consumer.
"""
queue_name = self.queue_name
channel = await self._get_channel()
if self.exclusive_consumer_queue:
queue_name = '{}-consumer-queue-{}'.format(self.name, str(uuid4()))
await self.bind(routing_key, queue_name, channel)
queue_name = self._get_queue_name_for_routing_key(routing_key,
queue_name)
elif routing_key:
queue_name = self._get_queue_name_for_routing_key(routing_key,
queue_name)
await self.bind(routing_key, channel=channel)
if self.durable:
await channel.basic_qos(prefetch_count=1, prefetch_size=0,
connection_global=False)
consumer = await channel.basic_consume(queue_name=queue_name,
no_ack=no_ack,
wait_message=wait_message,
timeout=timeout)
# help on tests
consumer.queue_name = queue_name
return consumer
[docs] async def get_queue_size(self, queue_name=None):
if not queue_name:
queue_name = self.queue_name
channel = await self._get_channel()
try:
info = await channel.queue_declare(queue_name,
durable=self.durable,
passive=True,
exclusive=False,
auto_delete=False)
finally:
await channel.close()
return int(info['message_count'])
[docs] async def queue_delete(self, queue_name=None):
"""Deletes a queue. If not queue_name, `self.queue_name` will be
used.
:param queue_name: The name of the queue to be deleted."""
queue_name = queue_name or self.queue_name
try:
channel = await self._get_channel()
await channel.queue_delete(queue_name)
finally:
await channel.close()
async def _get_channel(self):
try:
channel = await self.connection.protocol.channel()
except AmqpClosedConnection:
await self.connection.reconnect()
channel = await self.connection.protocol.channel()
return channel