summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2022-03-10 10:36:13 -0500
committerGitHub <noreply@github.com>2022-03-10 15:36:13 +0000
commitea27528b5d177dcfc5a4e38b463baeace916dc8e (patch)
tree4fc7ebe8c997256b8744de2f4f29c72f020b50c3
parentUpdates to the Room DAG concepts development document (#12179) (diff)
downloadsynapse-ea27528b5d177dcfc5a4e38b463baeace916dc8e.tar.xz
Support stable identifiers for MSC3440: Threading (#12151)
The unstable identifiers are still supported if the experimental configuration
flag is enabled. The unstable identifiers will be removed in a future release.
-rw-r--r--changelog.d/12151.feature1
-rw-r--r--synapse/api/constants.py4
-rw-r--r--synapse/api/filtering.py23
-rw-r--r--synapse/events/utils.py9
-rw-r--r--synapse/handlers/message.py5
-rw-r--r--synapse/rest/client/versions.py1
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/storage/databases/main/events.py5
-rw-r--r--synapse/storage/databases/main/relations.py77
-rw-r--r--synapse/storage/databases/main/stream.py18
-rw-r--r--tests/rest/client/test_relations.py7
-rw-r--r--tests/rest/client/test_rooms.py18
-rw-r--r--tests/storage/test_stream.py20
13 files changed, 109 insertions, 81 deletions
diff --git a/changelog.d/12151.feature b/changelog.d/12151.feature
new file mode 100644
index 0000000000..18432b2da9
--- /dev/null
+++ b/changelog.d/12151.feature
@@ -0,0 +1 @@
+Support the stable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440): threads.
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 36ace7c613..b0c08a074d 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -178,7 +178,9 @@ class RelationTypes:
     ANNOTATION: Final = "m.annotation"
     REPLACE: Final = "m.replace"
     REFERENCE: Final = "m.reference"
-    THREAD: Final = "io.element.thread"
+    THREAD: Final = "m.thread"
+    # TODO Remove this in Synapse >= v1.57.0.
+    UNSTABLE_THREAD: Final = "io.element.thread"
 
 
 class LimitBlockingTypes:
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index cb532d7238..27e97d6f37 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -88,7 +88,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
         "org.matrix.labels": {"type": "array", "items": {"type": "string"}},
         "org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
         # MSC3440, filtering by event relations.
+        "related_by_senders": {"type": "array", "items": {"type": "string"}},
         "io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
+        "related_by_rel_types": {"type": "array", "items": {"type": "string"}},
         "io.element.relation_types": {"type": "array", "items": {"type": "string"}},
     },
 }
@@ -318,19 +320,18 @@ class Filter:
         self.labels = filter_json.get("org.matrix.labels", None)
         self.not_labels = filter_json.get("org.matrix.not_labels", [])
 
-        # Ideally these would be rejected at the endpoint if they were provided
-        # and not supported, but that would involve modifying the JSON schema
-        # based on the homeserver configuration.
+        self.related_by_senders = self.filter_json.get("related_by_senders", None)
+        self.related_by_rel_types = self.filter_json.get("related_by_rel_types", None)
+
+        # Fallback to the unstable prefix if the stable version is not given.
         if hs.config.experimental.msc3440_enabled:
-            self.relation_senders = self.filter_json.get(
+            self.related_by_senders = self.related_by_senders or self.filter_json.get(
                 "io.element.relation_senders", None
             )
-            self.relation_types = self.filter_json.get(
-                "io.element.relation_types", None
+            self.related_by_rel_types = (
+                self.related_by_rel_types
+                or self.filter_json.get("io.element.relation_types", None)
             )
-        else:
-            self.relation_senders = None
-            self.relation_types = None
 
     def filters_all_types(self) -> bool:
         return "*" in self.not_types
@@ -461,7 +462,7 @@ class Filter:
         event_ids = [event.event_id for event in events if isinstance(event, EventBase)]  # type: ignore[attr-defined]
         event_ids_to_keep = set(
             await self._store.events_have_relations(
-                event_ids, self.relation_senders, self.relation_types
+                event_ids, self.related_by_senders, self.related_by_rel_types
             )
         )
 
@@ -474,7 +475,7 @@ class Filter:
     async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
         result = [event for event in events if self._check(event)]
 
-        if self.relation_senders or self.relation_types:
+        if self.related_by_senders or self.related_by_rel_types:
             return await self._check_event_relations(result)
 
         return result
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index ee34cb46e4..b2a237c1e0 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -38,6 +38,7 @@ from synapse.util.frozenutils import unfreeze
 from . import EventBase
 
 if TYPE_CHECKING:
+    from synapse.server import HomeServer
     from synapse.storage.databases.main.relations import BundledAggregations
 
 
@@ -395,6 +396,9 @@ class EventClientSerializer:
     clients.
     """
 
+    def __init__(self, hs: "HomeServer"):
+        self._msc3440_enabled = hs.config.experimental.msc3440_enabled
+
     def serialize_event(
         self,
         event: Union[JsonDict, EventBase],
@@ -515,11 +519,14 @@ class EventClientSerializer:
                     thread.latest_event, serialized_latest_event, thread.latest_edit
                 )
 
-            serialized_aggregations[RelationTypes.THREAD] = {
+            thread_summary = {
                 "latest_event": serialized_latest_event,
                 "count": thread.count,
                 "current_user_participated": thread.current_user_participated,
             }
+            serialized_aggregations[RelationTypes.THREAD] = thread_summary
+            if self._msc3440_enabled:
+                serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary
 
         # Include the bundled aggregations in the event.
         if serialized_aggregations:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0799ec9a84..f9544fe7fb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1079,7 +1079,10 @@ class EventCreationHandler:
                 raise SynapseError(400, "Can't send same reaction twice")
 
         # Don't attempt to start a thread if the parent event is a relation.
-        elif relation_type == RelationTypes.THREAD:
+        elif (
+            relation_type == RelationTypes.THREAD
+            or relation_type == RelationTypes.UNSTABLE_THREAD
+        ):
             if await self.store.event_includes_relation(relates_to):
                 raise SynapseError(
                     400, "Cannot start threads from an event with a relation"
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 2e5d0e4e22..9a65aa4843 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -101,6 +101,7 @@ class VersionsRestServlet(RestServlet):
                     "org.matrix.msc3030": self.config.experimental.msc3030_enabled,
                     # Adds support for thread relations, per MSC3440.
                     "org.matrix.msc3440": self.config.experimental.msc3440_enabled,
+                    "org.matrix.msc3440.stable": True,  # TODO: remove when "v1.3" is added above
                 },
             },
         )
diff --git a/synapse/server.py b/synapse/server.py
index 1270abb5a3..7741ff29dc 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -754,7 +754,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_event_client_serializer(self) -> EventClientSerializer:
-        return EventClientSerializer()
+        return EventClientSerializer(self)
 
     @cache_in_self
     def get_password_policy_handler(self) -> PasswordPolicyHandler:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1a322882bf..1f60aef180 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1814,7 +1814,10 @@ class PersistEventsStore:
         if rel_type == RelationTypes.REPLACE:
             txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
-        if rel_type == RelationTypes.THREAD:
+        if (
+            rel_type == RelationTypes.THREAD
+            or rel_type == RelationTypes.UNSTABLE_THREAD
+        ):
             txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
             # It should be safe to only invalidate the cache if the user has not
             # previously participated in the thread, but that's difficult (and
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index be1500092b..c4869d64e6 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -508,7 +508,7 @@ class RelationsWorkerStore(SQLBaseStore):
                         AND parent.room_id = child.room_id
                     WHERE
                         %s
-                        AND relation_type = ?
+                        AND %s
                     ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC
                 """
             else:
@@ -523,16 +523,22 @@ class RelationsWorkerStore(SQLBaseStore):
                         AND parent.room_id = child.room_id
                     WHERE
                         %s
-                        AND relation_type = ?
+                        AND %s
                     ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
                 """
 
             clause, args = make_in_list_sql_clause(
                 txn.database_engine, "relates_to_id", event_ids
             )
-            args.append(RelationTypes.THREAD)
 
-            txn.execute(sql % (clause,), args)
+            if self._msc3440_enabled:
+                relations_clause = "(relation_type = ? OR relation_type = ?)"
+                args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+            else:
+                relations_clause = "relation_type = ?"
+                args.append(RelationTypes.THREAD)
+
+            txn.execute(sql % (clause, relations_clause), args)
             latest_event_ids = {}
             for parent_event_id, child_event_id in txn:
                 # Only consider the latest threaded reply (by topological ordering).
@@ -552,7 +558,7 @@ class RelationsWorkerStore(SQLBaseStore):
                     AND parent.room_id = child.room_id
                 WHERE
                     %s
-                    AND relation_type = ?
+                    AND %s
                 GROUP BY parent.event_id
             """
 
@@ -561,9 +567,15 @@ class RelationsWorkerStore(SQLBaseStore):
             clause, args = make_in_list_sql_clause(
                 txn.database_engine, "relates_to_id", latest_event_ids.keys()
             )
-            args.append(RelationTypes.THREAD)
 
-            txn.execute(sql % (clause,), args)
+            if self._msc3440_enabled:
+                relations_clause = "(relation_type = ? OR relation_type = ?)"
+                args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+            else:
+                relations_clause = "relation_type = ?"
+                args.append(RelationTypes.THREAD)
+
+            txn.execute(sql % (clause, relations_clause), args)
             counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
 
             return counts, latest_event_ids
@@ -626,16 +638,24 @@ class RelationsWorkerStore(SQLBaseStore):
                     AND parent.room_id = child.room_id
                 WHERE
                     %s
-                    AND relation_type = ?
+                    AND %s
                     AND child.sender = ?
             """
 
             clause, args = make_in_list_sql_clause(
                 txn.database_engine, "relates_to_id", event_ids
             )
-            args.extend((RelationTypes.THREAD, user_id))
 
-            txn.execute(sql % (clause,), args)
+            if self._msc3440_enabled:
+                relations_clause = "(relation_type = ? OR relation_type = ?)"
+                args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+            else:
+                relations_clause = "relation_type = ?"
+                args.append(RelationTypes.THREAD)
+
+            args.append(user_id)
+
+            txn.execute(sql % (clause, relations_clause), args)
             return {row[0] for row in txn.fetchall()}
 
         participated_threads = await self.db_pool.runInteraction(
@@ -834,26 +854,23 @@ class RelationsWorkerStore(SQLBaseStore):
             results.setdefault(event_id, BundledAggregations()).replace = edit
 
         # Fetch thread summaries.
-        if self._msc3440_enabled:
-            summaries = await self._get_thread_summaries(events_by_id.keys())
-            # Only fetch participated for a limited selection based on what had
-            # summaries.
-            participated = await self._get_threads_participated(
-                summaries.keys(), user_id
-            )
-            for event_id, summary in summaries.items():
-                if summary:
-                    thread_count, latest_thread_event, edit = summary
-                    results.setdefault(
-                        event_id, BundledAggregations()
-                    ).thread = _ThreadAggregation(
-                        latest_event=latest_thread_event,
-                        latest_edit=edit,
-                        count=thread_count,
-                        # If there's a thread summary it must also exist in the
-                        # participated dictionary.
-                        current_user_participated=participated[event_id],
-                    )
+        summaries = await self._get_thread_summaries(events_by_id.keys())
+        # Only fetch participated for a limited selection based on what had
+        # summaries.
+        participated = await self._get_threads_participated(summaries.keys(), user_id)
+        for event_id, summary in summaries.items():
+            if summary:
+                thread_count, latest_thread_event, edit = summary
+                results.setdefault(
+                    event_id, BundledAggregations()
+                ).thread = _ThreadAggregation(
+                    latest_event=latest_thread_event,
+                    latest_edit=edit,
+                    count=thread_count,
+                    # If there's a thread summary it must also exist in the
+                    # participated dictionary.
+                    current_user_participated=participated[event_id],
+                )
 
         return results
 
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index a898f847e7..39e1efe373 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -325,21 +325,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
         args.extend(event_filter.labels)
 
     # Filter on relation_senders / relation types from the joined tables.
-    if event_filter.relation_senders:
+    if event_filter.related_by_senders:
         clauses.append(
             "(%s)"
             % " OR ".join(
-                "related_event.sender = ?" for _ in event_filter.relation_senders
+                "related_event.sender = ?" for _ in event_filter.related_by_senders
             )
         )
-        args.extend(event_filter.relation_senders)
+        args.extend(event_filter.related_by_senders)
 
-    if event_filter.relation_types:
+    if event_filter.related_by_rel_types:
         clauses.append(
             "(%s)"
-            % " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
+            % " OR ".join(
+                "relation_type = ?" for _ in event_filter.related_by_rel_types
+            )
         )
-        args.extend(event_filter.relation_types)
+        args.extend(event_filter.related_by_rel_types)
 
     return " AND ".join(clauses), args
 
@@ -1203,7 +1205,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         # If there is a filter on relation_senders and relation_types join to the
         # relations table.
         if event_filter and (
-            event_filter.relation_senders or event_filter.relation_types
+            event_filter.related_by_senders or event_filter.related_by_rel_types
         ):
             # Filtering by relations could cause the same event to appear multiple
             # times (since there's no limit on the number of relations to an event).
@@ -1211,7 +1213,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             join_clause += """
                 LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
             """
-            if event_filter.relation_senders:
+            if event_filter.related_by_senders:
                 join_clause += """
                     LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
                 """
diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py
index f9ae6e663f..0cbe6c0cf7 100644
--- a/tests/rest/client/test_relations.py
+++ b/tests/rest/client/test_relations.py
@@ -547,9 +547,7 @@ class RelationsTestCase(BaseRelationsTestCase):
         )
         self.assertEqual(400, channel.code, channel.json_body)
 
-    @unittest.override_config(
-        {"experimental_features": {"msc3440_enabled": True, "msc3666_enabled": True}}
-    )
+    @unittest.override_config({"experimental_features": {"msc3666_enabled": True}})
     def test_bundled_aggregations(self) -> None:
         """
         Test that annotations, references, and threads get correctly bundled.
@@ -758,7 +756,6 @@ class RelationsTestCase(BaseRelationsTestCase):
             },
         )
 
-    @unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
     def test_ignore_invalid_room(self) -> None:
         """Test that we ignore invalid relations over federation."""
         # Create another room and send a message in it.
@@ -1065,7 +1062,6 @@ class RelationsTestCase(BaseRelationsTestCase):
             {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
         )
 
-    @unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
     def test_edit_thread(self) -> None:
         """Test that editing a thread works."""
 
@@ -1383,7 +1379,6 @@ class RelationRedactionTestCase(BaseRelationsTestCase):
         chunk = self._get_aggregations()
         self.assertEqual(chunk, [{"type": "m.reaction", "key": "a", "count": 1}])
 
-    @unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
     def test_redact_relation_thread(self) -> None:
         """
         Test that thread replies are properly handled after the thread reply redacted.
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 37866ee330..3a9617d6da 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -2141,21 +2141,19 @@ class RelationsTestCase(unittest.HomeserverTestCase):
 
     def test_filter_relation_senders(self) -> None:
         # Messages which second user reacted to.
-        filter = {"io.element.relation_senders": [self.second_user_id]}
+        filter = {"related_by_senders": [self.second_user_id]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0]["event_id"], self.event_id_1)
 
         # Messages which third user reacted to.
-        filter = {"io.element.relation_senders": [self.third_user_id]}
+        filter = {"related_by_senders": [self.third_user_id]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0]["event_id"], self.event_id_2)
 
         # Messages which either user reacted to.
-        filter = {
-            "io.element.relation_senders": [self.second_user_id, self.third_user_id]
-        }
+        filter = {"related_by_senders": [self.second_user_id, self.third_user_id]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 2, chunk)
         self.assertCountEqual(
@@ -2164,20 +2162,20 @@ class RelationsTestCase(unittest.HomeserverTestCase):
 
     def test_filter_relation_type(self) -> None:
         # Messages which have annotations.
-        filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
+        filter = {"related_by_rel_types": [RelationTypes.ANNOTATION]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0]["event_id"], self.event_id_1)
 
         # Messages which have references.
-        filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
+        filter = {"related_by_rel_types": [RelationTypes.REFERENCE]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0]["event_id"], self.event_id_2)
 
         # Messages which have either annotations or references.
         filter = {
-            "io.element.relation_types": [
+            "related_by_rel_types": [
                 RelationTypes.ANNOTATION,
                 RelationTypes.REFERENCE,
             ]
@@ -2191,8 +2189,8 @@ class RelationsTestCase(unittest.HomeserverTestCase):
     def test_filter_relation_senders_and_type(self) -> None:
         # Messages which second user reacted to.
         filter = {
-            "io.element.relation_senders": [self.second_user_id],
-            "io.element.relation_types": [RelationTypes.ANNOTATION],
+            "related_by_senders": [self.second_user_id],
+            "related_by_rel_types": [RelationTypes.ANNOTATION],
         }
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index 6a1cf33054..eaa0d7d749 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -129,21 +129,19 @@ class PaginationTestCase(HomeserverTestCase):
 
     def test_filter_relation_senders(self):
         # Messages which second user reacted to.
-        filter = {"io.element.relation_senders": [self.second_user_id]}
+        filter = {"related_by_senders": [self.second_user_id]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0].event_id, self.event_id_1)
 
         # Messages which third user reacted to.
-        filter = {"io.element.relation_senders": [self.third_user_id]}
+        filter = {"related_by_senders": [self.third_user_id]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0].event_id, self.event_id_2)
 
         # Messages which either user reacted to.
-        filter = {
-            "io.element.relation_senders": [self.second_user_id, self.third_user_id]
-        }
+        filter = {"related_by_senders": [self.second_user_id, self.third_user_id]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 2, chunk)
         self.assertCountEqual(
@@ -152,20 +150,20 @@ class PaginationTestCase(HomeserverTestCase):
 
     def test_filter_relation_type(self):
         # Messages which have annotations.
-        filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
+        filter = {"related_by_rel_types": [RelationTypes.ANNOTATION]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0].event_id, self.event_id_1)
 
         # Messages which have references.
-        filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
+        filter = {"related_by_rel_types": [RelationTypes.REFERENCE]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0].event_id, self.event_id_2)
 
         # Messages which have either annotations or references.
         filter = {
-            "io.element.relation_types": [
+            "related_by_rel_types": [
                 RelationTypes.ANNOTATION,
                 RelationTypes.REFERENCE,
             ]
@@ -179,8 +177,8 @@ class PaginationTestCase(HomeserverTestCase):
     def test_filter_relation_senders_and_type(self):
         # Messages which second user reacted to.
         filter = {
-            "io.element.relation_senders": [self.second_user_id],
-            "io.element.relation_types": [RelationTypes.ANNOTATION],
+            "related_by_senders": [self.second_user_id],
+            "related_by_rel_types": [RelationTypes.ANNOTATION],
         }
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
@@ -201,7 +199,7 @@ class PaginationTestCase(HomeserverTestCase):
             tok=self.second_tok,
         )
 
-        filter = {"io.element.relation_senders": [self.second_user_id]}
+        filter = {"related_by_senders": [self.second_user_id]}
         chunk = self._filter_messages(filter)
         self.assertEqual(len(chunk), 1, chunk)
         self.assertEqual(chunk[0].event_id, self.event_id_1)