diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2e8914be14..d2f25ae12a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,7 +25,7 @@ The methods that define policy are:
import abc
import logging
from contextlib import contextmanager
-from typing import Dict, Iterable, List, Set
+from typing import Dict, Iterable, List, Set, Tuple
from prometheus_client import Counter
from typing_extensions import ContextManager
@@ -773,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
return False
- async def get_all_presence_updates(self, last_id, current_id, limit):
+ async def get_all_presence_updates(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
@@ -785,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
- last_user_sync_ts(int)
- status_msg(int)
- currently_active(int)
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
"""
+
# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
- rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
+ rows = await self.store.get_all_presence_updates(
+ instance_name, last_id, current_id, limit
+ )
return rows
def notify_new_event(self):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..4330abb9f7 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,7 +15,7 @@
import logging
from collections import namedtuple
-from typing import List
+from typing import List, Tuple
from twisted.internet import defer
@@ -259,14 +259,31 @@ class TypingHandler(object):
)
async def get_all_typing_updates(
- self, last_id: int, current_id: int, limit: int
- ) -> List[dict]:
- """Get up to `limit` typing updates between the given tokens, earliest
- updates first.
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for typing replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
- return []
+ return [], current_id, False
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
@@ -280,9 +297,16 @@ class TypingHandler(object):
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
- rows.append((serial, room_id, list(typing)))
+ rows.append((serial, [room_id, list(typing)]))
rows.sort()
- return rows[:limit]
+
+ limited = False
+ if len(rows) > limit:
+ rows = rows[:limit]
+ current_id = rows[-1][0]
+ limited = True
+
+ return rows, current_id, limited
def get_current_token(self):
return self._latest_room_serial
|