summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py33
1 files changed, 23 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index be81025355..e74e0d2e91 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -371,7 +371,7 @@ def _make_generic_sql_bound(
 def _filter_results(
     lower_token: Optional[RoomStreamToken],
     upper_token: Optional[RoomStreamToken],
-    instance_name: str,
+    instance_name: Optional[str],
     topological_ordering: int,
     stream_ordering: int,
 ) -> bool:
@@ -384,8 +384,14 @@ def _filter_results(
     position maps, which we handle by fetching more than necessary from the DB
     and then filtering (rather than attempting to construct a complicated SQL
     query).
+
+    The `instance_name` arg is optional to handle historic rows, and is
+    interpreted as if it was "master".
     """
 
+    if instance_name is None:
+        instance_name = "master"
+
     event_historical_tuple = (
         topological_ordering,
         stream_ordering,
@@ -420,7 +426,7 @@ def _filter_results(
 def _filter_results_by_stream(
     lower_token: Optional[RoomStreamToken],
     upper_token: Optional[RoomStreamToken],
-    instance_name: str,
+    instance_name: Optional[str],
     stream_ordering: int,
 ) -> bool:
     """
@@ -436,7 +442,14 @@ def _filter_results_by_stream(
     position maps, which we handle by fetching more than necessary from the DB
     and then filtering (rather than attempting to construct a complicated SQL
     query).
+
+    The `instance_name` arg is optional to handle historic rows, and is
+    interpreted as if it was "master".
     """
+
+    if instance_name is None:
+        instance_name = "master"
+
     if lower_token:
         assert lower_token.topological is None
 
@@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 prev_sender,
             ) in txn:
                 assert room_id is not None
-                assert instance_name is not None
                 assert stream_ordering is not None
 
                 if _filter_results_by_stream(
@@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                         # Event
                         event_id=event_id,
                         event_pos=PersistedEventPosition(
-                            instance_name=instance_name,
+                            # If instance_name is null we default to "master"
+                            instance_name=instance_name or "master",
                             stream=stream_ordering,
                         ),
                         # When `s.event_id = null`, we won't be able to get respective
@@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                         prev_event_id=prev_event_id,
                         prev_event_pos=(
                             PersistedEventPosition(
-                                instance_name=prev_instance_name,
+                                # If instance_name is null we default to "master"
+                                instance_name=prev_instance_name or "master",
                                 stream=prev_stream_ordering,
                             )
-                            if (
-                                prev_instance_name is not None
-                                and prev_stream_ordering is not None
-                            )
+                            if (prev_stream_ordering is not None)
                             else None
                         ),
                         prev_membership=prev_membership,
@@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                     stream_ordering=stream_ordering,
                 ):
                     return event_id, PersistedEventPosition(
-                        instance_name, stream_ordering
+                        # If instance_name is null we default to "master"
+                        instance_name or "master",
+                        stream_ordering,
                     )
 
             return None