diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index be81025355..e74e0d2e91 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -371,7 +371,7 @@ def _make_generic_sql_bound(
def _filter_results(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
- instance_name: str,
+ instance_name: Optional[str],
topological_ordering: int,
stream_ordering: int,
) -> bool:
@@ -384,8 +384,14 @@ def _filter_results(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
+
+ The `instance_name` arg is optional to handle historic rows, and is
+ interpreted as if it was "master".
"""
+ if instance_name is None:
+ instance_name = "master"
+
event_historical_tuple = (
topological_ordering,
stream_ordering,
@@ -420,7 +426,7 @@ def _filter_results(
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
- instance_name: str,
+ instance_name: Optional[str],
stream_ordering: int,
) -> bool:
"""
@@ -436,7 +442,14 @@ def _filter_results_by_stream(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
+
+ The `instance_name` arg is optional to handle historic rows, and is
+ interpreted as if it was "master".
"""
+
+ if instance_name is None:
+ instance_name = "master"
+
if lower_token:
assert lower_token.topological is None
@@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
prev_sender,
) in txn:
assert room_id is not None
- assert instance_name is not None
assert stream_ordering is not None
if _filter_results_by_stream(
@@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# Event
event_id=event_id,
event_pos=PersistedEventPosition(
- instance_name=instance_name,
+ # If instance_name is null we default to "master"
+ instance_name=instance_name or "master",
stream=stream_ordering,
),
# When `s.event_id = null`, we won't be able to get respective
@@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
prev_event_id=prev_event_id,
prev_event_pos=(
PersistedEventPosition(
- instance_name=prev_instance_name,
+ # If instance_name is null we default to "master"
+ instance_name=prev_instance_name or "master",
stream=prev_stream_ordering,
)
- if (
- prev_instance_name is not None
- and prev_stream_ordering is not None
- )
+ if (prev_stream_ordering is not None)
else None
),
prev_membership=prev_membership,
@@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
stream_ordering=stream_ordering,
):
return event_id, PersistedEventPosition(
- instance_name, stream_ordering
+ # If instance_name is null we default to "master"
+ instance_name or "master",
+ stream_ordering,
)
return None
|