summary refs log tree commit diff
path: root/synapse/storage/group_server.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-07-21 11:05:39 +0100
committerGitHub <noreply@github.com>2017-07-21 11:05:39 +0100
commit96917d555240f52c98d698f95e2ff1e65c8b0b5d (patch)
treedf70bdc99fc2db0a229a42a55b1de2efda99e883 /synapse/storage/group_server.py
parentMerge pull request #2377 from matrix-org/erikj/group_profile_update (diff)
parentAdd notifier (diff)
downloadsynapse-96917d555240f52c98d698f95e2ff1e65c8b0b5d.tar.xz
Merge pull request #2378 from matrix-org/erikj/group_sync_support
Add groups to sync stream
Diffstat (limited to 'synapse/storage/group_server.py')
-rw-r--r--synapse/storage/group_server.py91
1 files changed, 86 insertions, 5 deletions
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index ce0f513e30..d42e215b26 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore):
             remote_attestation (dict): If remote group then store the remote
                 attestation from the group, else None.
         """
-        def _register_user_group_membership_txn(txn):
+        def _register_user_group_membership_txn(txn, next_id):
             # TODO: Upsert?
             self._simple_delete_txn(
                 txn,
@@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore):
                 },
             )
 
+            self._simple_insert_txn(
+                txn,
+                table="local_group_updates",
+                values={
+                    "stream_id": next_id,
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "type": "membership",
+                    "content": json.dumps({"membership": membership, "content": content}),
+                }
+            )
+            self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
+
             # TODO: Insert profile to ensure it comes down stream if its a join.
 
             if membership == "join":
@@ -840,10 +853,13 @@ class GroupServerStore(SQLBaseStore):
                     },
                 )
 
-        yield self.runInteraction(
-            "register_user_group_membership",
-            _register_user_group_membership_txn,
-        )
+            return next_id
+
+        with self._group_updates_id_gen.get_next() as next_id:
+            yield self.runInteraction(
+                "register_user_group_membership",
+                _register_user_group_membership_txn, next_id,
+            )
 
     @defer.inlineCallbacks
     def create_group(self, group_id, user_id, name, avatar_url, short_description,
@@ -948,3 +964,68 @@ class GroupServerStore(SQLBaseStore):
             retcol="group_id",
             desc="get_joined_groups",
         )
+
+    def get_all_groups_for_user(self, user_id, now_token):
+        def _get_all_groups_for_user_txn(txn):
+            sql = """
+                SELECT group_id, type, membership, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND membership != 'leave'
+                    AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, now_token,))
+            return self.cursor_to_dict(txn)
+        return self.runInteraction(
+            "get_all_groups_for_user", _get_all_groups_for_user_txn,
+        )
+
+    def get_groups_changes_for_user(self, user_id, from_token, to_token):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_entity_changed(
+            user_id, from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_groups_changes_for_user_txn(txn):
+            sql = """
+                SELECT group_id, membership, type, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, from_token, to_token,))
+            return [{
+                "group_id": group_id,
+                "membership": membership,
+                "type": gtype,
+                "content": json.loads(content_json),
+            } for group_id, membership, gtype, content_json in txn]
+        return self.runInteraction(
+            "get_groups_changes_for_user", _get_groups_changes_for_user_txn,
+        )
+
+    def get_all_groups_changes(self, from_token, to_token, limit):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
+            from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_all_groups_changes_txn(txn):
+            sql = """
+                SELECT stream_id, group_id, user_id, type, content
+                FROM local_group_updates
+                WHERE ? < stream_id AND stream_id <= ?
+                LIMIT ?
+            """
+            txn.execute(sql, (from_token, to_token, limit,))
+            return txn.fetchall()
+        return self.runInteraction(
+            "get_all_groups_changes", _get_all_groups_changes_txn,
+        )
+
+    def get_group_stream_token(self):
+        return self._group_updates_id_gen.get_current_token()