summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-06-02 16:14:19 +0100
committerMark Haines <mark.haines@matrix.org>2016-06-02 16:14:19 +0100
commitaa3b6baa5500b3a8a9da0f4173e6dbc89a46a0c9 (patch)
tree78c1153710fec654a28c06fb524b9e96158b9240
parentMerge branch 'develop' into markjh/synchrotron (diff)
downloadsynapse-aa3b6baa5500b3a8a9da0f4173e6dbc89a46a0c9.tar.xz
Add typing support
-rw-r--r--synapse/app/synchrotron.py27
1 files changed, 24 insertions, 3 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index c1338e8e36..9d31732960 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -210,8 +210,24 @@ class SynchrotronPresence(object):
 
 
 class SynchrotronTyping(object):
-    _latest_room_serial = 0
-    _room_serials = ()
+    def __init__(self, hs):
+        self._latest_room_serial = 0
+        self._room_serials = {}
+        self._room_typing = {}
+
+    def stream_positions(self):
+        return {"typing": self._latest_room_serial}
+
+    def process_replication(self, result):
+        stream = result.get("typing")
+        if stream:
+            self._latest_room_serial = int(stream["position"])
+
+            for row in stream["rows"]:
+                position, room_id, typing_json = row
+                typing = json.loads(typing_json)
+                self._room_serials[room_id] = position
+                self._room_typing[room_id] = typing
 
 
 class SynchrotronApplicationService(object):
@@ -294,6 +310,7 @@ class SynchrotronServer(HomeServer):
         clock = self.get_clock()
         notifier = self.get_notifier()
         presence_handler = self.get_presence_handler()
+        typing_handler = self.get_typing_handler()
 
         def expire_broken_caches():
             store.who_forgot_in_room.invalidate_all()
@@ -318,6 +335,7 @@ class SynchrotronServer(HomeServer):
         while True:
             try:
                 args = store.stream_positions()
+                args.update(typing_handler.stream_positions())
                 args["timeout"] = 30000
                 result = yield http_client.get_json(replication_url, args=args)
                 logger.error("FENRIS %r", result)
@@ -328,6 +346,7 @@ class SynchrotronServer(HomeServer):
                         now_ms + store.BROKEN_CACHE_EXPIRY_MS
                     )
                 yield store.process_replication(result)
+                typing_handler.process_replication(result)
                 presence_handler.process_replication(result)
                 notify(result)
             except:
@@ -337,6 +356,9 @@ class SynchrotronServer(HomeServer):
     def build_presence_handler(self):
         return SynchrotronPresence(self)
 
+    def build_typing_handler(self):
+        return SynchrotronTyping(self)
+
 
 def setup(config_options):
     try:
@@ -360,7 +382,6 @@ def setup(config_options):
         config=config,
         version_string=get_version_string("Synapse", synapse),
         database_engine=database_engine,
-        typing_handler=SynchrotronTyping(),
         application_service_handler=SynchrotronApplicationService(),
     )