diff options
author | Erik Johnston <erik@matrix.org> | 2024-07-19 13:31:06 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-07-19 13:31:06 +0100 |
commit | 01f1dca71092391a8a9e8398002c0fcf9d434d17 (patch) | |
tree | f66d799b08f3f8e59787b30200a01dfb9d340f22 | |
parent | Fix linting in tests (diff) | |
parent | Merge remote-tracking branch 'origin/develop' into erikj/ss_tokens (diff) | |
download | synapse-01f1dca71092391a8a9e8398002c0fcf9d434d17.tar.xz |
Merge branch 'erikj/ss_tokens' into erikj/ss_room_store
-rw-r--r-- | changelog.d/17458.misc | 1 | ||||
-rw-r--r-- | poetry.lock | 10 | ||||
-rw-r--r-- | synapse/handlers/sliding_sync.py | 34 | ||||
-rw-r--r-- | synapse/types/__init__.py | 18 |
4 files changed, 38 insertions, 25 deletions
diff --git a/changelog.d/17458.misc b/changelog.d/17458.misc new file mode 100644 index 0000000000..09cce15d0d --- /dev/null +++ b/changelog.d/17458.misc @@ -0,0 +1 @@ +Speed up generating sliding sync responses. diff --git a/poetry.lock b/poetry.lock index b3e45972f1..2bfcb59cf2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2430,13 +2430,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"] [[package]] name = "sentry-sdk" -version = "2.6.0" +version = "2.8.0" description = "Python client for Sentry (https://sentry.io)" optional = true python-versions = ">=3.6" files = [ - {file = "sentry_sdk-2.6.0-py2.py3-none-any.whl", hash = "sha256:422b91cb49378b97e7e8d0e8d5a1069df23689d45262b86f54988a7db264e874"}, - {file = "sentry_sdk-2.6.0.tar.gz", hash = "sha256:65cc07e9c6995c5e316109f138570b32da3bd7ff8d0d0ee4aaf2628c3dd8127d"}, + {file = "sentry_sdk-2.8.0-py2.py3-none-any.whl", hash = "sha256:6051562d2cfa8087bb8b4b8b79dc44690f8a054762a29c07e22588b1f619bfb5"}, + {file = "sentry_sdk-2.8.0.tar.gz", hash = "sha256:aa4314f877d9cd9add5a0c9ba18e3f27f99f7de835ce36bd150e48a41c7c646f"}, ] [package.dependencies] @@ -2466,7 +2466,7 @@ langchain = ["langchain (>=0.0.210)"] loguru = ["loguru (>=0.5)"] openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"] opentelemetry = ["opentelemetry-distro (>=0.35b0)"] -opentelemetry-experimental = ["opentelemetry-distro (>=0.40b0,<1.0)", "opentelemetry-instrumentation-aiohttp-client (>=0.40b0,<1.0)", "opentelemetry-instrumentation-django (>=0.40b0,<1.0)", "opentelemetry-instrumentation-fastapi (>=0.40b0,<1.0)", "opentelemetry-instrumentation-flask (>=0.40b0,<1.0)", "opentelemetry-instrumentation-requests (>=0.40b0,<1.0)", "opentelemetry-instrumentation-sqlite3 (>=0.40b0,<1.0)", "opentelemetry-instrumentation-urllib (>=0.40b0,<1.0)"] +opentelemetry-experimental = ["opentelemetry-instrumentation-aio-pika (==0.46b0)", "opentelemetry-instrumentation-aiohttp-client (==0.46b0)", "opentelemetry-instrumentation-aiopg (==0.46b0)", "opentelemetry-instrumentation-asgi (==0.46b0)", "opentelemetry-instrumentation-asyncio (==0.46b0)", "opentelemetry-instrumentation-asyncpg (==0.46b0)", "opentelemetry-instrumentation-aws-lambda (==0.46b0)", "opentelemetry-instrumentation-boto (==0.46b0)", "opentelemetry-instrumentation-boto3sqs (==0.46b0)", "opentelemetry-instrumentation-botocore (==0.46b0)", "opentelemetry-instrumentation-cassandra (==0.46b0)", "opentelemetry-instrumentation-celery (==0.46b0)", "opentelemetry-instrumentation-confluent-kafka (==0.46b0)", "opentelemetry-instrumentation-dbapi (==0.46b0)", "opentelemetry-instrumentation-django (==0.46b0)", "opentelemetry-instrumentation-elasticsearch (==0.46b0)", "opentelemetry-instrumentation-falcon (==0.46b0)", "opentelemetry-instrumentation-fastapi (==0.46b0)", "opentelemetry-instrumentation-flask (==0.46b0)", "opentelemetry-instrumentation-grpc (==0.46b0)", "opentelemetry-instrumentation-httpx (==0.46b0)", "opentelemetry-instrumentation-jinja2 (==0.46b0)", "opentelemetry-instrumentation-kafka-python (==0.46b0)", "opentelemetry-instrumentation-logging (==0.46b0)", "opentelemetry-instrumentation-mysql (==0.46b0)", "opentelemetry-instrumentation-mysqlclient (==0.46b0)", "opentelemetry-instrumentation-pika (==0.46b0)", "opentelemetry-instrumentation-psycopg (==0.46b0)", "opentelemetry-instrumentation-psycopg2 (==0.46b0)", "opentelemetry-instrumentation-pymemcache (==0.46b0)", "opentelemetry-instrumentation-pymongo (==0.46b0)", "opentelemetry-instrumentation-pymysql (==0.46b0)", "opentelemetry-instrumentation-pyramid (==0.46b0)", "opentelemetry-instrumentation-redis (==0.46b0)", "opentelemetry-instrumentation-remoulade (==0.46b0)", "opentelemetry-instrumentation-requests (==0.46b0)", "opentelemetry-instrumentation-sklearn (==0.46b0)", "opentelemetry-instrumentation-sqlalchemy (==0.46b0)", "opentelemetry-instrumentation-sqlite3 (==0.46b0)", "opentelemetry-instrumentation-starlette (==0.46b0)", "opentelemetry-instrumentation-system-metrics (==0.46b0)", "opentelemetry-instrumentation-threading (==0.46b0)", "opentelemetry-instrumentation-tornado (==0.46b0)", "opentelemetry-instrumentation-tortoiseorm (==0.46b0)", "opentelemetry-instrumentation-urllib (==0.46b0)", "opentelemetry-instrumentation-urllib3 (==0.46b0)", "opentelemetry-instrumentation-wsgi (==0.46b0)"] pure-eval = ["asttokens", "executing", "pure-eval"] pymongo = ["pymongo (>=3.1)"] pyspark = ["pyspark (>=2.4.4)"] @@ -2476,7 +2476,7 @@ sanic = ["sanic (>=0.8)"] sqlalchemy = ["sqlalchemy (>=1.2)"] starlette = ["starlette (>=0.19.1)"] starlite = ["starlite (>=1.48)"] -tornado = ["tornado (>=5)"] +tornado = ["tornado (>=6)"] [[package]] name = "service-identity" diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index aef5bf7975..1238592917 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -30,6 +30,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe from synapse.events import EventBase from synapse.events.utils import strip_event from synapse.handlers.relations import BundledAggregations +from synapse.logging.opentracing import start_active_span, tag_args, trace from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.stream import CurrentStateDeltaMembership from synapse.storage.roommember import MemberSummary @@ -47,6 +48,7 @@ from synapse.types import ( ) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult from synapse.types.state import StateFilter +from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -385,7 +387,7 @@ class SlidingSyncHandler: # this returns false, it means we timed out waiting, and we should # just return an empty response. before_wait_ts = self.clock.time_msec() - if not await self.notifier.wait_for_stream_token(from_token.stream_token): + if not await self.notifier.wait_for_stream_token(from_token.stream): logger.warning( "Timed out waiting for worker to catch up. Returning empty response" ) @@ -423,7 +425,7 @@ class SlidingSyncHandler: sync_config.user.to_string(), timeout_ms, current_sync_callback, - from_token=from_token.stream_token, + from_token=from_token.stream, ) return result @@ -470,7 +472,7 @@ class SlidingSyncHandler: await self.get_room_membership_for_user_at_to_token( user=sync_config.user, to_token=to_token, - from_token=from_token.stream_token if from_token else None, + from_token=from_token.stream if from_token else None, ) ) @@ -603,11 +605,14 @@ class SlidingSyncHandler: # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} - for room_id, room_sync_config in relevant_room_map.items(): + + @trace + @tag_args + async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, room_id=room_id, - room_sync_config=room_sync_config, + room_sync_config=relevant_room_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ room_id ], @@ -617,6 +622,9 @@ class SlidingSyncHandler: rooms[room_id] = room_sync_result + with start_active_span("sliding_sync.generate_room_entries"): + await concurrently_execute(handle_room, relevant_room_map, 10) + extensions = await self.get_extensions_response( sync_config=sync_config, to_token=to_token ) @@ -630,7 +638,7 @@ class SlidingSyncHandler: unsent_room_ids=[], ) elif from_token: - connection_token = from_token.connection_token + connection_token = from_token.connection else: # Initial sync without a `from_token` starts a `0` connection_token = 0 @@ -1406,11 +1414,11 @@ class SlidingSyncHandler: if from_token and not room_membership_for_user_at_to_token.newly_joined: room_status = await self.connection_store.have_sent_room( sync_config=sync_config, - connection_token=from_token.connection_token, + connection_token=from_token.connection, room_id=room_id, ) if room_status.status == HaveSentRoomFlag.LIVE: - from_bound = from_token.stream_token.room_key + from_bound = from_token.stream.room_key initial = False elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: assert room_status.last_token is not None @@ -1520,9 +1528,7 @@ class SlidingSyncHandler: instance_name=timeline_event.internal_metadata.instance_name, stream=timeline_event.internal_metadata.stream_ordering, ) - if persisted_position.persisted_after( - from_token.stream_token.room_key - ): + if persisted_position.persisted_after(from_token.stream.room_key): num_live += 1 else: # Since we're iterating over the timeline events in @@ -2020,7 +2026,7 @@ class SlidingSyncConnectionStore: """ prev_connection_token = 0 if from_token is not None: - prev_connection_token = from_token.connection_token + prev_connection_token = from_token.connection # If there are no changes then this is a noop. if not sent_room_ids and not unsent_room_ids: @@ -2056,7 +2062,7 @@ class SlidingSyncConnectionStore: # Work out the new state for unsent rooms that were `LIVE`. if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) + new_unsent_state = HaveSentRoom.previously(from_token.stream.room_key) else: new_unsent_state = HAVE_SENT_ROOM_NEVER @@ -2095,7 +2101,7 @@ class SlidingSyncConnectionStore: sync_statuses = { connection_token: room_statuses for connection_token, room_statuses in sync_statuses.items() - if connection_token == from_token.connection_token + if connection_token == from_token.connection } if sync_statuses: self._connections[conn_key] = sync_statuses diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 23ac1842f8..a0956febc0 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1145,10 +1145,16 @@ class SlidingSyncStreamToken: This then looks something like: 5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379 + + Attributes: + stream: Token representing the position of all the standard + streams. + connection: Token used by sliding sync to track updates to any + per-connection state stored by Synapse. """ - stream_token: StreamToken - connection_token: int + stream: StreamToken + connection: int @staticmethod @cancellable @@ -1160,8 +1166,8 @@ class SlidingSyncStreamToken: stream_token = await StreamToken.from_string(store, stream_token_str) return SlidingSyncStreamToken( - stream_token=stream_token, - connection_token=connection_token, + stream=stream_token, + connection=connection_token, ) except CancelledError: raise @@ -1170,8 +1176,8 @@ class SlidingSyncStreamToken: async def to_string(self, store: "DataStore") -> str: """Serializes the token to a string""" - stream_token_str = await self.stream_token.to_string(store) - return f"{self.connection_token}/{stream_token_str}" + stream_token_str = await self.stream.to_string(store) + return f"{self.connection}/{stream_token_str}" @attr.s(slots=True, frozen=True, auto_attribs=True) |