From 2f9eafdd369796d8b7731b24ab8cf6a98ad19e29 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Jul 2017 14:52:27 +0100 Subject: Add local group server support --- synapse/storage/__init__.py | 15 +++ synapse/storage/group_server.py | 152 +++++++++++++++++++++++ synapse/storage/schema/delta/43/group_server.sql | 28 +++++ 3 files changed, 195 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fdee9f1ad5..594566eb38 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._group_updates_id_gen = StreamIdGenerator( + db_conn, "local_group_updates", "stream_id", + ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) + _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + db_conn, "local_group_updates", + entity_column="user_id", + stream_column="stream_id", + max_value=self._group_updates_id_gen.get_current_token(), + limit=1000, + ) + self._group_updates_stream_cache = StreamChangeCache( + "_group_updates_stream_cache", min_group_updates_id, + prefilled_cache=_group_updates_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index e8a799d8c7..036549d437 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -756,6 +756,103 @@ class GroupServerStore(SQLBaseStore): desc="add_room_to_group", ) + @defer.inlineCallbacks + def register_user_group_membership(self, group_id, user_id, membership, + is_admin=False, content={}, + local_attestation=None, + remote_attestation=None, + ): + def _register_user_group_membership_txn(txn, next_id): + # TODO: Upsert? + self._simple_delete_txn( + txn, + table="local_group_membership", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_insert_txn( + txn, + table="local_group_membership", + values={ + "group_id": group_id, + "user_id": user_id, + "is_admin": is_admin, + "membership": membership, + "content": json.dumps(content), + }, + ) + self._simple_delete_txn( + txn, + table="local_group_updates", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + "type": "membership", + }, + ) + self._simple_insert_txn( + txn, + table="local_group_updates", + values={ + "stream_id": next_id, + "group_id": group_id, + "user_id": user_id, + "type": "membership", + "content": json.dumps({"membership": membership, "content": content}), + } + ) + self._group_updates_stream_cache.entity_has_changed(user_id, next_id) + + # TODO: Insert profile to ensuer it comes down stream if its a join. + + if membership == "join": + if local_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_renewals", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": local_attestation["valid_until_ms"], + } + ) + if remote_attestation: + self._simple_insert_txn( + txn, + table="group_attestations_remote", + values={ + "group_id": group_id, + "user_id": user_id, + "valid_until_ms": remote_attestation["valid_until_ms"], + "attestation": json.dumps(remote_attestation), + } + ) + else: + self._simple_delete_txn( + txn, + table="group_attestations_renewals", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + self._simple_delete_txn( + txn, + table="group_attestations_remote", + keyvalues={ + "group_id": group_id, + "user_id": user_id, + }, + ) + + with self._group_updates_id_gen.get_next() as next_id: + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, next_id, + ) + @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, long_description,): @@ -771,6 +868,61 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) + + def get_all_groups_for_user(self, user_id, now_token): + def _get_all_groups_for_user_txn(txn): + sql = """ + SELECT group_id, type, membership, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND membership != 'leave' + AND stream_id <= ? + """ + txn.execute(sql, (user_id, now_token,)) + return self.cursor_to_dict(txn) + return self.runInteraction( + "get_all_groups_for_user", _get_all_groups_for_user_txn, + ) + + def get_groups_changes_for_user(self, user_id, from_token, to_token): + from_token = int(from_token) + has_changed = self._group_updates_stream_cache.has_entity_changed( + user_id, from_token, + ) + if not has_changed: + return [] + + def _get_groups_changes_for_user_txn(txn): + sql = """ + SELECT group_id, membership, type, u.content + FROM local_group_updates AS u + INNER JOIN local_group_membership USING (group_id, user_id) + WHERE user_id = ? AND ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (user_id, from_token, to_token,)) + return [{ + "group_id": group_id, + "membership": membership, + "type": gtype, + "content": json.loads(content_json), + } for group_id, membership, gtype, content_json in txn] + return self.runInteraction( + "get_groups_changes_for_user", _get_groups_changes_for_user_txn, + ) + + def get_group_stream_token(self): + return self._group_updates_id_gen.get_current_token() + def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index 472aab0a78..e32db8b313 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -142,3 +142,31 @@ CREATE TABLE group_attestations_remote ( CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); + + +CREATE TABLE local_group_membership ( + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + is_admin BOOLEAN NOT NULL, + membership TEXT NOT NULL, + content TEXT NOT NULL +); + +CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); +CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); + + +CREATE TABLE local_group_updates ( + stream_id BIGINT NOT NULL, + group_id TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, + content TEXT NOT NULL +); + + +CREATE TABLE local_group_profiles ( + group_id TEXT NOT NULL, + name TEXT, + avatar_url TEXT +); -- cgit 1.5.1 From e96ee95a7e84e7d75ac57395bb64e1c3428596e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:33:16 +0100 Subject: Remove sync stuff --- synapse/storage/__init__.py | 15 ----------- synapse/storage/group_server.py | 55 ----------------------------------------- 2 files changed, 70 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 594566eb38..fdee9f1ad5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -136,9 +136,6 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) - self._group_updates_id_gen = StreamIdGenerator( - db_conn, "local_group_updates", "stream_id", - ) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( @@ -239,18 +236,6 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=curr_state_delta_prefill, ) - _group_updates_prefill, min_group_updates_id = self._get_cache_dict( - db_conn, "local_group_updates", - entity_column="user_id", - stream_column="stream_id", - max_value=self._group_updates_id_gen.get_current_token(), - limit=1000, - ) - self._group_updates_stream_cache = StreamChangeCache( - "_group_updates_stream_cache", min_group_updates_id, - prefilled_cache=_group_updates_prefill, - ) - cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 036549d437..2dcdcbfdfc 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -868,61 +868,6 @@ class GroupServerStore(SQLBaseStore): desc="create_group", ) - def get_joined_groups(self, user_id): - return self._simple_select_onecol( - table="local_group_membership", - keyvalues={ - "user_id": user_id, - "membership": "join", - }, - retcol="group_id", - desc="get_joined_groups", - ) - - def get_all_groups_for_user(self, user_id, now_token): - def _get_all_groups_for_user_txn(txn): - sql = """ - SELECT group_id, type, membership, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND membership != 'leave' - AND stream_id <= ? - """ - txn.execute(sql, (user_id, now_token,)) - return self.cursor_to_dict(txn) - return self.runInteraction( - "get_all_groups_for_user", _get_all_groups_for_user_txn, - ) - - def get_groups_changes_for_user(self, user_id, from_token, to_token): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_entity_changed( - user_id, from_token, - ) - if not has_changed: - return [] - - def _get_groups_changes_for_user_txn(txn): - sql = """ - SELECT group_id, membership, type, u.content - FROM local_group_updates AS u - INNER JOIN local_group_membership USING (group_id, user_id) - WHERE user_id = ? AND ? < stream_id AND stream_id <= ? - """ - txn.execute(sql, (user_id, from_token, to_token,)) - return [{ - "group_id": group_id, - "membership": membership, - "type": gtype, - "content": json.loads(content_json), - } for group_id, membership, gtype, content_json in txn] - return self.runInteraction( - "get_groups_changes_for_user", _get_groups_changes_for_user_txn, - ) - - def get_group_stream_token(self): - return self._group_updates_id_gen.get_current_token() - def get_attestations_need_renewals(self, valid_until_ms): """Get all attestations that need to be renewed until givent time """ -- cgit 1.5.1 From 45407301111e55d04f46957a824d73eab69796de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:36:17 +0100 Subject: Remove unused tables --- synapse/storage/schema/delta/43/group_server.sql | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index e32db8b313..f9e11c9146 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -163,10 +163,3 @@ CREATE TABLE local_group_updates ( type TEXT NOT NULL, content TEXT NOT NULL ); - - -CREATE TABLE local_group_profiles ( - group_id TEXT NOT NULL, - name TEXT, - avatar_url TEXT -); -- cgit 1.5.1 From 508460f24077da74d8a3d3ce891c0b55ebbce2e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 09:55:46 +0100 Subject: Remove sync stuff --- synapse/storage/group_server.py | 20 -------------------- synapse/storage/schema/delta/43/group_server.sql | 10 +--------- 2 files changed, 1 insertion(+), 29 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 2dcdcbfdfc..bff2324cc7 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -783,26 +783,6 @@ class GroupServerStore(SQLBaseStore): "content": json.dumps(content), }, ) - self._simple_delete_txn( - txn, - table="local_group_updates", - keyvalues={ - "group_id": group_id, - "user_id": user_id, - "type": "membership", - }, - ) - self._simple_insert_txn( - txn, - table="local_group_updates", - values={ - "stream_id": next_id, - "group_id": group_id, - "user_id": user_id, - "type": "membership", - "content": json.dumps({"membership": membership, "content": content}), - } - ) self._group_updates_stream_cache.entity_has_changed(user_id, next_id) # TODO: Insert profile to ensuer it comes down stream if its a join. diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql index f9e11c9146..e1fd47aa7f 100644 --- a/synapse/storage/schema/delta/43/group_server.sql +++ b/synapse/storage/schema/delta/43/group_server.sql @@ -144,6 +144,7 @@ CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_i CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); +-- The group membership for the HS's users CREATE TABLE local_group_membership ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -154,12 +155,3 @@ CREATE TABLE local_group_membership ( CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); - - -CREATE TABLE local_group_updates ( - stream_id BIGINT NOT NULL, - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - type TEXT NOT NULL, - content TEXT NOT NULL -); -- cgit 1.5.1 From 3e703eb04e1b30dc2bce03d3895ac79ac24a063d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 10:17:25 +0100 Subject: Comment --- synapse/storage/group_server.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index bff2324cc7..3c6ee7df68 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -762,6 +762,20 @@ class GroupServerStore(SQLBaseStore): local_attestation=None, remote_attestation=None, ): + """Registers that a local user is a member of a (local or remote) group. + + Args: + group_id (str) + user_id (str) + membership (str) + is_admin (bool) + content (dict): Content of the membership, e.g. includes the inviter + if the user has been invited. + local_attestation (dict): If remote group then store the fact that we + have given out an attestation, else None. + remote_attestation (dict): If remote group then store the remote + attestation from the group, else None. + """ def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self._simple_delete_txn( -- cgit 1.5.1 From 94ecd871a047707da5998f83440c039d064de8aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Jul 2017 16:38:54 +0100 Subject: Fix typos --- synapse/federation/transport/client.py | 4 ++-- synapse/handlers/groups_local.py | 5 +++-- synapse/storage/group_server.py | 25 +++++++++++++++++-------- 3 files changed, 22 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 500f3622a2..e4d84c06c1 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -495,7 +495,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_rooms(self, destination, group_id, requester_user_id): + def get_rooms_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/rooms" % (group_id,) return self.client.get_json( @@ -518,7 +518,7 @@ class TransportLayerClient(object): ) @log_function - def get_group_users(self, destination, group_id, requester_user_id): + def get_users_in_group(self, destination, group_id, requester_user_id): path = PREFIX + "/groups/%s/users" % (group_id,) return self.client.get_json( diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 7d7fc5d976..50f7fce885 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -159,7 +159,7 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) - res = yield self.transport_client.get_group_users( + res = yield self.transport_client.get_users_in_group( get_domain_from_id(group_id), group_id, requester_user_id, ) @@ -278,7 +278,8 @@ class GroupsLocalHandler(object): else: content["requester_user_id"] = requester_user_id res = yield self.transport_client.remove_user_from_group( - get_domain_from_id(group_id), group_id, user_id, content + get_domain_from_id(group_id), group_id, requester_user_id, + user_id, content, ) defer.returnValue(res) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3c6ee7df68..0a69e0f501 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore): remote_attestation (dict): If remote group then store the remote attestation from the group, else None. """ - def _register_user_group_membership_txn(txn, next_id): + def _register_user_group_membership_txn(txn): # TODO: Upsert? self._simple_delete_txn( txn, @@ -797,7 +797,6 @@ class GroupServerStore(SQLBaseStore): "content": json.dumps(content), }, ) - self._group_updates_stream_cache.entity_has_changed(user_id, next_id) # TODO: Insert profile to ensuer it comes down stream if its a join. @@ -820,7 +819,7 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, "user_id": user_id, "valid_until_ms": remote_attestation["valid_until_ms"], - "attestation": json.dumps(remote_attestation), + "attestation_json": json.dumps(remote_attestation), } ) else: @@ -841,11 +840,10 @@ class GroupServerStore(SQLBaseStore): }, ) - with self._group_updates_id_gen.get_next() as next_id: - yield self.runInteraction( - "register_user_group_membership", - _register_user_group_membership_txn, next_id, - ) + yield self.runInteraction( + "register_user_group_membership", + _register_user_group_membership_txn, + ) @defer.inlineCallbacks def create_group(self, group_id, user_id, name, avatar_url, short_description, @@ -928,3 +926,14 @@ class GroupServerStore(SQLBaseStore): defer.returnValue(json.loads(row["attestation_json"])) defer.returnValue(None) + + def get_joined_groups(self, user_id): + return self._simple_select_onecol( + table="local_group_membership", + keyvalues={ + "user_id": user_id, + "membership": "join", + }, + retcol="group_id", + desc="get_joined_groups", + ) -- cgit 1.5.1 From 57826d645bd62ab534dbcab8d66a98daec145459 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 Jul 2017 13:15:22 +0100 Subject: Fix typo --- synapse/storage/group_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 0a69e0f501..a2e7aa47d8 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -798,7 +798,7 @@ class GroupServerStore(SQLBaseStore): }, ) - # TODO: Insert profile to ensuer it comes down stream if its a join. + # TODO: Insert profile to ensure it comes down stream if its a join. if membership == "join": if local_attestation: -- cgit 1.5.1