diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3cbfc2d780..d3692842e3 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -12,16 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import random
from collections import namedtuple
from typing import TYPE_CHECKING, List, Set, Tuple
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
+from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
-from synapse.types import UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -430,6 +430,33 @@ class TypingNotificationEventSource:
"content": {"user_ids": list(typing)},
}
+ async def get_new_events_as(
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new typing events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
+ with Measure(self.clock, "typing.get_new_events_as"):
+ from_key = int(from_key)
+ handler = self.get_typing_handler()
+
+ events = []
+ for room_id in handler._room_serials.keys():
+ if handler._room_serials[room_id] <= from_key:
+ continue
+ if not await service.matches_user_in_member_list(
+ room_id, handler.store
+ ):
+ continue
+
+ events.append(self._make_event_for(room_id))
+
+ return (events, handler._latest_room_serial)
+
async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
|