diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 52d97dfbf3..39d2bee8da 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -43,7 +43,6 @@ from synapse.events.utils import prune_event
from synapse.util.retryutils import NotRetryingDestination
-from synapse.push.action_generator import ActionGenerator
from synapse.util.distributor import user_joined_room
from twisted.internet import defer
@@ -75,6 +74,7 @@ class FederationHandler(BaseHandler):
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
+ self.action_generator = hs.get_action_generator()
self.replication_layer.set_handler(self)
@@ -832,7 +832,11 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_event_auth(self, event_id):
- auth = yield self.store.get_auth_chain([event_id])
+ event = yield self.store.get_event(event_id)
+ auth = yield self.store.get_auth_chain(
+ [auth_id for auth_id, _ in event.auth_events],
+ include_given=True
+ )
for event in auth:
event.signatures.update(
@@ -1047,9 +1051,7 @@ class FederationHandler(BaseHandler):
yield user_joined_room(self.distributor, user, event.room_id)
state_ids = context.prev_state_ids.values()
- auth_chain = yield self.store.get_auth_chain(set(
- [event.event_id] + state_ids
- ))
+ auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(context.prev_state_ids.values())
@@ -1100,6 +1102,9 @@ class FederationHandler(BaseHandler):
user_id,
"leave"
)
+ # Mark as outlier as we don't have any state for this event; we're not
+ # even in the room.
+ event.internal_metadata.outlier = True
event = self._sign_event(event)
# Try the host that we succesfully called /make_leave/ on first for
@@ -1389,8 +1394,7 @@ class FederationHandler(BaseHandler):
)
if not event.internal_metadata.is_outlier():
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
+ yield self.action_generator.handle_push_actions_for_event(
event, context
)
@@ -1599,7 +1603,11 @@ class FederationHandler(BaseHandler):
pass
# Now get the current auth_chain for the event.
- local_auth_chain = yield self.store.get_auth_chain([event_id])
+ event = yield self.store.get_event(event_id)
+ local_auth_chain = yield self.store.get_auth_chain(
+ [auth_id for auth_id, _ in event.auth_events],
+ include_given=True
+ )
# TODO: Check if we would now reject event_id. If so we need to tell
# everyone.
@@ -1792,7 +1800,9 @@ class FederationHandler(BaseHandler):
auth_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids
)
- local_auth_chain = yield self.store.get_auth_chain(auth_ids)
+ local_auth_chain = yield self.store.get_auth_chain(
+ auth_ids, include_given=True
+ )
try:
# 2. Get remote difference.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 196925edad..a04f634c5c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.push.action_generator import ActionGenerator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
@@ -54,6 +53,8 @@ class MessageHandler(BaseHandler):
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
+ self.action_generator = hs.get_action_generator()
+
@defer.inlineCallbacks
def purge_history(self, room_id, event_id):
event = yield self.store.get_event(event_id)
@@ -590,8 +591,7 @@ class MessageHandler(BaseHandler):
"Changing the room create event is forbidden",
)
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
+ yield self.action_generator.handle_push_actions_for_event(
event, context
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c0205da1a9..91c6c6be3c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -117,6 +117,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
"device_lists", # List of user_ids whose devices have chanegd
+ "device_one_time_keys_count", # Dict of algorithm to count for one time keys
+ # for this device
])):
__slots__ = []
@@ -550,6 +552,14 @@ class SyncHandler(object):
sync_result_builder
)
+ device_id = sync_config.device_id
+ one_time_key_counts = {}
+ if device_id:
+ user_id = sync_config.user.to_string()
+ one_time_key_counts = yield self.store.count_e2e_one_time_keys(
+ user_id, device_id
+ )
+
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
@@ -558,6 +568,7 @@ class SyncHandler(object):
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
+ device_one_time_keys_count=one_time_key_counts,
next_batch=sync_result_builder.now_token,
))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3b7818af5c..82dedbbc99 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -89,7 +89,7 @@ class TypingHandler(object):
until = self._member_typing_until.get(member, None)
if not until or until <= now:
logger.info("Timing out typing for: %s", member.user_id)
- preserve_fn(self._stopped_typing)(member)
+ self._stopped_typing(member)
continue
# Check if we need to resend a keep alive over federation for this
@@ -147,7 +147,7 @@ class TypingHandler(object):
# No point sending another notification
defer.returnValue(None)
- yield self._push_update(
+ self._push_update(
member=member,
typing=True,
)
@@ -171,7 +171,7 @@ class TypingHandler(object):
member = RoomMember(room_id=room_id, user_id=target_user_id)
- yield self._stopped_typing(member)
+ self._stopped_typing(member)
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
@@ -180,7 +180,6 @@ class TypingHandler(object):
member = RoomMember(room_id=room_id, user_id=user_id)
yield self._stopped_typing(member)
- @defer.inlineCallbacks
def _stopped_typing(self, member):
if member.user_id not in self._room_typing.get(member.room_id, set()):
# No point
@@ -189,16 +188,15 @@ class TypingHandler(object):
self._member_typing_until.pop(member, None)
self._member_last_federation_poke.pop(member, None)
- yield self._push_update(
+ self._push_update(
member=member,
typing=False,
)
- @defer.inlineCallbacks
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
- yield self._push_remote(member, typing)
+ preserve_fn(self._push_remote)(member, typing)
self._push_update_local(
member=member,
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
new file mode 100644
index 0000000000..f4451e5dfb
--- /dev/null
+++ b/synapse/handlers/user_directory.py
@@ -0,0 +1,446 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.storage.roommember import ProfileInfo
+from synapse.util.metrics import Measure
+
+
+logger = logging.getLogger(__name__)
+
+
+class UserDirectoyHandler(object):
+ """Handles querying of and keeping updated the user_directory.
+
+ N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
+
+ The user directory is filled with users who this server can see are joined to a
+ world_readable or publically joinable room. We keep a database table up to date
+ by streaming changes of the current state and recalculating whether users should
+ be in the directory or not when necessary.
+
+ For each user in the directory we also store a room_id which is public and that the
+ user is joined to. This allows us to ignore history_visibility and join_rules changes
+ for that user in all other public rooms, as we know they'll still be in at least
+ one public room.
+ """
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
+ self.server_name = hs.hostname
+ self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
+
+ self.notifier.add_replication_callback(self.notify_new_event)
+
+ # When start up for the first time we need to populate the user_directory.
+ # This is a set of user_id's we've inserted already
+ self.initially_handled_users = set()
+ self.initially_handled_users_in_public = set()
+
+ # The current position in the current_state_delta stream
+ self.pos = None
+
+ # Guard to ensure we only process deltas one at a time
+ self._is_processing = False
+
+ # We kick this off so that we don't have to wait for a change before
+ # we start populating the user directory
+ self.clock.call_later(0, self.notify_new_event)
+
+ def search_users(self, search_term, limit):
+ """Searches for users in directory
+
+ Returns:
+ dict of the form::
+
+ {
+ "limited": <bool>, # whether there were more results or not
+ "results": [ # Ordered by best match first
+ {
+ "user_id": <user_id>,
+ "display_name": <display_name>,
+ "avatar_url": <avatar_url>
+ }
+ ]
+ }
+ """
+ return self.store.search_user_dir(search_term, limit)
+
+ @defer.inlineCallbacks
+ def notify_new_event(self):
+ """Called when there may be more deltas to process
+ """
+ if self._is_processing:
+ return
+
+ self._is_processing = True
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ # If self.pos is None then means we haven't fetched it from DB
+ if self.pos is None:
+ self.pos = yield self.store.get_user_directory_stream_pos()
+
+ # If still None then we need to do the initial fill of directory
+ if self.pos is None:
+ yield self._do_initial_spam()
+ self.pos = yield self.store.get_user_directory_stream_pos()
+
+ # Loop round handling deltas until we're up to date
+ while True:
+ with Measure(self.clock, "user_dir_delta"):
+ deltas = yield self.store.get_current_state_deltas(self.pos)
+ if not deltas:
+ return
+
+ yield self._handle_deltas(deltas)
+
+ self.pos = deltas[-1]["stream_id"]
+ yield self.store.update_user_directory_stream_pos(self.pos)
+
+ @defer.inlineCallbacks
+ def _do_initial_spam(self):
+ """Populates the user_directory from the current state of the DB, used
+ when synapse first starts with user_directory support
+ """
+ new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
+
+ # Delete any existing entries just in case there are any
+ yield self.store.delete_all_from_user_dir()
+
+ # We process by going through each existing room at a time.
+ room_ids = yield self.store.get_all_rooms()
+
+ logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
+ num_processed_rooms = 1
+
+ for room_id in room_ids:
+ logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
+ yield self._handle_intial_room(room_id)
+ num_processed_rooms += 1
+
+ logger.info("Processed all rooms.")
+
+ self.initially_handled_users = None
+
+ yield self.store.update_user_directory_stream_pos(new_pos)
+
+ @defer.inlineCallbacks
+ def _handle_intial_room(self, room_id):
+ """Called when we initially fill out user_directory one room at a time
+ """
+ is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
+ if not is_in_room:
+ return
+
+ is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
+
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+ unhandled_users = set(users_with_profile) - self.initially_handled_users
+
+ yield self.store.add_profiles_to_user_dir(
+ room_id, {
+ user_id: users_with_profile[user_id] for user_id in unhandled_users
+ }
+ )
+
+ self.initially_handled_users |= unhandled_users
+
+ if is_public:
+ yield self.store.add_users_to_public_room(
+ room_id,
+ user_ids=unhandled_users - self.initially_handled_users_in_public
+ )
+ self.initially_handled_users_in_public != unhandled_users
+
+ @defer.inlineCallbacks
+ def _handle_deltas(self, deltas):
+ """Called with the state deltas to process
+ """
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ prev_event_id = delta["prev_event_id"]
+
+ logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+ # For join rule and visibility changes we need to check if the room
+ # may have become public or not and add/remove the users in said room
+ if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
+ yield self._handle_room_publicity_change(
+ room_id, prev_event_id, event_id, typ,
+ )
+ elif typ == EventTypes.Member:
+ change = yield self._get_key_change(
+ prev_event_id, event_id,
+ key_name="membership",
+ public_value=Membership.JOIN,
+ )
+
+ if change is None:
+ # Handle any profile changes
+ yield self._handle_profile_change(
+ state_key, room_id, prev_event_id, event_id,
+ )
+ continue
+
+ if not change:
+ # Need to check if the server left the room entirely, if so
+ # we might need to remove all the users in that room
+ is_in_room = yield self.store.is_host_joined(
+ room_id, self.server_name,
+ )
+ if not is_in_room:
+ logger.debug("Server left room: %r", room_id)
+ # Fetch all the users that we marked as being in user
+ # directory due to being in the room and then check if
+ # need to remove those users or not
+ user_ids = yield self.store.get_users_in_dir_due_to_room(room_id)
+ for user_id in user_ids:
+ yield self._handle_remove_user(room_id, user_id)
+ return
+ else:
+ logger.debug("Server is still in room: %r", room_id)
+
+ if change: # The user joined
+ event = yield self.store.get_event(event_id, allow_none=True)
+ profile = ProfileInfo(
+ avatar_url=event.content.get("avatar_url"),
+ display_name=event.content.get("displayname"),
+ )
+
+ yield self._handle_new_user(room_id, state_key, profile)
+ else: # The user left
+ yield self._handle_remove_user(room_id, state_key)
+ else:
+ logger.debug("Ignoring irrelevant type: %r", typ)
+
+ @defer.inlineCallbacks
+ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ):
+ """Handle a room having potentially changed from/to world_readable/publically
+ joinable.
+
+ Args:
+ room_id (str)
+ prev_event_id (str|None): The previous event before the state change
+ event_id (str|None): The new event after the state change
+ typ (str): Type of the event
+ """
+ logger.debug("Handling change for %s", typ)
+
+ if typ == EventTypes.RoomHistoryVisibility:
+ change = yield self._get_key_change(
+ prev_event_id, event_id,
+ key_name="history_visibility",
+ public_value="world_readable",
+ )
+ elif typ == EventTypes.JoinRules:
+ change = yield self._get_key_change(
+ prev_event_id, event_id,
+ key_name="join_rule",
+ public_value=JoinRules.PUBLIC,
+ )
+ else:
+ raise Exception("Invalid event type")
+ # If change is None, no change. True => become world_readable/public,
+ # False => was world_readable/public
+ if change is None:
+ logger.debug("No change")
+ return
+
+ # There's been a change to or from being world readable.
+
+ is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+ room_id
+ )
+
+ logger.debug("Change: %r, is_public: %r", change, is_public)
+
+ if change and not is_public:
+ # If we became world readable but room isn't currently public then
+ # we ignore the change
+ return
+ elif not change and is_public:
+ # If we stopped being world readable but are still public,
+ # ignore the change
+ return
+
+ if change:
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+ for user_id, profile in users_with_profile.iteritems():
+ yield self._handle_new_user(room_id, user_id, profile)
+ else:
+ users = yield self.store.get_users_in_public_due_to_room(room_id)
+ for user_id in users:
+ yield self._handle_remove_user(room_id, user_id)
+
+ @defer.inlineCallbacks
+ def _handle_new_user(self, room_id, user_id, profile):
+ """Called when we might need to add user to directory
+
+ Args:
+ room_id (str): room_id that user joined or started being public that
+ user_id (str)
+ """
+ logger.debug("Adding user to dir, %r", user_id)
+
+ row = yield self.store.get_user_in_directory(user_id)
+ if not row:
+ yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+
+ is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+ room_id
+ )
+
+ if not is_public:
+ return
+
+ row = yield self.store.get_user_in_public_room(user_id)
+ if not row:
+ yield self.store.add_users_to_public_room(room_id, [user_id])
+
+ @defer.inlineCallbacks
+ def _handle_remove_user(self, room_id, user_id):
+ """Called when we might need to remove user to directory
+
+ Args:
+ room_id (str): room_id that user left or stopped being public that
+ user_id (str)
+ """
+ logger.debug("Maybe removing user %r", user_id)
+
+ row = yield self.store.get_user_in_directory(user_id)
+ update_user_dir = row and row["room_id"] == room_id
+
+ row = yield self.store.get_user_in_public_room(user_id)
+ update_user_in_public = row and row["room_id"] == room_id
+
+ if not update_user_in_public and not update_user_dir:
+ return
+
+ # XXX: Make this faster?
+ rooms = yield self.store.get_rooms_for_user(user_id)
+ for j_room_id in rooms:
+ if not update_user_in_public and not update_user_dir:
+ break
+
+ is_in_room = yield self.store.is_host_joined(
+ j_room_id, self.server_name,
+ )
+
+ if not is_in_room:
+ continue
+
+ if update_user_dir:
+ update_user_dir = False
+ yield self.store.update_user_in_user_dir(user_id, j_room_id)
+
+ if update_user_in_public:
+ is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+ j_room_id
+ )
+
+ if is_public:
+ yield self.store.update_user_in_public_user_list(user_id, j_room_id)
+ update_user_in_public = False
+
+ if update_user_dir:
+ yield self.store.remove_from_user_dir(user_id)
+ elif update_user_in_public:
+ yield self.store.remove_from_user_in_public_room(user_id)
+
+ @defer.inlineCallbacks
+ def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
+ """Check member event changes for any profile changes and update the
+ database if there are.
+ """
+ if not prev_event_id or not event_id:
+ return
+
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+ event = yield self.store.get_event(event_id, allow_none=True)
+
+ if not prev_event or not event:
+ return
+
+ if event.membership != Membership.JOIN:
+ return
+
+ prev_name = prev_event.content.get("displayname")
+ new_name = event.content.get("displayname")
+
+ prev_avatar = prev_event.content.get("avatar_url")
+ new_avatar = event.content.get("avatar_url")
+
+ if prev_name != new_name or prev_avatar != new_avatar:
+ yield self.store.update_profile_in_user_dir(
+ user_id, new_name, new_avatar, room_id,
+ )
+
+ @defer.inlineCallbacks
+ def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
+ """Given two events check if the `key_name` field in content changed
+ from not matching `public_value` to doing so.
+
+ For example, check if `history_visibility` (`key_name`) changed from
+ `shared` to `world_readable` (`public_value`).
+
+ Returns:
+ None if the field in the events either both match `public_value`
+ or if neither do, i.e. there has been no change.
+ True if it didnt match `public_value` but now does
+ False if it did match `public_value` but now doesn't
+ """
+ prev_event = None
+ event = None
+ if prev_event_id:
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+
+ if event_id:
+ event = yield self.store.get_event(event_id, allow_none=True)
+
+ if not event and not prev_event:
+ logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
+ defer.returnValue(None)
+
+ prev_value = None
+ value = None
+
+ if prev_event:
+ prev_value = prev_event.content.get(key_name)
+
+ if event:
+ value = event.content.get(key_name)
+
+ logger.debug("prev_value: %r -> value: %r", prev_value, value)
+
+ if value == public_value and prev_value != public_value:
+ defer.returnValue(True)
+ elif value != public_value and prev_value == public_value:
+ defer.returnValue(False)
+ else:
+ defer.returnValue(None)
|