diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/admin.py | 44 | ||||
-rw-r--r-- | synapse/handlers/device.py | 208 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 4 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 6 |
6 files changed, 203 insertions, 63 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 084e33ca6a..f36b358b45 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -19,7 +19,6 @@ from ._base import BaseHandler import logging - logger = logging.getLogger(__name__) @@ -54,3 +53,46 @@ class AdminHandler(BaseHandler): } defer.returnValue(ret) + + @defer.inlineCallbacks + def get_users(self): + """Function to reterive a list of users in users table. + + Args: + Returns: + defer.Deferred: resolves to list[dict[str, Any]] + """ + ret = yield self.store.get_users() + + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_users_paginate(self, order, start, limit): + """Function to reterive a paginated list of users from + users list. This will return a json object, which contains + list of users and the total number of users in users table. + + Args: + order (str): column name to order the select by this column + start (int): start number to begin the query from + limit (int): number of rows to reterive + Returns: + defer.Deferred: resolves to json object {list[dict[str, Any]], count} + """ + ret = yield self.store.get_users_paginate(order, start, limit) + + defer.returnValue(ret) + + @defer.inlineCallbacks + def search_users(self, term): + """Function to search users list for one or more users with + the matched term. + + Args: + term (str): search term + Returns: + defer.Deferred: resolves to list[dict[str, Any]] + """ + ret = yield self.store.search_users(term) + + defer.returnValue(ret) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 8cb47ac417..e859b3165f 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -12,11 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from synapse.api import errors from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer @@ -35,10 +35,11 @@ class DeviceHandler(BaseHandler): self.state = hs.get_state_handler() self.federation_sender = hs.get_federation_sender() self.federation = hs.get_replication_layer() - self._remote_edue_linearizer = Linearizer(name="remote_device_list") + + self._edu_updater = DeviceListEduUpdater(hs, self) self.federation.register_edu_handler( - "m.device_list_update", self._incoming_device_list_update, + "m.device_list_update", self._edu_updater.incoming_device_list_update, ) self.federation.register_query_handler( "user_devices", self.on_federation_query_user_devices, @@ -246,30 +247,51 @@ class DeviceHandler(BaseHandler): # Then work out if any users have since joined rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + stream_ordering = RoomStreamToken.parse_stream_token( + from_token.room_key).stream + possibly_changed = set(changed) for room_id in rooms_changed: - # Fetch the current state at the time. - stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key) - + # Fetch the current state at the time. try: event_ids = yield self.store.get_forward_extremeties_for_room( room_id, stream_ordering=stream_ordering ) - prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) - except: - prev_state_ids = {} + except errors.StoreError: + # we have purged the stream_ordering index since the stream + # ordering: treat it the same as a new room + event_ids = [] current_state_ids = yield self.state.get_current_state_ids(room_id) + # special-case for an empty prev state: include all members + # in the changed list + if not event_ids: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + continue + + # mapping from event_id -> state_dict + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. for key, event_id in current_state_ids.iteritems(): etype, state_key = key - if etype == EventTypes.Member: - prev_event_id = prev_state_ids.get(key, None) + if etype != EventTypes.Member: + continue + + # check if this member has changed since any of the extremities + # at the stream_ordering, and add them to the list if so. + for state_dict in prev_state_ids.values(): + prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: possibly_changed.add(state_key) + break users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id @@ -279,13 +301,69 @@ class DeviceHandler(BaseHandler): # and those that actually still share a room with the user defer.returnValue(users_who_share_room & possibly_changed) - @measure_func("_incoming_device_list_update") @defer.inlineCallbacks - def _incoming_device_list_update(self, origin, edu_content): - user_id = edu_content["user_id"] - device_id = edu_content["device_id"] - stream_id = edu_content["stream_id"] - prev_ids = edu_content.get("prev_id", []) + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + defer.returnValue({ + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + }) + + @defer.inlineCallbacks + def user_left_room(self, user, room_id): + user_id = user.to_string() + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We no longer share rooms with this user, so we'll no longer + # receive device updates. Mark this in DB. + yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) + + +def _update_device_from_client_ips(device, client_ips): + ip = client_ips.get((device["user_id"], device["device_id"]), {}) + device.update({ + "last_seen_ts": ip.get("last_seen"), + "last_seen_ip": ip.get("ip"), + }) + + +class DeviceListEduUpdater(object): + "Handles incoming device list updates from federation and updates the DB" + + def __init__(self, hs, device_handler): + self.store = hs.get_datastore() + self.federation = hs.get_replication_layer() + self.clock = hs.get_clock() + self.device_handler = device_handler + + self._remote_edu_linearizer = Linearizer(name="remote_device_list") + + # user_id -> list of updates waiting to be handled. + self._pending_updates = {} + + # Recently seen stream ids. We don't bother keeping these in the DB, + # but they're useful to have them about to reduce the number of spurious + # resyncs. + self._seen_updates = ExpiringCache( + cache_name="device_update_edu", + clock=self.clock, + max_len=10000, + expiry_ms=30 * 60 * 1000, + iterable=True, + ) + + @defer.inlineCallbacks + def incoming_device_list_update(self, origin, edu_content): + """Called on incoming device list update from federation. Responsible + for parsing the EDU and adding to pending updates list. + """ + + user_id = edu_content.pop("user_id") + device_id = edu_content.pop("device_id") + stream_id = str(edu_content.pop("stream_id")) # They may come as ints + prev_ids = edu_content.pop("prev_id", []) + prev_ids = [str(p) for p in prev_ids] # They may come as ints if get_domain_from_id(user_id) != origin: # TODO: Raise? @@ -298,20 +376,28 @@ class DeviceHandler(BaseHandler): # probably won't get any further updates. return - with (yield self._remote_edue_linearizer.queue(user_id)): - # If the prev id matches whats in our cache table, then we don't need - # to resync the users device list, otherwise we do. - resync = True - if len(prev_ids) == 1: - extremity = yield self.store.get_device_list_last_stream_id_for_remote( - user_id - ) - logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) - if str(extremity) == str(prev_ids[0]): - resync = False + self._pending_updates.setdefault(user_id, []).append( + (device_id, stream_id, prev_ids, edu_content) + ) + + yield self._handle_device_updates(user_id) + + @measure_func("_incoming_device_list_update") + @defer.inlineCallbacks + def _handle_device_updates(self, user_id): + "Actually handle pending updates." + + with (yield self._remote_edu_linearizer.queue(user_id)): + pending_updates = self._pending_updates.pop(user_id, []) + if not pending_updates: + # This can happen since we batch updates + return + + resync = yield self._need_to_do_resync(user_id, pending_updates) if resync: # Fetch all devices for the user. + origin = get_domain_from_id(user_id) result = yield self.federation.query_user_devices(origin, user_id) stream_id = result["stream_id"] devices = result["devices"] @@ -319,40 +405,50 @@ class DeviceHandler(BaseHandler): user_id, devices, stream_id, ) device_ids = [device["device_id"] for device in devices] - yield self.notify_device_update(user_id, device_ids) + yield self.device_handler.notify_device_update(user_id, device_ids) else: # Simply update the single device, since we know that is the only # change (becuase of the single prev_id matching the current cache) - content = dict(edu_content) - for key in ("user_id", "device_id", "stream_id", "prev_ids"): - content.pop(key, None) - yield self.store.update_remote_device_list_cache_entry( - user_id, device_id, content, stream_id, + for device_id, stream_id, prev_ids, content in pending_updates: + yield self.store.update_remote_device_list_cache_entry( + user_id, device_id, content, stream_id, + ) + + yield self.device_handler.notify_device_update( + user_id, [device_id for device_id, _, _, _ in pending_updates] ) - yield self.notify_device_update(user_id, [device_id]) - @defer.inlineCallbacks - def on_federation_query_user_devices(self, user_id): - stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - defer.returnValue({ - "user_id": user_id, - "stream_id": stream_id, - "devices": devices, - }) + self._seen_updates.setdefault(user_id, set()).update( + stream_id for _, stream_id, _, _ in pending_updates + ) @defer.inlineCallbacks - def user_left_room(self, user, room_id): - user_id = user.to_string() - rooms = yield self.store.get_rooms_for_user(user_id) - if not rooms: - # We no longer share rooms with this user, so we'll no longer - # receive device updates. Mark this in DB. - yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) + def _need_to_do_resync(self, user_id, updates): + """Given a list of updates for a user figure out if we need to do a full + resync, or whether we have enough data that we can just apply the delta. + """ + seen_updates = self._seen_updates.get(user_id, set()) + extremity = yield self.store.get_device_list_last_stream_id_for_remote( + user_id + ) -def _update_device_from_client_ips(device, client_ips): - ip = client_ips.get((device["user_id"], device["device_id"]), {}) - device.update({ - "last_seen_ts": ip.get("last_seen"), - "last_seen_ip": ip.get("ip"), - }) + stream_id_in_updates = set() # stream_ids in updates list + for _, stream_id, prev_ids, _ in updates: + if not prev_ids: + # We always do a resync if there are no previous IDs + defer.returnValue(True) + + for prev_id in prev_ids: + if prev_id == extremity: + continue + elif prev_id in seen_updates: + continue + elif prev_id in stream_id_in_updates: + continue + else: + defer.returnValue(True) + + stream_id_in_updates.add(stream_id) + + defer.returnValue(False) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 996bfd0e23..ed0fa51e7f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1096,7 +1096,7 @@ class FederationHandler(BaseHandler): if prev_id != event.event_id: results[(event.type, event.state_key)] = prev_id else: - del results[(event.type, event.state_key)] + results.pop((event.type, event.state_key), None) defer.returnValue(results.values()) else: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fdfce2a88c..da610e430f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -531,7 +531,7 @@ class PresenceHandler(object): # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) - states.update({state.user_id: state for state in res}) + states.update(res) missing = [user_id for user_id, state in states.items() if not state] if missing: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b2806555cf..2052d6d05f 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -719,7 +719,9 @@ class RoomMemberHandler(BaseHandler): ) membership = member.membership if member else None - if membership is not None and membership != Membership.LEAVE: + if membership is not None and membership not in [ + Membership.LEAVE, Membership.BAN + ]: raise SynapseError(400, "User %s in room %s" % ( user_id, room_id )) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d7dcd1ce5b..5572cb883f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -609,14 +609,14 @@ class SyncHandler(object): deleted = yield self.store.delete_messages_for_device( user_id, device_id, since_stream_id ) - logger.info("Deleted %d to-device messages up to %d", - deleted, since_stream_id) + logger.debug("Deleted %d to-device messages up to %d", + deleted, since_stream_id) messages, stream_id = yield self.store.get_new_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) - logger.info( + logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", len(messages), since_stream_id, stream_id, now_token.to_device_key ) |