summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/config/tls.py2
-rw-r--r--synapse/handlers/admin.py44
-rw-r--r--synapse/handlers/auth.py80
-rw-r--r--synapse/handlers/device.py46
-rw-r--r--synapse/handlers/presence.py46
-rw-r--r--synapse/handlers/register.py10
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/handlers/sync.py3
-rw-r--r--synapse/notifier.py1
-rw-r--r--synapse/push/mailer.py4
-rw-r--r--synapse/replication/slave/storage/account_data.py6
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/replication/slave/storage/presence.py4
-rw-r--r--synapse/rest/client/transactions.py14
-rw-r--r--synapse/rest/client/v1/admin.py220
-rw-r--r--synapse/rest/client/v1/login.py5
-rw-r--r--synapse/rest/client/v1/profile.py6
-rw-r--r--synapse/rest/client/v2_alpha/keys.py2
-rw-r--r--synapse/rest/client/v2_alpha/register.py3
-rw-r--r--synapse/server.py6
-rw-r--r--synapse/storage/__init__.py76
-rw-r--r--synapse/storage/_base.py159
-rw-r--r--synapse/storage/end_to_end_keys.py2
-rw-r--r--synapse/storage/events.py15
-rw-r--r--synapse/storage/presence.py14
-rw-r--r--synapse/storage/roommember.py21
-rw-r--r--synapse/util/caches/descriptors.py5
30 files changed, 683 insertions, 124 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 498ded38c0..da8ef90a77 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.18.7"
+__version__ = "0.19.1"
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index a8123cddcb..ca23c9c460 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -43,9 +43,6 @@ class JoinRules(object):
 
 class LoginType(object):
     PASSWORD = u"m.login.password"
-    OAUTH = u"m.login.oauth2"
-    EMAIL_CODE = u"m.login.email.code"
-    EMAIL_URL = u"m.login.email.url"
     EMAIL_IDENTITY = u"m.login.email.identity"
     RECAPTCHA = u"m.login.recaptcha"
     DUMMY = u"m.login.dummy"
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index b3fb408cfd..3f29595256 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -87,6 +87,10 @@ class SynchrotronSlavedStore(
         RoomMemberStore.__dict__["who_forgot_in_room"]
     )
 
+    did_forget = (
+        RoomMemberStore.__dict__["did_forget"]
+    )
+
     # XXX: This is a bit broken because we don't persist the accepted list in a
     # way that can be replicated. This means that we don't have a way to
     # invalidate the cache correctly.
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 3c58d2de17..e081840a83 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -95,7 +95,7 @@ class TlsConfig(Config):
         # make HTTPS requests to this server will check that the TLS
         # certificates returned by this server match one of the fingerprints.
         #
-        # Synapse automatically adds its the fingerprint of its own certificate
+        # Synapse automatically adds the fingerprint of its own certificate
         # to the list. So if federation traffic is handle directly by synapse
         # then no modification to the list is required.
         #
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 084e33ca6a..f36b358b45 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -19,7 +19,6 @@ from ._base import BaseHandler
 
 import logging
 
-
 logger = logging.getLogger(__name__)
 
 
@@ -54,3 +53,46 @@ class AdminHandler(BaseHandler):
         }
 
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_users(self):
+        """Function to reterive a list of users in users table.
+
+        Args:
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        ret = yield self.store.get_users()
+
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_users_paginate(self, order, start, limit):
+        """Function to reterive a paginated list of users from
+        users list. This will return a json object, which contains
+        list of users and the total number of users in users table.
+
+        Args:
+            order (str): column name to order the select by this column
+            start (int): start number to begin the query from
+            limit (int): number of rows to reterive
+        Returns:
+            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+        """
+        ret = yield self.store.get_users_paginate(order, start, limit)
+
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def search_users(self, term):
+        """Function to search users list for one or more users with
+        the matched term.
+
+        Args:
+            term (str): search term
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        ret = yield self.store.search_users(term)
+
+        defer.returnValue(ret)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 221d7ea7a2..fffba34383 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -65,6 +65,7 @@ class AuthHandler(BaseHandler):
 
         self.hs = hs  # FIXME better possibility to access registrationHandler later?
         self.device_handler = hs.get_device_handler()
+        self.macaroon_gen = hs.get_macaroon_generator()
 
     @defer.inlineCallbacks
     def check_auth(self, flows, clientdict, clientip):
@@ -529,37 +530,11 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def issue_access_token(self, user_id, device_id=None):
-        access_token = self.generate_access_token(user_id)
+        access_token = self.macaroon_gen.generate_access_token(user_id)
         yield self.store.add_access_token_to_user(user_id, access_token,
                                                   device_id)
         defer.returnValue(access_token)
 
-    def generate_access_token(self, user_id, extra_caveats=None):
-        extra_caveats = extra_caveats or []
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = access")
-        # Include a nonce, to make sure that each login gets a different
-        # access token.
-        macaroon.add_first_party_caveat("nonce = %s" % (
-            stringutils.random_string_with_symbols(16),
-        ))
-        for caveat in extra_caveats:
-            macaroon.add_first_party_caveat(caveat)
-        return macaroon.serialize()
-
-    def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = login")
-        now = self.hs.get_clock().time_msec()
-        expiry = now + duration_in_ms
-        macaroon.add_first_party_caveat("time < %d" % (expiry,))
-        return macaroon.serialize()
-
-    def generate_delete_pusher_token(self, user_id):
-        macaroon = self._generate_base_macaroon(user_id)
-        macaroon.add_first_party_caveat("type = delete_pusher")
-        return macaroon.serialize()
-
     def validate_short_term_login_token_and_get_user_id(self, login_token):
         auth_api = self.hs.get_auth()
         try:
@@ -570,15 +545,6 @@ class AuthHandler(BaseHandler):
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
 
-    def _generate_base_macaroon(self, user_id):
-        macaroon = pymacaroons.Macaroon(
-            location=self.hs.config.server_name,
-            identifier="key",
-            key=self.hs.config.macaroon_secret_key)
-        macaroon.add_first_party_caveat("gen = 1")
-        macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
-        return macaroon
-
     @defer.inlineCallbacks
     def set_password(self, user_id, newpassword, requester=None):
         password_hash = self.hash(newpassword)
@@ -673,6 +639,48 @@ class AuthHandler(BaseHandler):
             return False
 
 
+class MacaroonGeneartor(object):
+    def __init__(self, hs):
+        self.clock = hs.get_clock()
+        self.server_name = hs.config.server_name
+        self.macaroon_secret_key = hs.config.macaroon_secret_key
+
+    def generate_access_token(self, user_id, extra_caveats=None):
+        extra_caveats = extra_caveats or []
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = access")
+        # Include a nonce, to make sure that each login gets a different
+        # access token.
+        macaroon.add_first_party_caveat("nonce = %s" % (
+            stringutils.random_string_with_symbols(16),
+        ))
+        for caveat in extra_caveats:
+            macaroon.add_first_party_caveat(caveat)
+        return macaroon.serialize()
+
+    def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = login")
+        now = self.clock.time_msec()
+        expiry = now + duration_in_ms
+        macaroon.add_first_party_caveat("time < %d" % (expiry,))
+        return macaroon.serialize()
+
+    def generate_delete_pusher_token(self, user_id):
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = delete_pusher")
+        return macaroon.serialize()
+
+    def _generate_base_macaroon(self, user_id):
+        macaroon = pymacaroons.Macaroon(
+            location=self.server_name,
+            identifier="key",
+            key=self.macaroon_secret_key)
+        macaroon.add_first_party_caveat("gen = 1")
+        macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
+        return macaroon
+
+
 class _AccountHandler(object):
     """A proxy object that gets passed to password auth providers so they
     can register new users etc if necessary.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 815410969c..8cb47ac417 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,7 +17,8 @@ from synapse.api import errors
 from synapse.api.constants import EventTypes
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
-from synapse.types import get_domain_from_id
+from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id, RoomStreamToken
 from twisted.internet import defer
 from ._base import BaseHandler
 
@@ -193,25 +194,28 @@ class DeviceHandler(BaseHandler):
             else:
                 raise
 
+    @measure_func("notify_device_update")
     @defer.inlineCallbacks
     def notify_device_update(self, user_id, device_ids):
         """Notify that a user's device(s) has changed. Pokes the notifier, and
         remote servers if the user is local.
         """
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = [r.room_id for r in rooms]
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
 
         hosts = set()
         if self.hs.is_mine_id(user_id):
-            for room_id in room_ids:
-                users = yield self.store.get_users_in_room(room_id)
-                hosts.update(get_domain_from_id(u) for u in users)
+            hosts.update(get_domain_from_id(u) for u in users_who_share_room)
             hosts.discard(self.server_name)
 
         position = yield self.store.add_device_change_to_streams(
             user_id, device_ids, list(hosts)
         )
 
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        room_ids = [r.room_id for r in rooms]
+
         yield self.notifier.on_new_event(
             "device_list_key", position, rooms=room_ids,
         )
@@ -221,6 +225,7 @@ class DeviceHandler(BaseHandler):
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
 
+    @measure_func("device.get_user_ids_changed")
     @defer.inlineCallbacks
     def get_user_ids_changed(self, user_id, from_token):
         """Get list of users that have had the devices updated, or have newly
@@ -243,15 +248,15 @@ class DeviceHandler(BaseHandler):
 
         possibly_changed = set(changed)
         for room_id in rooms_changed:
-            # Fetch (an approximation) of the current state at the time.
-            event_rows, token = yield self.store.get_recent_event_ids_for_room(
-                room_id, end_token=from_token.room_key, limit=1,
-            )
+            # Fetch  the current state at the time.
+            stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
 
-            if event_rows:
-                last_event_id = event_rows[-1]["event_id"]
-                prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id)
-            else:
+            try:
+                event_ids = yield self.store.get_forward_extremeties_for_room(
+                    room_id, stream_ordering=stream_ordering
+                )
+                prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+            except:
                 prev_state_ids = {}
 
             current_state_ids = yield self.state.get_current_state_ids(room_id)
@@ -266,14 +271,15 @@ class DeviceHandler(BaseHandler):
                     if not prev_event_id or prev_event_id != event_id:
                         possibly_changed.add(state_key)
 
-        user_ids_changed = set()
-        for other_user_id in possibly_changed:
-            other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-            if room_ids.intersection(e.room_id for e in other_rooms):
-                user_ids_changed.add(other_user_id)
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
 
-        defer.returnValue(user_ids_changed)
+        # Take the intersection of the users whose devices may have changed
+        # and those that actually still share a room with the user
+        defer.returnValue(users_who_share_room & possibly_changed)
 
+    @measure_func("_incoming_device_list_update")
     @defer.inlineCallbacks
     def _incoming_device_list_update(self, origin, edu_content):
         user_id = edu_content["user_id"]
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9982ae0fed..da610e430f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -531,7 +531,7 @@ class PresenceHandler(object):
             # There are things not in our in memory cache. Lets pull them out of
             # the database.
             res = yield self.store.get_presence_for_users(missing)
-            states.update({state.user_id: state for state in res})
+            states.update(res)
 
             missing = [user_id for user_id, state in states.items() if not state]
             if missing:
@@ -1011,7 +1011,7 @@ class PresenceEventSource(object):
     @defer.inlineCallbacks
     @log_function
     def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
-                       **kwargs):
+                       explicit_room_id=None, **kwargs):
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1028,22 +1028,24 @@ class PresenceEventSource(object):
             user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
-            room_ids = room_ids or []
 
             presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
 
-            if not room_ids:
-                rooms = yield self.store.get_rooms_for_user(user_id)
-                room_ids = set(e.room_id for e in rooms)
-            else:
-                room_ids = set(room_ids)
-
             max_token = self.store.get_current_presence_token()
 
             plist = yield self.store.get_presence_list_accepted(user.localpart)
-            friends = set(row["observed_user_id"] for row in plist)
-            friends.add(user_id)  # So that we receive our own presence
+            users_interested_in = set(row["observed_user_id"] for row in plist)
+            users_interested_in.add(user_id)  # So that we receive our own presence
+
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
+            users_interested_in.update(users_who_share_room)
+
+            if explicit_room_id:
+                user_ids = yield self.store.get_users_in_room(explicit_room_id)
+                users_interested_in.update(user_ids)
 
             user_ids_changed = set()
             changed = None
@@ -1055,35 +1057,19 @@ class PresenceEventSource(object):
                 # work out if we share a room or they're in our presence list
                 get_updates_counter.inc("stream")
                 for other_user_id in changed:
-                    if other_user_id in friends:
+                    if other_user_id in users_interested_in:
                         user_ids_changed.add(other_user_id)
-                        continue
-                    other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-                    if room_ids.intersection(e.room_id for e in other_rooms):
-                        user_ids_changed.add(other_user_id)
-                        continue
             else:
                 # Too many possible updates. Find all users we can see and check
                 # if any of them have changed.
                 get_updates_counter.inc("full")
 
-                user_ids_to_check = set()
-                for room_id in room_ids:
-                    users = yield self.store.get_users_in_room(room_id)
-                    user_ids_to_check.update(users)
-
-                user_ids_to_check.update(friends)
-
-                # Always include yourself. Only really matters for when the user is
-                # not in any rooms, but still.
-                user_ids_to_check.add(user_id)
-
                 if from_key:
                     user_ids_changed = stream_change_cache.get_entities_changed(
-                        user_ids_to_check, from_key,
+                        users_interested_in, from_key,
                     )
                 else:
-                    user_ids_changed = user_ids_to_check
+                    user_ids_changed = users_interested_in
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 286f0cef0a..03c6a85fc6 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -40,6 +40,8 @@ class RegistrationHandler(BaseHandler):
 
         self._next_generated_user_id = None
 
+        self.macaroon_gen = hs.get_macaroon_generator()
+
     @defer.inlineCallbacks
     def check_username(self, localpart, guest_access_token=None,
                        assigned_user_id=None):
@@ -143,7 +145,7 @@ class RegistrationHandler(BaseHandler):
 
             token = None
             if generate_token:
-                token = self.auth_handler().generate_access_token(user_id)
+                token = self.macaroon_gen.generate_access_token(user_id)
             yield self.store.register(
                 user_id=user_id,
                 token=token,
@@ -167,7 +169,7 @@ class RegistrationHandler(BaseHandler):
                 user_id = user.to_string()
                 yield self.check_user_id_not_appservice_exclusive(user_id)
                 if generate_token:
-                    token = self.auth_handler().generate_access_token(user_id)
+                    token = self.macaroon_gen.generate_access_token(user_id)
                 try:
                     yield self.store.register(
                         user_id=user_id,
@@ -254,7 +256,7 @@ class RegistrationHandler(BaseHandler):
         user_id = user.to_string()
 
         yield self.check_user_id_not_appservice_exclusive(user_id)
-        token = self.auth_handler().generate_access_token(user_id)
+        token = self.macaroon_gen.generate_access_token(user_id)
         try:
             yield self.store.register(
                 user_id=user_id,
@@ -399,7 +401,7 @@ class RegistrationHandler(BaseHandler):
 
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
-        token = self.auth_handler().generate_access_token(user_id)
+        token = self.macaroon_gen.generate_access_token(user_id)
 
         if need_register:
             yield self.store.register(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5f18007e90..7e7671c9a2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -437,6 +437,7 @@ class RoomEventSource(object):
             limit,
             room_ids,
             is_guest,
+            explicit_room_id=None,
     ):
         # We just ignore the key for now.
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9a44de3f33..d7dcd1ce5b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -16,7 +16,7 @@
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
 from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
@@ -561,6 +561,7 @@ class SyncHandler(object):
             next_batch=sync_result_builder.now_token,
         ))
 
+    @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
     def _generate_sync_entry_for_device_list(self, sync_result_builder):
         user_id = sync_result_builder.sync_config.user.to_string()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index acbd4bb5ae..8051a7a842 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -378,6 +378,7 @@ class Notifier(object):
                     limit=limit,
                     is_guest=is_peeking,
                     room_ids=room_ids,
+                    explicit_room_id=explicit_room_id,
                 )
 
                 if name == "room":
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index ce2d31fb98..62d794f22b 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -81,7 +81,7 @@ class Mailer(object):
     def __init__(self, hs, app_name):
         self.hs = hs
         self.store = self.hs.get_datastore()
-        self.auth_handler = self.hs.get_auth_handler()
+        self.macaroon_gen = self.hs.get_macaroon_generator()
         self.state_handler = self.hs.get_state_handler()
         loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
         self.app_name = app_name
@@ -466,7 +466,7 @@ class Mailer(object):
 
     def make_unsubscribe_link(self, user_id, app_id, email_address):
         params = {
-            "access_token": self.auth_handler.generate_delete_pusher_token(user_id),
+            "access_token": self.macaroon_gen.generate_delete_pusher_token(user_id),
             "app_id": app_id,
             "pushkey": email_address,
         }
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 735c03c7eb..77c64722c7 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -46,6 +46,12 @@ class SlavedAccountDataStore(BaseSlavedStore):
     )
 
     get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+    get_tags_for_room = (
+        DataStore.get_tags_for_room.__func__
+    )
+    get_account_data_for_room = (
+        DataStore.get_account_data_for_room.__func__
+    )
 
     get_updated_tags = DataStore.get_updated_tags.__func__
     get_updated_account_data_for_user = (
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 15a025a019..d72ff6055c 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -73,6 +73,9 @@ class SlavedEventStore(BaseSlavedStore):
     # to reach inside the __dict__ to extract them.
     get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
     get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
+    get_users_who_share_room_with_user = (
+        RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
+    )
     get_latest_event_ids_in_room = EventFederationStore.__dict__[
         "get_latest_event_ids_in_room"
     ]
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 703f4a49bf..40f6c9a386 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
 
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.storage import DataStore
+from synapse.storage.presence import PresenceStore
 
 
 class SlavedPresenceStore(BaseSlavedStore):
@@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore):
 
     _get_active_presence = DataStore._get_active_presence.__func__
     take_presence_startup_info = DataStore.take_presence_startup_info.__func__
-    get_presence_for_users = DataStore.get_presence_for_users.__func__
+    _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
+    get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
 
     def get_current_presence_token(self):
         return self._presence_id_gen.get_current_token()
diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py
index efa77b8c51..fceca2edeb 100644
--- a/synapse/rest/client/transactions.py
+++ b/synapse/rest/client/transactions.py
@@ -87,9 +87,17 @@ class HttpTransactionCache(object):
 
         deferred = fn(*args, **kwargs)
 
-        # We don't add an errback to the raw deferred, so we ask ObservableDeferred
-        # to swallow the error. This is fine as the error will still be reported
-        # to the observers.
+        # if the request fails with a Twisted failure, remove it
+        # from the transaction map. This is done to ensure that we don't
+        # cache transient errors like rate-limiting errors, etc.
+        def remove_from_map(err):
+            self.transactions.pop(txn_key, None)
+            return err
+        deferred.addErrback(remove_from_map)
+
+        # We don't add any other errbacks to the raw deferred, so we ask
+        # ObservableDeferred to swallow the error. This is fine as the error will
+        # still be reported to the observers.
         observable = ObservableDeferred(deferred, consumeErrors=True)
         self.transactions[txn_key] = (observable, self.clock.time_msec())
         return observable.observe()
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index af21661d7c..29fcd72375 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.types import UserID
+from synapse.http.servlet import parse_json_object_from_request
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -25,6 +26,34 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+class UsersRestServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/admin/users/(?P<user_id>[^/]*)")
+
+    def __init__(self, hs):
+        super(UsersRestServlet, self).__init__(hs)
+        self.handlers = hs.get_handlers()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, user_id):
+        target_user = UserID.from_string(user_id)
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        # To allow all users to get the users list
+        # if not is_admin and target_user != auth_user:
+        #     raise AuthError(403, "You are not a server admin")
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        ret = yield self.handlers.admin_handler.get_users()
+
+        defer.returnValue((200, ret))
+
+
 class WhoisRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
 
@@ -128,8 +157,199 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
         defer.returnValue((200, {}))
 
 
+class ResetPasswordRestServlet(ClientV1RestServlet):
+    """Post request to allow an administrator reset password for a user.
+    This need a user have a administrator access in Synapse.
+        Example:
+            http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
+            @user:to_reset_password?access_token=admin_access_token
+        JsonBodyToSend:
+            {
+                "new_password": "secret"
+            }
+        Returns:
+            200 OK with empty object if success otherwise an error.
+        """
+    PATTERNS = client_path_patterns("/admin/reset_password/(?P<target_user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        super(ResetPasswordRestServlet, self).__init__(hs)
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.auth_handler = hs.get_auth_handler()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, target_user_id):
+        """Post request to allow an administrator reset password for a user.
+        This need a user have a administrator access in Synapse.
+        """
+        UserID.from_string(target_user_id)
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        params = parse_json_object_from_request(request)
+        new_password = params['new_password']
+        if not new_password:
+            raise SynapseError(400, "Missing 'new_password' arg")
+
+        logger.info("new_password: %r", new_password)
+
+        yield self.auth_handler.set_password(
+            target_user_id, new_password, requester
+        )
+        defer.returnValue((200, {}))
+
+
+class GetUsersPaginatedRestServlet(ClientV1RestServlet):
+    """Get request to get specific number of users from Synapse.
+    This need a user have a administrator access in Synapse.
+        Example:
+            http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
+            @admin:user?access_token=admin_access_token&start=0&limit=10
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+        """
+    PATTERNS = client_path_patterns("/admin/users_paginate/(?P<target_user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        super(GetUsersPaginatedRestServlet, self).__init__(hs)
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, target_user_id):
+        """Get request to get specific number of users from Synapse.
+        This need a user have a administrator access in Synapse.
+        """
+        target_user = UserID.from_string(target_user_id)
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        # To allow all users to get the users list
+        # if not is_admin and target_user != auth_user:
+        #     raise AuthError(403, "You are not a server admin")
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        order = "name"  # order by name in user table
+        start = request.args.get("start")[0]
+        limit = request.args.get("limit")[0]
+        if not limit:
+            raise SynapseError(400, "Missing 'limit' arg")
+        if not start:
+            raise SynapseError(400, "Missing 'start' arg")
+        logger.info("limit: %s, start: %s", limit, start)
+
+        ret = yield self.handlers.admin_handler.get_users_paginate(
+            order, start, limit
+        )
+        defer.returnValue((200, ret))
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, target_user_id):
+        """Post request to get specific number of users from Synapse..
+        This need a user have a administrator access in Synapse.
+        Example:
+            http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
+            @admin:user?access_token=admin_access_token
+        JsonBodyToSend:
+            {
+                "start": "0",
+                "limit": "10
+            }
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+        """
+        UserID.from_string(target_user_id)
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        order = "name"  # order by name in user table
+        params = parse_json_object_from_request(request)
+        limit = params['limit']
+        start = params['start']
+        if not limit:
+            raise SynapseError(400, "Missing 'limit' arg")
+        if not start:
+            raise SynapseError(400, "Missing 'start' arg")
+        logger.info("limit: %s, start: %s", limit, start)
+
+        ret = yield self.handlers.admin_handler.get_users_paginate(
+            order, start, limit
+        )
+        defer.returnValue((200, ret))
+
+
+class SearchUsersRestServlet(ClientV1RestServlet):
+    """Get request to search user table for specific users according to
+    search term.
+    This need a user have a administrator access in Synapse.
+        Example:
+            http://localhost:8008/_matrix/client/api/v1/admin/search_users/
+            @admin:user?access_token=admin_access_token&term=alice
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+    """
+    PATTERNS = client_path_patterns("/admin/search_users/(?P<target_user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        super(SearchUsersRestServlet, self).__init__(hs)
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, target_user_id):
+        """Get request to search user table for specific users according to
+        search term.
+        This need a user have a administrator access in Synapse.
+        """
+        target_user = UserID.from_string(target_user_id)
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        # To allow all users to get the users list
+        # if not is_admin and target_user != auth_user:
+        #     raise AuthError(403, "You are not a server admin")
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        term = request.args.get("term")[0]
+        if not term:
+            raise SynapseError(400, "Missing 'term' arg")
+
+        logger.info("term: %s ", term)
+
+        ret = yield self.handlers.admin_handler.search_users(
+            term
+        )
+        defer.returnValue((200, ret))
+
+
 def register_servlets(hs, http_server):
     WhoisRestServlet(hs).register(http_server)
     PurgeMediaCacheRestServlet(hs).register(http_server)
     DeactivateAccountRestServlet(hs).register(http_server)
     PurgeHistoryRestServlet(hs).register(http_server)
+    UsersRestServlet(hs).register(http_server)
+    ResetPasswordRestServlet(hs).register(http_server)
+    GetUsersPaginatedRestServlet(hs).register(http_server)
+    SearchUsersRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 0c9cdff3b8..72057f1b0c 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -330,6 +330,7 @@ class CasTicketServlet(ClientV1RestServlet):
         self.cas_required_attributes = hs.config.cas_required_attributes
         self.auth_handler = hs.get_auth_handler()
         self.handlers = hs.get_handlers()
+        self.macaroon_gen = hs.get_macaroon_generator()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
@@ -368,7 +369,9 @@ class CasTicketServlet(ClientV1RestServlet):
                 yield self.handlers.registration_handler.register(localpart=user)
             )
 
-        login_token = auth_handler.generate_short_term_login_token(registered_user_id)
+        login_token = self.macaroon_gen.generate_short_term_login_token(
+            registered_user_id
+        )
         redirect_url = self.add_login_token_to_redirect_url(client_redirect_url,
                                                             login_token)
         request.redirect(redirect_url)
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index 355e82474b..1a5045c9ec 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -46,6 +46,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
     def on_PUT(self, request, user_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         user = UserID.from_string(user_id)
+        is_admin = yield self.auth.is_server_admin(requester.user)
 
         content = parse_json_object_from_request(request)
 
@@ -55,7 +56,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
             defer.returnValue((400, "Unable to parse name"))
 
         yield self.handlers.profile_handler.set_displayname(
-            user, requester, new_name)
+            user, requester, new_name, is_admin)
 
         defer.returnValue((200, {}))
 
@@ -88,6 +89,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
     def on_PUT(self, request, user_id):
         requester = yield self.auth.get_user_by_req(request)
         user = UserID.from_string(user_id)
+        is_admin = yield self.auth.is_server_admin(requester.user)
 
         content = parse_json_object_from_request(request)
         try:
@@ -96,7 +98,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
             defer.returnValue((400, "Unable to parse name"))
 
         yield self.handlers.profile_handler.set_avatar_url(
-            user, requester, new_name)
+            user, requester, new_name, is_admin)
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index f99b53530a..6a3cfe84f8 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -193,7 +193,7 @@ class KeyChangesServlet(RestServlet):
         )
 
         defer.returnValue((200, {
-            "changed": changed
+            "changed": list(changed),
         }))
 
 
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 3e7a285e10..ccca5a12d5 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -96,6 +96,7 @@ class RegisterRestServlet(RestServlet):
         self.registration_handler = hs.get_handlers().registration_handler
         self.identity_handler = hs.get_handlers().identity_handler
         self.device_handler = hs.get_device_handler()
+        self.macaroon_gen = hs.get_macaroon_generator()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -436,7 +437,7 @@ class RegisterRestServlet(RestServlet):
             user_id, device_id, initial_display_name
         )
 
-        access_token = self.auth_handler.generate_access_token(
+        access_token = self.macaroon_gen.generate_access_token(
             user_id, ["guest = true"]
         )
         defer.returnValue((200, {
diff --git a/synapse/server.py b/synapse/server.py
index 0bfb411269..c577032041 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -37,7 +37,7 @@ from synapse.federation.transport.client import TransportLayerClient
 from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
 from synapse.handlers.appservice import ApplicationServicesHandler
-from synapse.handlers.auth import AuthHandler
+from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
 from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
@@ -131,6 +131,7 @@ class HomeServer(object):
         'federation_transport_client',
         'federation_sender',
         'receipts_handler',
+        'macaroon_generator',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -213,6 +214,9 @@ class HomeServer(object):
     def build_auth_handler(self):
         return AuthHandler(self)
 
+    def build_macaroon_generator(self):
+        return MacaroonGeneartor(self)
+
     def build_device_handler(self):
         return DeviceHandler(self)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b9968debe5..d604e7668f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -297,6 +297,82 @@ class DataStore(RoomMemberStore, RoomStore,
             desc="get_user_ip_and_agents",
         )
 
+    def get_users(self):
+        """Function to reterive a list of users in users table.
+
+        Args:
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        return self._simple_select_list(
+            table="users",
+            keyvalues={},
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin"
+            ],
+            desc="get_users",
+        )
+
+    def get_users_paginate(self, order, start, limit):
+        """Function to reterive a paginated list of users from
+        users list. This will return a json object, which contains
+        list of users and the total number of users in users table.
+
+        Args:
+            order (str): column name to order the select by this column
+            start (int): start number to begin the query from
+            limit (int): number of rows to reterive
+        Returns:
+            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+        """
+        is_guest = 0
+        i_start = (int)(start)
+        i_limit = (int)(limit)
+        return self.get_user_list_paginate(
+            table="users",
+            keyvalues={
+                "is_guest": is_guest
+            },
+            pagevalues=[
+                order,
+                i_limit,
+                i_start
+            ],
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin"
+            ],
+            desc="get_users_paginate",
+        )
+
+    def search_users(self, term):
+        """Function to search users list for one or more users with
+        the matched term.
+
+        Args:
+            term (str): search term
+            col (str): column to query term should be matched to
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        return self._simple_search_list(
+            table="users",
+            term=term,
+            col="name",
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin"
+            ],
+            desc="search_users",
+        )
+
 
 def are_all_users_on_domain(txn, database_engine, domain):
     sql = database_engine.convert_param_style(
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 05374682fd..b0dc391190 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -934,6 +934,165 @@ class SQLBaseStore(object):
         else:
             return 0
 
+    def _simple_select_list_paginate(self, table, keyvalues, pagevalues, retcols,
+                                     desc="_simple_select_list_paginate"):
+        """Executes a SELECT query on the named table with start and limit,
+        of row numbers, which may return zero or number of rows from start to limit,
+        returning the result as a list of dicts.
+
+        Args:
+            table (str): the table name
+            keyvalues (dict[str, Any] | None):
+                column names and values to select the rows with, or None to not
+                apply a WHERE clause.
+            retcols (iterable[str]): the names of the columns to return
+            order (str): order the select by this column
+            start (int): start number to begin the query from
+            limit (int): number of rows to reterive
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        return self.runInteraction(
+            desc,
+            self._simple_select_list_paginate_txn,
+            table, keyvalues, pagevalues, retcols
+        )
+
+    @classmethod
+    def _simple_select_list_paginate_txn(cls, txn, table, keyvalues, pagevalues, retcols):
+        """Executes a SELECT query on the named table with start and limit,
+        of row numbers, which may return zero or number of rows from start to limit,
+        returning the result as a list of dicts.
+
+        Args:
+            txn : Transaction object
+            table (str): the table name
+            keyvalues (dict[str, T] | None):
+                column names and values to select the rows with, or None to not
+                apply a WHERE clause.
+            pagevalues ([]):
+                order (str): order the select by this column
+                start (int): start number to begin the query from
+                limit (int): number of rows to reterive
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+
+        """
+        if keyvalues:
+            sql = "SELECT %s FROM %s WHERE %s ORDER BY %s" % (
+                ", ".join(retcols),
+                table,
+                " AND ".join("%s = ?" % (k,) for k in keyvalues),
+                " ? ASC LIMIT ? OFFSET ?"
+            )
+            txn.execute(sql, keyvalues.values() + pagevalues)
+        else:
+            sql = "SELECT %s FROM %s ORDER BY %s" % (
+                ", ".join(retcols),
+                table,
+                " ? ASC LIMIT ? OFFSET ?"
+            )
+            txn.execute(sql, pagevalues)
+
+        return cls.cursor_to_dict(txn)
+
+    @defer.inlineCallbacks
+    def get_user_list_paginate(self, table, keyvalues, pagevalues, retcols,
+                               desc="get_user_list_paginate"):
+        """Get a list of users from start row to a limit number of rows. This will
+        return a json object with users and total number of users in users list.
+
+        Args:
+            table (str): the table name
+            keyvalues (dict[str, Any] | None):
+                column names and values to select the rows with, or None to not
+                apply a WHERE clause.
+            pagevalues ([]):
+                order (str): order the select by this column
+                start (int): start number to begin the query from
+                limit (int): number of rows to reterive
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+        """
+        users = yield self.runInteraction(
+            desc,
+            self._simple_select_list_paginate_txn,
+            table, keyvalues, pagevalues, retcols
+        )
+        count = yield self.runInteraction(
+            desc,
+            self.get_user_count_txn
+        )
+        retval = {
+            "users": users,
+            "total": count
+        }
+        defer.returnValue(retval)
+
+    def get_user_count_txn(self, txn):
+        """Get a total number of registerd users in the users list.
+
+        Args:
+            txn : Transaction object
+        Returns:
+            defer.Deferred: resolves to int
+        """
+        sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
+        txn.execute(sql_count)
+        count = txn.fetchone()[0]
+        defer.returnValue(count)
+
+    def _simple_search_list(self, table, term, col, retcols,
+                            desc="_simple_search_list"):
+        """Executes a SELECT query on the named table, which may return zero or
+        more rows, returning the result as a list of dicts.
+
+        Args:
+            table (str): the table name
+            term (str | None):
+                term for searching the table matched to a column.
+            col (str): column to query term should be matched to
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]] or None
+        """
+
+        return self.runInteraction(
+            desc,
+            self._simple_search_list_txn,
+            table, term, col, retcols
+        )
+
+    @classmethod
+    def _simple_search_list_txn(cls, txn, table, term, col, retcols):
+        """Executes a SELECT query on the named table, which may return zero or
+        more rows, returning the result as a list of dicts.
+
+        Args:
+            txn : Transaction object
+            table (str): the table name
+            term (str | None):
+                term for searching the table matched to a column.
+            col (str): column to query term should be matched to
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]] or None
+        """
+        if term:
+            sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (
+                ", ".join(retcols),
+                table,
+                col
+            )
+            termvalues = ["%%" + term + "%%"]
+            txn.execute(sql, termvalues)
+        else:
+            return 0
+
+        return cls.cursor_to_dict(txn)
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2040e022fa..b9f1365f92 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -93,7 +93,7 @@ class EndToEndKeyStore(SQLBaseStore):
             query_clause = "user_id = ?"
             query_params.append(user_id)
 
-            if device_id:
+            if device_id is not None:
                 query_clause += " AND device_id = ?"
                 query_params.append(device_id)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8659f605a5..c88f689d3a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -302,7 +302,7 @@ class EventsStore(SQLBaseStore):
                                 room_id
                             )
                             new_latest_event_ids = yield self._calculate_new_extremeties(
-                                room_id, [ev for ev, _ in ev_ctx_rm]
+                                room_id, ev_ctx_rm, latest_event_ids
                             )
 
                             if new_latest_event_ids == set(latest_event_ids):
@@ -329,27 +329,24 @@ class EventsStore(SQLBaseStore):
                 persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
-    def _calculate_new_extremeties(self, room_id, events):
+    def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
         """Calculates the new forward extremeties for a room given events to
         persist.
 
         Assumes that we are only persisting events for one room at a time.
         """
-        latest_event_ids = yield self.get_latest_event_ids_in_room(
-            room_id
-        )
         new_latest_event_ids = set(latest_event_ids)
         # First, add all the new events to the list
         new_latest_event_ids.update(
-            event.event_id for event in events
-            if not event.internal_metadata.is_outlier()
+            event.event_id for event, ctx in event_contexts
+            if not event.internal_metadata.is_outlier() and not ctx.rejected
         )
         # Now remove all events that are referenced by the to-be-added events
         new_latest_event_ids.difference_update(
             e_id
-            for event in events
+            for event, ctx in event_contexts
             for e_id, _ in event.prev_events
-            if not event.internal_metadata.is_outlier()
+            if not event.internal_metadata.is_outlier() and not ctx.rejected
         )
 
         # And finally remove any events that are referenced by previously added
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 7460f98a1f..4d1590d2b4 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -15,7 +15,7 @@
 
 from ._base import SQLBaseStore
 from synapse.api.constants import PresenceState
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
 from collections import namedtuple
 from twisted.internet import defer
@@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore):
                 self.presence_stream_cache.entity_has_changed,
                 state.user_id, stream_id,
             )
+            self._invalidate_cache_and_stream(
+                txn, self._get_presence_for_user, (state.user_id,)
+            )
 
         # Actually insert new rows
         self._simple_insert_many_txn(
@@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore):
             "get_all_presence_updates", get_all_presence_updates_txn
         )
 
-    @defer.inlineCallbacks
+    @cached()
+    def _get_presence_for_user(self, user_id):
+        raise NotImplementedError()
+
+    @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids",
+                num_args=1, inlineCallbacks=True)
     def get_presence_for_users(self, user_ids):
         rows = yield self._simple_select_many_batch(
             table="presence_stream",
@@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore):
         for row in rows:
             row["currently_active"] = bool(row["currently_active"])
 
-        defer.returnValue([UserPresenceState(**row) for row in rows])
+        defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
 
     def get_current_presence_token(self):
         return self._presence_id_gen.get_current_token()
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ee800d074f..545d3d3a99 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -129,7 +129,7 @@ class RoomMemberStore(SQLBaseStore):
         with self._stream_id_gen.get_next() as stream_ordering:
             yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
-    @cached(max_entries=100000, iterable=True)
+    @cached(max_entries=500000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
 
@@ -274,12 +274,29 @@ class RoomMemberStore(SQLBaseStore):
 
         return rows
 
-    @cached(max_entries=5000)
+    @cached(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
         return self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
 
+    @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
+    def get_users_who_share_room_with_user(self, user_id, cache_context):
+        """Returns the set of users who share a room with `user_id`
+        """
+        rooms = yield self.get_rooms_for_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+
+        user_who_share_room = set()
+        for room in rooms:
+            user_ids = yield self.get_users_in_room(
+                room.room_id, on_invalidate=cache_context.invalidate,
+            )
+            user_who_share_room.update(user_ids)
+
+        defer.returnValue(user_who_share_room)
+
     def forget(self, user_id, room_id):
         """Indicate that user_id wishes to discard history for room_id."""
         def f(txn):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 675bfd5feb..998de70d29 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -478,6 +478,11 @@ class CacheListDescriptor(object):
 
 
 class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
+    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
+    # which namedtuple does for us (i.e. two _CacheContext are the same if
+    # their caches and keys match). This is important in particular to
+    # dedupe when we add callbacks to lru cache nodes, otherwise the number
+    # of callbacks would grow.
     def invalidate(self):
         self.cache.invalidate(self.key)