summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/14810.bugfix1
-rw-r--r--changelog.d/14817.bugfix1
-rw-r--r--synapse/api/filtering.py3
-rw-r--r--synapse/handlers/sync.py4
-rw-r--r--synapse/storage/databases/main/stream.py65
5 files changed, 61 insertions, 13 deletions
diff --git a/changelog.d/14810.bugfix b/changelog.d/14810.bugfix
new file mode 100644
index 0000000000..379bfccffa
--- /dev/null
+++ b/changelog.d/14810.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 1.75.0rc1 where device lists could be miscalculated with some sync filters.
diff --git a/changelog.d/14817.bugfix b/changelog.d/14817.bugfix
new file mode 100644
index 0000000000..bb5da79268
--- /dev/null
+++ b/changelog.d/14817.bugfix
@@ -0,0 +1 @@
+Fix race where calling `/members` or `/state` with an `at` parameter could fail for newly created rooms, when using multiple workers.
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 2b5af264b4..4cf8f0cc8e 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -283,9 +283,6 @@ class FilterCollection:
             await self._room_filter.filter(events)
         )
 
-    def blocks_all_rooms(self) -> bool:
-        return self._room_filter.filters_all_rooms()
-
     def blocks_all_presence(self) -> bool:
         return (
             self._presence_filter.filters_all_types()
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6942e06c77..20ee2f203a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1793,10 +1793,6 @@ class SyncHandler:
             - newly_left_users
         """
 
-        # If the request doesn't care about rooms then nothing to do!
-        if sync_result_builder.sync_config.filter_collection.blocks_all_rooms():
-            return set(), set(), set(), set()
-
         since_token = sync_result_builder.since_token
 
         # 1. Start by fetching all ephemeral events in rooms we've joined (if required).
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index cc27ec3804..63d8350530 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -801,13 +801,66 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             before this stream ordering.
         """
 
-        last_row = await self.get_room_event_before_stream_ordering(
-            room_id=room_id,
-            stream_ordering=end_token.stream,
+        def get_last_event_in_room_before_stream_ordering_txn(
+            txn: LoggingTransaction,
+        ) -> Optional[str]:
+            # We need to handle the fact that the stream tokens can be vector
+            # clocks. We do this by getting all rows between the minimum and
+            # maximum stream ordering in the token, plus one row less than the
+            # minimum stream ordering. We then filter the results against the
+            # token and return the first row that matches.
+
+            sql = """
+                SELECT * FROM (
+                    SELECT instance_name, stream_ordering, topological_ordering, event_id
+                    FROM events
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE room_id = ?
+                        AND ? < stream_ordering AND stream_ordering <= ?
+                        AND NOT outlier
+                        AND rejections.event_id IS NULL
+                    ORDER BY stream_ordering DESC
+                ) AS a
+                UNION
+                SELECT * FROM (
+                    SELECT instance_name, stream_ordering, topological_ordering, event_id
+                    FROM events
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE room_id = ?
+                        AND stream_ordering <= ?
+                        AND NOT outlier
+                        AND rejections.event_id IS NULL
+                    ORDER BY stream_ordering DESC
+                    LIMIT 1
+                ) AS b
+            """
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    end_token.stream,
+                    end_token.get_max_stream_pos(),
+                    room_id,
+                    end_token.stream,
+                ),
+            )
+
+            for instance_name, stream_ordering, topological_ordering, event_id in txn:
+                if _filter_results(
+                    lower_token=None,
+                    upper_token=end_token,
+                    instance_name=instance_name,
+                    topological_ordering=topological_ordering,
+                    stream_ordering=stream_ordering,
+                ):
+                    return event_id
+
+            return None
+
+        return await self.db_pool.runInteraction(
+            "get_last_event_in_room_before_stream_ordering",
+            get_last_event_in_room_before_stream_ordering_txn,
         )
-        if last_row:
-            return last_row[2]
-        return None
 
     async def get_current_room_stream_token_for_room_id(
         self, room_id: str