diff options
author | Erik Johnston <erik@matrix.org> | 2017-07-10 14:53:19 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-07-20 16:36:42 +0100 |
commit | c544188ee3644c85a97a3c4e09e63ad4e3c6f0cc (patch) | |
tree | 6e8f40f857d42827b2fef0c1f6e150480ff870f3 /synapse/storage | |
parent | Merge pull request #2374 from matrix-org/erikj/group_server_local (diff) | |
download | synapse-c544188ee3644c85a97a3c4e09e63ad4e3c6f0cc.tar.xz |
Add groups to sync stream
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 15 | ||||
-rw-r--r-- | synapse/storage/group_server.py | 68 | ||||
-rw-r--r-- | synapse/storage/schema/delta/43/group_server.sql | 9 |
3 files changed, 87 insertions, 5 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index a2e7aa47d8..45f0a4c599 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn): + def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self._simple_delete_txn( txn, @@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore): }, ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + # TODO: Insert profile to ensure it comes down stream if its a join. if membership == "join": @@ -840,10 +853,11 @@ class GroupServerStore(SQLBaseStore): }, ) - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, - ) + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -937,3 +951,47 @@ class GroupServerStore(SQLBaseStore): retcol="group_id", desc="get_joined_groups", ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index e1fd47aa7f..92f3339c94 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -155,3 +155,12 @@ CREATE TABLE local_group_membership ( CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); |