diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 16 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 2 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 202 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 2 |
5 files changed, 219 insertions, 5 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 685792dbdc..dc5b6ef79d 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -32,6 +32,7 @@ from .appservice import ApplicationServicesHandler from .sync import SyncHandler from .auth import AuthHandler from .identity import IdentityHandler +from .receipts import ReceiptsHandler class Handlers(object): @@ -57,6 +58,7 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + self.receipts_handler = ReceiptsHandler(hs) asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( hs, asapi, AppServiceScheduler( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d8b117612d..9d6d4f0978 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -334,6 +334,11 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("presence"), None ) + receipt_stream = self.hs.get_event_sources().sources["receipt"] + receipt, _ = yield receipt_stream.get_pagination_rows( + user, pagination_config.get_source_config("receipt"), None + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -404,7 +409,8 @@ class MessageHandler(BaseHandler): ret = { "rooms": rooms_ret, "presence": presence, - "end": now_token.to_string() + "receipts": receipt, + "end": now_token.to_string(), } defer.returnValue(ret) @@ -465,9 +471,12 @@ class MessageHandler(BaseHandler): defer.returnValue([p for success, p in presence_defs if success]) - presence, (messages, token) = yield defer.gatherResults( + receipts_handler = self.hs.get_handlers().receipts_handler + + presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), + receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), self.store.get_recent_events_for_room( room_id, limit=limit, @@ -495,5 +504,6 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), }, "state": state, - "presence": presence + "presence": presence, + "receipts": receipts, }) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7c03198313..341a516da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -992,7 +992,7 @@ class PresenceHandler(BaseHandler): room_ids([str]): List of room_ids to notify. """ with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "presence_key", self._user_cachemap_latest_serial, users_to_push, diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py new file mode 100644 index 0000000000..1925a48039 --- /dev/null +++ b/synapse/handlers/receipts.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.logcontext import PreserveLoggingContext + +import logging + + +logger = logging.getLogger(__name__) + + +class ReceiptsHandler(BaseHandler): + def __init__(self, hs): + super(ReceiptsHandler, self).__init__(hs) + + self.hs = hs + self.federation = hs.get_replication_layer() + self.federation.register_edu_handler( + "m.receipt", self._received_remote_receipt + ) + self.clock = self.hs.get_clock() + + self._receipt_cache = None + + @defer.inlineCallbacks + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + """Called when a client tells us a local user has read up to the given + event_id in the room. + """ + receipt = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + "data": { + "ts": int(self.clock.time_msec()), + } + } + + is_new = yield self._handle_new_receipts([receipt]) + + if is_new: + self._push_remotes([receipt]) + + @defer.inlineCallbacks + def _received_remote_receipt(self, origin, content): + """Called when we receive an EDU of type m.receipt from a remote HS. + """ + receipts = [ + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": user_values["event_ids"], + "data": user_values.get("data", {}), + } + for room_id, room_values in content.items() + for receipt_type, users in room_values.items() + for user_id, user_values in users.items() + ] + + yield self._handle_new_receipts(receipts) + + @defer.inlineCallbacks + def _handle_new_receipts(self, receipts): + """Takes a list of receipts, stores them and informs the notifier. + """ + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + res = yield self.store.insert_receipt( + room_id, receipt_type, user_id, event_ids, data + ) + + if not res: + # res will be None if this read receipt is 'old' + defer.returnValue(False) + + stream_id, max_persisted_id = res + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_persisted_id, rooms=[room_id] + ) + + defer.returnValue(True) + + @defer.inlineCallbacks + def _push_remotes(self, receipts): + """Given a list of receipts, works out which remote servers should be + poked and pokes them. + """ + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + remotedomains = set() + + rm_handler = self.hs.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=None, remotedomains=remotedomains + ) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } + } + }, + }, + ) + + @defer.inlineCallbacks + def get_receipts_for_room(self, room_id, to_key): + """Gets all receipts for a room, upto the given key. + """ + result = yield self.store.get_linearized_receipts_for_room( + room_id, None, to_key + ) + + if not result: + defer.returnValue([]) + + event = { + "type": "m.receipt", + "room_id": room_id, + "content": result, + } + + defer.returnValue([event]) + + +class ReceiptEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + to_key = yield self.get_current_key() + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, from_key, to_key + ) + + defer.returnValue((events, to_key)) + + def get_current_key(self, direction='f'): + return self.store.get_max_receipt_stream_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + to_key = int(config.from_key) + + if config.to_key: + from_key = int(config.to_key) + else: + from_key = None + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, from_key, to_key + ) + + defer.returnValue((events, to_key)) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a9895292c2..026bd2b9d4 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,7 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[room_id] ) |