diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-06-02 16:14:19 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-06-02 16:14:19 +0100 |
commit | aa3b6baa5500b3a8a9da0f4173e6dbc89a46a0c9 (patch) | |
tree | 78c1153710fec654a28c06fb524b9e96158b9240 | |
parent | Merge branch 'develop' into markjh/synchrotron (diff) | |
download | synapse-aa3b6baa5500b3a8a9da0f4173e6dbc89a46a0c9.tar.xz |
Add typing support
-rw-r--r-- | synapse/app/synchrotron.py | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c1338e8e36..9d31732960 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -210,8 +210,24 @@ class SynchrotronPresence(object): class SynchrotronTyping(object): - _latest_room_serial = 0 - _room_serials = () + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing class SynchrotronApplicationService(object): @@ -294,6 +310,7 @@ class SynchrotronServer(HomeServer): clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() @@ -318,6 +335,7 @@ class SynchrotronServer(HomeServer): while True: try: args = store.stream_positions() + args.update(typing_handler.stream_positions()) args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) logger.error("FENRIS %r", result) @@ -328,6 +346,7 @@ class SynchrotronServer(HomeServer): now_ms + store.BROKEN_CACHE_EXPIRY_MS ) yield store.process_replication(result) + typing_handler.process_replication(result) presence_handler.process_replication(result) notify(result) except: @@ -337,6 +356,9 @@ class SynchrotronServer(HomeServer): def build_presence_handler(self): return SynchrotronPresence(self) + def build_typing_handler(self): + return SynchrotronTyping(self) + def setup(config_options): try: @@ -360,7 +382,6 @@ def setup(config_options): config=config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, - typing_handler=SynchrotronTyping(), application_service_handler=SynchrotronApplicationService(), ) |