diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 084e33ca6a..f36b358b45 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -19,7 +19,6 @@ from ._base import BaseHandler
import logging
-
logger = logging.getLogger(__name__)
@@ -54,3 +53,46 @@ class AdminHandler(BaseHandler):
}
defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_users(self):
+ """Function to reterive a list of users in users table.
+
+ Args:
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+ """
+ ret = yield self.store.get_users()
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_users_paginate(self, order, start, limit):
+ """Function to reterive a paginated list of users from
+ users list. This will return a json object, which contains
+ list of users and the total number of users in users table.
+
+ Args:
+ order (str): column name to order the select by this column
+ start (int): start number to begin the query from
+ limit (int): number of rows to reterive
+ Returns:
+ defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+ """
+ ret = yield self.store.get_users_paginate(order, start, limit)
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def search_users(self, term):
+ """Function to search users list for one or more users with
+ the matched term.
+
+ Args:
+ term (str): search term
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+ """
+ ret = yield self.store.search_users(term)
+
+ defer.returnValue(ret)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 8cb47ac417..e859b3165f 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,11 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.util import stringutils
from synapse.util.async import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id, RoomStreamToken
from twisted.internet import defer
@@ -35,10 +35,11 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler()
self.federation_sender = hs.get_federation_sender()
self.federation = hs.get_replication_layer()
- self._remote_edue_linearizer = Linearizer(name="remote_device_list")
+
+ self._edu_updater = DeviceListEduUpdater(hs, self)
self.federation.register_edu_handler(
- "m.device_list_update", self._incoming_device_list_update,
+ "m.device_list_update", self._edu_updater.incoming_device_list_update,
)
self.federation.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
@@ -246,30 +247,51 @@ class DeviceHandler(BaseHandler):
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+ stream_ordering = RoomStreamToken.parse_stream_token(
+ from_token.room_key).stream
+
possibly_changed = set(changed)
for room_id in rooms_changed:
- # Fetch the current state at the time.
- stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
-
+ # Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering
)
- prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
- except:
- prev_state_ids = {}
+ except errors.StoreError:
+ # we have purged the stream_ordering index since the stream
+ # ordering: treat it the same as a new room
+ event_ids = []
current_state_ids = yield self.state.get_current_state_ids(room_id)
+ # special-case for an empty prev state: include all members
+ # in the changed list
+ if not event_ids:
+ for key, event_id in current_state_ids.iteritems():
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ continue
+
+ # mapping from event_id -> state_dict
+ prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
- if etype == EventTypes.Member:
- prev_event_id = prev_state_ids.get(key, None)
+ if etype != EventTypes.Member:
+ continue
+
+ # check if this member has changed since any of the extremities
+ # at the stream_ordering, and add them to the list if so.
+ for state_dict in prev_state_ids.values():
+ prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key)
+ break
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
@@ -279,13 +301,69 @@ class DeviceHandler(BaseHandler):
# and those that actually still share a room with the user
defer.returnValue(users_who_share_room & possibly_changed)
- @measure_func("_incoming_device_list_update")
@defer.inlineCallbacks
- def _incoming_device_list_update(self, origin, edu_content):
- user_id = edu_content["user_id"]
- device_id = edu_content["device_id"]
- stream_id = edu_content["stream_id"]
- prev_ids = edu_content.get("prev_id", [])
+ def on_federation_query_user_devices(self, user_id):
+ stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
+ defer.returnValue({
+ "user_id": user_id,
+ "stream_id": stream_id,
+ "devices": devices,
+ })
+
+ @defer.inlineCallbacks
+ def user_left_room(self, user, room_id):
+ user_id = user.to_string()
+ rooms = yield self.store.get_rooms_for_user(user_id)
+ if not rooms:
+ # We no longer share rooms with this user, so we'll no longer
+ # receive device updates. Mark this in DB.
+ yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+
+def _update_device_from_client_ips(device, client_ips):
+ ip = client_ips.get((device["user_id"], device["device_id"]), {})
+ device.update({
+ "last_seen_ts": ip.get("last_seen"),
+ "last_seen_ip": ip.get("ip"),
+ })
+
+
+class DeviceListEduUpdater(object):
+ "Handles incoming device list updates from federation and updates the DB"
+
+ def __init__(self, hs, device_handler):
+ self.store = hs.get_datastore()
+ self.federation = hs.get_replication_layer()
+ self.clock = hs.get_clock()
+ self.device_handler = device_handler
+
+ self._remote_edu_linearizer = Linearizer(name="remote_device_list")
+
+ # user_id -> list of updates waiting to be handled.
+ self._pending_updates = {}
+
+ # Recently seen stream ids. We don't bother keeping these in the DB,
+ # but they're useful to have them about to reduce the number of spurious
+ # resyncs.
+ self._seen_updates = ExpiringCache(
+ cache_name="device_update_edu",
+ clock=self.clock,
+ max_len=10000,
+ expiry_ms=30 * 60 * 1000,
+ iterable=True,
+ )
+
+ @defer.inlineCallbacks
+ def incoming_device_list_update(self, origin, edu_content):
+ """Called on incoming device list update from federation. Responsible
+ for parsing the EDU and adding to pending updates list.
+ """
+
+ user_id = edu_content.pop("user_id")
+ device_id = edu_content.pop("device_id")
+ stream_id = str(edu_content.pop("stream_id")) # They may come as ints
+ prev_ids = edu_content.pop("prev_id", [])
+ prev_ids = [str(p) for p in prev_ids] # They may come as ints
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
@@ -298,20 +376,28 @@ class DeviceHandler(BaseHandler):
# probably won't get any further updates.
return
- with (yield self._remote_edue_linearizer.queue(user_id)):
- # If the prev id matches whats in our cache table, then we don't need
- # to resync the users device list, otherwise we do.
- resync = True
- if len(prev_ids) == 1:
- extremity = yield self.store.get_device_list_last_stream_id_for_remote(
- user_id
- )
- logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
- if str(extremity) == str(prev_ids[0]):
- resync = False
+ self._pending_updates.setdefault(user_id, []).append(
+ (device_id, stream_id, prev_ids, edu_content)
+ )
+
+ yield self._handle_device_updates(user_id)
+
+ @measure_func("_incoming_device_list_update")
+ @defer.inlineCallbacks
+ def _handle_device_updates(self, user_id):
+ "Actually handle pending updates."
+
+ with (yield self._remote_edu_linearizer.queue(user_id)):
+ pending_updates = self._pending_updates.pop(user_id, [])
+ if not pending_updates:
+ # This can happen since we batch updates
+ return
+
+ resync = yield self._need_to_do_resync(user_id, pending_updates)
if resync:
# Fetch all devices for the user.
+ origin = get_domain_from_id(user_id)
result = yield self.federation.query_user_devices(origin, user_id)
stream_id = result["stream_id"]
devices = result["devices"]
@@ -319,40 +405,50 @@ class DeviceHandler(BaseHandler):
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
- yield self.notify_device_update(user_id, device_ids)
+ yield self.device_handler.notify_device_update(user_id, device_ids)
else:
# Simply update the single device, since we know that is the only
# change (becuase of the single prev_id matching the current cache)
- content = dict(edu_content)
- for key in ("user_id", "device_id", "stream_id", "prev_ids"):
- content.pop(key, None)
- yield self.store.update_remote_device_list_cache_entry(
- user_id, device_id, content, stream_id,
+ for device_id, stream_id, prev_ids, content in pending_updates:
+ yield self.store.update_remote_device_list_cache_entry(
+ user_id, device_id, content, stream_id,
+ )
+
+ yield self.device_handler.notify_device_update(
+ user_id, [device_id for device_id, _, _, _ in pending_updates]
)
- yield self.notify_device_update(user_id, [device_id])
- @defer.inlineCallbacks
- def on_federation_query_user_devices(self, user_id):
- stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
- defer.returnValue({
- "user_id": user_id,
- "stream_id": stream_id,
- "devices": devices,
- })
+ self._seen_updates.setdefault(user_id, set()).update(
+ stream_id for _, stream_id, _, _ in pending_updates
+ )
@defer.inlineCallbacks
- def user_left_room(self, user, room_id):
- user_id = user.to_string()
- rooms = yield self.store.get_rooms_for_user(user_id)
- if not rooms:
- # We no longer share rooms with this user, so we'll no longer
- # receive device updates. Mark this in DB.
- yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
+ def _need_to_do_resync(self, user_id, updates):
+ """Given a list of updates for a user figure out if we need to do a full
+ resync, or whether we have enough data that we can just apply the delta.
+ """
+ seen_updates = self._seen_updates.get(user_id, set())
+ extremity = yield self.store.get_device_list_last_stream_id_for_remote(
+ user_id
+ )
-def _update_device_from_client_ips(device, client_ips):
- ip = client_ips.get((device["user_id"], device["device_id"]), {})
- device.update({
- "last_seen_ts": ip.get("last_seen"),
- "last_seen_ip": ip.get("ip"),
- })
+ stream_id_in_updates = set() # stream_ids in updates list
+ for _, stream_id, prev_ids, _ in updates:
+ if not prev_ids:
+ # We always do a resync if there are no previous IDs
+ defer.returnValue(True)
+
+ for prev_id in prev_ids:
+ if prev_id == extremity:
+ continue
+ elif prev_id in seen_updates:
+ continue
+ elif prev_id in stream_id_in_updates:
+ continue
+ else:
+ defer.returnValue(True)
+
+ stream_id_in_updates.add(stream_id)
+
+ defer.returnValue(False)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 996bfd0e23..ed0fa51e7f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1096,7 +1096,7 @@ class FederationHandler(BaseHandler):
if prev_id != event.event_id:
results[(event.type, event.state_key)] = prev_id
else:
- del results[(event.type, event.state_key)]
+ results.pop((event.type, event.state_key), None)
defer.returnValue(results.values())
else:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fdfce2a88c..da610e430f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -531,7 +531,7 @@ class PresenceHandler(object):
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
- states.update({state.user_id: state for state in res})
+ states.update(res)
missing = [user_id for user_id, state in states.items() if not state]
if missing:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b2806555cf..2052d6d05f 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -719,7 +719,9 @@ class RoomMemberHandler(BaseHandler):
)
membership = member.membership if member else None
- if membership is not None and membership != Membership.LEAVE:
+ if membership is not None and membership not in [
+ Membership.LEAVE, Membership.BAN
+ ]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d7dcd1ce5b..5572cb883f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -609,14 +609,14 @@ class SyncHandler(object):
deleted = yield self.store.delete_messages_for_device(
user_id, device_id, since_stream_id
)
- logger.info("Deleted %d to-device messages up to %d",
- deleted, since_stream_id)
+ logger.debug("Deleted %d to-device messages up to %d",
+ deleted, since_stream_id)
messages, stream_id = yield self.store.get_new_messages_for_device(
user_id, device_id, since_stream_id, now_token.to_device_key
)
- logger.info(
+ logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
len(messages), since_stream_id, stream_id, now_token.to_device_key
)
|