summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2016-01-04 14:02:50 +0000
committerMark Haines <mjark@negativecurvature.net>2016-01-04 14:02:50 +0000
commitf35f8d06ea58e2d0cdccd82924c7a44fd93f4c38 (patch)
treedc5312558565f8ac01264be21d388e563a5c8c58 /synapse/handlers/message.py
parentAdded info abou Martin Giess' auto-deployment process with vagrant/ansible (diff)
parentBump changelog and version for v0.12.0 (diff)
downloadsynapse-f35f8d06ea58e2d0cdccd82924c7a44fd93f4c38.tar.xz
Merge remote-tracking branch 'origin/release-v0.12.0' v0.12.0
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py112
1 files changed, 91 insertions, 21 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2e7d0d7f82..a1bed9b0dc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,15 +22,22 @@ from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.types import UserID, RoomStreamToken, StreamToken
 
 from ._base import BaseHandler
 
+from canonicaljson import encode_canonical_json
+
 import logging
 
 logger = logging.getLogger(__name__)
 
 
+def collect_presencelike_data(distributor, user, content):
+    return distributor.fire("collect_presencelike_data", user, content)
+
+
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -39,6 +46,7 @@ class MessageHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
         self.validator = EventValidator()
+        self.snapshot_cache = SnapshotCache()
 
     @defer.inlineCallbacks
     def get_message(self, msg_id=None, room_id=None, sender_id=None,
@@ -195,10 +203,8 @@ class MessageHandler(BaseHandler):
             if membership == Membership.JOIN:
                 joinee = UserID.from_string(builder.state_key)
                 # If event doesn't include a display name, add one.
-                yield self.distributor.fire(
-                    "collect_presencelike_data",
-                    joinee,
-                    builder.content
+                yield collect_presencelike_data(
+                    self.distributor, joinee, builder.content
                 )
 
         if token_id is not None:
@@ -211,6 +217,16 @@ class MessageHandler(BaseHandler):
             builder=builder,
         )
 
+        if event.is_state():
+            prev_state = context.current_state.get((event.type, event.state_key))
+            if prev_state and event.user_id == prev_state.user_id:
+                prev_content = encode_canonical_json(prev_state.content)
+                next_content = encode_canonical_json(event.content)
+                if prev_content == next_content:
+                    # Duplicate suppression for state updates with same sender
+                    # and content.
+                    defer.returnValue(prev_state)
+
         if event.type == EventTypes.Member:
             member_handler = self.hs.get_handlers().room_member_handler
             yield member_handler.change_membership(event, context, is_guest=is_guest)
@@ -312,7 +328,6 @@ class MessageHandler(BaseHandler):
             [serialize_event(c, now) for c in room_state.values()]
         )
 
-    @defer.inlineCallbacks
     def snapshot_all_rooms(self, user_id=None, pagin_config=None,
                            as_client_event=True, include_archived=False):
         """Retrieve a snapshot of all rooms the user is invited or has joined.
@@ -332,6 +347,28 @@ class MessageHandler(BaseHandler):
             is joined on, may return a "messages" key with messages, depending
             on the specified PaginationConfig.
         """
+        key = (
+            user_id,
+            pagin_config.from_token,
+            pagin_config.to_token,
+            pagin_config.direction,
+            pagin_config.limit,
+            as_client_event,
+            include_archived,
+        )
+        now_ms = self.clock.time_msec()
+        result = self.snapshot_cache.get(now_ms, key)
+        if result is not None:
+            return result
+
+        return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms(
+            user_id, pagin_config, as_client_event, include_archived
+        ))
+
+    @defer.inlineCallbacks
+    def _snapshot_all_rooms(self, user_id=None, pagin_config=None,
+                            as_client_event=True, include_archived=False):
+
         memberships = [Membership.INVITE, Membership.JOIN]
         if include_archived:
             memberships.append(Membership.LEAVE)
@@ -359,6 +396,10 @@ class MessageHandler(BaseHandler):
 
         tags_by_room = yield self.store.get_tags_for_user(user_id)
 
+        account_data, account_data_by_room = (
+            yield self.store.get_account_data_for_user(user_id)
+        )
+
         public_room_ids = yield self.store.get_public_room_ids()
 
         limit = pagin_config.limit
@@ -436,14 +477,22 @@ class MessageHandler(BaseHandler):
                     for c in current_state.values()
                 ]
 
-                account_data = []
+                account_data_events = []
                 tags = tags_by_room.get(event.room_id)
                 if tags:
-                    account_data.append({
+                    account_data_events.append({
                         "type": "m.tag",
                         "content": {"tags": tags},
                     })
-                d["account_data"] = account_data
+
+                account_data = account_data_by_room.get(event.room_id, {})
+                for account_data_type, content in account_data.items():
+                    account_data_events.append({
+                        "type": account_data_type,
+                        "content": content,
+                    })
+
+                d["account_data"] = account_data_events
             except:
                 logger.exception("Failed to get snapshot")
 
@@ -456,9 +505,17 @@ class MessageHandler(BaseHandler):
                 consumeErrors=True
             ).addErrback(unwrapFirstError)
 
+        account_data_events = []
+        for account_data_type, content in account_data.items():
+            account_data_events.append({
+                "type": account_data_type,
+                "content": content,
+            })
+
         ret = {
             "rooms": rooms_ret,
             "presence": presence,
+            "account_data": account_data_events,
             "receipts": receipt,
             "end": now_token.to_string(),
         }
@@ -498,14 +555,22 @@ class MessageHandler(BaseHandler):
                 user_id, room_id, pagin_config, membership, member_event_id, is_guest
             )
 
-        account_data = []
+        account_data_events = []
         tags = yield self.store.get_tags_for_room(user_id, room_id)
         if tags:
-            account_data.append({
+            account_data_events.append({
                 "type": "m.tag",
                 "content": {"tags": tags},
             })
-        result["account_data"] = account_data
+
+        account_data = yield self.store.get_account_data_for_room(user_id, room_id)
+        for account_data_type, content in account_data.items():
+            account_data_events.append({
+                "type": account_data_type,
+                "content": content,
+            })
+
+        result["account_data"] = account_data_events
 
         defer.returnValue(result)
 
@@ -588,23 +653,28 @@ class MessageHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_presence():
-            states = {}
-            if not is_guest:
-                states = yield presence_handler.get_states(
-                    target_users=[UserID.from_string(m.user_id) for m in room_members],
-                    auth_user=auth_user,
-                    as_event=True,
-                    check_auth=False,
-                )
+            states = yield presence_handler.get_states(
+                target_users=[UserID.from_string(m.user_id) for m in room_members],
+                auth_user=auth_user,
+                as_event=True,
+                check_auth=False,
+            )
 
             defer.returnValue(states.values())
 
-        receipts_handler = self.hs.get_handlers().receipts_handler
+        @defer.inlineCallbacks
+        def get_receipts():
+            receipts_handler = self.hs.get_handlers().receipts_handler
+            receipts = yield receipts_handler.get_receipts_for_room(
+                room_id,
+                now_token.receipt_key
+            )
+            defer.returnValue(receipts)
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
             [
                 get_presence(),
-                receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
+                get_receipts(),
                 self.store.get_recent_events_for_room(
                     room_id,
                     limit=limit,