Ad

How To Implement Single-producer Multi-consumer With Aioredis Pub/sub

- 1 answer

I have the web app. That app has endpoint to push some object data to redis channel.
And another endpoint handles websocket connection, where that data is fetched from channel and send to client via ws.

When i connect via ws, messages gets only first connected client.

How to read messages from redis channel with multiple clients and not create a new subscription?

Websocket handler.
Here i subscribe to channel, save it to app (init_tram_channel). Then run job where i listen channel and send messages(run_tram_listening).

@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    tram_id = int(request.match_info['tram_id'])
    channel_name = f'tram_{tram_id}'

    await init_tram_channel(channel_name, request.app)
    tram_job = await run_tram_listening(
        request=request,
        ws=ws,
        channel=request.app['tram_producers'][channel_name]
    )

    request.app['websockets'].add(ws)
    try:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                if msg.data == 'close':
                    await ws.close()
                    break
            if msg.type == aiohttp.WSMsgType.ERROR:
                logging.error(f'ws connection was closed with exception {ws.exception()}')
            else:
                await asyncio.sleep(0.005)
    except asyncio.CancelledError:
        pass
    finally:
        await tram_job.close()
        request.app['websockets'].discard(ws)

    return ws

Subscribing and saving channel.
Every channel is related to unique object, and in order not to create many channels that related to the same object, i save only one to app. app['tram_producers'] is dict.

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = channel

Running coro for channel listening. I run it via aiojobs:

async def run_tram_listening(
        request: web.Request,
        ws: web.WebSocketResponse,
        channel: Channel
):
    """
    :return: aiojobs._job.Job object
    """
    listen_redis_job = await spawn(
        request,
        _read_tram_subscription(
            ws,
            channel
        )
    )
    return listen_redis_job

Coro where i listen and send messages:

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        channel: Channel
):
    try:
        async for msg in channel.iter():
            tram_data = msg.decode()
            await ws.send_json(tram_data)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)
Ad

Answer

The following code has been found in some aioredis github issue (I've adopted it to my task).

class TramProducer:
    def __init__(self, channel: aioredis.Channel):
        self._future = None
        self._channel = channel

    def __aiter__(self):
        return self

    def __anext__(self):
        return asyncio.shield(self._get_message())

    async def _get_message(self):
        if self._future:
            return await self._future

        self._future = asyncio.get_event_loop().create_future()
        message = await self._channel.get_json()
        future, self._future = self._future, None
        future.set_result(message)
        return message

So, how it works? TramProducer wraps the way we get messages.
As said @Messa

message is received from one Redis subscription only once.

So only one client of TramProducer is retrieving messages from redis, while other clients are waiting for future result that will be set after receiving message from channel.

If self._future initialized it means that somebody is waiting for message from redis, so we will just wait for self._future result.

TramProducer usage (i've taken an example from my question):

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        tram_producer: TramProducer
):
    try:
        async for msg in tram_producer:
            await ws.send_json(msg)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)

TramProducer initialization:

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = TramProducer(channel)

I think it maybe helpfull for somebody.
Full project here https://gitlab.com/tram-emulator/tram-server

Ad
source: stackoverflow.com
Ad