diff --git a/docs/reference/kombu.transport.nats.rst b/docs/reference/kombu.transport.nats.rst new file mode 100644 index 0000000000..2341614b15 --- /dev/null +++ b/docs/reference/kombu.transport.nats.rst @@ -0,0 +1,31 @@ +================================================ + NATS Transport - ``kombu.transport.nats`` +================================================ + +.. currentmodule:: kombu.transport.nats + +.. automodule:: kombu.transport.nats + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: + + Message + ------- + + .. autoclass:: Message + :members: + :undoc-members: diff --git a/examples/nats_receive.py b/examples/nats_receive.py new file mode 100644 index 0000000000..9ed104f068 --- /dev/null +++ b/examples/nats_receive.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import sys +from pprint import pformat + +from kombu import Connection, Consumer, Exchange, Queue, eventloop + +LOCAL_SERVER = "localhost" +DEMO_SERVER = "demo.nats.io" + +server = LOCAL_SERVER +use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo" +if use_demo_server: + server = DEMO_SERVER + + +exchange = Exchange("exchange", "direct", durable=False) +msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages") + + +def pretty(obj): + return pformat(obj, indent=4) + + +def process_msg(body, message): + print(f"Received message: {body!r}") + print(f" properties:\n{pretty(message.properties)}") + print(f" delivery_info:\n{pretty(message.delivery_info)}") + message.ack() + + +with Connection(f"nats://{server}:4222") as connection: + with Consumer(connection, msg_queue, callbacks=[process_msg]) as consumer: + for msg in eventloop(connection): + pass diff --git a/examples/nats_send.py b/examples/nats_send.py new file mode 100644 index 0000000000..e798ec927c --- /dev/null +++ b/examples/nats_send.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import sys + +from nats.js.api import StorageType + +from kombu import Connection, Exchange, Queue + +LOCAL_SERVER = "localhost" +DEMO_SERVER = "demo.nats.io" + +server = LOCAL_SERVER +use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo" +if use_demo_server: + server = DEMO_SERVER + + +exchange = Exchange("exchange", "direct", durable=False) +msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages") + + +with Connection(f"nats://{server}:4222", transport_options={ + "stream_config": { + "storage": StorageType.FILE, + } +}) as conn: + producer = conn.Producer() + producer.publish( + "hello world", exchange=exchange, routing_key="messages", declare=[msg_queue] + ) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 180a27b4b6..1a122f34bc 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -45,6 +45,7 @@ def supports_librabbitmq() -> bool | None: 'azureservicebus': 'kombu.transport.azureservicebus:Transport', 'pyro': 'kombu.transport.pyro:Transport', 'gcpubsub': 'kombu.transport.gcpubsub:Transport', + 'nats': 'kombu.transport.nats:Transport', } _transport_cache = {} diff --git a/kombu/transport/nats.py b/kombu/transport/nats.py new file mode 100644 index 0000000000..096e6250f2 --- /dev/null +++ b/kombu/transport/nats.py @@ -0,0 +1,499 @@ +"""NATS JetStream transport module for Kombu. + +NATS JetStream transport using nats-py library. + +**References** + +- https://github.com/nats-io/nats.py +- https://docs.nats.io/nats-concepts/jetstream + +Features +======== +* Type: Virtual +* Supports Direct: Yes +* Supports Topic: Yes +* Supports Fanout: Yes +* Supports Priority: No +* Supports TTL: Yes + +Connection String +================= +Connection string has the following format: + +.. code-block:: + + nats://[USER:PASSWORD@]NATS_ADDRESS[:PORT] + +Transport Options +================= +* ``connection_wait_time_seconds`` - Time in seconds to wait for connection + to succeed. Default ``5`` +* ``wait_time_seconds`` - Time in seconds to wait to receive messages. + Default ``5`` +* ``stream_config`` - Stream configuration. Must be a dict whose key-value pairs + correspond with attributes in the NATS JetStream stream configuration. +* ``consumer_config`` - Consumer configuration. Must be a dict whose key-value pairs + correspond with attributes in the NATS JetStream consumer configuration. +""" + +from __future__ import annotations + +import asyncio +from queue import Empty + +from kombu.transport import virtual +from kombu.utils import cached_property +from kombu.utils.encoding import str_to_bytes +from kombu.utils.json import dumps, loads + +try: + import nats.aio.client + import nats.aio.errors + import nats.errors + import nats.js.errors + from nats.aio.client import Client + from nats.js.api import (AckPolicy, ConsumerConfig, DeliverPolicy, + DiscardPolicy, RetentionPolicy, StorageType, + StreamConfig) + from nats.js.client import JetStreamContext + + NATS_CONNECTION_ERRORS = ( + nats.aio.errors.ErrConnectionClosed, + nats.aio.errors.ErrTimeout, + nats.aio.errors.ErrNoServers, + ) + NATS_CHANNEL_ERRORS = (nats.js.errors.NotFoundError,) + +except ImportError: + Client = None + NATS_CONNECTION_ERRORS = NATS_CHANNEL_ERRORS = () + +from kombu.log import get_logger + +logger = get_logger(__name__) + +DEFAULT_PORT = 4222 +DEFAULT_HOST = "localhost" + +_event_loop: asyncio.AbstractEventLoop | None = None + + +def get_event_loop() -> asyncio.AbstractEventLoop: + """Get or create the global event loop.""" + global _event_loop + if _event_loop is None: + _event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_event_loop) + return _event_loop + + +class Message(virtual.Message): + """Message object.""" + + def __init__(self, payload, channel=None, **kwargs): + self.subject = payload["subject"] + self.nats_ack = payload["ack"] + self.nats_nak = payload["nak"] + self.nats_term = payload["term"] + super().__init__(payload, channel=channel, **kwargs) + + +class QoS(virtual.QoS): + """Quality of Service guarantees.""" + + _not_yet_acked = {} + + def can_consume(self): + """Return true if the channel can be consumed from.""" + return not self.prefetch_count or len(self._not_yet_acked) < self.prefetch_count + + def can_consume_max_estimate(self): + if self.prefetch_count: + return self.prefetch_count - len(self._not_yet_acked) + return 1 + + def append(self, message, delivery_tag): + self._not_yet_acked[delivery_tag] = message + + def get(self, delivery_tag): + return self._not_yet_acked[delivery_tag] + + def ack(self, delivery_tag): + if delivery_tag not in self._not_yet_acked: + return + message = self._not_yet_acked.pop(delivery_tag) + self.channel.ack_msg(message) + + def reject(self, delivery_tag, requeue=False): + """Reject a message by delivery tag.""" + if delivery_tag not in self._not_yet_acked: + return + message = self._not_yet_acked.pop(delivery_tag) + if requeue: + self.channel.nak_msg(message) + else: + self.channel.term_msg(message) + + def restore_unacked_once(self, stderr=None): + pass + + +class Channel(virtual.Channel): + """NATS JetStream Channel.""" + + QoS = QoS + Message = Message + + default_wait_time_seconds = 5 + default_connection_wait_time_seconds = 5 + + def __init__(self, *args, **kwargs): + if Client is None: + raise ImportError("nats-py is not installed") + + super().__init__(*args, **kwargs) + + port = self.connection.client.port or self.connection.default_port + host = self.connection.client.hostname or DEFAULT_HOST + + logger.debug("Host: %s Port: %s", host, port) + + self._nats_client: Client | None = None + self._js: JetStreamContext | None = None + self._streams = set() + + # Evaluate connection + self.client + + def _get_stream_name(self, queue): + """Get the stream name for a queue.""" + return f"STREAM_{queue}" + + def _get_consumer_name(self, queue): + """Get the consumer name for a queue.""" + return f"CONSUMER_{queue}" + + def _ensure_stream(self, queue): + """Ensure a stream exists for the queue.""" + stream_name = self._get_stream_name(queue) + if stream_name in self._streams: + return + + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + # First try to get stream info with a shorter timeout + try: + loop = get_event_loop() + loop.run_until_complete( + asyncio.wait_for( + self._js.stream_info(stream_name), + timeout=1.0 # Use a shorter timeout for the check + ) + ) + self._streams.add(stream_name) + return + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): + # Stream doesn't exist or timed out, we'll create it + pass + + # Create the stream with a longer timeout + stream_config = StreamConfig( + name=stream_name, + subjects=[queue], + retention=RetentionPolicy.WORK_QUEUE, + max_consumers=-1, + max_msgs_per_subject=-1, + max_msgs=-1, + max_bytes=-1, + max_age=0, + max_msg_size=-1, + storage=StorageType.MEMORY, + discard=DiscardPolicy.OLD, + num_replicas=1, + duplicate_window=120.0, # 2 minutes in seconds + allow_direct=True, # for debugging with nats cli + ) + + # Update with user-provided config + user_cfg = self.options.get("stream_config") or {} + + try: + loop = get_event_loop() + loop.run_until_complete( + asyncio.wait_for( + self._js.add_stream(stream_config, **user_cfg), + timeout=5.0 # Use a longer timeout for creation + ) + ) + self._streams.add(stream_name) + except nats.errors.TimeoutError: + # If we timeout creating the stream, check if it was actually created + try: + loop.run_until_complete( + asyncio.wait_for( + self._js.stream_info(stream_name), + timeout=1.0 + ) + ) + self._streams.add(stream_name) + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): + raise RuntimeError(f"Failed to create stream {stream_name}") + + def _ensure_consumer(self, queue): + """Ensure a consumer exists for the queue.""" + consumer_name = self._get_consumer_name(queue) + if consumer_name in self._consumers: + return + + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + name = self._get_stream_name(queue) + + consumer_config = ConsumerConfig( + durable_name=consumer_name, + deliver_policy=DeliverPolicy.ALL, + ack_policy=AckPolicy.EXPLICIT, + filter_subject=queue, + ) + + # Update with user-provided config + user_cfg = self.options.get("consumer_config") or {} + + try: + loop = get_event_loop() + loop.run_until_complete( + asyncio.wait_for( + self._js.add_consumer(name, consumer_config, **user_cfg), + timeout=5.0 # Use a longer timeout for consumer creation + ) + ) + self._consumers.add(consumer_name) + except nats.errors.TimeoutError: + # If we timeout creating the consumer, check if it was actually created + try: + loop.run_until_complete( + asyncio.wait_for( + self._js.consumer_info(name, consumer_name), + timeout=1.0 + ) + ) + self._consumers.add(consumer_name) + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): + raise RuntimeError(f"Failed to create consumer {consumer_name} for stream {name}") + + def _put(self, queue, message, **kwargs): + """Put a message on a queue.""" + self._ensure_stream(queue) + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + subject = queue + get_event_loop().run_until_complete( + self._js.publish(subject, str_to_bytes(dumps(message))) + ) + + def _get(self, queue, **kwargs): + """Get a message from a queue.""" + self._ensure_stream(queue) + self._ensure_consumer(queue) + + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + try: + pull_sub = get_event_loop().run_until_complete( + self._js.pull_subscribe( + queue, + self._get_consumer_name(queue), + stream=self._get_stream_name(queue), + ) + ) + msg = get_event_loop().run_until_complete( + pull_sub.fetch(1, timeout=self.wait_time_seconds) + )[0] + + body = loads(msg.data.decode()) + body["subject"] = msg.subject + body["ack"] = msg.ack + body["nak"] = msg.nak + body["term"] = msg.term + return body + except (IndexError, nats.errors.TimeoutError): + pass + raise Empty() + + def _delete(self, queue, *args, **kwargs): + """Delete a queue.""" + stream_name = self._get_stream_name(queue) + if stream_name in self._streams: + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + try: + get_event_loop().run_until_complete(self._js.delete_stream(stream_name)) + self._streams.remove(stream_name) + except (nats.js.errors.NotFoundError): + pass + + def _size(self, queue): + """Return the number of messages in a queue.""" + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + try: + info = get_event_loop().run_until_complete( + self._js.stream_info(self._get_stream_name(queue)) + ) + return info.state.messages + except nats.js.errors.NotFoundError: + return 0 + + def _new_queue(self, queue, **kwargs): + """Declare a new queue.""" + self._ensure_stream(queue) + return queue + + def _has_queue(self, queue, **kwargs): + """Check if a queue exists.""" + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + try: + get_event_loop().run_until_complete( + self._js.stream_info(self._get_stream_name(queue)) + ) + return True + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): + return False + + def _open(self): + """Open a new connection to NATS.""" + if self._nats_client is None: + self._nats_client = Client() + if self._nats_client is None: + raise RuntimeError("Failed to create NATS client") + + get_event_loop().run_until_complete( + self._nats_client.connect( + f"nats://{self.conninfo.hostname}:{self.conninfo.port or DEFAULT_PORT}", + user=self.conninfo.userid, + password=self.conninfo.password, + connect_timeout=self.connection_wait_time_seconds, + ) + ) + self._js = self._nats_client.jetstream() + return self._nats_client + + @cached_property + def client(self): + """Get the NATS client.""" + return self._open() + + @property + def options(self): + """Get the transport options.""" + return self.connection.client.transport_options + + @property + def conninfo(self): + """Get the connection info.""" + return self.connection.client + + @cached_property + def wait_time_seconds(self): + """Get the wait time in seconds.""" + return float( + self.options.get("wait_time_seconds", self.default_wait_time_seconds) + ) + + @cached_property + def connection_wait_time_seconds(self): + """Get the connection wait time in seconds.""" + return float( + self.options.get( + "connection_wait_time_seconds", + self.default_connection_wait_time_seconds, + ) + ) + + def close(self): + """Close the channel.""" + if self._nats_client is not None: + loop = get_event_loop() + loop.run_until_complete(self._nats_client.drain()) + loop.run_until_complete(self._nats_client.close()) + self._nats_client = None + self._js = None + + # Cancel any pending tasks + loop = get_event_loop() + for task in asyncio.all_tasks(loop): + if not task.done(): + task.cancel() + try: + loop.run_until_complete(task) + except asyncio.CancelledError: + pass + + def ack_msg(self, msg): + get_event_loop().run_until_complete(msg.nats_ack()) + + def nak_msg(self, msg): + get_event_loop().run_until_complete(msg.nats_nak()) + + def term_msg(self, msg): + get_event_loop().run_until_complete(msg.nats_term()) + + +class Transport(virtual.Transport): + """NATS JetStream Transport.""" + + Channel = Channel + + default_port = DEFAULT_PORT + + driver_type = "nats" + driver_name = "nats" + + connection_errors = NATS_CONNECTION_ERRORS + channel_errors = NATS_CHANNEL_ERRORS + + def __init__(self, client, **kwargs): + if Client is None: + raise ImportError("nats-py is not installed") + super().__init__(client, **kwargs) + + def drain_events(self, connection, **kwargs): + return super().drain_events(connection, **kwargs) + + def driver_version(self): + """Get the NATS driver version.""" + return nats.aio.client.__version__ + + def establish_connection(self): + """Establish a connection to NATS.""" + return super().establish_connection() + + def close_connection(self, connection): + """Close the connection to NATS.""" + return super().close_connection(connection) + + def verify_connection(self, connection): + """Verify the connection works.""" + port = connection.client.port or self.default_port + host = connection.client.hostname or DEFAULT_HOST + + logger.debug("Verify NATS connection to nats://%s:%s", host, port) + + client = Client() + try: + loop = get_event_loop() + loop.run_until_complete(client.connect(f"nats://{host}:{port}")) + loop.run_until_complete(client.close()) + return True + except ValueError: + pass + + return False diff --git a/requirements/extras/nats.txt b/requirements/extras/nats.txt new file mode 100644 index 0000000000..f92a4733f0 --- /dev/null +++ b/requirements/extras/nats.txt @@ -0,0 +1 @@ +nats-py[nkeys]>=2.9.0,<3.0.0 diff --git a/t/integration/docker/Dockerfile.nats b/t/integration/docker/Dockerfile.nats new file mode 100644 index 0000000000..c47451b12d --- /dev/null +++ b/t/integration/docker/Dockerfile.nats @@ -0,0 +1,2 @@ +FROM nats:2.11-alpine +CMD ["nats-server", "--jetstream", "--http_port", "8222", "--debug"] diff --git a/t/integration/test_nats.py b/t/integration/test_nats.py new file mode 100644 index 0000000000..9a928bd731 --- /dev/null +++ b/t/integration/test_nats.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from kombu import Connection +from kombu.transport import nats + +pytest.importorskip('nats') + + +@pytest.mark.env('nats') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_Channel: + def setup_method(self): + self.connection = self.create_connection() + self.channel = self.connection.default_channel + + def create_connection(self, **kwargs): + return Connection(transport=nats.Transport, connect_timeout=60, **kwargs) + + def teardown_method(self): + self.connection.close() + + def test_get_returns_message(self): + message = {'body': 'test message'} + self.channel._put('test_queue', message) + result = self.channel._get('test_queue') + assert result['body'] == 'test message' + + def test_delete_removes_queue(self): + self.channel._put('test_queue', {'body': 'test'}) + self.channel._delete('test_queue') + assert not self.channel._has_queue('test_queue') + + def test_size_returns_queue_size(self): + self.channel._put('test_queue', {'body': 'test1'}) + self.channel._put('test_queue', {'body': 'test2'}) + assert self.channel._size('test_queue') == 2 + + def test_new_queue_creates_queue(self): + queue = self.channel._new_queue('test_queue') + assert queue == 'test_queue' + assert self.channel._has_queue('test_queue') + + def test_has_queue_returns_true_for_existing_queue(self): + self.channel._new_queue('test_queue') + assert self.channel._has_queue('test_queue') + + def test_has_queue_returns_false_for_nonexistent_queue(self): + assert not self.channel._has_queue('nonexistent_queue') + + def test_ack_msg_acknowledges_message(self): + message = AsyncMock() + self.channel.ack_msg(message) + message.nats_ack.assert_called_once() + + def test_nak_msg_negatively_acknowledges_message(self): + message = AsyncMock() + self.channel.nak_msg(message) + message.nats_nak.assert_called_once() + + def test_term_msg_terminates_message(self): + message = AsyncMock() + self.channel.term_msg(message) + message.nats_term.assert_called_once() + + def test_custom_stream_config(self): + stream_config = { + 'max_msgs': 1000, + 'max_bytes': 1024 * 1024, + 'max_age': 3600, + } + conn = self.create_connection(transport_options={'stream_config': stream_config}) + channel = conn.default_channel + assert channel.options['stream_config'] == stream_config + + def test_custom_consumer_config(self): + consumer_config = { + 'ack_policy': 'explicit', + 'deliver_policy': 'all', + } + conn = self.create_connection(transport_options={'consumer_config': consumer_config}) + channel = conn.default_channel + assert channel.options['consumer_config'] == consumer_config + + def test_custom_wait_time(self): + wait_time = 10 + conn = self.create_connection(transport_options={'wait_time_seconds': wait_time}) + channel = conn.default_channel + assert channel.wait_time_seconds == wait_time + + def test_custom_connection_wait_time(self): + wait_time = 15 + conn = self.create_connection(transport_options={'connection_wait_time_seconds': wait_time}) + channel = conn.default_channel + assert channel.connection_wait_time_seconds == wait_time + + +@pytest.mark.env('nats') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_Transport: + def setup_method(self): + self.client = Mock() + self.client.transport_options = {} + self.transport = nats.Transport(self.client) + + def test_driver_version(self): + assert self.transport.driver_version() + + @patch('kombu.transport.nats.Client') + def test_verify_connection(self, mock_client_class): + mock_client = AsyncMock() + mock_client_class.return_value = mock_client + mock_client.connect.return_value = None + mock_client.close.return_value = None + + connection = Mock() + connection.client.port = None + connection.client.hostname = None + + assert self.transport.verify_connection(connection) + mock_client.connect.assert_called_once() + mock_client.close.assert_called_once() + + @patch('kombu.transport.nats.Client') + def test_verify_connection_fails(self, mock_client_class): + mock_client = AsyncMock() + mock_client_class.return_value = mock_client + mock_client.connect.side_effect = ValueError + + connection = Mock() + connection.client.port = None + connection.client.hostname = None + + assert not self.transport.verify_connection(connection) + mock_client.connect.assert_called_once() + + def test_connection_errors(self): + assert self.transport.connection_errors == nats.NATS_CONNECTION_ERRORS + + def test_channel_errors(self): + assert self.transport.channel_errors == nats.NATS_CHANNEL_ERRORS + + def test_default_port(self): + assert self.transport.default_port == nats.DEFAULT_PORT + + def test_driver_type(self): + assert self.transport.driver_type == 'nats' + + def test_driver_name(self): + assert self.transport.driver_name == 'nats' diff --git a/tox.ini b/tox.ini index 780bd79f93..f183e79e85 100644 --- a/tox.ini +++ b/tox.ini @@ -5,6 +5,7 @@ envlist = {pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14}-linux-integration-redis {pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14}-linux-integration-mongodb {pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14}-linux-integration-kafka + {pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14}-linux-integration-nats flake8 apicheck pydocstyle @@ -33,11 +34,13 @@ setenv = passenv = DISTUTILS_USE_SDK deps= - -r{toxinidir}/requirements/dev.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/default.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/test.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/test-ci.txt apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux,3.14-linux,3.14t-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt + apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux,3.14-linux,3.14t-linux: -r{toxinidir}/requirements/extras/nats.txt + + integration-nats: -r{toxinidir}/requirements/extras/nats.txt apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt flake8,pydocstyle,mypy: -r{toxinidir}/requirements/pkgutils.txt integration: -r{toxinidir}/requirements/test-integration.txt @@ -48,6 +51,7 @@ commands = integration-redis: pytest -xv -E redis t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} integration-mongodb: pytest -xv -E mongodb t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} integration-kafka: pytest -xv -E kafka t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} + integration-nats: pytest -xv -E nats t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} basepython = pypy3: pypy3 @@ -67,6 +71,7 @@ docker = integration-mongodb: mongodb integration-kafka: zookeeper integration-kafka: kafka + integration-nats: nats dockerenv = PYAMQP_INTEGRATION_INSTANCE=1 @@ -126,6 +131,16 @@ environment = KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 ALLOW_PLAINTEXT_LISTENER=yes +[docker:nats] +dockerfile = {toxinidir}/t/integration/docker/Dockerfile.nats +command = nats-server --jetstream --http_port 8222 --debug +ports = 4222:4222/tcp +healthcheck_cmd = wget http://localhost:8222/healthz -q -S -O - +healthcheck_interval = 5 +healthcheck_timeout = 10 +healthcheck_retries = 3 +healthcheck_start_period = 3 + [testenv:apicheck] commands = pip install --upgrade -r{toxinidir}/requirements/dev.txt sphinx-build -j2 -b apicheck -d {envtmpdir}/doctrees docs docs/_build/apicheck @@ -170,4 +185,5 @@ commands = 3.13-linux-integration-redis,\ 3.13-linux-integration-mongodb,\ 3.13-linux-integration-kafka \ + 3.13-linux-integration-nats \ -p -o -- --exitfirst {posargs}