summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-27 14:03:38 +0100
committerErik Johnston <erik@matrix.org>2017-03-30 11:48:35 +0100
commit24d35ab47bdec651706a221974424409d9ab036b (patch)
tree17231cca811506a14ab23094c0ffc2c7c621df95 /synapse/storage
parentUse txn.fetchall() so we can reuse txn (diff)
downloadsynapse-24d35ab47bdec651706a221974424409d9ab036b.tar.xz
Add new storage functions for new replication
The new replication protocol will keep all the streams separate, rather
than muxing multiple streams into one.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/devices.py6
-rw-r--r--synapse/storage/events.py88
-rw-r--r--synapse/storage/pusher.py42
3 files changed, 133 insertions, 3 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 53e36791d5..c8d5f5ba8b 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -533,7 +533,7 @@ class DeviceStore(SQLBaseStore):
         rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
         defer.returnValue(set(row[0] for row in rows))
 
-    def get_all_device_list_changes_for_remotes(self, from_key):
+    def get_all_device_list_changes_for_remotes(self, from_key, to_key):
         """Return a list of `(stream_id, user_id, destination)` which is the
         combined list of changes to devices, and which destinations need to be
         poked. `destination` may be None if no destinations need to be poked.
@@ -541,11 +541,11 @@ class DeviceStore(SQLBaseStore):
         sql = """
             SELECT stream_id, user_id, destination FROM device_lists_stream
             LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
-            WHERE stream_id > ?
+            WHERE ? < stream_id AND stream_id <= ?
         """
         return self._execute(
             "get_all_device_list_changes_for_remotes", None,
-            sql, from_key,
+            sql, from_key, to_key
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3f6833fad2..64fe937bdc 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1771,6 +1771,94 @@ class EventsStore(SQLBaseStore):
         """The current minimum token that backfilled events have reached"""
         return -self._backfill_id_gen.get_current_token()
 
+    def get_current_events_token(self):
+        """The current maximum token that events have reached"""
+        return self._stream_id_gen.get_current_token()
+
+    def get_all_new_forward_event_rows(self, last_id, current_id, limit):
+        if last_id == current_id:
+            return defer.succeed([])
+
+        def get_all_new_forward_event_rows(txn):
+            sql = (
+                "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? < stream_ordering AND stream_ordering <= ?"
+                " ORDER BY stream_ordering ASC"
+                " LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            new_event_updates = txn.fetchall()
+
+            if len(new_event_updates) == limit:
+                upper_bound = new_event_updates[-1][0]
+            else:
+                upper_bound = current_id
+
+            sql = (
+                "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " INNER JOIN ex_outlier_stream USING (event_id)"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? < event_stream_ordering"
+                " AND event_stream_ordering <= ?"
+                " ORDER BY event_stream_ordering DESC"
+            )
+            txn.execute(sql, (last_id, upper_bound))
+            new_event_updates.extend(txn)
+
+            return new_event_updates
+        return self.runInteraction(
+            "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+        )
+
+    def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+        if last_id == current_id:
+            return defer.succeed([])
+
+        def get_all_new_backfill_event_rows(txn):
+            sql = (
+                "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? > stream_ordering AND stream_ordering >= ?"
+                " ORDER BY stream_ordering ASC"
+                " LIMIT ?"
+            )
+            txn.execute(sql, (-last_id, -current_id, limit))
+            new_event_updates = txn.fetchall()
+
+            if len(new_event_updates) == limit:
+                upper_bound = new_event_updates[-1][0]
+            else:
+                upper_bound = current_id
+
+            sql = (
+                "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " INNER JOIN ex_outlier_stream USING (event_id)"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? > event_stream_ordering"
+                " AND event_stream_ordering >= ?"
+                " ORDER BY event_stream_ordering DESC"
+            )
+            txn.execute(sql, (-last_id, -upper_bound))
+            new_event_updates.extend(txn.fetchall())
+
+            return new_event_updates
+        return self.runInteraction(
+            "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
+        )
+
     @cached(num_args=5, max_entries=10)
     def get_all_new_events(self, last_backfill_id, last_forward_id,
                            current_backfill_id, current_forward_id, limit):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 8cc9f0353b..715c8bef24 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
+    def get_all_updated_pushers_rows(self, last_id, current_id, limit):
+        """Get all the pushers that have changed between the given tokens.
+
+        Returns:
+            list(tuple): each tuple consists of:
+                stream_id (str)
+                user_id (str)
+                app_id (str)
+                pushkey (str)
+                was_deleted (bool): whether the pusher was added/updated (False)
+                    or deleted (True)
+        """
+
+        if last_id == current_id:
+            return defer.succeed([])
+
+        def get_all_updated_pushers_rows_txn(txn):
+            sql = (
+                "SELECT id, user_name, app_id, pushkey"
+                " FROM pushers"
+                " WHERE ? < id AND id <= ?"
+                " ORDER BY id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            results = [list(row) + [False] for row in txn]
+
+            sql = (
+                "SELECT stream_id, user_id, app_id, pushkey"
+                " FROM deleted_pushers"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+
+            results.extend(list(row) + [True] for row in txn)
+            results.sort()  # Sort so that they're ordered by stream id
+
+            return results
+        return self.runInteraction(
+            "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
+        )
+
     @cachedInlineCallbacks(num_args=1, max_entries=15000)
     def get_if_user_has_pusher(self, user_id):
         # This only exists for the cachedList decorator