diff --git a/synapse/notifier.py b/synapse/notifier.py
index 16f19c938e..12cd84b27b 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -25,6 +25,7 @@ from typing import (
Set,
Tuple,
TypeVar,
+ Union,
)
from prometheus_client import Counter
@@ -41,7 +42,7 @@ from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
-from synapse.types import Collection, StreamToken, UserID
+from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -111,7 +112,9 @@ class _NotifierUserStream:
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
- def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
+ def notify(
+ self, stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int,
+ ):
"""Notify any listeners for this user of a new event from an
event source.
Args:
@@ -294,7 +297,12 @@ class Notifier:
rooms.add(event.room_id)
if users or rooms:
- self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms)
+ self.on_new_event(
+ "room_key",
+ RoomStreamToken(None, max_room_stream_id),
+ users=users,
+ rooms=rooms,
+ )
self._on_updated_room_token(max_room_stream_id)
def _on_updated_room_token(self, max_room_stream_id: int):
@@ -329,7 +337,7 @@ class Notifier:
def on_new_event(
self,
stream_key: str,
- new_token: int,
+ new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
rooms: Collection[str] = [],
):
|