diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 7c1bb8cfe4..98e346d48e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -400,7 +400,7 @@ class RoomMemberHandler(BaseHandler):
@defer.inlineCallbacks
def get_inviter(self, user_id, room_id):
- invite = yield self.store.get_room_member(user_id=user_id, room_id=room_id)
+ invite = yield self.store.get_inviter(user_id=user_id, room_id=room_id)
if invite:
defer.returnValue(UserID.from_string(invite.sender))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c4dc3b3d51..5d299a1132 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -367,7 +367,8 @@ class EventsStore(SQLBaseStore):
event
for event, _ in events_and_contexts
if event.type == EventTypes.Member
- ]
+ ],
+ backfilled=backfilled,
)
def event_dict(event):
@@ -485,14 +486,8 @@ class EventsStore(SQLBaseStore):
return
for event, _ in state_events_and_contexts:
- if (not event.internal_metadata.is_invite_from_remote()
- and event.internal_metadata.is_outlier()):
- # Outlier events generally shouldn't clobber the current state.
- # However invites from remote severs for rooms we aren't in
- # are a bit special: they don't come with any associated
- # state so are technically an outlier, however all the
- # client-facing code assumes that they are in the current
- # state table so we insert the event anyway.
+ if event.internal_metadata.is_outlier():
+ # Outlier events shouldn't clobber the current state.
continue
if context.rejected:
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 3f29aad1e8..4099387ba7 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 30
+SCHEMA_VERSION = 31
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 430b49c12e..4c026b33ae 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -36,7 +36,7 @@ RoomsForUser = namedtuple(
class RoomMemberStore(SQLBaseStore):
- def _store_room_members_txn(self, txn, events):
+ def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
"""
self._simple_insert_many_txn(
@@ -62,6 +62,41 @@ class RoomMemberStore(SQLBaseStore):
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
)
+ txn.call_after(
+ self.get_invited_rooms_for_user.invalidate, (event.state_key,)
+ )
+
+ is_mine = self.hs.is_mine_id(event.state_key)
+ is_new_state = not backfilled and (
+ not event.internal_metadata.is_outlier()
+ or event.internal_metadata.is_invite_from_remote()
+ )
+ if is_new_state and is_mine:
+ if event.membership == Membership.INVITE:
+ self._simple_insert_txn(
+ txn,
+ table="invites",
+ values={
+ "event_id": event.event_id,
+ "invitee": event.state_key,
+ "inviter": event.sender,
+ "room_id": event.room_id,
+ "stream_id": event.internal_metadata.stream_ordering,
+ }
+ )
+ else:
+ sql = (
+ "UPDATE invites SET stream_id = ?, replaced_by = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ txn.execute(sql, (
+ event.internal_metadata.stream_ordering,
+ event.event_id,
+ event.room_id,
+ event.state_key,
+ ))
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -127,6 +162,14 @@ class RoomMemberStore(SQLBaseStore):
user_id, [Membership.INVITE]
)
+ @defer.inlineCallbacks
+ def get_inviter(self, user_id, room_id):
+ invites = yield self.get_invited_rooms_for_user(user_id)
+ for invite in invites:
+ if invite.room_id == room_id:
+ defer.returnValue(invite)
+ defer.returnValue(None)
+
def get_leave_and_ban_events_for_user(self, user_id):
""" Get all the leave events for a user
Args:
@@ -163,29 +206,55 @@ class RoomMemberStore(SQLBaseStore):
def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
membership_list):
- where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
- " OR ".join(["membership = ?" for _ in membership_list]),
- )
- args = [user_id]
- args.extend(membership_list)
+ do_invite = Membership.INVITE in membership_list
+ membership_list = [m for m in membership_list if m != Membership.INVITE]
- sql = (
- "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
- " FROM current_state_events as c"
- " INNER JOIN room_memberships as m"
- " ON m.event_id = c.event_id"
- " INNER JOIN events as e"
- " ON e.event_id = c.event_id"
- " AND m.room_id = c.room_id"
- " AND m.user_id = c.state_key"
- " WHERE %s"
- ) % (where_clause,)
+ results = []
+ if membership_list:
+ where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
+ " OR ".join(["membership = ?" for _ in membership_list]),
+ )
+
+ args = [user_id]
+ args.extend(membership_list)
+
+ sql = (
+ "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+ " FROM current_state_events as c"
+ " INNER JOIN room_memberships as m"
+ " ON m.event_id = c.event_id"
+ " INNER JOIN events as e"
+ " ON e.event_id = c.event_id"
+ " AND m.room_id = c.room_id"
+ " AND m.user_id = c.state_key"
+ " WHERE %s"
+ ) % (where_clause,)
+
+ txn.execute(sql, args)
+ results = [
+ RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+ ]
+
+ if do_invite:
+ sql = (
+ "SELECT i.room_id, inviter, i.event_id, e.stream_ordering"
+ " FROM invites as i"
+ " INNER JOIN events as e USING (event_id)"
+ " WHERE invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ txn.execute(sql, (user_id,))
+ results.extend(RoomsForUser(
+ room_id=r["room_id"],
+ sender=r["inviter"],
+ event_id=r["event_id"],
+ stream_ordering=r["stream_ordering"],
+ membership=Membership.INVITE,
+ ) for r in self.cursor_to_dict(txn))
- txn.execute(sql, args)
- return [
- RoomsForUser(**r) for r in self.cursor_to_dict(txn)
- ]
+ return results
@cached(max_entries=5000)
def get_joined_hosts_for_room(self, room_id):
diff --git a/synapse/storage/schema/delta/31/invites.sql b/synapse/storage/schema/delta/31/invites.sql
new file mode 100644
index 0000000000..4f6fb9ea63
--- /dev/null
+++ b/synapse/storage/schema/delta/31/invites.sql
@@ -0,0 +1,28 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE invites(
+ stream_id BIGINT NOT NULL,
+ inviter TEXT NOT NULL,
+ invitee TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ locally_rejected TEXT,
+ replaced_by TEXT
+);
+
+CREATE INDEX invites_id ON invites(stream_id);
+CREATE INDEX invites_for_user_idx ON invites(invitee, locally_rejected, replaced_by, room_id);
|