summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-05-26 11:41:38 +0100
committerGitHub <noreply@github.com>2020-05-26 11:41:38 +0100
commit00db90f4098e400306b3f25f14ba7a302d6f7ee8 (patch)
tree2aa4dd466b14e2a705065e492c833196bb2c4bdd
parentSimplify reap_monthly_active_users (#7558) (diff)
downloadsynapse-00db90f4098e400306b3f25f14ba7a302d6f7ee8.tar.xz
Fix recording of federation stream token (#7564)
A couple of changes of significance:

 * remove the `_last_ack < federation_position` condition, so that
   updates will still be correctly processed after restart

 * Correctly wire up send_federation_ack to the right class.
-rw-r--r--changelog.d/7565.bugfix1
-rw-r--r--synapse/app/generic_worker.py68
-rw-r--r--tests/replication/test_federation_ack.py71
3 files changed, 116 insertions, 24 deletions
diff --git a/changelog.d/7565.bugfix b/changelog.d/7565.bugfix
new file mode 100644
index 0000000000..35fabb9a0b
--- /dev/null
+++ b/changelog.d/7565.bugfix
@@ -0,0 +1 @@
+Fix exception `'GenericWorkerReplicationHandler' object has no attribute 'send_federation_ack'`, introduced in v1.13.0.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 2906b93f6a..440341cb3c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -17,7 +17,7 @@
 import contextlib
 import logging
 import sys
-from typing import Dict, Iterable
+from typing import Dict, Iterable, Optional, Set
 
 from typing_extensions import ContextManager
 
@@ -677,10 +677,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         self.notify_pushers = hs.config.start_pushers
         self.pusher_pool = hs.get_pusherpool()
 
+        self.send_handler = None  # type: Optional[FederationSenderHandler]
         if hs.config.send_federation:
-            self.send_handler = FederationSenderHandler(hs, self)
-        else:
-            self.send_handler = None
+            self.send_handler = FederationSenderHandler(hs)
 
     async def on_rdata(self, stream_name, instance_name, token, rows):
         await super().on_rdata(stream_name, instance_name, token, rows)
@@ -718,7 +717,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
                 if entities:
                     self.notifier.on_new_event("to_device_key", token, users=entities)
             elif stream_name == DeviceListsStream.NAME:
-                all_room_ids = set()
+                all_room_ids = set()  # type: Set[str]
                 for row in rows:
                     if row.entity.startswith("@"):
                         room_ids = await self.store.get_rooms_for_user(row.entity)
@@ -769,24 +768,33 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
 
 
 class FederationSenderHandler(object):
-    """Processes the replication stream and forwards the appropriate entries
-    to the federation sender.
+    """Processes the fedration replication stream
+
+    This class is only instantiate on the worker responsible for sending outbound
+    federation transactions. It receives rows from the replication stream and forwards
+    the appropriate entries to the FederationSender class.
     """
 
-    def __init__(self, hs: GenericWorkerServer, replication_client):
+    def __init__(self, hs: GenericWorkerServer):
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
-        self.replication_client = replication_client
-
+        self._hs = hs
+
+        # if the worker is restarted, we want to pick up where we left off in
+        # the replication stream, so load the position from the database.
+        #
+        # XXX is this actually worthwhile? Whenever the master is restarted, we'll
+        # drop some rows anyway (which is mostly fine because we're only dropping
+        # typing and presence notifications). If the replication stream is
+        # unreliable, why do we do all this hoop-jumping to store the position in the
+        # database? See also https://github.com/matrix-org/synapse/issues/7535.
+        #
         self.federation_position = self.store.federation_out_pos_startup
-        self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
 
+        self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
         self._last_ack = self.federation_position
 
-        self._room_serials = {}
-        self._room_typing = {}
-
     def on_start(self):
         # There may be some events that are persisted but haven't been sent,
         # so send them now.
@@ -849,22 +857,34 @@ class FederationSenderHandler(object):
             await self.federation_sender.send_read_receipt(receipt_info)
 
     async def update_token(self, token):
+        """Update the record of where we have processed to in the federation stream.
+
+        Called after we have processed a an update received over replication. Sends
+        a FEDERATION_ACK back to the master, and stores the token that we have processed
+         in `federation_stream_position` so that we can restart where we left off.
+        """
         try:
             self.federation_position = token
 
             # We linearize here to ensure we don't have races updating the token
+            #
+            # XXX this appears to be redundant, since the ReplicationCommandHandler
+            # has a linearizer which ensures that we only process one line of
+            # replication data at a time. Should we remove it, or is it doing useful
+            # service for robustness? Or could we replace it with an assertion that
+            # we're not being re-entered?
+
             with (await self._fed_position_linearizer.queue(None)):
-                if self._last_ack < self.federation_position:
-                    await self.store.update_federation_out_pos(
-                        "federation", self.federation_position
-                    )
+                await self.store.update_federation_out_pos(
+                    "federation", self.federation_position
+                )
 
-                    # We ACK this token over replication so that the master can drop
-                    # its in memory queues
-                    self.replication_client.send_federation_ack(
-                        self.federation_position
-                    )
-                    self._last_ack = self.federation_position
+                # We ACK this token over replication so that the master can drop
+                # its in memory queues
+                self._hs.get_tcp_replication().send_federation_ack(
+                    self.federation_position
+                )
+                self._last_ack = self.federation_position
         except Exception:
             logger.exception("Error updating federation stream position")
 
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
new file mode 100644
index 0000000000..5448d9f0dc
--- /dev/null
+++ b/tests/replication/test_federation_ack.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+
+from synapse.app.generic_worker import GenericWorkerServer
+from synapse.replication.tcp.commands import FederationAckCommand
+from synapse.replication.tcp.protocol import AbstractConnection
+from synapse.replication.tcp.streams.federation import FederationStream
+
+from tests.unittest import HomeserverTestCase
+
+
+class FederationAckTestCase(HomeserverTestCase):
+    def default_config(self) -> dict:
+        config = super().default_config()
+        config["worker_app"] = "synapse.app.federation_sender"
+        config["send_federation"] = True
+        return config
+
+    def make_homeserver(self, reactor, clock):
+        hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer)
+        return hs
+
+    def test_federation_ack_sent(self):
+        """A FEDERATION_ACK should be sent back after each RDATA federation
+
+        This test checks that the federation sender is correctly sending back
+        FEDERATION_ACK messages. The test works by spinning up a federation_sender
+        worker server, and then fishing out its ReplicationCommandHandler. We wire
+        the RCH up to a mock connection (so that we can observe the command being sent)
+        and then poke in an RDATA row.
+
+        XXX: it might be nice to do this by pretending to be a synapse master worker
+        (or a redis server), and having the worker connect to us via a mocked-up TCP
+        transport, rather than assuming that the implementation has a
+        ReplicationCommandHandler.
+        """
+        rch = self.hs.get_tcp_replication()
+
+        # wire up the ReplicationCommandHandler to a mock connection
+        mock_connection = mock.Mock(spec=AbstractConnection)
+        rch.new_connection(mock_connection)
+
+        # tell it it received an RDATA row
+        self.get_success(
+            rch.on_rdata(
+                "federation",
+                "master",
+                token=10,
+                rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
+            )
+        )
+
+        # now check that the FEDERATION_ACK was sent
+        mock_connection.send_command.assert_called_once()
+        cmd = mock_connection.send_command.call_args[0][0]
+        assert isinstance(cmd, FederationAckCommand)
+        self.assertEqual(cmd.token, 10)