summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation.py39
-rw-r--r--synapse/handlers/room_member.py23
-rw-r--r--synapse/storage/__init__.py3
-rw-r--r--synapse/storage/events.py13
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/roommember.py144
-rw-r--r--synapse/storage/schema/delta/31/invites.sql42
7 files changed, 217 insertions, 49 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index adafd06b24..eb02f0e000 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -284,6 +284,9 @@ class FederationHandler(BaseHandler):
     def backfill(self, dest, room_id, limit, extremities=[]):
         """ Trigger a backfill request to `dest` for the given `room_id`
         """
+        if dest == self.server_name:
+            raise SynapseError(400, "Can't backfill from self.")
+
         if not extremities:
             extremities = yield self.store.get_oldest_events_in_room(room_id)
 
@@ -450,7 +453,7 @@ class FederationHandler(BaseHandler):
 
         likely_domains = [
             domain for domain, depth in curr_domains
-            if domain is not self.server_name
+            if domain != self.server_name
         ]
 
         @defer.inlineCallbacks
@@ -784,13 +787,19 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
-        origin, event = yield self._make_and_verify_event(
-            target_hosts,
-            room_id,
-            user_id,
-            "leave"
-        )
-        signed_event = self._sign_event(event)
+        try:
+            origin, event = yield self._make_and_verify_event(
+                target_hosts,
+                room_id,
+                user_id,
+                "leave"
+            )
+            signed_event = self._sign_event(event)
+        except SynapseError:
+            raise
+        except CodeMessageException as e:
+            logger.warn("Failed to reject invite: %s", e)
+            raise SynapseError(500, "Failed to reject invite")
 
         # Try the host we successfully got a response to /make_join/
         # request first.
@@ -800,10 +809,16 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        yield self.replication_layer.send_leave(
-            target_hosts,
-            signed_event
-        )
+        try:
+            yield self.replication_layer.send_leave(
+                target_hosts,
+                signed_event
+            )
+        except SynapseError:
+            raise
+        except CodeMessageException as e:
+            logger.warn("Failed to reject invite: %s", e)
+            raise SynapseError(500, "Failed to reject invite")
 
         context = yield self.state_handler.compute_event_context(event)
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2f788a4b1b..fe2315df8f 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -258,10 +258,20 @@ class RoomMemberHandler(BaseHandler):
                 else:
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    ret = yield self.reject_remote_invite(
-                        target.to_string(), room_id, remote_room_hosts
-                    )
-                    defer.returnValue(ret)
+
+                    try:
+                        ret = yield self.reject_remote_invite(
+                            target.to_string(), room_id, remote_room_hosts
+                        )
+                        defer.returnValue(ret)
+                    except SynapseError as e:
+                        logger.warn("Failed to reject invite: %s", e)
+
+                        yield self.store.locally_reject_invite(
+                            target.to_string(), room_id
+                        )
+
+                        defer.returnValue({})
 
         yield self._local_membership_update(
             requester=requester,
@@ -400,7 +410,10 @@ class RoomMemberHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_inviter(self, user_id, room_id):
-        invite = yield self.store.get_room_member(user_id=user_id, room_id=room_id)
+        invite = yield self.store.get_invite_for_user_in_room(
+            user_id=user_id,
+            room_id=room_id,
+        )
         if invite:
             defer.returnValue(UserID.from_string(invite.sender))
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 57863bba4d..07916b292d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -94,7 +94,8 @@ class DataStore(RoomMemberStore, RoomStore,
         )
 
         self._stream_id_gen = StreamIdGenerator(
-            db_conn, "events", "stream_ordering"
+            db_conn, "events", "stream_ordering",
+            extra_tables=[("local_invites", "stream_id")]
         )
         self._backfill_id_gen = StreamIdGenerator(
             db_conn, "events", "stream_ordering", step=-1
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c4dc3b3d51..5d299a1132 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -367,7 +367,8 @@ class EventsStore(SQLBaseStore):
                 event
                 for event, _ in events_and_contexts
                 if event.type == EventTypes.Member
-            ]
+            ],
+            backfilled=backfilled,
         )
 
         def event_dict(event):
@@ -485,14 +486,8 @@ class EventsStore(SQLBaseStore):
             return
 
         for event, _ in state_events_and_contexts:
-            if (not event.internal_metadata.is_invite_from_remote()
-                    and event.internal_metadata.is_outlier()):
-                # Outlier events generally shouldn't clobber the current state.
-                # However invites from remote severs for rooms we aren't in
-                # are a bit special: they don't come with any associated
-                # state so are technically an outlier, however all the
-                # client-facing code assumes that they are in the current
-                # state table so we insert the event anyway.
+            if event.internal_metadata.is_outlier():
+                # Outlier events shouldn't clobber the current state.
                 continue
 
             if context.rejected:
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 3f29aad1e8..4099387ba7 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 30
+SCHEMA_VERSION = 31
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 430b49c12e..66e7a40e3c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -36,7 +36,7 @@ RoomsForUser = namedtuple(
 
 class RoomMemberStore(SQLBaseStore):
 
-    def _store_room_members_txn(self, txn, events):
+    def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
         """
         self._simple_insert_many_txn(
@@ -62,6 +62,64 @@ class RoomMemberStore(SQLBaseStore):
                 self._membership_stream_cache.entity_has_changed,
                 event.state_key, event.internal_metadata.stream_ordering
             )
+            txn.call_after(
+                self.get_invited_rooms_for_user.invalidate, (event.state_key,)
+            )
+
+            # We update the local_invites table only if the event is "current",
+            # i.e., its something that has just happened.
+            # The only current event that can also be an outlier is if its an
+            # invite that has come in across federation.
+            is_new_state = not backfilled and (
+                not event.internal_metadata.is_outlier()
+                or event.internal_metadata.is_invite_from_remote()
+            )
+            is_mine = self.hs.is_mine_id(event.state_key)
+            if is_new_state and is_mine:
+                if event.membership == Membership.INVITE:
+                    self._simple_insert_txn(
+                        txn,
+                        table="local_invites",
+                        values={
+                            "event_id": event.event_id,
+                            "invitee": event.state_key,
+                            "inviter": event.sender,
+                            "room_id": event.room_id,
+                            "stream_id": event.internal_metadata.stream_ordering,
+                        }
+                    )
+                else:
+                    sql = (
+                        "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
+                        " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+                        " AND replaced_by is NULL"
+                    )
+
+                    txn.execute(sql, (
+                        event.internal_metadata.stream_ordering,
+                        event.event_id,
+                        event.room_id,
+                        event.state_key,
+                    ))
+
+    @defer.inlineCallbacks
+    def locally_reject_invite(self, user_id, room_id):
+        sql = (
+            "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+            " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+            " AND replaced_by is NULL"
+        )
+
+        def f(txn, stream_ordering):
+            txn.execute(sql, (
+                stream_ordering,
+                True,
+                room_id,
+                user_id,
+            ))
+
+        with self._stream_id_gen.get_next() as stream_ordering:
+            yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
@@ -127,6 +185,24 @@ class RoomMemberStore(SQLBaseStore):
             user_id, [Membership.INVITE]
         )
 
+    @defer.inlineCallbacks
+    def get_invite_for_user_in_room(self, user_id, room_id):
+        """Gets the invite for the given user and room
+
+        Args:
+            user_id (str)
+            room_id (str)
+
+        Returns:
+            Deferred: Resolves to either a RoomsForUser or None if no invite was
+                found.
+        """
+        invites = yield self.get_invited_rooms_for_user(user_id)
+        for invite in invites:
+            if invite.room_id == room_id:
+                defer.returnValue(invite)
+        defer.returnValue(None)
+
     def get_leave_and_ban_events_for_user(self, user_id):
         """ Get all the leave events for a user
         Args:
@@ -163,29 +239,55 @@ class RoomMemberStore(SQLBaseStore):
 
     def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
                                                     membership_list):
-        where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
-            " OR ".join(["membership = ?" for _ in membership_list]),
-        )
 
-        args = [user_id]
-        args.extend(membership_list)
+        do_invite = Membership.INVITE in membership_list
+        membership_list = [m for m in membership_list if m != Membership.INVITE]
 
-        sql = (
-            "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
-            " FROM current_state_events as c"
-            " INNER JOIN room_memberships as m"
-            " ON m.event_id = c.event_id"
-            " INNER JOIN events as e"
-            " ON e.event_id = c.event_id"
-            " AND m.room_id = c.room_id"
-            " AND m.user_id = c.state_key"
-            " WHERE %s"
-        ) % (where_clause,)
+        results = []
+        if membership_list:
+            where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
+                " OR ".join(["membership = ?" for _ in membership_list]),
+            )
+
+            args = [user_id]
+            args.extend(membership_list)
+
+            sql = (
+                "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+                " FROM current_state_events as c"
+                " INNER JOIN room_memberships as m"
+                " ON m.event_id = c.event_id"
+                " INNER JOIN events as e"
+                " ON e.event_id = c.event_id"
+                " AND m.room_id = c.room_id"
+                " AND m.user_id = c.state_key"
+                " WHERE %s"
+            ) % (where_clause,)
+
+            txn.execute(sql, args)
+            results = [
+                RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+            ]
+
+        if do_invite:
+            sql = (
+                "SELECT i.room_id, inviter, i.event_id, e.stream_ordering"
+                " FROM local_invites as i"
+                " INNER JOIN events as e USING (event_id)"
+                " WHERE invitee = ? AND locally_rejected is NULL"
+                " AND replaced_by is NULL"
+            )
+
+            txn.execute(sql, (user_id,))
+            results.extend(RoomsForUser(
+                room_id=r["room_id"],
+                sender=r["inviter"],
+                event_id=r["event_id"],
+                stream_ordering=r["stream_ordering"],
+                membership=Membership.INVITE,
+            ) for r in self.cursor_to_dict(txn))
 
-        txn.execute(sql, args)
-        return [
-            RoomsForUser(**r) for r in self.cursor_to_dict(txn)
-        ]
+        return results
 
     @cached(max_entries=5000)
     def get_joined_hosts_for_room(self, room_id):
diff --git a/synapse/storage/schema/delta/31/invites.sql b/synapse/storage/schema/delta/31/invites.sql
new file mode 100644
index 0000000000..2c57846d5a
--- /dev/null
+++ b/synapse/storage/schema/delta/31/invites.sql
@@ -0,0 +1,42 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE local_invites(
+    stream_id BIGINT NOT NULL,
+    inviter TEXT NOT NULL,
+    invitee TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    locally_rejected TEXT,
+    replaced_by TEXT
+);
+
+-- Insert all invites for local users into new `invites` table
+INSERT INTO local_invites SELECT
+        stream_ordering as stream_id,
+        sender as inviter,
+        state_key as invitee,
+        event_id,
+        room_id,
+        NULL as locally_rejected,
+        NULL as replaced_by
+    FROM events
+    NATURAL JOIN current_state_events
+    NATURAL JOIN room_memberships
+    WHERE membership = 'invite'  AND state_key IN (SELECT name FROM users);
+
+CREATE INDEX local_invites_id ON local_invites(stream_id);
+CREATE INDEX local_invites_for_user_idx ON local_invites(invitee, locally_rejected, replaced_by, room_id);