summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorkaiyou <pierre@jaury.eu>2018-04-08 17:56:44 +0200
committerkaiyou <pierre@jaury.eu>2018-04-08 17:56:44 +0200
commita13b7860c6570ad1bb9003e94ad67c761f0cf312 (patch)
treed08b2c50d1c1a6e570c59396660cd83836aa9f14 /synapse/replication
parentMerge remote-tracking branch 'upstream/master' into feat-dockerfile (diff)
parentUpdate README.rst (diff)
downloadsynapse-a13b7860c6570ad1bb9003e94ad67c761f0cf312.tar.xz
Merge remote-tracking branch 'upstream/master' into feat-dockerfile
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py30
-rw-r--r--synapse/replication/http/membership.py334
-rw-r--r--synapse/replication/http/send_event.py166
-rw-r--r--synapse/replication/slave/storage/account_data.py47
-rw-r--r--synapse/replication/slave/storage/appservice.py35
-rw-r--r--synapse/replication/slave/storage/directory.py8
-rw-r--r--synapse/replication/slave/storage/events.py150
-rw-r--r--synapse/replication/slave/storage/profile.py21
-rw-r--r--synapse/replication/slave/storage/push_rule.py24
-rw-r--r--synapse/replication/slave/storage/pushers.py12
-rw-r--r--synapse/replication/slave/storage/receipts.py36
-rw-r--r--synapse/replication/slave/storage/registration.py18
-rw-r--r--synapse/replication/slave/storage/room.py21
-rw-r--r--synapse/replication/tcp/commands.py16
-rw-r--r--synapse/replication/tcp/protocol.py19
15 files changed, 645 insertions, 292 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
new file mode 100644
index 0000000000..1d7a607529
--- /dev/null
+++ b/synapse/replication/http/__init__.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import JsonResource
+from synapse.replication.http import membership, send_event
+
+
+REPLICATION_PREFIX = "/_synapse/replication"
+
+
+class ReplicationRestResource(JsonResource):
+    def __init__(self, hs):
+        JsonResource.__init__(self, hs, canonical_json=False)
+        self.register_servlets(hs)
+
+    def register_servlets(self, hs):
+        send_event.register_servlets(hs, self)
+        membership.register_servlets(hs, self)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
new file mode 100644
index 0000000000..e66c4e881f
--- /dev/null
+++ b/synapse/replication/http/membership.py
@@ -0,0 +1,334 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, MatrixCodeMessageException
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import Requester, UserID
+from synapse.util.distributor import user_left_room, user_joined_room
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def remote_join(client, host, port, requester, remote_room_hosts,
+                room_id, user_id, content):
+    """Ask the master to do a remote join for the given user to the given room
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        remote_room_hosts (list[str]): Servers to try and join via
+        room_id (str)
+        user_id (str)
+        content (dict): The event content to use for the join event
+
+    Returns:
+        Deferred
+    """
+    uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "remote_room_hosts": remote_room_hosts,
+        "room_id": room_id,
+        "user_id": user_id,
+        "content": content,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def remote_reject_invite(client, host, port, requester, remote_room_hosts,
+                         room_id, user_id):
+    """Ask master to reject the invite for the user and room.
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        remote_room_hosts (list[str]): Servers to try and reject via
+        room_id (str)
+        user_id (str)
+
+    Returns:
+        Deferred
+    """
+    uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "remote_room_hosts": remote_room_hosts,
+        "room_id": room_id,
+        "user_id": user_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def get_or_register_3pid_guest(client, host, port, requester,
+                               medium, address, inviter_user_id):
+    """Ask the master to get/create a guest account for given 3PID.
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        medium (str)
+        address (str)
+        inviter_user_id (str): The user ID who is trying to invite the
+            3PID
+
+    Returns:
+        Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+        3PID guest account.
+    """
+
+    uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "medium": medium,
+        "address": address,
+        "inviter_user_id": inviter_user_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def notify_user_membership_change(client, host, port, user_id, room_id, change):
+    """Notify master that a user has joined or left the room
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication.
+        user_id (str)
+        room_id (str)
+        change (str): Either "join" or "left"
+
+    Returns:
+        Deferred
+    """
+    assert change in ("joined", "left")
+
+    uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
+
+    payload = {
+        "user_id": user_id,
+        "room_id": room_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+class ReplicationRemoteJoinRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
+
+    def __init__(self, hs):
+        super(ReplicationRemoteJoinRestServlet, self).__init__()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        remote_room_hosts = content["remote_room_hosts"]
+        room_id = content["room_id"]
+        user_id = content["user_id"]
+        event_content = content["content"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "remote_join: %s into room: %s",
+            user_id, room_id,
+        )
+
+        yield self.federation_handler.do_invite_join(
+            remote_room_hosts,
+            room_id,
+            user_id,
+            event_content,
+        )
+
+        defer.returnValue((200, {}))
+
+
+class ReplicationRemoteRejectInviteRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
+
+    def __init__(self, hs):
+        super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        remote_room_hosts = content["remote_room_hosts"]
+        room_id = content["room_id"]
+        user_id = content["user_id"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "remote_reject_invite: %s out of room: %s",
+            user_id, room_id,
+        )
+
+        try:
+            event = yield self.federation_handler.do_remotely_reject_invite(
+                remote_room_hosts,
+                room_id,
+                user_id,
+            )
+            ret = event.get_pdu_json()
+        except Exception as e:
+            # if we were unable to reject the exception, just mark
+            # it as rejected on our end and plough ahead.
+            #
+            # The 'except' clause is very broad, but we need to
+            # capture everything from DNS failures upwards
+            #
+            logger.warn("Failed to reject invite: %s", e)
+
+            yield self.store.locally_reject_invite(
+                user_id, room_id
+            )
+            ret = {}
+
+        defer.returnValue((200, ret))
+
+
+class ReplicationRegister3PIDGuestRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
+
+    def __init__(self, hs):
+        super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
+
+        self.registeration_handler = hs.get_handlers().registration_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        medium = content["medium"]
+        address = content["address"]
+        inviter_user_id = content["inviter_user_id"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info("get_or_register_3pid_guest: %r", content)
+
+        ret = yield self.registeration_handler.get_or_register_3pid_guest(
+            medium, address, inviter_user_id,
+        )
+
+        defer.returnValue((200, ret))
+
+
+class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
+
+    def __init__(self, hs):
+        super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
+
+        self.registeration_handler = hs.get_handlers().registration_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.distributor = hs.get_distributor()
+
+    def on_POST(self, request, change):
+        content = parse_json_object_from_request(request)
+
+        user_id = content["user_id"]
+        room_id = content["room_id"]
+
+        logger.info("user membership change: %s in %s", user_id, room_id)
+
+        user = UserID.from_string(user_id)
+
+        if change == "joined":
+            user_joined_room(self.distributor, user, room_id)
+        elif change == "left":
+            user_left_room(self.distributor, user, room_id)
+        else:
+            raise Exception("Unrecognized change: %r", change)
+
+        return (200, {})
+
+
+def register_servlets(hs, http_server):
+    ReplicationRemoteJoinRestServlet(hs).register(http_server)
+    ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
+    ReplicationRegister3PIDGuestRestServlet(hs).register(http_server)
+    ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
new file mode 100644
index 0000000000..bbe2f967b7
--- /dev/null
+++ b/synapse/replication/http/send_event.py
@@ -0,0 +1,166 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import (
+    SynapseError, MatrixCodeMessageException, CodeMessageException,
+)
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.util.async import sleep
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.metrics import Measure
+from synapse.types import Requester, UserID
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def send_event_to_master(client, host, port, requester, event, context,
+                         ratelimit, extra_users):
+    """Send event to be handled on the master
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        event (FrozenEvent)
+        context (EventContext)
+        ratelimit (bool)
+        extra_users (list(UserID)): Any extra users to notify about event
+    """
+    uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
+        host, port, event.event_id,
+    )
+
+    payload = {
+        "event": event.get_pdu_json(),
+        "internal_metadata": event.internal_metadata.get_dict(),
+        "rejected_reason": event.rejected_reason,
+        "context": context.serialize(event),
+        "requester": requester.serialize(),
+        "ratelimit": ratelimit,
+        "extra_users": [u.to_string() for u in extra_users],
+    }
+
+    try:
+        # We keep retrying the same request for timeouts. This is so that we
+        # have a good idea that the request has either succeeded or failed on
+        # the master, and so whether we should clean up or not.
+        while True:
+            try:
+                result = yield client.put_json(uri, payload)
+                break
+            except CodeMessageException as e:
+                if e.code != 504:
+                    raise
+
+            logger.warn("send_event request timed out")
+
+            # If we timed out we probably don't need to worry about backing
+            # off too much, but lets just wait a little anyway.
+            yield sleep(1)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+class ReplicationSendEventRestServlet(RestServlet):
+    """Handles events newly created on workers, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/send_event/:event_id
+
+        {
+            "event": { .. serialized event .. },
+            "internal_metadata": { .. serialized internal_metadata .. },
+            "rejected_reason": ..,   // The event.rejected_reason field
+            "context": { .. serialized event context .. },
+            "requester": { .. serialized requester .. },
+            "ratelimit": true,
+            "extra_users": [],
+        }
+    """
+    PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
+
+    def __init__(self, hs):
+        super(ReplicationSendEventRestServlet, self).__init__()
+
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+        # The responses are tiny, so we may as well cache them for a while
+        self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
+
+    def on_PUT(self, request, event_id):
+        result = self.response_cache.get(event_id)
+        if not result:
+            result = self.response_cache.set(
+                event_id,
+                self._handle_request(request)
+            )
+        else:
+            logger.warn("Returning cached response")
+        return make_deferred_yieldable(result)
+
+    @preserve_fn
+    @defer.inlineCallbacks
+    def _handle_request(self, request):
+        with Measure(self.clock, "repl_send_event_parse"):
+            content = parse_json_object_from_request(request)
+
+            event_dict = content["event"]
+            internal_metadata = content["internal_metadata"]
+            rejected_reason = content["rejected_reason"]
+            event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+            requester = Requester.deserialize(self.store, content["requester"])
+            context = yield EventContext.deserialize(self.store, content["context"])
+
+            ratelimit = content["ratelimit"]
+            extra_users = [UserID.from_string(u) for u in content["extra_users"]]
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "Got event to send with ID: %s into room: %s",
+            event.event_id, event.room_id,
+        )
+
+        yield self.event_creation_handler.persist_and_notify_client_event(
+            requester, event, context,
+            ratelimit=ratelimit,
+            extra_users=extra_users,
+        )
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    ReplicationSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index efbd87918e..d9ba6d69b1 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,50 +14,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.account_data import AccountDataStore
-from synapse.storage.tags import TagsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.account_data import AccountDataWorkerStore
+from synapse.storage.tags import TagsWorkerStore
 
 
-class SlavedAccountDataStore(BaseSlavedStore):
+class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedAccountDataStore, self).__init__(db_conn, hs)
         self._account_data_id_gen = SlavedIdTracker(
             db_conn, "account_data_max_stream_id", "stream_id",
         )
-        self._account_data_stream_cache = StreamChangeCache(
-            "AccountDataAndTagsChangeCache",
-            self._account_data_id_gen.get_current_token(),
-        )
-
-    get_account_data_for_user = (
-        AccountDataStore.__dict__["get_account_data_for_user"]
-    )
-
-    get_global_account_data_by_type_for_users = (
-        AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
-    )
 
-    get_global_account_data_by_type_for_user = (
-        AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
-    )
-
-    get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
-    get_tags_for_room = (
-        DataStore.get_tags_for_room.__func__
-    )
-    get_account_data_for_room = (
-        DataStore.get_account_data_for_room.__func__
-    )
-
-    get_updated_tags = DataStore.get_updated_tags.__func__
-    get_updated_account_data_for_user = (
-        DataStore.get_updated_account_data_for_user.__func__
-    )
+        super(SlavedAccountDataStore, self).__init__(db_conn, hs)
 
     def get_max_account_data_stream_id(self):
         return self._account_data_id_gen.get_current_token()
@@ -85,6 +56,10 @@ class SlavedAccountDataStore(BaseSlavedStore):
                         (row.data_type, row.user_id,)
                     )
                 self.get_account_data_for_user.invalidate((row.user_id,))
+                self.get_account_data_for_room.invalidate((row.user_id, row.room_id,))
+                self.get_account_data_for_room_and_type.invalidate(
+                    (row.user_id, row.room_id, row.data_type,),
+                )
                 self._account_data_stream_cache.entity_has_changed(
                     row.user_id, token
                 )
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 0d3f31a50c..8cae3076f4 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,33 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.config.appservice import load_appservices
-from synapse.storage.appservice import _make_exclusive_regex
+from synapse.storage.appservice import (
+    ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
+)
 
 
-class SlavedApplicationServiceStore(BaseSlavedStore):
-    def __init__(self, db_conn, hs):
-        super(SlavedApplicationServiceStore, self).__init__(db_conn, hs)
-        self.services_cache = load_appservices(
-            hs.config.server_name,
-            hs.config.app_service_config_files
-        )
-        self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
-
-    get_app_service_by_token = DataStore.get_app_service_by_token.__func__
-    get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
-    get_app_services = DataStore.get_app_services.__func__
-    get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__
-    create_appservice_txn = DataStore.create_appservice_txn.__func__
-    get_appservices_by_state = DataStore.get_appservices_by_state.__func__
-    get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__
-    _get_last_txn = DataStore._get_last_txn.__func__
-    complete_appservice_txn = DataStore.complete_appservice_txn.__func__
-    get_appservice_state = DataStore.get_appservice_state.__func__
-    set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
-    set_appservice_state = DataStore.set_appservice_state.__func__
-    get_if_app_services_interested_in_user = (
-        DataStore.get_if_app_services_interested_in_user.__func__
-    )
+class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
+                                    ApplicationServiceWorkerStore):
+    pass
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 7301d885f2..6deecd3963 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -14,10 +14,8 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage.directory import DirectoryStore
+from synapse.storage.directory import DirectoryWorkerStore
 
 
-class DirectoryStore(BaseSlavedStore):
-    get_aliases_for_room = DirectoryStore.__dict__[
-        "get_aliases_for_room"
-    ]
+class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 29d7296b43..b1f64ef0d8 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,13 +16,13 @@
 import logging
 
 from synapse.api.constants import EventTypes
-from synapse.storage import DataStore
-from synapse.storage.event_federation import EventFederationStore
-from synapse.storage.event_push_actions import EventPushActionsStore
-from synapse.storage.roommember import RoomMemberStore
-from synapse.storage.state import StateGroupReadStore
-from synapse.storage.stream import StreamStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.event_federation import EventFederationWorkerStore
+from synapse.storage.event_push_actions import EventPushActionsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
+from synapse.storage.stream import StreamWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
@@ -37,138 +38,33 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(StateGroupReadStore, BaseSlavedStore):
+class SlavedEventStore(EventFederationWorkerStore,
+                       RoomMemberWorkerStore,
+                       EventPushActionsWorkerStore,
+                       StreamWorkerStore,
+                       EventsWorkerStore,
+                       StateGroupWorkerStore,
+                       SignatureWorkerStore,
+                       BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedEventStore, self).__init__(db_conn, hs)
         self._stream_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering",
         )
         self._backfill_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering", step=-1
         )
-        events_max = self._stream_id_gen.get_current_token()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
-            db_conn, "events",
-            entity_column="room_id",
-            stream_column="stream_ordering",
-            max_value=events_max,
-        )
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", min_event_val,
-            prefilled_cache=event_cache_prefill,
-        )
-        self._membership_stream_cache = StreamChangeCache(
-            "MembershipStreamChangeCache", events_max,
-        )
 
-        self.stream_ordering_month_ago = 0
-        self._stream_order_on_start = self.get_room_max_stream_ordering()
+        super(SlavedEventStore, self).__init__(db_conn, hs)
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
-    get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
-    get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
-    get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
-    get_users_who_share_room_with_user = (
-        RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
-    )
-    get_latest_event_ids_in_room = EventFederationStore.__dict__[
-        "get_latest_event_ids_in_room"
-    ]
-    get_invited_rooms_for_user = RoomMemberStore.__dict__[
-        "get_invited_rooms_for_user"
-    ]
-    get_unread_event_push_actions_by_room_for_user = (
-        EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
-    )
-    _get_unread_counts_by_receipt_txn = (
-        DataStore._get_unread_counts_by_receipt_txn.__func__
-    )
-    _get_unread_counts_by_pos_txn = (
-        DataStore._get_unread_counts_by_pos_txn.__func__
-    )
-    get_recent_event_ids_for_room = (
-        StreamStore.__dict__["get_recent_event_ids_for_room"]
-    )
-    _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
-    has_room_changed_since = DataStore.has_room_changed_since.__func__
-
-    get_unread_push_actions_for_user_in_range_for_http = (
-        DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
-    )
-    get_unread_push_actions_for_user_in_range_for_email = (
-        DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
-    )
-    get_push_action_users_in_range = (
-        DataStore.get_push_action_users_in_range.__func__
-    )
-    get_event = DataStore.get_event.__func__
-    get_events = DataStore.get_events.__func__
-    get_rooms_for_user_where_membership_is = (
-        DataStore.get_rooms_for_user_where_membership_is.__func__
-    )
-    get_membership_changes_for_user = (
-        DataStore.get_membership_changes_for_user.__func__
-    )
-    get_room_events_max_id = DataStore.get_room_events_max_id.__func__
-    get_room_events_stream_for_room = (
-        DataStore.get_room_events_stream_for_room.__func__
-    )
-    get_events_around = DataStore.get_events_around.__func__
-    get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
-    get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
-    _get_joined_users_from_context = (
-        RoomMemberStore.__dict__["_get_joined_users_from_context"]
-    )
-
-    get_joined_hosts = DataStore.get_joined_hosts.__func__
-    _get_joined_hosts = RoomMemberStore.__dict__["_get_joined_hosts"]
-
-    get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
-    get_room_events_stream_for_rooms = (
-        DataStore.get_room_events_stream_for_rooms.__func__
-    )
-    is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
-    get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
-    _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
-    _get_events = DataStore._get_events.__func__
-    _get_events_from_cache = DataStore._get_events_from_cache.__func__
-
-    _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
-    _enqueue_events = DataStore._enqueue_events.__func__
-    _do_fetch = DataStore._do_fetch.__func__
-    _fetch_event_rows = DataStore._fetch_event_rows.__func__
-    _get_event_from_row = DataStore._get_event_from_row.__func__
-    _get_rooms_for_user_where_membership_is_txn = (
-        DataStore._get_rooms_for_user_where_membership_is_txn.__func__
-    )
-    _get_events_around_txn = DataStore._get_events_around_txn.__func__
-
-    get_backfill_events = DataStore.get_backfill_events.__func__
-    _get_backfill_events = DataStore._get_backfill_events.__func__
-    get_missing_events = DataStore.get_missing_events.__func__
-    _get_missing_events = DataStore._get_missing_events.__func__
-
-    get_auth_chain = DataStore.get_auth_chain.__func__
-    get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
-    _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
-
-    get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
-
-    get_forward_extremeties_for_room = (
-        DataStore.get_forward_extremeties_for_room.__func__
-    )
-    _get_forward_extremeties_for_room = (
-        EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
-    )
-
-    get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
-
-    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
-    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
new file mode 100644
index 0000000000..46c28d4171
--- /dev/null
+++ b/synapse/replication/slave/storage/profile.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.storage.profile import ProfileWorkerStore
+
+
+class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 83e880fdd2..bb2c40b6e3 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,29 +16,15 @@
 
 from .events import SlavedEventStore
 from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage import DataStore
-from synapse.storage.push_rule import PushRuleStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.push_rule import PushRulesWorkerStore
 
 
-class SlavedPushRuleStore(SlavedEventStore):
+class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
     def __init__(self, db_conn, hs):
-        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
         self._push_rules_stream_id_gen = SlavedIdTracker(
             db_conn, "push_rules_stream", "stream_id",
         )
-        self.push_rules_stream_cache = StreamChangeCache(
-            "PushRulesStreamChangeCache",
-            self._push_rules_stream_id_gen.get_current_token(),
-        )
-
-    get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"]
-    get_push_rules_enabled_for_user = (
-        PushRuleStore.__dict__["get_push_rules_enabled_for_user"]
-    )
-    have_push_rules_changed_for_user = (
-        DataStore.have_push_rules_changed_for_user.__func__
-    )
+        super(SlavedPushRuleStore, self).__init__(db_conn, hs)
 
     def get_push_rules_stream_token(self):
         return (
@@ -45,6 +32,9 @@ class SlavedPushRuleStore(SlavedEventStore):
             self._stream_id_gen.get_current_token(),
         )
 
+    def get_max_push_rules_stream_id(self):
+        return self._push_rules_stream_id_gen.get_current_token()
+
     def stream_positions(self):
         result = super(SlavedPushRuleStore, self).stream_positions()
         result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 4e8d68ece9..a7cd5a7291 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,10 +17,10 @@
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage import DataStore
+from synapse.storage.pusher import PusherWorkerStore
 
 
-class SlavedPusherStore(BaseSlavedStore):
+class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
         super(SlavedPusherStore, self).__init__(db_conn, hs)
@@ -28,13 +29,6 @@ class SlavedPusherStore(BaseSlavedStore):
             extra_tables=[("deleted_pushers", "stream_id")],
         )
 
-    get_all_pushers = DataStore.get_all_pushers.__func__
-    get_pushers_by = DataStore.get_pushers_by.__func__
-    get_pushers_by_app_id_and_pushkey = (
-        DataStore.get_pushers_by_app_id_and_pushkey.__func__
-    )
-    _decode_pushers_rows = DataStore._decode_pushers_rows.__func__
-
     def stream_positions(self):
         result = super(SlavedPusherStore, self).stream_positions()
         result["pushers"] = self._pushers_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index b371574ece..1647072f65 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,9 +17,7 @@
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage import DataStore
-from synapse.storage.receipts import ReceiptsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.receipts import ReceiptsWorkerStore
 
 # So, um, we want to borrow a load of functions intended for reading from
 # a DataStore, but we don't want to take functions that either write to the
@@ -29,36 +28,19 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedReceiptsStore(BaseSlavedStore):
+class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedReceiptsStore, self).__init__(db_conn, hs)
-
+        # We instantiate this first as the ReceiptsWorkerStore constructor
+        # needs to be able to call get_max_receipt_stream_id
         self._receipts_id_gen = SlavedIdTracker(
             db_conn, "receipts_linearized", "stream_id"
         )
 
-        self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
-        )
-
-    get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
-    get_linearized_receipts_for_room = (
-        ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
-    )
-    _get_linearized_receipts_for_rooms = (
-        ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
-    )
-    get_last_receipt_event_id_for_user = (
-        ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
-    )
-
-    get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
-    get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+        super(SlavedReceiptsStore, self).__init__(db_conn, hs)
 
-    get_linearized_receipts_for_rooms = (
-        DataStore.get_linearized_receipts_for_rooms.__func__
-    )
+    def get_max_receipt_stream_id(self):
+        return self._receipts_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(SlavedReceiptsStore, self).stream_positions()
@@ -71,6 +53,8 @@ class SlavedReceiptsStore(BaseSlavedStore):
         self.get_last_receipt_event_id_for_user.invalidate(
             (user_id, room_id, receipt_type)
         )
+        self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id)
+        self.get_receipts_for_room.invalidate((room_id, receipt_type))
 
     def process_replication_rows(self, stream_name, token, rows):
         if stream_name == "receipts":
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index e27c7332d2..7323bf0f1e 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -14,20 +14,8 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.registration import RegistrationStore
+from synapse.storage.registration import RegistrationWorkerStore
 
 
-class SlavedRegistrationStore(BaseSlavedStore):
-    def __init__(self, db_conn, hs):
-        super(SlavedRegistrationStore, self).__init__(db_conn, hs)
-
-    # TODO: use the cached version and invalidate deleted tokens
-    get_user_by_access_token = RegistrationStore.__dict__[
-        "get_user_by_access_token"
-    ]
-
-    _query_for_auth = DataStore._query_for_auth.__func__
-    get_user_by_id = RegistrationStore.__dict__[
-        "get_user_by_id"
-    ]
+class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index f510384033..5ae1670157 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -14,32 +14,19 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
+from synapse.storage.room import RoomWorkerStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 
-class RoomStore(BaseSlavedStore):
+class RoomStore(RoomWorkerStore, BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(RoomStore, self).__init__(db_conn, hs)
         self._public_room_id_gen = SlavedIdTracker(
             db_conn, "public_room_list_stream", "stream_id"
         )
 
-    get_public_room_ids = DataStore.get_public_room_ids.__func__
-    get_current_public_room_stream_id = (
-        DataStore.get_current_public_room_stream_id.__func__
-    )
-    get_public_room_ids_at_stream_id = (
-        RoomStore.__dict__["get_public_room_ids_at_stream_id"]
-    )
-    get_public_room_ids_at_stream_id_txn = (
-        DataStore.get_public_room_ids_at_stream_id_txn.__func__
-    )
-    get_published_at_stream_id_txn = (
-        DataStore.get_published_at_stream_id_txn.__func__
-    )
-    get_public_room_changes = DataStore.get_public_room_changes.__func__
+    def get_current_public_room_stream_id(self):
+        return self._public_room_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(RoomStore, self).stream_positions()
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 9633404f73..0005ad5879 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,7 +19,7 @@ allowed to be sent by which side.
 """
 
 import logging
-import simplejson as json
+import simplejson
 
 
 logger = logging.getLogger(__name__)
@@ -100,14 +100,14 @@ class RdataCommand(Command):
         return cls(
             stream_name,
             None if token == "batch" else int(token),
-            json.loads(row_json)
+            simplejson.loads(row_json)
         )
 
     def to_line(self):
         return " ".join((
             self.stream_name,
             str(self.token) if self.token is not None else "batch",
-            json.dumps(self.row),
+            simplejson.dumps(self.row, namedtuple_as_object=False),
         ))
 
 
@@ -298,10 +298,12 @@ class InvalidateCacheCommand(Command):
     def from_line(cls, line):
         cache_func, keys_json = line.split(" ", 1)
 
-        return cls(cache_func, json.loads(keys_json))
+        return cls(cache_func, simplejson.loads(keys_json))
 
     def to_line(self):
-        return " ".join((self.cache_func, json.dumps(self.keys)))
+        return " ".join((
+            self.cache_func, simplejson.dumps(self.keys, namedtuple_as_object=False)
+        ))
 
 
 class UserIpCommand(Command):
@@ -325,14 +327,14 @@ class UserIpCommand(Command):
     def from_line(cls, line):
         user_id, jsn = line.split(" ", 1)
 
-        access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+        access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
 
         return cls(
             user_id, access_token, ip, user_agent, device_id, last_seen
         )
 
     def to_line(self):
-        return self.user_id + " " + json.dumps((
+        return self.user_id + " " + simplejson.dumps((
             self.access_token, self.ip, self.user_agent, self.device_id,
             self.last_seen,
         ))
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index d59503b905..0a9a290af4 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             self.send_error("Wrong remote")
 
     def on_RDATA(self, cmd):
+        stream_name = cmd.stream_name
+        inbound_rdata_count.inc(stream_name)
+
         try:
-            row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
+            row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
         except Exception:
             logger.exception(
                 "[%s] Failed to parse RDATA: %r %r",
-                self.id(), cmd.stream_name, cmd.row
+                self.id(), stream_name, cmd.row
             )
             raise
 
         if cmd.token is None:
             # I.e. this is part of a batch of updates for this stream. Batch
             # until we get an update for the stream with a non None token
-            self.pending_batches.setdefault(cmd.stream_name, []).append(row)
+            self.pending_batches.setdefault(stream_name, []).append(row)
         else:
             # Check if this is the last of a batch of updates
-            rows = self.pending_batches.pop(cmd.stream_name, [])
+            rows = self.pending_batches.pop(stream_name, [])
             rows.append(row)
 
-            self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
+            self.handler.on_rdata(stream_name, cmd.token, rows)
 
     def on_POSITION(self, cmd):
         self.handler.on_position(cmd.stream_name, cmd.token)
@@ -644,3 +647,9 @@ metrics.register_callback(
     },
     labels=["command", "name", "conn_id"],
 )
+
+# number of updates received for each RDATA stream
+inbound_rdata_count = metrics.register_counter(
+    "inbound_rdata_count",
+    labels=["stream_name"],
+)