summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-19 13:31:06 +0100
committerErik Johnston <erik@matrix.org>2024-07-19 13:31:06 +0100
commit01f1dca71092391a8a9e8398002c0fcf9d434d17 (patch)
treef66d799b08f3f8e59787b30200a01dfb9d340f22
parentFix linting in tests (diff)
parentMerge remote-tracking branch 'origin/develop' into erikj/ss_tokens (diff)
downloadsynapse-01f1dca71092391a8a9e8398002c0fcf9d434d17.tar.xz
Merge branch 'erikj/ss_tokens' into erikj/ss_room_store
-rw-r--r--changelog.d/17458.misc1
-rw-r--r--poetry.lock10
-rw-r--r--synapse/handlers/sliding_sync.py34
-rw-r--r--synapse/types/__init__.py18
4 files changed, 38 insertions, 25 deletions
diff --git a/changelog.d/17458.misc b/changelog.d/17458.misc
new file mode 100644
index 0000000000..09cce15d0d
--- /dev/null
+++ b/changelog.d/17458.misc
@@ -0,0 +1 @@
+Speed up generating sliding sync responses.
diff --git a/poetry.lock b/poetry.lock
index b3e45972f1..2bfcb59cf2 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2430,13 +2430,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
 
 [[package]]
 name = "sentry-sdk"
-version = "2.6.0"
+version = "2.8.0"
 description = "Python client for Sentry (https://sentry.io)"
 optional = true
 python-versions = ">=3.6"
 files = [
-    {file = "sentry_sdk-2.6.0-py2.py3-none-any.whl", hash = "sha256:422b91cb49378b97e7e8d0e8d5a1069df23689d45262b86f54988a7db264e874"},
-    {file = "sentry_sdk-2.6.0.tar.gz", hash = "sha256:65cc07e9c6995c5e316109f138570b32da3bd7ff8d0d0ee4aaf2628c3dd8127d"},
+    {file = "sentry_sdk-2.8.0-py2.py3-none-any.whl", hash = "sha256:6051562d2cfa8087bb8b4b8b79dc44690f8a054762a29c07e22588b1f619bfb5"},
+    {file = "sentry_sdk-2.8.0.tar.gz", hash = "sha256:aa4314f877d9cd9add5a0c9ba18e3f27f99f7de835ce36bd150e48a41c7c646f"},
 ]
 
 [package.dependencies]
@@ -2466,7 +2466,7 @@ langchain = ["langchain (>=0.0.210)"]
 loguru = ["loguru (>=0.5)"]
 openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"]
 opentelemetry = ["opentelemetry-distro (>=0.35b0)"]
-opentelemetry-experimental = ["opentelemetry-distro (>=0.40b0,<1.0)", "opentelemetry-instrumentation-aiohttp-client (>=0.40b0,<1.0)", "opentelemetry-instrumentation-django (>=0.40b0,<1.0)", "opentelemetry-instrumentation-fastapi (>=0.40b0,<1.0)", "opentelemetry-instrumentation-flask (>=0.40b0,<1.0)", "opentelemetry-instrumentation-requests (>=0.40b0,<1.0)", "opentelemetry-instrumentation-sqlite3 (>=0.40b0,<1.0)", "opentelemetry-instrumentation-urllib (>=0.40b0,<1.0)"]
+opentelemetry-experimental = ["opentelemetry-instrumentation-aio-pika (==0.46b0)", "opentelemetry-instrumentation-aiohttp-client (==0.46b0)", "opentelemetry-instrumentation-aiopg (==0.46b0)", "opentelemetry-instrumentation-asgi (==0.46b0)", "opentelemetry-instrumentation-asyncio (==0.46b0)", "opentelemetry-instrumentation-asyncpg (==0.46b0)", "opentelemetry-instrumentation-aws-lambda (==0.46b0)", "opentelemetry-instrumentation-boto (==0.46b0)", "opentelemetry-instrumentation-boto3sqs (==0.46b0)", "opentelemetry-instrumentation-botocore (==0.46b0)", "opentelemetry-instrumentation-cassandra (==0.46b0)", "opentelemetry-instrumentation-celery (==0.46b0)", "opentelemetry-instrumentation-confluent-kafka (==0.46b0)", "opentelemetry-instrumentation-dbapi (==0.46b0)", "opentelemetry-instrumentation-django (==0.46b0)", "opentelemetry-instrumentation-elasticsearch (==0.46b0)", "opentelemetry-instrumentation-falcon (==0.46b0)", "opentelemetry-instrumentation-fastapi (==0.46b0)", "opentelemetry-instrumentation-flask (==0.46b0)", "opentelemetry-instrumentation-grpc (==0.46b0)", "opentelemetry-instrumentation-httpx (==0.46b0)", "opentelemetry-instrumentation-jinja2 (==0.46b0)", "opentelemetry-instrumentation-kafka-python (==0.46b0)", "opentelemetry-instrumentation-logging (==0.46b0)", "opentelemetry-instrumentation-mysql (==0.46b0)", "opentelemetry-instrumentation-mysqlclient (==0.46b0)", "opentelemetry-instrumentation-pika (==0.46b0)", "opentelemetry-instrumentation-psycopg (==0.46b0)", "opentelemetry-instrumentation-psycopg2 (==0.46b0)", "opentelemetry-instrumentation-pymemcache (==0.46b0)", "opentelemetry-instrumentation-pymongo (==0.46b0)", "opentelemetry-instrumentation-pymysql (==0.46b0)", "opentelemetry-instrumentation-pyramid (==0.46b0)", "opentelemetry-instrumentation-redis (==0.46b0)", "opentelemetry-instrumentation-remoulade (==0.46b0)", "opentelemetry-instrumentation-requests (==0.46b0)", "opentelemetry-instrumentation-sklearn (==0.46b0)", "opentelemetry-instrumentation-sqlalchemy (==0.46b0)", "opentelemetry-instrumentation-sqlite3 (==0.46b0)", "opentelemetry-instrumentation-starlette (==0.46b0)", "opentelemetry-instrumentation-system-metrics (==0.46b0)", "opentelemetry-instrumentation-threading (==0.46b0)", "opentelemetry-instrumentation-tornado (==0.46b0)", "opentelemetry-instrumentation-tortoiseorm (==0.46b0)", "opentelemetry-instrumentation-urllib (==0.46b0)", "opentelemetry-instrumentation-urllib3 (==0.46b0)", "opentelemetry-instrumentation-wsgi (==0.46b0)"]
 pure-eval = ["asttokens", "executing", "pure-eval"]
 pymongo = ["pymongo (>=3.1)"]
 pyspark = ["pyspark (>=2.4.4)"]
@@ -2476,7 +2476,7 @@ sanic = ["sanic (>=0.8)"]
 sqlalchemy = ["sqlalchemy (>=1.2)"]
 starlette = ["starlette (>=0.19.1)"]
 starlite = ["starlite (>=1.48)"]
-tornado = ["tornado (>=5)"]
+tornado = ["tornado (>=6)"]
 
 [[package]]
 name = "service-identity"
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index aef5bf7975..1238592917 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -30,6 +30,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
+from synapse.logging.opentracing import start_active_span, tag_args, trace
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.storage.roommember import MemberSummary
@@ -47,6 +48,7 @@ from synapse.types import (
 )
 from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
 from synapse.types.state import StateFilter
+from synapse.util.async_helpers import concurrently_execute
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -385,7 +387,7 @@ class SlidingSyncHandler:
             # this returns false, it means we timed out waiting, and we should
             # just return an empty response.
             before_wait_ts = self.clock.time_msec()
-            if not await self.notifier.wait_for_stream_token(from_token.stream_token):
+            if not await self.notifier.wait_for_stream_token(from_token.stream):
                 logger.warning(
                     "Timed out waiting for worker to catch up. Returning empty response"
                 )
@@ -423,7 +425,7 @@ class SlidingSyncHandler:
                 sync_config.user.to_string(),
                 timeout_ms,
                 current_sync_callback,
-                from_token=from_token.stream_token,
+                from_token=from_token.stream,
             )
 
         return result
@@ -470,7 +472,7 @@ class SlidingSyncHandler:
                 await self.get_room_membership_for_user_at_to_token(
                     user=sync_config.user,
                     to_token=to_token,
-                    from_token=from_token.stream_token if from_token else None,
+                    from_token=from_token.stream if from_token else None,
                 )
             )
 
@@ -603,11 +605,14 @@ class SlidingSyncHandler:
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
-        for room_id, room_sync_config in relevant_room_map.items():
+
+        @trace
+        @tag_args
+        async def handle_room(room_id: str) -> None:
             room_sync_result = await self.get_room_sync_data(
                 sync_config=sync_config,
                 room_id=room_id,
-                room_sync_config=room_sync_config,
+                room_sync_config=relevant_room_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
                     room_id
                 ],
@@ -617,6 +622,9 @@ class SlidingSyncHandler:
 
             rooms[room_id] = room_sync_result
 
+        with start_active_span("sliding_sync.generate_room_entries"):
+            await concurrently_execute(handle_room, relevant_room_map, 10)
+
         extensions = await self.get_extensions_response(
             sync_config=sync_config, to_token=to_token
         )
@@ -630,7 +638,7 @@ class SlidingSyncHandler:
                 unsent_room_ids=[],
             )
         elif from_token:
-            connection_token = from_token.connection_token
+            connection_token = from_token.connection
         else:
             # Initial sync without a `from_token` starts a `0`
             connection_token = 0
@@ -1406,11 +1414,11 @@ class SlidingSyncHandler:
         if from_token and not room_membership_for_user_at_to_token.newly_joined:
             room_status = await self.connection_store.have_sent_room(
                 sync_config=sync_config,
-                connection_token=from_token.connection_token,
+                connection_token=from_token.connection,
                 room_id=room_id,
             )
             if room_status.status == HaveSentRoomFlag.LIVE:
-                from_bound = from_token.stream_token.room_key
+                from_bound = from_token.stream.room_key
                 initial = False
             elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
                 assert room_status.last_token is not None
@@ -1520,9 +1528,7 @@ class SlidingSyncHandler:
                         instance_name=timeline_event.internal_metadata.instance_name,
                         stream=timeline_event.internal_metadata.stream_ordering,
                     )
-                    if persisted_position.persisted_after(
-                        from_token.stream_token.room_key
-                    ):
+                    if persisted_position.persisted_after(from_token.stream.room_key):
                         num_live += 1
                     else:
                         # Since we're iterating over the timeline events in
@@ -2020,7 +2026,7 @@ class SlidingSyncConnectionStore:
         """
         prev_connection_token = 0
         if from_token is not None:
-            prev_connection_token = from_token.connection_token
+            prev_connection_token = from_token.connection
 
         # If there are no changes then this is a noop.
         if not sent_room_ids and not unsent_room_ids:
@@ -2056,7 +2062,7 @@ class SlidingSyncConnectionStore:
 
         # Work out the new state for unsent rooms that were `LIVE`.
         if from_token:
-            new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
+            new_unsent_state = HaveSentRoom.previously(from_token.stream.room_key)
         else:
             new_unsent_state = HAVE_SENT_ROOM_NEVER
 
@@ -2095,7 +2101,7 @@ class SlidingSyncConnectionStore:
         sync_statuses = {
             connection_token: room_statuses
             for connection_token, room_statuses in sync_statuses.items()
-            if connection_token == from_token.connection_token
+            if connection_token == from_token.connection
         }
         if sync_statuses:
             self._connections[conn_key] = sync_statuses
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 23ac1842f8..a0956febc0 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1145,10 +1145,16 @@ class SlidingSyncStreamToken:
 
     This then looks something like:
         5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
+
+    Attributes:
+        stream: Token representing the position of all the standard
+            streams.
+        connection: Token used by sliding sync to track updates to any
+            per-connection state stored by Synapse.
     """
 
-    stream_token: StreamToken
-    connection_token: int
+    stream: StreamToken
+    connection: int
 
     @staticmethod
     @cancellable
@@ -1160,8 +1166,8 @@ class SlidingSyncStreamToken:
             stream_token = await StreamToken.from_string(store, stream_token_str)
 
             return SlidingSyncStreamToken(
-                stream_token=stream_token,
-                connection_token=connection_token,
+                stream=stream_token,
+                connection=connection_token,
             )
         except CancelledError:
             raise
@@ -1170,8 +1176,8 @@ class SlidingSyncStreamToken:
 
     async def to_string(self, store: "DataStore") -> str:
         """Serializes the token to a string"""
-        stream_token_str = await self.stream_token.to_string(store)
-        return f"{self.connection_token}/{stream_token_str}"
+        stream_token_str = await self.stream.to_string(store)
+        return f"{self.connection}/{stream_token_str}"
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)