summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-21 11:28:37 +0000
committerErik Johnston <erik@matrix.org>2016-11-21 11:33:08 +0000
commit7c9cdb22453d1a442e5c280149aeeff4d46da215 (patch)
tree1434dca32f57320810b5e70314db521f5cc7a338
parentHandle sending events and device messages over federation (diff)
downloadsynapse-7c9cdb22453d1a442e5c280149aeeff4d46da215.tar.xz
Store federation stream positions in the database
-rw-r--r--synapse/app/federation_sender.py38
-rw-r--r--synapse/federation/transaction_queue.py21
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/storage/_base.py18
-rw-r--r--synapse/storage/schema/delta/39/federation_out_position.sql22
-rw-r--r--synapse/storage/stream.py16
6 files changed, 94 insertions, 24 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 32113c175c..6678667c35 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
         replication_url = self.config.worker_replication_url
-        send_handler = self._get_send_handler()
+        send_handler = FederationSenderHandler(self)
+
+        send_handler.on_start()
 
         while True:
             try:
                 args = store.stream_positions()
-                args.update(send_handler.stream_positions())
+                args.update((yield send_handler.stream_positions()))
                 args["timeout"] = 30000
                 result = yield http_client.get_json(replication_url, args=args)
                 yield store.process_replication(result)
-                send_handler.process_replication(result)
+                yield send_handler.process_replication(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(30)
 
-    def _get_send_handler(self):
-        try:
-            return self._send_handler
-        except AttributeError:
-            self._send_handler = FederationSenderHandler(self)
-            return self._send_handler
-
 
 def start(config_options):
     try:
@@ -221,22 +216,29 @@ def start(config_options):
 
 class FederationSenderHandler(object):
     def __init__(self, hs):
+        self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
 
-        self._latest_room_serial = -1
         self._room_serials = {}
         self._room_typing = {}
 
+    def on_start(self):
+        # There may be some events that are persisted but haven't been sent,
+        # so send them now.
+        self.federation_sender.notify_new_events(
+            self.store.get_room_max_stream_ordering()
+        )
+
+    @defer.inlineCallbacks
     def stream_positions(self):
-        # We must update this token from the response of the previous
-        # sync. In particular, the stream id may "reset" back to zero/a low
-        # value which we *must* use for the next replication request.
-        return {"federation": self._latest_room_serial}
+        stream_id = yield self.store.get_federation_out_pos("federation")
+        defer.returnValue({"federation": stream_id})
 
+    @defer.inlineCallbacks
     def process_replication(self, result):
         fed_stream = result.get("federation")
         if fed_stream:
-            self._latest_room_serial = int(fed_stream["position"])
+            latest_id = int(fed_stream["position"])
 
             presence_to_send = {}
             keyed_edus = {}
@@ -296,6 +298,10 @@ class FederationSenderHandler(object):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
+            yield self.store.update_federation_out_pos(
+                "federation", latest_id
+            )
+
         event_stream = result.get("events")
         if event_stream:
             latest_pos = event_stream["position"]
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index aa664beead..1b0ea070c2 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -106,7 +106,7 @@ class TransactionQueue(object):
         self._order = 1
 
         self._is_processing = False
-        self._last_token = 0
+        self._last_poked_id = -1
 
     def can_send_to(self, destination):
         """Can we send messages to the given server?
@@ -130,17 +130,22 @@ class TransactionQueue(object):
 
     @defer.inlineCallbacks
     def notify_new_events(self, current_id):
+        self._last_poked_id = max(current_id, self._last_poked_id)
+
         if self._is_processing:
             return
 
         try:
             self._is_processing = True
             while True:
-                self._last_token, events = yield self.store.get_all_new_events_stream(
-                    self._last_token, current_id, limit=20,
+                last_token = yield self.store.get_federation_out_pos("events")
+                next_token, events = yield self.store.get_all_new_events_stream(
+                    last_token, self._last_poked_id, limit=20,
                 )
 
-                if not events:
+                logger.debug("Handling %s -> %s", last_token, next_token)
+
+                if not events and next_token >= self._last_poked_id:
                     break
 
                 for event in events:
@@ -151,7 +156,15 @@ class TransactionQueue(object):
                     destinations = [
                         get_domain_from_id(user_id) for user_id in users_in_room
                     ]
+
+                    logger.debug("Sending %s to %r", event, destinations)
+
                     self.send_pdu(event, destinations)
+
+                yield self.store.update_federation_out_pos(
+                    "events", next_token
+                )
+
         finally:
             self._is_processing = False
 
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index ef8713b55d..64f18bbb3e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -187,6 +187,9 @@ class SlavedEventStore(BaseSlavedStore):
 
     get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
 
+    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
+    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d828d6ee1d..d3686b9690 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -561,12 +561,17 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+        if keyvalues:
+            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
+            "SELECT %(retcol)s FROM %(table)s %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
-            "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+            "where": where,
         }
 
         txn.execute(sql, keyvalues.values())
@@ -744,10 +749,15 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
-        update_sql = "UPDATE %s SET %s WHERE %s" % (
+        if keyvalues:
+            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
+        update_sql = "UPDATE %s SET %s %s" % (
             table,
             ", ".join("%s = ?" % (k,) for k in updatevalues),
-            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+            where,
         )
 
         txn.execute(
diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql
new file mode 100644
index 0000000000..edbd8e132f
--- /dev/null
+++ b/synapse/storage/schema/delta/39/federation_out_position.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE federation_stream_position(
+     type TEXT NOT NULL,
+     stream_id INTEGER NOT NULL
+ );
+
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index f34cb78f9a..7fa63b58a7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -796,3 +796,19 @@ class StreamStore(SQLBaseStore):
         events = yield self._get_events(event_ids)
 
         defer.returnValue((upper_bound, events))
+
+    def get_federation_out_pos(self, typ):
+        return self._simple_select_one_onecol(
+            table="federation_stream_position",
+            retcol="stream_id",
+            keyvalues={"type": typ},
+            desc="get_federation_out_pos"
+        )
+
+    def update_federation_out_pos(self, typ, stream_id):
+        return self._simple_update_one(
+            table="federation_stream_position",
+            keyvalues={"type": typ},
+            updatevalues={"stream_id": stream_id},
+            desc="update_federation_out_pos",
+        )