diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3cbfc2d780..e919a8f9ed 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -12,16 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import random
from collections import namedtuple
from typing import TYPE_CHECKING, List, Set, Tuple
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
+from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
-from synapse.types import UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -167,20 +167,25 @@ class FollowerTypingHandler:
now_typing = set(row.user_ids)
self._room_typing[row.room_id] = row.user_ids
- run_as_background_process(
- "_handle_change_in_typing",
- self._handle_change_in_typing,
- row.room_id,
- prev_typing,
- now_typing,
- )
+ if self.federation:
+ run_as_background_process(
+ "_send_changes_in_typing_to_remotes",
+ self._send_changes_in_typing_to_remotes,
+ row.room_id,
+ prev_typing,
+ now_typing,
+ )
- async def _handle_change_in_typing(
+ async def _send_changes_in_typing_to_remotes(
self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
):
"""Process a change in typing of a room from replication, sending EDUs
for any local users.
"""
+
+ if not self.federation:
+ return
+
for user_id in now_typing - prev_typing:
if self.is_mine_id(user_id):
await self._push_remote(RoomMember(room_id, user_id), True)
@@ -371,7 +376,7 @@ class TypingWriterHandler(FollowerTypingHandler):
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
- function to get further updatees.
+ function to get further updates.
The updates are a list of 2-tuples of stream ID and the row data
"""
@@ -430,6 +435,33 @@ class TypingNotificationEventSource:
"content": {"user_ids": list(typing)},
}
+ async def get_new_events_as(
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new typing events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
+ with Measure(self.clock, "typing.get_new_events_as"):
+ from_key = int(from_key)
+ handler = self.get_typing_handler()
+
+ events = []
+ for room_id in handler._room_serials.keys():
+ if handler._room_serials[room_id] <= from_key:
+ continue
+ if not await service.matches_user_in_member_list(
+ room_id, handler.store
+ ):
+ continue
+
+ events.append(self._make_event_for(room_id))
+
+ return (events, handler._latest_room_serial)
+
async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
|