summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py193
1 files changed, 78 insertions, 115 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 76c4cc54d1..e51a69074d 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -23,19 +23,19 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.http.site import SynapseSite
 from synapse.federation import send_queue
-from synapse.federation.units import Edu
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
-from synapse.storage.presence import UserPresenceState
-from synapse.util.async import sleep
+from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -50,16 +50,36 @@ from daemonize import Daemonize
 import sys
 import logging
 import gc
-import ujson as json
 
 logger = logging.getLogger("synapse.app.appservice")
 
 
 class FederationSenderSlaveStore(
     SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
-    SlavedRegistrationStore, SlavedDeviceStore,
+    SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
 ):
-    pass
+    def __init__(self, db_conn, hs):
+        super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
+
+        # We pull out the current federation stream position now so that we
+        # always have a known value for the federation position in memory so
+        # that we don't have to bounce via a deferred once when we start the
+        # replication streams.
+        self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
+
+    def _get_federation_out_pos(self, db_conn):
+        sql = (
+            "SELECT stream_id FROM federation_stream_position"
+            " WHERE type = ?"
+        )
+        sql = self.database_engine.convert_param_style(sql)
+
+        txn = db_conn.cursor()
+        txn.execute(sql, ("federation",))
+        rows = txn.fetchall()
+        txn.close()
+
+        return rows[0][0] if rows else -1
 
 
 class FederationSenderServer(HomeServer):
@@ -127,26 +147,27 @@ class FederationSenderServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        send_handler = FederationSenderHandler(self)
-
-        send_handler.on_start()
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args.update((yield send_handler.stream_positions()))
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                yield send_handler.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(30)
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return FederationSenderReplicationHandler(self)
+
+
+class FederationSenderReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
+        self.send_handler = FederationSenderHandler(hs, self)
+
+    def on_rdata(self, stream_name, token, rows):
+        super(FederationSenderReplicationHandler, self).on_rdata(
+            stream_name, token, rows
+        )
+        self.send_handler.process_replication_rows(stream_name, token, rows)
+
+    def get_streams_to_replicate(self):
+        args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+        args.update(self.send_handler.stream_positions())
+        return args
 
 
 def start(config_options):
@@ -205,7 +226,6 @@ def start(config_options):
             reactor.run()
 
     def start():
-        ps.replicate()
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
@@ -229,9 +249,15 @@ class FederationSenderHandler(object):
     """Processes the replication stream and forwards the appropriate entries
     to the federation sender.
     """
-    def __init__(self, hs):
+    def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
+        self.replication_client = replication_client
+
+        self.federation_position = self.store.federation_out_pos_startup
+        self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
+
+        self._last_ack = self.federation_position
 
         self._room_serials = {}
         self._room_typing = {}
@@ -243,98 +269,35 @@ class FederationSenderHandler(object):
             self.store.get_room_max_stream_ordering()
         )
 
-    @defer.inlineCallbacks
     def stream_positions(self):
-        stream_id = yield self.store.get_federation_out_pos("federation")
-        defer.returnValue({
-            "federation": stream_id,
+        return {"federation": self.federation_position}
 
-            # Ack stuff we've "processed", this should only be called from
-            # one process.
-            "federation_ack": stream_id,
-        })
-
-    @defer.inlineCallbacks
-    def process_replication(self, result):
+    def process_replication_rows(self, stream_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
-        fed_stream = result.get("federation")
-        if fed_stream:
-            latest_id = int(fed_stream["position"])
-
-            # The federation stream containis a bunch of different types of
-            # rows that need to be handled differently. We parse the rows, put
-            # them into the appropriate collection and then send them off.
-            presence_to_send = {}
-            keyed_edus = {}
-            edus = {}
-            failures = {}
-            device_destinations = set()
-
-            # Parse the rows in the stream
-            for row in fed_stream["rows"]:
-                position, typ, content_js = row
-                content = json.loads(content_js)
-
-                if typ == send_queue.PRESENCE_TYPE:
-                    destination = content["destination"]
-                    state = UserPresenceState.from_dict(content["state"])
-
-                    presence_to_send.setdefault(destination, []).append(state)
-                elif typ == send_queue.KEYED_EDU_TYPE:
-                    key = content["key"]
-                    edu = Edu(**content["edu"])
-
-                    keyed_edus.setdefault(
-                        edu.destination, {}
-                    )[(edu.destination, tuple(key))] = edu
-                elif typ == send_queue.EDU_TYPE:
-                    edu = Edu(**content)
-
-                    edus.setdefault(edu.destination, []).append(edu)
-                elif typ == send_queue.FAILURE_TYPE:
-                    destination = content["destination"]
-                    failure = content["failure"]
-
-                    failures.setdefault(destination, []).append(failure)
-                elif typ == send_queue.DEVICE_MESSAGE_TYPE:
-                    device_destinations.add(content["destination"])
-                else:
-                    raise Exception("Unrecognised federation type: %r", typ)
-
-            # We've finished collecting, send everything off
-            for destination, states in presence_to_send.items():
-                self.federation_sender.send_presence(destination, states)
-
-            for destination, edu_map in keyed_edus.items():
-                for key, edu in edu_map.items():
-                    self.federation_sender.send_edu(
-                        edu.destination, edu.edu_type, edu.content, key=key,
-                    )
-
-            for destination, edu_list in edus.items():
-                for edu in edu_list:
-                    self.federation_sender.send_edu(
-                        edu.destination, edu.edu_type, edu.content, key=None,
-                    )
-
-            for destination, failure_list in failures.items():
-                for failure in failure_list:
-                    self.federation_sender.send_failure(destination, failure)
-
-            for destination in device_destinations:
-                self.federation_sender.send_device_messages(destination)
-
-            # Record where we are in the stream.
-            yield self.store.update_federation_out_pos(
-                "federation", latest_id
-            )
+        if stream_name == "federation":
+            send_queue.process_rows_for_federation(self.federation_sender, rows)
+            preserve_fn(self.update_token)(token)
 
         # We also need to poke the federation sender when new events happen
-        event_stream = result.get("events")
-        if event_stream:
-            latest_pos = event_stream["position"]
-            self.federation_sender.notify_new_events(latest_pos)
+        elif stream_name == "events":
+            self.federation_sender.notify_new_events(token)
+
+    @defer.inlineCallbacks
+    def update_token(self, token):
+        self.federation_position = token
+
+        # We linearize here to ensure we don't have races updating the token
+        with (yield self._fed_position_linearizer.queue(None)):
+            if self._last_ack < self.federation_position:
+                yield self.store.update_federation_out_pos(
+                    "federation", self.federation_position
+                )
+
+                # We ACK this token over replication so that the master can drop
+                # its in memory queues
+                self.replication_client.send_federation_ack(self.federation_position)
+                self._last_ack = self.federation_position
 
 
 if __name__ == '__main__':