summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-08-03 17:51:46 -0700
committerAndrew Morgan <andrew@amorgan.xyz>2020-08-03 17:51:46 -0700
commit7f2e76bef9e216c4ed34c45d40e09126cb72c0df (patch)
treec6ecb4eb2c3e10321b82d6948e1f107d772d4c71 /synapse/replication/tcp/client.py
parentMerge commit '4876af06d' into anoa/dinsic_release_1_18_x (diff)
parent1.18.0rc1 (diff)
downloadsynapse-7f2e76bef9e216c4ed34c45d40e09126cb72c0df.tar.xz
Merge commit 'f88c48f3b' into anoa/dinsic_release_1_18_x
* commit 'f88c48f3b':
  1.18.0rc1
  Fix error reporting when using `opentracing.trace` (#7961)
  Fix typing replication not being handled on master (#7959)
  Remove hacky error handling for inlineDeferreds. (#7950)
  Convert tests/rest/admin/test_room.py to unix file endings (#7953)
  Support oEmbed for media previews. (#7920)
  Convert state resolution to async/await (#7942)
  Fix up types and comments that refer to Deferreds. (#7945)
  Do not convert async functions to Deferreds in the interactive_auth_handler (#7944)
  Convert more of the media code to async/await (#7873)
  Return an empty body for OPTIONS requests. (#7886)
  Downgrade warning on client disconnect to INFO (#7928)
  Convert presence handler helpers to async/await. (#7939)
  Update the auth providers to be async. (#7935)
  Put a cache on `/state_ids` (#7931)
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py8
1 files changed, 8 insertions, 0 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index 4985e40b1f..fcf8ebf1e7 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol +from synapse.replication.tcp.streams import TypingStream from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, @@ -104,6 +105,7 @@ class ReplicationDataHandler: self._clock = hs.get_clock() self._streams = hs.get_replication_streams() self._instance_name = hs.get_instance_name() + self._typing_handler = hs.get_typing_handler() # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. @@ -127,6 +129,12 @@ class ReplicationDataHandler: """ self.store.process_replication_rows(stream_name, instance_name, token, rows) + if stream_name == TypingStream.NAME: + self._typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows] + ) + if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows.