diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index a2e7aa47d8..45f0a4c599 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore):
remote_attestation (dict): If remote group then store the remote
attestation from the group, else None.
"""
- def _register_user_group_membership_txn(txn):
+ def _register_user_group_membership_txn(txn, next_id):
# TODO: Upsert?
self._simple_delete_txn(
txn,
@@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore):
},
)
+ self._simple_insert_txn(
+ txn,
+ table="local_group_updates",
+ values={
+ "stream_id": next_id,
+ "group_id": group_id,
+ "user_id": user_id,
+ "type": "membership",
+ "content": json.dumps({"membership": membership, "content": content}),
+ }
+ )
+ self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
+
# TODO: Insert profile to ensure it comes down stream if its a join.
if membership == "join":
@@ -840,10 +853,11 @@ class GroupServerStore(SQLBaseStore):
},
)
- yield self.runInteraction(
- "register_user_group_membership",
- _register_user_group_membership_txn,
- )
+ with self._group_updates_id_gen.get_next() as next_id:
+ yield self.runInteraction(
+ "register_user_group_membership",
+ _register_user_group_membership_txn, next_id,
+ )
@defer.inlineCallbacks
def create_group(self, group_id, user_id, name, avatar_url, short_description,
@@ -937,3 +951,47 @@ class GroupServerStore(SQLBaseStore):
retcol="group_id",
desc="get_joined_groups",
)
+
+ def get_all_groups_for_user(self, user_id, now_token):
+ def _get_all_groups_for_user_txn(txn):
+ sql = """
+ SELECT group_id, type, membership, u.content
+ FROM local_group_updates AS u
+ INNER JOIN local_group_membership USING (group_id, user_id)
+ WHERE user_id = ? AND membership != 'leave'
+ AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, now_token,))
+ return self.cursor_to_dict(txn)
+ return self.runInteraction(
+ "get_all_groups_for_user", _get_all_groups_for_user_txn,
+ )
+
+ def get_groups_changes_for_user(self, user_id, from_token, to_token):
+ from_token = int(from_token)
+ has_changed = self._group_updates_stream_cache.has_entity_changed(
+ user_id, from_token,
+ )
+ if not has_changed:
+ return []
+
+ def _get_groups_changes_for_user_txn(txn):
+ sql = """
+ SELECT group_id, membership, type, u.content
+ FROM local_group_updates AS u
+ INNER JOIN local_group_membership USING (group_id, user_id)
+ WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, from_token, to_token,))
+ return [{
+ "group_id": group_id,
+ "membership": membership,
+ "type": gtype,
+ "content": json.loads(content_json),
+ } for group_id, membership, gtype, content_json in txn]
+ return self.runInteraction(
+ "get_groups_changes_for_user", _get_groups_changes_for_user_txn,
+ )
+
+ def get_group_stream_token(self):
+ return self._group_updates_id_gen.get_current_token()
|