diff options
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index be81025355..e74e0d2e91 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -371,7 +371,7 @@ def _make_generic_sql_bound( def _filter_results( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], topological_ordering: int, stream_ordering: int, ) -> bool: @@ -384,8 +384,14 @@ def _filter_results( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + if instance_name is None: + instance_name = "master" + event_historical_tuple = ( topological_ordering, stream_ordering, @@ -420,7 +426,7 @@ def _filter_results( def _filter_results_by_stream( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], stream_ordering: int, ) -> bool: """ @@ -436,7 +442,14 @@ def _filter_results_by_stream( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + + if instance_name is None: + instance_name = "master" + if lower_token: assert lower_token.topological is None @@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): prev_sender, ) in txn: assert room_id is not None - assert instance_name is not None assert stream_ordering is not None if _filter_results_by_stream( @@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Event event_id=event_id, event_pos=PersistedEventPosition( - instance_name=instance_name, + # If instance_name is null we default to "master" + instance_name=instance_name or "master", stream=stream_ordering, ), # When `s.event_id = null`, we won't be able to get respective @@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): prev_event_id=prev_event_id, prev_event_pos=( PersistedEventPosition( - instance_name=prev_instance_name, + # If instance_name is null we default to "master" + instance_name=prev_instance_name or "master", stream=prev_stream_ordering, ) - if ( - prev_instance_name is not None - and prev_stream_ordering is not None - ) + if (prev_stream_ordering is not None) else None ), prev_membership=prev_membership, @@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): stream_ordering=stream_ordering, ): return event_id, PersistedEventPosition( - instance_name, stream_ordering + # If instance_name is null we default to "master" + instance_name or "master", + stream_ordering, ) return None |