summary refs log tree commit diff
path: root/synapse/storage/group_server.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-07-10 14:53:19 +0100
committerErik Johnston <erik@matrix.org>2017-07-20 16:36:42 +0100
commitc544188ee3644c85a97a3c4e09e63ad4e3c6f0cc (patch)
tree6e8f40f857d42827b2fef0c1f6e150480ff870f3 /synapse/storage/group_server.py
parentMerge pull request #2374 from matrix-org/erikj/group_server_local (diff)
downloadsynapse-c544188ee3644c85a97a3c4e09e63ad4e3c6f0cc.tar.xz
Add groups to sync stream
Diffstat (limited to 'synapse/storage/group_server.py')
-rw-r--r--synapse/storage/group_server.py68
1 files changed, 63 insertions, 5 deletions
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index a2e7aa47d8..45f0a4c599 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore):
             remote_attestation (dict): If remote group then store the remote
                 attestation from the group, else None.
         """
-        def _register_user_group_membership_txn(txn):
+        def _register_user_group_membership_txn(txn, next_id):
             # TODO: Upsert?
             self._simple_delete_txn(
                 txn,
@@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore):
                 },
             )
 
+            self._simple_insert_txn(
+                txn,
+                table="local_group_updates",
+                values={
+                    "stream_id": next_id,
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "type": "membership",
+                    "content": json.dumps({"membership": membership, "content": content}),
+                }
+            )
+            self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
+
             # TODO: Insert profile to ensure it comes down stream if its a join.
 
             if membership == "join":
@@ -840,10 +853,11 @@ class GroupServerStore(SQLBaseStore):
                     },
                 )
 
-        yield self.runInteraction(
-            "register_user_group_membership",
-            _register_user_group_membership_txn,
-        )
+        with self._group_updates_id_gen.get_next() as next_id:
+            yield self.runInteraction(
+                "register_user_group_membership",
+                _register_user_group_membership_txn, next_id,
+            )
 
     @defer.inlineCallbacks
     def create_group(self, group_id, user_id, name, avatar_url, short_description,
@@ -937,3 +951,47 @@ class GroupServerStore(SQLBaseStore):
             retcol="group_id",
             desc="get_joined_groups",
         )
+
+    def get_all_groups_for_user(self, user_id, now_token):
+        def _get_all_groups_for_user_txn(txn):
+            sql = """
+                SELECT group_id, type, membership, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND membership != 'leave'
+                    AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, now_token,))
+            return self.cursor_to_dict(txn)
+        return self.runInteraction(
+            "get_all_groups_for_user", _get_all_groups_for_user_txn,
+        )
+
+    def get_groups_changes_for_user(self, user_id, from_token, to_token):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_entity_changed(
+            user_id, from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_groups_changes_for_user_txn(txn):
+            sql = """
+                SELECT group_id, membership, type, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, from_token, to_token,))
+            return [{
+                "group_id": group_id,
+                "membership": membership,
+                "type": gtype,
+                "content": json.loads(content_json),
+            } for group_id, membership, gtype, content_json in txn]
+        return self.runInteraction(
+            "get_groups_changes_for_user", _get_groups_changes_for_user_txn,
+        )
+
+    def get_group_stream_token(self):
+        return self._group_updates_id_gen.get_current_token()