From 89fc09c3d11b3574919a311dbf68816116873abb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jan 2015 13:56:56 +0000 Subject: Bump version and changelog --- CHANGES.rst | 8 ++++++++ VERSION | 2 +- synapse/__init__.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 813ad364ea..3bd69367bb 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,11 @@ +Changes in synapse 0.6.1 (2015-01-07) +===================================== + + * Various optimization to improve performance of event sending and initial + sync. + * Media repository now includes a Content-Length header on media downloads. + * Improve quality of thumbnails by changing resizing algorithm. + Changes in synapse 0.6.0 (2014-12-16) ===================================== diff --git a/VERSION b/VERSION index a918a2aa18..ee6cdce3c2 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.0 +0.6.1 diff --git a/synapse/__init__.py b/synapse/__init__.py index 06167e3c1a..c3f1ac63be 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a synapse home server. """ -__version__ = "0.6.0" +__version__ = "0.6.1" -- cgit 1.4.1 From 72d8d1265b8316878d8d61ba8a7781c4ac0764d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jan 2015 14:16:38 +0000 Subject: Improve change log --- CHANGES.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 3bd69367bb..297ae914fd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,8 +1,8 @@ Changes in synapse 0.6.1 (2015-01-07) ===================================== - * Various optimization to improve performance of event sending and initial - sync. + * Major optimizations to improve performance of initial sync and event sending + in large rooms (by up to 10x) * Media repository now includes a Content-Length header on media downloads. * Improve quality of thumbnails by changing resizing algorithm. -- cgit 1.4.1 From 9b8e348b15faba1469a93c7daa009e27ee377bc0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 7 Jan 2015 15:08:22 +0000 Subject: *cough* --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/__init__.py b/synapse/__init__.py index c3f1ac63be..9bfa09edfd 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" This is a reference implementation of a synapse home server. +""" This is a reference implementation of a Matrix home server. """ __version__ = "0.6.1" -- cgit 1.4.1 From 9cb4f75d53d99634e79e791de22cb7de718248d6 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 7 Jan 2015 15:16:31 +0000 Subject: SYN-154: Better error messages when joining an unknown room by ID. The simple fix doesn't work here because room creation also involves unknown room IDs. The check relies on the presence of m.room.create for rooms being created, whereas bogus room IDs have no state events at all. --- synapse/api/auth.py | 11 ++++++++++- synapse/handlers/federation.py | 4 ++-- synapse/handlers/room.py | 8 +++++--- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e31482cfaa..8a3455ec54 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -98,7 +98,16 @@ class Auth(object): defer.returnValue(member) @defer.inlineCallbacks - def check_host_in_room(self, room_id, host): + def check_host_in_room(self, room_id, host, context=None): + if context: + # XXX: check_host_in_room should really return True for a new + # room created by this home server. There are no m.room.member + # join events yet so we need to check for the m.room.create event + # instead. + if (u"m.room.create", u"") in context.auth_events: + defer.returnValue(True) + return + curr_state = yield self.state.get_current_state(room_id) for event in curr_state: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d26975a88a..d0de6fd04d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -617,8 +617,8 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_backfill_request(self, origin, context, pdu_list, limit): - in_room = yield self.auth.check_host_in_room(context, origin) + def on_backfill_request(self, origin, room_id, pdu_list, limit): + in_room = yield self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 59719a1fae..3cb7e324fc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -423,12 +423,13 @@ class RoomMemberHandler(BaseHandler): is_host_in_room = yield self.auth.check_host_in_room( event.room_id, - self.hs.hostname + self.hs.hostname, + context=context ) if is_host_in_room: should_do_dance = False - elif room_host: + elif room_host: # TODO: Shouldn't this be remote_room_host? should_do_dance = True else: # TODO(markjh): get prev_state from snapshot @@ -442,7 +443,8 @@ class RoomMemberHandler(BaseHandler): should_do_dance = not self.hs.is_mine(inviter) room_host = inviter.domain else: - should_do_dance = False + # return the same error as join_room_alias does + raise SynapseError(404, "No known servers") if should_do_dance: handler = self.hs.get_handlers().federation_handler -- cgit 1.4.1 From 4c68460392ef032b156b8d006f4aec5496ceedcb Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 7 Jan 2015 16:09:00 +0000 Subject: SYN-154: Tweak how the m.room.create check is done. Don't perform the check in auth.is_host_in_room but instead do it in _do_join and also assert that there are no m.room.members in the room before doing so. --- synapse/api/auth.py | 11 +---------- synapse/handlers/room.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 8a3455ec54..e31482cfaa 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -98,16 +98,7 @@ class Auth(object): defer.returnValue(member) @defer.inlineCallbacks - def check_host_in_room(self, room_id, host, context=None): - if context: - # XXX: check_host_in_room should really return True for a new - # room created by this home server. There are no m.room.member - # join events yet so we need to check for the m.room.create event - # instead. - if (u"m.room.create", u"") in context.auth_events: - defer.returnValue(True) - return - + def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) for event in curr_state: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3cb7e324fc..16c6628292 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -423,9 +423,18 @@ class RoomMemberHandler(BaseHandler): is_host_in_room = yield self.auth.check_host_in_room( event.room_id, - self.hs.hostname, - context=context + self.hs.hostname ) + if not is_host_in_room: + # is *anyone* in the room? + room_member_keys = [ + v for (k,v) in context.current_state.keys() if k == "m.room.member" + ] + if len(room_member_keys) == 0: + # has the room been created so we can join it? + create_event = context.current_state.get(("m.room.create", "")) + if create_event: + is_host_in_room = True if is_host_in_room: should_do_dance = False -- cgit 1.4.1 From a09882de8378f143af79f97929bd1655cc7ac495 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 7 Jan 2015 16:12:14 +0000 Subject: Update tests --- tests/rest/test_rooms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py index 84fd730afc..8e65ff9a1c 100644 --- a/tests/rest/test_rooms.py +++ b/tests/rest/test_rooms.py @@ -294,7 +294,7 @@ class RoomPermissionsTestCase(RestTestCase): # set [invite/join/left] of self, set [invite/join/left] of other, # expect all 403s for usr in [self.user_id, self.rmcreator_id]: - yield self.join(room=room, user=usr, expect_code=403) + yield self.join(room=room, user=usr, expect_code=404) yield self.leave(room=room, user=usr, expect_code=403) @defer.inlineCallbacks -- cgit 1.4.1 From 333836ff9205a53934cf0c412b75916740e407b5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 7 Jan 2015 16:18:12 +0000 Subject: PEP8 and pyflakes warnings --- synapse/handlers/federation.py | 2 +- synapse/handlers/room.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d0de6fd04d..195f7c618a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -623,7 +623,7 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Host not in room.") events = yield self.store.get_backfill_events( - context, + room_id, pdu_list, limit ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 16c6628292..6d0db18e51 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -428,7 +428,9 @@ class RoomMemberHandler(BaseHandler): if not is_host_in_room: # is *anyone* in the room? room_member_keys = [ - v for (k,v) in context.current_state.keys() if k == "m.room.member" + v for (k, v) in context.current_state.keys() if ( + k == "m.room.member" + ) ] if len(room_member_keys) == 0: # has the room been created so we can join it? -- cgit 1.4.1 From 76e1565200dda04e4091be761c737042f9a15e67 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 7 Jan 2015 17:11:19 +0000 Subject: Change error message for missing pillow libs. --- synapse/media/v1/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/media/v1/__init__.py b/synapse/media/v1/__init__.py index 619999d268..d6c6690577 100644 --- a/synapse/media/v1/__init__.py +++ b/synapse/media/v1/__init__.py @@ -22,7 +22,8 @@ except IOError as e: if str(e).startswith("decoder jpeg not available"): raise Exception( "FATAL: jpeg codec not supported. Install pillow correctly! " - " 'sudo apt-get install libjpeg-dev' then 'pip install -I pillow'" + " 'sudo apt-get install libjpeg-dev' then 'pip uninstall pillow &&" + " pip install pillow --user'" ) except Exception: # any other exception is fine @@ -36,7 +37,8 @@ except IOError as e: if str(e).startswith("decoder zip not available"): raise Exception( "FATAL: zip codec not supported. Install pillow correctly! " - " 'sudo apt-get install libjpeg-dev' then 'pip install -I pillow'" + " 'sudo apt-get install libjpeg-dev' then 'pip uninstall pillow &&" + " pip install pillow --user'" ) except Exception: # any other exception is fine -- cgit 1.4.1 From 42507b0011a1285645206f5bd627809a8a6337e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jan 2015 17:25:28 +0000 Subject: Log server version on startup --- synapse/app/homeserver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 5fec8da7ca..fba43aa2bf 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -39,6 +39,8 @@ from synapse.util.logcontext import LoggingContext from daemonize import Daemonize import twisted.manhole.telnet +import synapse + import logging import os import re @@ -199,6 +201,7 @@ def setup(): config.setup_logging() logger.info("Server hostname: %s", config.server_name) + logger.info("Server version: %s", synapse.__version__) if re.search(":[0-9]+$", config.server_name): domain_with_port = config.server_name -- cgit 1.4.1 From c9d2cecac9727ac3be8ad8cab21dc78b9dffe7a2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Jan 2015 09:41:11 +0000 Subject: SYN-231: User agent header broken --- synapse/http/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index e5d4939e2d..7793bab106 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -53,7 +53,7 @@ class SimpleHttpClient(object): uri.encode("ascii"), headers=Headers({ b"Content-Type": [b"application/x-www-form-urlencoded"], - b"User-Agent": AGENT_NAME, + b"User-Agent": [AGENT_NAME], }), bodyProducer=FileBodyProducer(StringIO(query_bytes)) ) @@ -89,7 +89,7 @@ class SimpleHttpClient(object): "GET", uri.encode("ascii"), headers=Headers({ - b"User-Agent": AGENT_NAME, + b"User-Agent": [AGENT_NAME], }) ) @@ -114,7 +114,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): bodyProducer=FileBodyProducer(StringIO(query_bytes)), headers=Headers({ b"Content-Type": [b"application/x-www-form-urlencoded"], - b"User-Agent": AGENT_NAME, + b"User-Agent": [AGENT_NAME], }) ) -- cgit 1.4.1 From 5a0e687d5cfbb01d817a2ea5a795d3fee7ca5083 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Jan 2015 09:42:23 +0000 Subject: Bump version --- VERSION | 2 +- synapse/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index ee6cdce3c2..8b9dea59dc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.1 +0.6.1a diff --git a/synapse/__init__.py b/synapse/__init__.py index c3f1ac63be..2195ad3177 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a synapse home server. """ -__version__ = "0.6.1" +__version__ = "0.6.1a" -- cgit 1.4.1 From d44dd47fbf7a2e4a9b253128e645ceb698ec274a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jan 2015 10:53:03 +0000 Subject: Add optional limit to graph script --- graph/graph2.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/graph/graph2.py b/graph/graph2.py index b9b8a562a0..6b551d42e5 100644 --- a/graph/graph2.py +++ b/graph/graph2.py @@ -23,14 +23,27 @@ import argparse from synapse.events import FrozenEvent -def make_graph(db_name, room_id, file_prefix): +def make_graph(db_name, room_id, file_prefix, limit): conn = sqlite3.connect(db_name) - c = conn.execute( - "SELECT json FROM event_json where room_id = ?", - (room_id,) + sql = ( + "SELECT json FROM event_json as j " + "INNER JOIN events as e ON e.event_id = j.event_id " + "WHERE j.room_id = ?" ) + args = [room_id] + + if limit: + sql += ( + " ORDER BY topological_ordering DESC, stream_ordering DESC " + "LIMIT ?" + ) + + args.append(limit) + + c = conn.execute(sql, args) + events = [FrozenEvent(json.loads(e[0])) for e in c.fetchall()] events.sort(key=lambda e: e.depth) @@ -128,11 +141,16 @@ if __name__ == "__main__": ) parser.add_argument( "-p", "--prefix", dest="prefix", - help="String to prefix output files with" + help="String to prefix output files with", + default="graph_output" + ) + parser.add_argument( + "-l", "--limit", + help="Only retrieve the last N events.", ) parser.add_argument('db') parser.add_argument('room') args = parser.parse_args() - make_graph(args.db, args.room, args.prefix) + make_graph(args.db, args.room, args.prefix, args.limit) -- cgit 1.4.1 From 5720ab59e03d6f5ab48c3be22e8957a8891ea56c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 8 Jan 2015 13:57:29 +0000 Subject: Add 'raw' query parameter to expose the event graph and signatures to savvy clients. --- synapse/events/utils.py | 17 +++++++++-------- synapse/handlers/events.py | 7 +++++-- synapse/handlers/message.py | 6 ++++-- synapse/rest/events.py | 5 ++++- synapse/rest/initial_sync.py | 5 ++++- synapse/server.py | 4 ++-- 6 files changed, 28 insertions(+), 16 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 4f4914467c..258dedb27c 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -89,7 +89,7 @@ def prune_event(event): return type(event)(allowed_fields) -def serialize_event(hs, e): +def serialize_event(hs, e, remove_data=True): # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e @@ -122,12 +122,13 @@ def serialize_event(hs, e): d["prev_content"] = e.unsigned["prev_content"] del d["unsigned"]["prev_content"] - del d["auth_events"] - del d["prev_events"] - del d["hashes"] - del d["signatures"] - d.pop("depth", None) - d.pop("unsigned", None) - d.pop("origin", None) + if remove_data: + del d["auth_events"] + del d["prev_events"] + del d["hashes"] + del d["signatures"] + d.pop("depth", None) + d.pop("unsigned", None) + d.pop("origin", None) return d diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 808219bd10..4e805606b8 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -46,7 +46,8 @@ class EventStreamHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_stream(self, auth_user_id, pagin_config, timeout=0): + def get_stream(self, auth_user_id, pagin_config, timeout=0, + trim_events=True): auth_user = self.hs.parse_userid(auth_user_id) try: @@ -78,7 +79,9 @@ class EventStreamHandler(BaseHandler): auth_user, room_ids, pagin_config, timeout ) - chunks = [self.hs.serialize_event(e) for e in events] + chunks = [ + self.hs.serialize_event(e, trim_events) for e in events + ] chunk = { "chunk": chunks, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7195de98b5..b2bbcfc6e2 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -211,7 +211,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, - feedback=False): + feedback=False, trim_events=True): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -280,7 +280,9 @@ class MessageHandler(BaseHandler): end_token = now_token.copy_and_replace("room_key", token[1]) d["messages"] = { - "chunk": [self.hs.serialize_event(m) for m in messages], + "chunk": [ + self.hs.serialize_event(m, trim_events) for m in messages + ], "start": start_token.to_string(), "end": end_token.to_string(), } diff --git a/synapse/rest/events.py b/synapse/rest/events.py index cf6d13f817..ac1a75a559 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -44,8 +44,11 @@ class EventStreamRestServlet(RestServlet): except ValueError: raise SynapseError(400, "timeout must be in milliseconds.") + trim_events = "raw" not in request.args + chunk = yield handler.get_stream( - auth_user.to_string(), pagin_config, timeout=timeout + auth_user.to_string(), pagin_config, timeout=timeout, + trim_events=trim_events ) except: logger.exception("Event stream failed") diff --git a/synapse/rest/initial_sync.py b/synapse/rest/initial_sync.py index a571589581..d2c0c63aa6 100644 --- a/synapse/rest/initial_sync.py +++ b/synapse/rest/initial_sync.py @@ -27,12 +27,15 @@ class InitialSyncRestServlet(RestServlet): def on_GET(self, request): user = yield self.auth.get_user_by_req(request) with_feedback = "feedback" in request.args + trim_events = "raw" not in request.args pagination_config = PaginationConfig.from_request(request) handler = self.handlers.message_handler content = yield handler.snapshot_all_rooms( user_id=user.to_string(), pagin_config=pagination_config, - feedback=with_feedback) + feedback=with_feedback, + trim_events=trim_events + ) defer.returnValue((200, content)) diff --git a/synapse/server.py b/synapse/server.py index c3bf46abbf..88161107af 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -149,8 +149,8 @@ class BaseHomeServer(object): object.""" return EventID.from_string(s) - def serialize_event(self, e): - return serialize_event(self, e) + def serialize_event(self, e, remove_data=True): + return serialize_event(self, e, remove_data) def get_ip_from_request(self, request): # May be an X-Forwarding-For header depending on config -- cgit 1.4.1 From 5940ec993bf75d5d05885544e811da88703f1800 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jan 2015 13:59:29 +0000 Subject: Add missing continuation indent. --- synapse/handlers/message.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b2bbcfc6e2..9b20e4f50e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -281,7 +281,8 @@ class MessageHandler(BaseHandler): d["messages"] = { "chunk": [ - self.hs.serialize_event(m, trim_events) for m in messages + self.hs.serialize_event(m, trim_events) + for m in messages ], "start": start_token.to_string(), "end": end_token.to_string(), -- cgit 1.4.1 From edb557b2ad98d3260caaba41ef2278b3eafc7e85 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 8 Jan 2015 14:27:04 +0000 Subject: Return the raw federation event rather than adding extra keys for federation data. --- synapse/events/utils.py | 25 ++++++++++++++++--------- synapse/handlers/events.py | 4 ++-- synapse/handlers/message.py | 5 +++-- synapse/rest/events.py | 4 ++-- synapse/rest/initial_sync.py | 4 ++-- synapse/server.py | 4 ++-- 6 files changed, 27 insertions(+), 19 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 258dedb27c..4687d96f26 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -89,13 +89,21 @@ def prune_event(event): return type(event)(allowed_fields) -def serialize_event(hs, e, remove_data=True): +def serialize_event(hs, e, client_event=True): # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e # Should this strip out None's? d = {k: v for k, v in e.get_dict().items()} + + if not client_event: + # set the age and keep all other keys + if "age_ts" in d["unsigned"]: + now = int(hs.get_clock().time_msec()) + d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] + return d + if "age_ts" in d["unsigned"]: now = int(hs.get_clock().time_msec()) d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] @@ -122,13 +130,12 @@ def serialize_event(hs, e, remove_data=True): d["prev_content"] = e.unsigned["prev_content"] del d["unsigned"]["prev_content"] - if remove_data: - del d["auth_events"] - del d["prev_events"] - del d["hashes"] - del d["signatures"] - d.pop("depth", None) - d.pop("unsigned", None) - d.pop("origin", None) + del d["auth_events"] + del d["prev_events"] + del d["hashes"] + del d["signatures"] + d.pop("depth", None) + d.pop("unsigned", None) + d.pop("origin", None) return d diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 4e805606b8..c9ade253dd 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -47,7 +47,7 @@ class EventStreamHandler(BaseHandler): @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, - trim_events=True): + as_client_event=True): auth_user = self.hs.parse_userid(auth_user_id) try: @@ -80,7 +80,7 @@ class EventStreamHandler(BaseHandler): ) chunks = [ - self.hs.serialize_event(e, trim_events) for e in events + self.hs.serialize_event(e, as_client_event) for e in events ] chunk = { diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9b20e4f50e..30f5a08b59 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -211,7 +211,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, - feedback=False, trim_events=True): + feedback=False, as_client_event=True): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -222,6 +222,7 @@ class MessageHandler(BaseHandler): pagin_config (synapse.api.streams.PaginationConfig): The pagination config used to determine how many messages *PER ROOM* to return. feedback (bool): True to get feedback along with these messages. + as_client_event (bool): True to get events in client-server format. Returns: A list of dicts with "room_id" and "membership" keys for all rooms the user is currently invited or joined in on. Rooms where the user @@ -281,7 +282,7 @@ class MessageHandler(BaseHandler): d["messages"] = { "chunk": [ - self.hs.serialize_event(m, trim_events) + self.hs.serialize_event(m, as_client_event) for m in messages ], "start": start_token.to_string(), diff --git a/synapse/rest/events.py b/synapse/rest/events.py index ac1a75a559..bedcb2bcc6 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -44,11 +44,11 @@ class EventStreamRestServlet(RestServlet): except ValueError: raise SynapseError(400, "timeout must be in milliseconds.") - trim_events = "raw" not in request.args + as_client_event = "raw" not in request.args chunk = yield handler.get_stream( auth_user.to_string(), pagin_config, timeout=timeout, - trim_events=trim_events + as_client_event=as_client_event ) except: logger.exception("Event stream failed") diff --git a/synapse/rest/initial_sync.py b/synapse/rest/initial_sync.py index d2c0c63aa6..b13d56b286 100644 --- a/synapse/rest/initial_sync.py +++ b/synapse/rest/initial_sync.py @@ -27,14 +27,14 @@ class InitialSyncRestServlet(RestServlet): def on_GET(self, request): user = yield self.auth.get_user_by_req(request) with_feedback = "feedback" in request.args - trim_events = "raw" not in request.args + as_client_event = "raw" not in request.args pagination_config = PaginationConfig.from_request(request) handler = self.handlers.message_handler content = yield handler.snapshot_all_rooms( user_id=user.to_string(), pagin_config=pagination_config, feedback=with_feedback, - trim_events=trim_events + as_client_event=as_client_event ) defer.returnValue((200, content)) diff --git a/synapse/server.py b/synapse/server.py index 88161107af..d861efd2fd 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -149,8 +149,8 @@ class BaseHomeServer(object): object.""" return EventID.from_string(s) - def serialize_event(self, e, remove_data=True): - return serialize_event(self, e, remove_data) + def serialize_event(self, e, as_client_event=True): + return serialize_event(self, e, as_client_event) def get_ip_from_request(self, request): # May be an X-Forwarding-For header depending on config -- cgit 1.4.1 From 379a653ae3e46bc27b8ad4bde9bb7c25d0e048f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jan 2015 14:32:53 +0000 Subject: Add better help message for --server-name config option. --- synapse/config/server.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/config/server.py b/synapse/config/server.py index 4f73c85466..31e44cc857 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -47,8 +47,12 @@ class ServerConfig(Config): def add_arguments(cls, parser): super(ServerConfig, cls).add_arguments(parser) server_group = parser.add_argument_group("server") - server_group.add_argument("-H", "--server-name", default="localhost", - help="The name of the server") + server_group.add_argument( + "-H", "--server-name", default="localhost", + help="The domain name of the server, with optional explicit port. " + "This is used by remote servers to connect to this server, " + "e.g. matrix.org, localhost:8080, etc." + ) server_group.add_argument("--signing-key-path", help="The signing key to sign messages with") server_group.add_argument("-p", "--bind-port", metavar="PORT", -- cgit 1.4.1 From b5924cae04e549b3e19addc9257b462627f3d334 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 8 Jan 2015 14:36:33 +0000 Subject: Add raw query param for scrollback. --- synapse/handlers/message.py | 7 +++++-- synapse/rest/room.py | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 30f5a08b59..f2a2f16933 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -67,7 +67,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False): + feedback=False, as_client_event=True): """Get messages in a room. Args: @@ -76,6 +76,7 @@ class MessageHandler(BaseHandler): pagin_config (synapse.api.streams.PaginationConfig): The pagination config rules to apply, if any. feedback (bool): True to get compressed feedback with the messages + as_client_event (bool): True to get events in client-server format. Returns: dict: Pagination API results """ @@ -99,7 +100,9 @@ class MessageHandler(BaseHandler): ) chunk = { - "chunk": [self.hs.serialize_event(e) for e in events], + "chunk": [ + self.hs.serialize_event(e, as_client_event) for e in events + ], "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), } diff --git a/synapse/rest/room.py b/synapse/rest/room.py index e40773758a..caafa959e6 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -314,12 +314,15 @@ class RoomMessageListRestServlet(RestServlet): request, default_limit=10, ) with_feedback = "feedback" in request.args + as_client_event = "raw" not in request.args handler = self.handlers.message_handler msgs = yield handler.get_messages( room_id=room_id, user_id=user.to_string(), pagin_config=pagination_config, - feedback=with_feedback) + feedback=with_feedback, + as_client_event=as_client_event + ) defer.returnValue((200, msgs)) -- cgit 1.4.1 From 7f83613733bc39a14b4eaff78313047d0fc50739 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 8 Jan 2015 15:11:22 +0000 Subject: make our JPEG thumbnail quality less horrifically ugly --- synapse/media/v1/thumbnailer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/media/v1/thumbnailer.py b/synapse/media/v1/thumbnailer.py index bc86efea8f..28404f2b7b 100644 --- a/synapse/media/v1/thumbnailer.py +++ b/synapse/media/v1/thumbnailer.py @@ -82,7 +82,7 @@ class Thumbnailer(object): def save_image(self, output_image, output_type, output_path): output_bytes_io = BytesIO() - output_image.save(output_bytes_io, self.FORMATS[output_type]) + output_image.save(output_bytes_io, self.FORMATS[output_type], quality=70) output_bytes = output_bytes_io.getvalue() with open(output_path, "wb") as output_file: output_file.write(output_bytes) -- cgit 1.4.1 From 9d0dcf2e3ca8b8c9cc8d87a451ed901f102dc2c6 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 8 Jan 2015 15:31:06 +0000 Subject: SYN-142: Rotate logs if logging to file. Fixed to a 4 file rotate with 100MB/file for now. --- synapse/config/logger.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 15383b3184..f9568ebd21 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -66,7 +66,10 @@ class LoggingConfig(Config): formatter = logging.Formatter(log_format) if self.log_file: - handler = logging.FileHandler(self.log_file) + # TODO: Customisable file size / backup count + handler = logging.handlers.RotatingFileHandler( + self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3 + ) else: handler = logging.StreamHandler() handler.setFormatter(formatter) -- cgit 1.4.1 From 63403aa7a57704cde86344b48390d16b1d74b035 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Jan 2015 17:07:28 +0000 Subject: Check the existance and versions of necessary modules when starting synapse, log which modules are used --- synapse/app/homeserver.py | 5 +++ synapse/python_dependencies.py | 80 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 synapse/python_dependencies.py diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fba43aa2bf..43b5c26144 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,6 +18,8 @@ from synapse.storage import prepare_database, UpgradeDatabaseException from synapse.server import HomeServer +from synapse.python_dependencies import check_requirements + from twisted.internet import reactor from twisted.enterprise import adbapi from twisted.web.resource import Resource @@ -200,6 +202,8 @@ def setup(): config.setup_logging() + check_requirements() + logger.info("Server hostname: %s", config.server_name) logger.info("Server version: %s", synapse.__version__) @@ -280,6 +284,7 @@ def run(): def main(): with LoggingContext("main"): + check_requirements() setup() diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py new file mode 100644 index 0000000000..b1fae991e0 --- /dev/null +++ b/synapse/python_dependencies.py @@ -0,0 +1,80 @@ +import logging +from distutils.version import LooseVersion + +logger = logging.getLogger(__name__) + +REQUIREMENTS = { + "syutil==0.0.2": ["syutil"], + "matrix_angular_sdk==0.6.0": ["syweb==0.6.0"], + "Twisted>=14.0.0": ["twisted>=14.0.0"], + "service_identity>=1.0.0": ["service_identity>=1.0.0"], + "pyopenssl>=0.14": ["OpenSSL>=0.14"], + "pyyaml": ["yaml"], + "pyasn1": ["pyasn1"], + "pynacl": ["nacl"], + "daemonize": ["daemonize"], + "py-bcrypt": ["bcrypt"], + "frozendict>=0.4": ["frozendict"], + "pillow": ["PIL"], +} + + +class MissingRequirementError(Exception): + pass + + +def check_requirements(): + """Checks that all the modules needed by synapse have been correctly + installed and are at the correct version""" + for dependency, module_requirements in REQUIREMENTS.items(): + for module_requirement in module_requirements: + if ">=" in module_requirement: + module_name, required_version = module_requirement.split(">=") + version_test = ">=" + elif "==" in module_requirement: + module_name, required_version = module_requirement.split("==") + version_test = "==" + else: + module_name = module_requirement + version_test = None + + try: + module = __import__(module_name) + except ImportError: + logging.exception( + "Can't import %r which is part of %r", + module_name, dependency + ) + raise MissingRequirementError( + "Can't import %r which is part of %r" + % (module_name, dependency) + ) + version = getattr(module, "__version__", None) + file_path = getattr(module, "__file__", None) + logger.info( + "Using %r version %r from %r to satisfy %r", + module_name, version, file_path, dependency + ) + + if version_test == ">=": + if version is None: + raise MissingRequirementError( + "Version of %r isn't set as __version__ of module %r" + % (dependency, module_name) + ) + if LooseVersion(version) < LooseVersion(required_version): + raise MissingRequirementError( + "Version of %r in %r is too old. %r < %r" + % (dependency, file_path, version, required_version) + ) + elif version_test == "==": + if version is None: + raise MissingRequirementError( + "Version of %r isn't set as __version__ of module %r" + % (dependency, module_name) + ) + if LooseVersion(version) != LooseVersion(required_version): + raise MissingRequirementError( + "Unexpected version of %r in %r. %r != %r" + % (dependency, file_path, version, required_version) + ) -- cgit 1.4.1 From 80e89772e2d531941bf4403ecf3d539557763985 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 8 Jan 2015 20:36:34 +0000 Subject: spell out that local libs may need to be explicitly given priority --- README.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.rst b/README.rst index 92b94bcd7d..2be201178f 100644 --- a/README.rst +++ b/README.rst @@ -108,6 +108,15 @@ To install the synapse homeserver run:: This installs synapse, along with the libraries it uses, into ``$HOME/.local/lib/`` on Linux or ``$HOME/Library/Python/2.7/lib/`` on OSX. +Your python may not give priority to locally installed libraries over system +libraries, in which case you must add your local packages to your python path:: + + $ # on Linux: + $ export PYTHONPATH=$HOME/.local/lib/python2.7/site-packages + + $ # on OSX: + $ export PYTHONPATH=$HOME/Library/Python/2.7/lib/python2.7/site-packages + For reliable VoIP calls to be routed via this homeserver, you MUST configure a TURN server. See docs/turn-howto.rst for details. -- cgit 1.4.1 From 28db5dde4c37ec69449995de40c02b7f4c532746 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 8 Jan 2015 20:38:55 +0000 Subject: oops --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 2be201178f..326f1d9cc1 100644 --- a/README.rst +++ b/README.rst @@ -115,7 +115,7 @@ libraries, in which case you must add your local packages to your python path:: $ export PYTHONPATH=$HOME/.local/lib/python2.7/site-packages $ # on OSX: - $ export PYTHONPATH=$HOME/Library/Python/2.7/lib/python2.7/site-packages + $ export PYTHONPATH=$HOME/Library/Python/2.7/lib/python/site-packages For reliable VoIP calls to be routed via this homeserver, you MUST configure a TURN server. See docs/turn-howto.rst for details. -- cgit 1.4.1 From bfb198a6eb0d1c0b2c73e88b8420549f84ebd626 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 9 Jan 2015 18:14:05 +0000 Subject: don't clobber pythonpath --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 326f1d9cc1..768da3df64 100644 --- a/README.rst +++ b/README.rst @@ -112,10 +112,10 @@ Your python may not give priority to locally installed libraries over system libraries, in which case you must add your local packages to your python path:: $ # on Linux: - $ export PYTHONPATH=$HOME/.local/lib/python2.7/site-packages + $ export PYTHONPATH=$HOME/.local/lib/python2.7/site-packages:$PYTHONPATH $ # on OSX: - $ export PYTHONPATH=$HOME/Library/Python/2.7/lib/python/site-packages + $ export PYTHONPATH=$HOME/Library/Python/2.7/lib/python/site-packages:$PYTHONPATH For reliable VoIP calls to be routed via this homeserver, you MUST configure a TURN server. See docs/turn-howto.rst for details. -- cgit 1.4.1 From d8fcc4e00a05252d4402a834d4b8ef66784de62b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 12 Jan 2015 14:30:54 +0000 Subject: Add copyrighter script for sql --- scripts/copyrighter-sql.pl | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100755 scripts/copyrighter-sql.pl diff --git a/scripts/copyrighter-sql.pl b/scripts/copyrighter-sql.pl new file mode 100755 index 0000000000..890e51e587 --- /dev/null +++ b/scripts/copyrighter-sql.pl @@ -0,0 +1,33 @@ +#!/usr/bin/perl -pi +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +$copyright = < Date: Mon, 12 Jan 2015 17:38:30 +0000 Subject: SYN-178: Fix off by one. --- synapse/storage/stream.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index bedc3c6c52..744c821dfe 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -284,8 +284,12 @@ class StreamStore(SQLBaseStore): rows.reverse() # As we selected with reverse ordering if rows: - topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] + # XXX: Always subtract 1 since the start token always goes + # backwards (parity with paginate_room_events). It isn't + # obvious that this is correct; we should clarify the algorithm + # used here. + topo = rows[0]["topological_ordering"] - 1 + toke = rows[0]["stream_ordering"] - 1 start_token = "t%s-%s" % (topo, toke) token = (start_token, end_token) -- cgit 1.4.1 From 968dc988f9008b15348705c52992100dcabf206f Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 12 Jan 2015 18:01:33 +0000 Subject: Check that setting typing notification still works after explicit timeout - SYN-230 --- tests/handlers/test_typing.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 0d4b368a43..6a498b23a4 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -352,3 +352,29 @@ class TypingNotificationsTestCase(unittest.TestCase): }}, ] ) + + # SYN-230 - see if we can still set after timeout + + yield self.handler.started_typing( + target_user=self.u_apple, + auth_user=self.u_apple, + room_id=self.room_id, + timeout=10000, + ) + + self.on_new_user_event.assert_has_calls([ + call(rooms=[self.room_id]), + ]) + self.on_new_user_event.reset_mock() + + self.assertEquals(self.event_source.get_current_key(), 3) + self.assertEquals( + self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + [ + {"type": "m.typing", + "room_id": self.room_id, + "content": { + "user_ids": [self.u_apple.to_string()], + }}, + ] + ) -- cgit 1.4.1 From db72a07ef52dd3a911978df9a13f23febdcc00ce Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 12 Jan 2015 18:16:27 +0000 Subject: Don't make @unittest.DEBUG print the huge amount of verbosity generated by the synapse.storage loggers --- tests/unittest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unittest.py b/tests/unittest.py index a9c0e05541..fe26b7574f 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -69,6 +69,8 @@ class TestCase(unittest.TestCase): return ret logging.getLogger().setLevel(level) + # Don't set SQL logging + logging.getLogger("synapse.storage").setLevel(old_level) return orig() def assertObjectHasAttributes(self, attrs, obj): -- cgit 1.4.1 From 67d8305aea65d52abe4ce1c40bf78fdab3dc6471 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 12 Jan 2015 18:22:00 +0000 Subject: Make typing notification timeouts print a (debug) logging message --- synapse/handlers/typing.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index ab698b36e1..15039ff0da 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -83,9 +83,15 @@ class TypingNotificationHandler(BaseHandler): if member in self._member_typing_timer: self.clock.cancel_call_later(self._member_typing_timer[member]) + def _cb(): + logger.debug( + "%s has timed out in %s", target_user.to_string(), room_id + ) + self._stopped_typing(member) + self._member_typing_until[member] = until self._member_typing_timer[member] = self.clock.call_later( - timeout / 1000, lambda: self._stopped_typing(member) + timeout / 1000, _cb ) if was_present: -- cgit 1.4.1 From 9c804bc3fd23a2bafe5d6f7368c90a7fba99bcf7 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 12 Jan 2015 18:31:48 +0000 Subject: Check that setting typing notification still works after explicit timeout at REST layer - SYN-230 --- tests/rest/test_typing.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/rest/test_typing.py b/tests/rest/test_typing.py index c550294d59..18138af1b5 100644 --- a/tests/rest/test_typing.py +++ b/tests/rest/test_typing.py @@ -21,7 +21,7 @@ from twisted.internet import defer import synapse.rest.room from synapse.server import HomeServer -from ..utils import MockHttpResource, SQLiteMemoryDbPool, MockKey +from ..utils import MockHttpResource, MockClock, SQLiteMemoryDbPool, MockKey from .utils import RestTestCase from mock import Mock, NonCallableMock @@ -36,6 +36,8 @@ class RoomTypingTestCase(RestTestCase): @defer.inlineCallbacks def setUp(self): + self.clock = MockClock() + self.mock_resource = MockHttpResource(prefix=PATH_PREFIX) self.auth_user_id = self.user_id @@ -47,6 +49,7 @@ class RoomTypingTestCase(RestTestCase): hs = HomeServer( "red", + clock=self.clock, db_pool=db_pool, http_client=None, replication_layer=Mock(), @@ -77,6 +80,30 @@ class RoomTypingTestCase(RestTestCase): return defer.succeed(None) hs.get_datastore().insert_client_ip = _insert_client_ip + def get_room_members(room_id): + if room_id == self.room_id: + return defer.succeed([hs.parse_userid(self.user_id)]) + else: + return defer.succeed([]) + + @defer.inlineCallbacks + def fetch_room_distributions_into(room_id, localusers=None, + remotedomains=None, ignore_user=None): + + members = yield get_room_members(room_id) + for member in members: + if ignore_user is not None and member == ignore_user: + continue + + if hs.is_mine(member): + if localusers is not None: + localusers.add(member) + else: + if remotedomains is not None: + remotedomains.add(member.domain) + hs.get_handlers().room_member_handler.fetch_room_distributions_into = ( + fetch_room_distributions_into) + synapse.rest.room.register_servlets(hs, self.mock_resource) self.room_id = yield self.create_room_as(self.user_id) @@ -113,3 +140,25 @@ class RoomTypingTestCase(RestTestCase): '{"typing": false}' ) self.assertEquals(200, code) + + @defer.inlineCallbacks + def test_typing_timeout(self): + (code, _) = yield self.mock_resource.trigger("PUT", + "/rooms/%s/typing/%s" % (self.room_id, self.user_id), + '{"typing": true, "timeout": 30000}' + ) + self.assertEquals(200, code) + + self.assertEquals(self.event_source.get_current_key(), 1) + + self.clock.advance_time(31); + + self.assertEquals(self.event_source.get_current_key(), 2) + + (code, _) = yield self.mock_resource.trigger("PUT", + "/rooms/%s/typing/%s" % (self.room_id, self.user_id), + '{"typing": true, "timeout": 30000}' + ) + self.assertEquals(200, code) + + self.assertEquals(self.event_source.get_current_key(), 3) -- cgit 1.4.1 From 02ffbb20d00dbda213ba9321537ac12e347dcc35 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 12 Jan 2015 19:09:14 +0000 Subject: Use float rather than integer divisions to turn msec into sec - so timeouts under 1000msec will actually work --- synapse/handlers/typing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 15039ff0da..22ce7873d0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -91,7 +91,7 @@ class TypingNotificationHandler(BaseHandler): self._member_typing_until[member] = until self._member_typing_timer[member] = self.clock.call_later( - timeout / 1000, _cb + timeout / 1000.0, _cb ) if was_present: -- cgit 1.4.1 From 39585bf5560e64bf6fca7d043cc3357b5dba59de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jan 2015 09:57:32 +0000 Subject: Insert 'age' into top level when returning events to clients --- synapse/events/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 4f4914467c..4ad37188ba 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -98,7 +98,7 @@ def serialize_event(hs, e): d = {k: v for k, v in e.get_dict().items()} if "age_ts" in d["unsigned"]: now = int(hs.get_clock().time_msec()) - d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] + d["age"] = now - d["unsigned"]["age_ts"] del d["unsigned"]["age_ts"] d["user_id"] = d.pop("sender", None) -- cgit 1.4.1 From 1d3d37937d680e460d1931dd36f36bf59c606561 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jan 2015 09:59:47 +0000 Subject: Bump version --- VERSION | 2 +- synapse/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 8b9dea59dc..3b3e723172 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.1a +0.6.1b diff --git a/synapse/__init__.py b/synapse/__init__.py index 2195ad3177..e7e27b06ec 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a synapse home server. """ -__version__ = "0.6.1a" +__version__ = "0.6.1b" -- cgit 1.4.1 From 895fcb377e3ebc43d67df1ac66413d92aacffca1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Jan 2015 14:14:21 +0000 Subject: Fix stream token ordering --- synapse/storage/stream.py | 173 +++++++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 72 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 744c821dfe..563c8e3bbb 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,6 +39,8 @@ from ._base import SQLBaseStore from synapse.api.errors import SynapseError from synapse.util.logutils import log_function +from collections import namedtuple + import logging @@ -52,58 +54,76 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" -def _parse_stream_token(string): - try: - if string[0] != 's': - raise - return int(string[1:]) - except: - raise SynapseError(400, "Invalid token") - - -def _parse_topological_token(string): - try: - if string[0] != 't': - raise - parts = string[1:].split('-', 1) - return (int(parts[0]), int(parts[1])) - except: - raise SynapseError(400, "Invalid token") - - -def is_stream_token(string): - try: - _parse_stream_token(string) - return True - except: - return False - - -def is_topological_token(string): - try: - _parse_topological_token(string) - return True - except: - return False - - -def _get_token_bound(token, comparison): - try: - s = _parse_stream_token(token) - return "%s %s %d" % ("stream_ordering", comparison, s) - except: - pass - - try: - top, stream = _parse_topological_token(token) - return "%s %s %d AND %s %s %d" % ( - "topological_ordering", comparison, top, - "stream_ordering", comparison, stream, - ) - except: - pass +class _StreamToken(namedtuple("_StreamToken", "topological stream")): + """Tokens are positions between events. The token "s1" comes after event 1. + + s0 s1 + | | + [0] V [1] V [2] + + Tokens can either be a point in the live event stream or a cursor going + through historic events. + + When traversing the live event stream events are ordered by when they + arrived at the homeserver. + + When traversing historic events the events are ordered by their depth in + the event graph "topological_ordering" and then by when they arrived at the + homeserver "stream_ordering". + + Live tokens start with an "s" followed by the "stream_ordering" id of the + event it comes after. Historic tokens start with a "t" followed by the + "topological_ordering" id of the event it comes after, follewed by "-", + followed by the "stream_ordering" id of the event it comes after. + """ + __slots__ = [] + + @classmethod + def parse(cls, string): + try: + if string[0] == 's': + return cls(None, int(string[1:])) + if string[0] == 't': + parts = string[1:].split('-', 1) + return cls(int(parts[1]), int(parts[0])) + except: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + @classmethod + def parse_stream_token(cls, string): + try: + if string[0] == 's': + return cls(None, int(string[1:])) + except: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + def __str__(self): + if self.topological is not None: + return "t%d-%d" % (self.topological, self.stream) + else: + return "s%d" % (self.stream,) + + def lower_bound(self): + if self.topological is None: + return "(%d < %s)" % (self.stream, "stream_ordering") + else: + return "(%d < %s OR (%d == %s AND %d < %s))" % ( + self.topological, "topological_ordering", + self.topological, "topological_ordering", + self.stream, "stream_ordering", + ) - raise SynapseError(400, "Invalid token") + def upper_bound(self): + if self.topological is None: + return "(%d >= %s)" % (self.stream, "stream_ordering") + else: + return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + self.topological, "topological_ordering", + self.topological, "topological_ordering", + self.stream, "stream_ordering", + ) class StreamStore(SQLBaseStore): @@ -162,8 +182,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_id = _parse_stream_token(from_key) - to_id = _parse_stream_token(to_key) + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: return defer.succeed(([], to_key)) @@ -181,7 +201,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id, to_id,)) + txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -211,17 +231,21 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - from_comp = '<=' if direction == 'b' else '>' - to_comp = '>' if direction == 'b' else '<=' - order = "DESC" if direction == 'b' else "ASC" - args = [room_id] - - bounds = _get_token_bound(from_key, from_comp) - if to_key: - bounds = "%s AND %s" % ( - bounds, _get_token_bound(to_key, to_comp) - ) + if direction == 'b': + order = "DESC" + bounds = _StreamToken.parse(from_key).upper_bound() + if to_key: + bounds = "%s AND %s" % ( + bounds, _StreamToken.parse(to_key).lower_bound() + ) + else: + order = "ASC" + bounds = _StreamToken.parse(from_key).lower_bound() + if to_key: + bounds = "%s AND %s" % ( + bounds, _StreamToken.parse(to_key).upper_bound() + ) if int(limit) > 0: args.append(int(limit)) @@ -249,9 +273,13 @@ class StreamStore(SQLBaseStore): topo = rows[-1]["topological_ordering"] toke = rows[-1]["stream_ordering"] if direction == 'b': - topo -= 1 + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. toke -= 1 - next_token = "t%s-%s" % (topo, toke) + next_token = str(_StreamToken(topo, toke)) else: # TODO (erikj): We should work out what to do here instead. next_token = to_key if to_key else from_key @@ -284,13 +312,14 @@ class StreamStore(SQLBaseStore): rows.reverse() # As we selected with reverse ordering if rows: - # XXX: Always subtract 1 since the start token always goes - # backwards (parity with paginate_room_events). It isn't - # obvious that this is correct; we should clarify the algorithm - # used here. - topo = rows[0]["topological_ordering"] - 1 + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # since we are going backwards so we subtract one from the + # stream part. + topo = rows[0]["topological_ordering"] toke = rows[0]["stream_ordering"] - 1 - start_token = "t%s-%s" % (topo, toke) + start_token = str(_StreamToken(topo, toke)) token = (start_token, end_token) else: -- cgit 1.4.1 From fda63064fc21881c8aabbfef36916b3b00af5299 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Jan 2015 14:43:26 +0000 Subject: get_room_events isn't called anywhere --- synapse/storage/stream.py | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 563c8e3bbb..8ac2adab05 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -127,36 +127,6 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): - @log_function - def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, - direction='f', with_feedback=False): - # We deal with events request in two different ways depending on if - # this looks like an /events request or a pagination request. - is_events = ( - direction == 'f' - and user_id - and is_stream_token(from_key) - and to_key and is_stream_token(to_key) - ) - - if is_events: - return self.get_room_events_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - room_id=room_id, - limit=limit, - with_feedback=with_feedback, - ) - else: - return self.paginate_room_events( - from_key=from_key, - to_key=to_key, - room_id=room_id, - limit=limit, - with_feedback=with_feedback, - ) - @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): -- cgit 1.4.1 From 3891597eb3666bbcbe325e38798b9d78b5d70bcc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Jan 2015 15:57:26 +0000 Subject: Remove unused functions --- synapse/events/builder.py | 6 ------ synapse/storage/room.py | 7 ------- synapse/storage/state.py | 6 ------ tests/storage/test_room.py | 11 ----------- 4 files changed, 30 deletions(-) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index d4cb602ebb..a9b1b99a10 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -33,12 +33,6 @@ class EventBuilder(EventBase): unsigned=unsigned ) - def update_event_key(self, key, value): - self._event_dict[key] = value - - def update_event_keys(self, other_dict): - self._event_dict.update(other_dict) - def build(self): return FrozenEvent.from_event(self) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 978b2c4a48..6542f8e4f8 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -58,13 +58,6 @@ class RoomStore(SQLBaseStore): logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") - def store_room_config(self, room_id, visibility): - return self._simple_update_one( - table=RoomsTable.table_name, - keyvalues={"room_id": room_id}, - updatevalues={"is_public": visibility} - ) - def get_room(self, room_id): """Retrieve a room. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5327517704..71db16d0e5 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -78,12 +78,6 @@ class StateStore(SQLBaseStore): f, ) - def store_state_groups(self, event): - return self.runInteraction( - "store_state_groups", - self._store_state_groups_txn, event - ) - def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return diff --git a/tests/storage/test_room.py b/tests/storage/test_room.py index 11761fe29a..e7739776ec 100644 --- a/tests/storage/test_room.py +++ b/tests/storage/test_room.py @@ -56,17 +56,6 @@ class RoomStoreTestCase(unittest.TestCase): (yield self.store.get_room(self.room.to_string())) ) - @defer.inlineCallbacks - def test_store_room_config(self): - yield self.store.store_room_config(self.room.to_string(), - visibility=False - ) - - self.assertObjectHasAttributes( - {"is_public": False}, - (yield self.store.get_room(self.room.to_string())) - ) - @defer.inlineCallbacks def test_get_rooms(self): # get_rooms does an INNER JOIN on the room_aliases table :( -- cgit 1.4.1 From c2e7c84e5887de9b622a7cd2cc861f5eef3866c1 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 13 Jan 2015 16:57:28 +0000 Subject: Don't try to cancel already-expired timers - SYN-230 --- synapse/handlers/typing.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 22ce7873d0..cd9638dd04 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -120,6 +120,10 @@ class TypingNotificationHandler(BaseHandler): member = RoomMember(room_id=room_id, user=target_user) + if member in self._member_typing_timer: + self.clock.cancel_call_later(self._member_typing_timer[member]) + del self._member_typing_timer[member] + yield self._stopped_typing(member) @defer.inlineCallbacks @@ -142,8 +146,10 @@ class TypingNotificationHandler(BaseHandler): del self._member_typing_until[member] - self.clock.cancel_call_later(self._member_typing_timer[member]) - del self._member_typing_timer[member] + if member in self._member_typing_timer: + # Don't cancel it - either it already expired, or the real + # stopped_typing() will cancel it + del self._member_typing_timer[member] @defer.inlineCallbacks def _push_update(self, room_id, user, typing): -- cgit 1.4.1 From cf7e723808fa5882f33b01274fb2b94e5abe9eca Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 13 Jan 2015 16:57:55 +0000 Subject: Have MockClock detect attempts to cancel expired timers, to prevent a repeat of SYN-230 --- tests/utils.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/utils.py b/tests/utils.py index 731e03f517..97fa8d8181 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -138,7 +138,8 @@ class MockClock(object): now = 1000 def __init__(self): - # list of tuples of (absolute_time, callback) in no particular order + # list of lists of [absolute_time, callback, expired] in no particular + # order self.timers = [] def time(self): @@ -154,11 +155,16 @@ class MockClock(object): LoggingContext.thread_local.current_context = current_context callback() - t = (self.now + delay, wrapped_callback) + t = [self.now + delay, wrapped_callback, False] self.timers.append(t) + return t def cancel_call_later(self, timer): + if timer[2]: + raise Exception("Cannot cancel an expired timer") + + timer[2] = True self.timers = [t for t in self.timers if t != timer] # For unit testing @@ -168,11 +174,17 @@ class MockClock(object): timers = self.timers self.timers = [] - for time, callback in timers: + for t in timers: + time, callback, expired = t + + if expired: + raise Exception("Timer already expired") + if self.now >= time: + t[2] = True callback() else: - self.timers.append((time, callback)) + self.timers.append(t) class SQLiteMemoryDbPool(ConnectionPool, object): -- cgit 1.4.1 From 34a5fbe2b7162c545ab5ae9403147bdd925a58f9 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 13 Jan 2015 17:29:24 +0000 Subject: Have /join/:room_id return the room ID in response anyway, for consistency of clients (SYN-234) --- synapse/rest/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index caafa959e6..48bba2a5f3 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -246,7 +246,7 @@ class JoinRoomAliasServlet(RestServlet): } ) - defer.returnValue((200, {})) + defer.returnValue((200, {"room_id": identifier.to_string()})) @defer.inlineCallbacks def on_PUT(self, request, room_identifier, txn_id): -- cgit 1.4.1 From 2bdee982695345e676cc1667c8011b88e0a4cf66 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 16 Jan 2015 19:00:40 +0000 Subject: Remove temporary debug logging that was accidentally committed --- synapse/handlers/events.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index c9ade253dd..103bc67c42 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -70,10 +70,8 @@ class EventStreamHandler(BaseHandler): pagin_config.from_token = None rm_handler = self.hs.get_handlers().room_member_handler - logger.debug("BETA") room_ids = yield rm_handler.get_rooms_for_user(auth_user) - logger.debug("ALPHA") with PreserveLoggingContext(): events, tokens = yield self.notifier.get_events_for( auth_user, room_ids, pagin_config, timeout -- cgit 1.4.1 From 602684eac5b7acf61e10d7fabed4977635d3fb46 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 16 Jan 2015 13:21:14 +0000 Subject: Split transport layer into client and server parts --- synapse/federation/transport.py | 598 ------------------------------- synapse/federation/transport/__init__.py | 62 ++++ synapse/federation/transport/client.py | 257 +++++++++++++ synapse/federation/transport/server.py | 328 +++++++++++++++++ 4 files changed, 647 insertions(+), 598 deletions(-) delete mode 100644 synapse/federation/transport.py create mode 100644 synapse/federation/transport/__init__.py create mode 100644 synapse/federation/transport/client.py create mode 100644 synapse/federation/transport/server.py diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py deleted file mode 100644 index 1f0f06e0fe..0000000000 --- a/synapse/federation/transport.py +++ /dev/null @@ -1,598 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""The transport layer is responsible for both sending transactions to remote -home servers and receiving a variety of requests from other home servers. - -Typically, this is done over HTTP (and all home servers are required to -support HTTP), however individual pairings of servers may decide to communicate -over a different (albeit still reliable) protocol. -""" - -from twisted.internet import defer - -from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError -from synapse.util.logutils import log_function - -import logging -import json -import re - - -logger = logging.getLogger(__name__) - - -class TransportLayer(object): - """This is a basic implementation of the transport layer that translates - transactions and other requests to/from HTTP. - - Attributes: - server_name (str): Local home server host - - server (synapse.http.server.HttpServer): the http server to - register listeners on - - client (synapse.http.client.HttpClient): the http client used to - send requests - - request_handler (TransportRequestHandler): The handler to fire when we - receive requests for data. - - received_handler (TransportReceivedHandler): The handler to fire when - we receive data. - """ - - def __init__(self, homeserver, server_name, server, client): - """ - Args: - server_name (str): Local home server host - server (synapse.protocol.http.HttpServer): the http server to - register listeners on - client (synapse.protocol.http.HttpClient): the http client used to - send requests - """ - self.keyring = homeserver.get_keyring() - self.server_name = server_name - self.server = server - self.client = client - self.request_handler = None - self.received_handler = None - - @log_function - def get_context_state(self, destination, context, event_id=None): - """ Requests all state for a given context (i.e. room) from the - given server. - - Args: - destination (str): The host name of the remote home server we want - to get the state from. - context (str): The name of the context we want the state of - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug("get_context_state dest=%s, context=%s", - destination, context) - - subpath = "/state/%s/" % context - - args = {} - if event_id: - args["event_id"] = event_id - - return self._do_request_for_transaction( - destination, subpath, args=args - ) - - @log_function - def get_event(self, destination, event_id): - """ Requests the pdu with give id and origin from the given server. - - Args: - destination (str): The host name of the remote home server we want - to get the state from. - event_id (str): The id of the event being requested. - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug("get_pdu dest=%s, event_id=%s", - destination, event_id) - - subpath = "/event/%s/" % (event_id, ) - - return self._do_request_for_transaction(destination, subpath) - - @log_function - def backfill(self, dest, context, event_tuples, limit): - """ Requests `limit` previous PDUs in a given context before list of - PDUs. - - Args: - dest (str) - context (str) - event_tuples (list) - limt (int) - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug( - "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", - dest, context, repr(event_tuples), str(limit) - ) - - if not event_tuples: - # TODO: raise? - return - - subpath = "/backfill/%s/" % (context,) - - args = { - "v": event_tuples, - "limit": [str(limit)], - } - - return self._do_request_for_transaction( - dest, - subpath, - args=args, - ) - - @defer.inlineCallbacks - @log_function - def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to its destination - - Args: - transaction (Transaction) - - Returns: - Deferred: Results of the deferred is a tuple in the form of - (response_code, response_body) where the response_body is a - python dict decoded from json - """ - logger.debug( - "send_data dest=%s, txid=%s", - transaction.destination, transaction.transaction_id - ) - - if transaction.destination == self.server_name: - raise RuntimeError("Transport layer cannot send to itself!") - - # FIXME: This is only used by the tests. The actual json sent is - # generated by the json_data_callback. - json_data = transaction.get_dict() - - code, response = yield self.client.put_json( - transaction.destination, - path=PREFIX + "/send/%s/" % transaction.transaction_id, - data=json_data, - json_data_callback=json_data_callback, - ) - - logger.debug( - "send_data dest=%s, txid=%s, got response: %d", - transaction.destination, transaction.transaction_id, code - ) - - defer.returnValue((code, response)) - - @defer.inlineCallbacks - @log_function - def make_query(self, destination, query_type, args, retry_on_dns_fail): - path = PREFIX + "/query/%s" % query_type - - response = yield self.client.get_json( - destination=destination, - path=path, - args=args, - retry_on_dns_fail=retry_on_dns_fail, - ) - - defer.returnValue(response) - - @defer.inlineCallbacks - @log_function - def make_join(self, destination, context, user_id, retry_on_dns_fail=True): - path = PREFIX + "/make_join/%s/%s" % (context, user_id,) - - response = yield self.client.get_json( - destination=destination, - path=path, - retry_on_dns_fail=retry_on_dns_fail, - ) - - defer.returnValue(response) - - @defer.inlineCallbacks - @log_function - def send_join(self, destination, context, event_id, content): - path = PREFIX + "/send_join/%s/%s" % ( - context, - event_id, - ) - - code, content = yield self.client.put_json( - destination=destination, - path=path, - data=content, - ) - - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_join", code) - - defer.returnValue(json.loads(content)) - - @defer.inlineCallbacks - @log_function - def send_invite(self, destination, context, event_id, content): - path = PREFIX + "/invite/%s/%s" % ( - context, - event_id, - ) - - code, content = yield self.client.put_json( - destination=destination, - path=path, - data=content, - ) - - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) - - @defer.inlineCallbacks - @log_function - def get_event_auth(self, destination, context, event_id): - path = PREFIX + "/event_auth/%s/%s" % ( - context, - event_id, - ) - - response = yield self.client.get_json( - destination=destination, - path=path, - ) - - defer.returnValue(response) - - @defer.inlineCallbacks - def _authenticate_request(self, request): - json_request = { - "method": request.method, - "uri": request.uri, - "destination": self.server_name, - "signatures": {}, - } - - content = None - origin = None - - if request.method == "PUT": - # TODO: Handle other method types? other content types? - try: - content_bytes = request.content.read() - content = json.loads(content_bytes) - json_request["content"] = content - except: - raise SynapseError(400, "Unable to parse JSON", Codes.BAD_JSON) - - def parse_auth_header(header_str): - try: - params = auth.split(" ")[1].split(",") - param_dict = dict(kv.split("=") for kv in params) - - def strip_quotes(value): - if value.startswith("\""): - return value[1:-1] - else: - return value - - origin = strip_quotes(param_dict["origin"]) - key = strip_quotes(param_dict["key"]) - sig = strip_quotes(param_dict["sig"]) - return (origin, key, sig) - except: - raise SynapseError( - 400, "Malformed Authorization header", Codes.UNAUTHORIZED - ) - - auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") - - if not auth_headers: - raise SynapseError( - 401, "Missing Authorization headers", Codes.UNAUTHORIZED, - ) - - for auth in auth_headers: - if auth.startswith("X-Matrix"): - (origin, key, sig) = parse_auth_header(auth) - json_request["origin"] = origin - json_request["signatures"].setdefault(origin, {})[key] = sig - - if not json_request["signatures"]: - raise SynapseError( - 401, "Missing Authorization headers", Codes.UNAUTHORIZED, - ) - - yield self.keyring.verify_json_for_server(origin, json_request) - - defer.returnValue((origin, content)) - - def _with_authentication(self, handler): - @defer.inlineCallbacks - def new_handler(request, *args, **kwargs): - try: - (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) - except: - logger.exception("_authenticate_request failed") - raise - defer.returnValue(response) - return new_handler - - @log_function - def register_received_handler(self, handler): - """ Register a handler that will be fired when we receive data. - - Args: - handler (TransportReceivedHandler) - """ - self.received_handler = handler - - # This is when someone is trying to send us a bunch of data. - self.server.register_path( - "PUT", - re.compile("^" + PREFIX + "/send/([^/]*)/$"), - self._with_authentication(self._on_send_request) - ) - - @log_function - def register_request_handler(self, handler): - """ Register a handler that will be fired when we get asked for data. - - Args: - handler (TransportRequestHandler) - """ - self.request_handler = handler - - # TODO(markjh): Namespace the federation URI paths - - # This is for when someone asks us for everything since version X - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/pull/$"), - self._with_authentication( - lambda origin, content, query: - handler.on_pull_request(query["origin"][0], query["v"]) - ) - ) - - # This is when someone asks for a data item for a given server - # data_id pair. - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/event/([^/]*)/$"), - self._with_authentication( - lambda origin, content, query, event_id: - handler.on_pdu_request(origin, event_id) - ) - ) - - # This is when someone asks for all data for a given context. - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/state/([^/]*)/$"), - self._with_authentication( - lambda origin, content, query, context: - handler.on_context_state_request( - origin, - context, - query.get("event_id", [None])[0], - ) - ) - ) - - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/backfill/([^/]*)/$"), - self._with_authentication( - lambda origin, content, query, context: - self._on_backfill_request( - origin, context, query["v"], query["limit"] - ) - ) - ) - - # This is when we receive a server-server Query - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/query/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, query_type: - handler.on_query_request( - query_type, - {k: v[0].decode("utf-8") for k, v in query.items()} - ) - ) - ) - - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, user_id: - self._on_make_join_request( - origin, content, query, context, user_id - ) - ) - ) - - self.server.register_path( - "GET", - re.compile("^" + PREFIX + "/event_auth/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, event_id: - handler.on_event_auth( - origin, context, event_id, - ) - ) - ) - - self.server.register_path( - "PUT", - re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, event_id: - self._on_send_join_request( - origin, content, query, - ) - ) - ) - - self.server.register_path( - "PUT", - re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"), - self._with_authentication( - lambda origin, content, query, context, event_id: - self._on_invite_request( - origin, content, query, - ) - ) - ) - - @defer.inlineCallbacks - @log_function - def _on_send_request(self, origin, content, query, transaction_id): - """ Called on PUT /send// - - Args: - request (twisted.web.http.Request): The HTTP request. - transaction_id (str): The transaction_id associated with this - request. This is *not* None. - - Returns: - Deferred: Results in a tuple of `(code, response)`, where - `response` is a python dict to be converted into JSON that is - used as the response body. - """ - # Parse the request - try: - transaction_data = content - - logger.debug( - "Decoded %s: %s", - transaction_id, str(transaction_data) - ) - - # We should ideally be getting this from the security layer. - # origin = body["origin"] - - # Add some extra data to the transaction dict that isn't included - # in the request body. - transaction_data.update( - transaction_id=transaction_id, - destination=self.server_name - ) - - except Exception as e: - logger.exception(e) - defer.returnValue((400, {"error": "Invalid transaction"})) - return - - try: - handler = self.received_handler - code, response = yield handler.on_incoming_transaction( - transaction_data - ) - except: - logger.exception("on_incoming_transaction failed") - raise - - defer.returnValue((code, response)) - - @defer.inlineCallbacks - @log_function - def _do_request_for_transaction(self, destination, subpath, args={}): - """ - Args: - destination (str) - path (str) - args (dict): This is parsed directly to the HttpClient. - - Returns: - Deferred: Results in a dict. - """ - - data = yield self.client.get_json( - destination, - path=PREFIX + subpath, - args=args, - ) - - # Add certain keys to the JSON, ready for decoding as a Transaction - data.update( - origin=destination, - destination=self.server_name, - transaction_id=None - ) - - defer.returnValue(data) - - @log_function - def _on_backfill_request(self, origin, context, v_list, limits): - if not limits: - return defer.succeed( - (400, {"error": "Did not include limit param"}) - ) - - limit = int(limits[-1]) - - versions = v_list - - return self.request_handler.on_backfill_request( - origin, context, versions, limit - ) - - @defer.inlineCallbacks - @log_function - def _on_make_join_request(self, origin, content, query, context, user_id): - content = yield self.request_handler.on_make_join_request( - context, user_id, - ) - defer.returnValue((200, content)) - - @defer.inlineCallbacks - @log_function - def _on_send_join_request(self, origin, content, query): - content = yield self.request_handler.on_send_join_request( - origin, content, - ) - - defer.returnValue((200, content)) - - @defer.inlineCallbacks - @log_function - def _on_invite_request(self, origin, content, query): - content = yield self.request_handler.on_invite_request( - origin, content, - ) - - defer.returnValue((200, content)) diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py new file mode 100644 index 0000000000..6800ac46c5 --- /dev/null +++ b/synapse/federation/transport/__init__.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The transport layer is responsible for both sending transactions to remote +home servers and receiving a variety of requests from other home servers. + +By default this is done over HTTPS (and all home servers are required to +support HTTPS), however individual pairings of servers may decide to +communicate over a different (albeit still reliable) protocol. +""" + +from .server import TransportLayerServer +from .client import TransportLayerClient + + +class TransportLayer(TransportLayerServer, TransportLayerClient): + """This is a basic implementation of the transport layer that translates + transactions and other requests to/from HTTP. + + Attributes: + server_name (str): Local home server host + + server (synapse.http.server.HttpServer): the http server to + register listeners on + + client (synapse.http.client.HttpClient): the http client used to + send requests + + request_handler (TransportRequestHandler): The handler to fire when we + receive requests for data. + + received_handler (TransportReceivedHandler): The handler to fire when + we receive data. + """ + + def __init__(self, homeserver, server_name, server, client): + """ + Args: + server_name (str): Local home server host + server (synapse.protocol.http.HttpServer): the http server to + register listeners on + client (synapse.protocol.http.HttpClient): the http client used to + send requests + """ + self.keyring = homeserver.get_keyring() + self.server_name = server_name + self.server = server + self.client = client + self.request_handler = None + self.received_handler = None diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py new file mode 100644 index 0000000000..604ade683b --- /dev/null +++ b/synapse/federation/transport/client.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.urls import FEDERATION_PREFIX as PREFIX +from synapse.util.logutils import log_function + +import logging +import json + + +logger = logging.getLogger(__name__) + + +class TransportLayerClient(object): + """Sends federation HTTP requests to other servers""" + + @log_function + def get_context_state(self, destination, context, event_id=None): + """ Requests all state for a given context (i.e. room) from the + given server. + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_context_state dest=%s, context=%s", + destination, context) + + subpath = "/state/%s/" % context + + args = {} + if event_id: + args["event_id"] = event_id + + return self._do_request_for_transaction( + destination, subpath, args=args + ) + + @log_function + def get_event(self, destination, event_id): + """ Requests the pdu with give id and origin from the given server. + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + event_id (str): The id of the event being requested. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_pdu dest=%s, event_id=%s", + destination, event_id) + + subpath = "/event/%s/" % (event_id, ) + + return self._do_request_for_transaction(destination, subpath) + + @log_function + def backfill(self, dest, context, event_tuples, limit): + """ Requests `limit` previous PDUs in a given context before list of + PDUs. + + Args: + dest (str) + context (str) + event_tuples (list) + limt (int) + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug( + "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", + dest, context, repr(event_tuples), str(limit) + ) + + if not event_tuples: + # TODO: raise? + return + + subpath = "/backfill/%s/" % (context,) + + args = { + "v": event_tuples, + "limit": [str(limit)], + } + + return self._do_request_for_transaction( + dest, + subpath, + args=args, + ) + + @defer.inlineCallbacks + @log_function + def send_transaction(self, transaction, json_data_callback=None): + """ Sends the given Transaction to its destination + + Args: + transaction (Transaction) + + Returns: + Deferred: Results of the deferred is a tuple in the form of + (response_code, response_body) where the response_body is a + python dict decoded from json + """ + logger.debug( + "send_data dest=%s, txid=%s", + transaction.destination, transaction.transaction_id + ) + + if transaction.destination == self.server_name: + raise RuntimeError("Transport layer cannot send to itself!") + + # FIXME: This is only used by the tests. The actual json sent is + # generated by the json_data_callback. + json_data = transaction.get_dict() + + code, response = yield self.client.put_json( + transaction.destination, + path=PREFIX + "/send/%s/" % transaction.transaction_id, + data=json_data, + json_data_callback=json_data_callback, + ) + + logger.debug( + "send_data dest=%s, txid=%s, got response: %d", + transaction.destination, transaction.transaction_id, code + ) + + defer.returnValue((code, response)) + + @defer.inlineCallbacks + @log_function + def make_query(self, destination, query_type, args, retry_on_dns_fail): + path = PREFIX + "/query/%s" % query_type + + response = yield self.client.get_json( + destination=destination, + path=path, + args=args, + retry_on_dns_fail=retry_on_dns_fail, + ) + + defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def make_join(self, destination, context, user_id, retry_on_dns_fail=True): + path = PREFIX + "/make_join/%s/%s" % (context, user_id,) + + response = yield self.client.get_json( + destination=destination, + path=path, + retry_on_dns_fail=retry_on_dns_fail, + ) + + defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def send_join(self, destination, context, event_id, content): + path = PREFIX + "/send_join/%s/%s" % ( + context, + event_id, + ) + + code, content = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_join", code) + + defer.returnValue(json.loads(content)) + + @defer.inlineCallbacks + @log_function + def send_invite(self, destination, context, event_id, content): + path = PREFIX + "/invite/%s/%s" % ( + context, + event_id, + ) + + code, content = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) + + @defer.inlineCallbacks + @log_function + def get_event_auth(self, destination, context, event_id): + path = PREFIX + "/event_auth/%s/%s" % ( + context, + event_id, + ) + + response = yield self.client.get_json( + destination=destination, + path=path, + ) + + defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def _do_request_for_transaction(self, destination, subpath, args={}): + """ + Args: + destination (str) + path (str) + args (dict): This is parsed directly to the HttpClient. + + Returns: + Deferred: Results in a dict. + """ + + data = yield self.client.get_json( + destination, + path=PREFIX + subpath, + args=args, + ) + + # Add certain keys to the JSON, ready for decoding as a Transaction + data.update( + origin=destination, + destination=self.server_name, + transaction_id=None + ) + + defer.returnValue(data) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py new file mode 100644 index 0000000000..34b50def7d --- /dev/null +++ b/synapse/federation/transport/server.py @@ -0,0 +1,328 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.urls import FEDERATION_PREFIX as PREFIX +from synapse.api.errors import Codes, SynapseError +from synapse.util.logutils import log_function + +import logging +import json +import re + + +logger = logging.getLogger(__name__) + + +class TransportLayerServer(object): + """Handles incoming federation HTTP requests""" + + @defer.inlineCallbacks + def _authenticate_request(self, request): + json_request = { + "method": request.method, + "uri": request.uri, + "destination": self.server_name, + "signatures": {}, + } + + content = None + origin = None + + if request.method == "PUT": + # TODO: Handle other method types? other content types? + try: + content_bytes = request.content.read() + content = json.loads(content_bytes) + json_request["content"] = content + except: + raise SynapseError(400, "Unable to parse JSON", Codes.BAD_JSON) + + def parse_auth_header(header_str): + try: + params = auth.split(" ")[1].split(",") + param_dict = dict(kv.split("=") for kv in params) + + def strip_quotes(value): + if value.startswith("\""): + return value[1:-1] + else: + return value + + origin = strip_quotes(param_dict["origin"]) + key = strip_quotes(param_dict["key"]) + sig = strip_quotes(param_dict["sig"]) + return (origin, key, sig) + except: + raise SynapseError( + 400, "Malformed Authorization header", Codes.UNAUTHORIZED + ) + + auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") + + if not auth_headers: + raise SynapseError( + 401, "Missing Authorization headers", Codes.UNAUTHORIZED, + ) + + for auth in auth_headers: + if auth.startswith("X-Matrix"): + (origin, key, sig) = parse_auth_header(auth) + json_request["origin"] = origin + json_request["signatures"].setdefault(origin, {})[key] = sig + + if not json_request["signatures"]: + raise SynapseError( + 401, "Missing Authorization headers", Codes.UNAUTHORIZED, + ) + + yield self.keyring.verify_json_for_server(origin, json_request) + + defer.returnValue((origin, content)) + + def _with_authentication(self, handler): + @defer.inlineCallbacks + def new_handler(request, *args, **kwargs): + try: + (origin, content) = yield self._authenticate_request(request) + response = yield handler( + origin, content, request.args, *args, **kwargs + ) + except: + logger.exception("_authenticate_request failed") + raise + defer.returnValue(response) + return new_handler + + @log_function + def register_received_handler(self, handler): + """ Register a handler that will be fired when we receive data. + + Args: + handler (TransportReceivedHandler) + """ + self.received_handler = handler + + # This is when someone is trying to send us a bunch of data. + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/send/([^/]*)/$"), + self._with_authentication(self._on_send_request) + ) + + @log_function + def register_request_handler(self, handler): + """ Register a handler that will be fired when we get asked for data. + + Args: + handler (TransportRequestHandler) + """ + self.request_handler = handler + + # This is for when someone asks us for everything since version X + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/pull/$"), + self._with_authentication( + lambda origin, content, query: + handler.on_pull_request(query["origin"][0], query["v"]) + ) + ) + + # This is when someone asks for a data item for a given server + # data_id pair. + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/event/([^/]*)/$"), + self._with_authentication( + lambda origin, content, query, event_id: + handler.on_pdu_request(origin, event_id) + ) + ) + + # This is when someone asks for all data for a given context. + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/state/([^/]*)/$"), + self._with_authentication( + lambda origin, content, query, context: + handler.on_context_state_request( + origin, + context, + query.get("event_id", [None])[0], + ) + ) + ) + + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/backfill/([^/]*)/$"), + self._with_authentication( + lambda origin, content, query, context: + self._on_backfill_request( + origin, context, query["v"], query["limit"] + ) + ) + ) + + # This is when we receive a server-server Query + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/query/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, query_type: + handler.on_query_request( + query_type, + {k: v[0].decode("utf-8") for k, v in query.items()} + ) + ) + ) + + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, user_id: + self._on_make_join_request( + origin, content, query, context, user_id + ) + ) + ) + + self.server.register_path( + "GET", + re.compile("^" + PREFIX + "/event_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + handler.on_event_auth( + origin, context, event_id, + ) + ) + ) + + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + self._on_send_join_request( + origin, content, query, + ) + ) + ) + + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + self._on_invite_request( + origin, content, query, + ) + ) + ) + + @defer.inlineCallbacks + @log_function + def _on_send_request(self, origin, content, query, transaction_id): + """ Called on PUT /send// + + Args: + request (twisted.web.http.Request): The HTTP request. + transaction_id (str): The transaction_id associated with this + request. This is *not* None. + + Returns: + Deferred: Results in a tuple of `(code, response)`, where + `response` is a python dict to be converted into JSON that is + used as the response body. + """ + # Parse the request + try: + transaction_data = content + + logger.debug( + "Decoded %s: %s", + transaction_id, str(transaction_data) + ) + + # We should ideally be getting this from the security layer. + # origin = body["origin"] + + # Add some extra data to the transaction dict that isn't included + # in the request body. + transaction_data.update( + transaction_id=transaction_id, + destination=self.server_name + ) + + except Exception as e: + logger.exception(e) + defer.returnValue((400, {"error": "Invalid transaction"})) + return + + try: + handler = self.received_handler + code, response = yield handler.on_incoming_transaction( + transaction_data + ) + except: + logger.exception("on_incoming_transaction failed") + raise + + defer.returnValue((code, response)) + + + @log_function + def _on_backfill_request(self, origin, context, v_list, limits): + if not limits: + return defer.succeed( + (400, {"error": "Did not include limit param"}) + ) + + limit = int(limits[-1]) + + versions = v_list + + return self.request_handler.on_backfill_request( + origin, context, versions, limit + ) + + @defer.inlineCallbacks + @log_function + def _on_make_join_request(self, origin, content, query, context, user_id): + content = yield self.request_handler.on_make_join_request( + context, user_id, + ) + defer.returnValue((200, content)) + + @defer.inlineCallbacks + @log_function + def _on_send_join_request(self, origin, content, query): + content = yield self.request_handler.on_send_join_request( + origin, content, + ) + + defer.returnValue((200, content)) + + @defer.inlineCallbacks + @log_function + def _on_invite_request(self, origin, content, query): + content = yield self.request_handler.on_invite_request( + origin, content, + ) + + defer.returnValue((200, content)) -- cgit 1.4.1 From 2408c4b0a49bf21f1a84f3ac6549d30fd53bc5d4 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 16 Jan 2015 18:20:19 +0000 Subject: Fold _do_request_for_transaction into the methods that called it since it was a trivial wrapper around client.get_json --- synapse/federation/transport/client.py | 55 +++++++--------------------------- 1 file changed, 11 insertions(+), 44 deletions(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 604ade683b..d61ff192eb 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -29,7 +29,7 @@ class TransportLayerClient(object): """Sends federation HTTP requests to other servers""" @log_function - def get_context_state(self, destination, context, event_id=None): + def get_context_state(self, destination, context, event_id): """ Requests all state for a given context (i.e. room) from the given server. @@ -37,6 +37,7 @@ class TransportLayerClient(object): destination (str): The host name of the remote home server we want to get the state from. context (str): The name of the context we want the state of + event_id (str): The event we want the context at. Returns: Deferred: Results in a dict received from the remote homeserver. @@ -44,14 +45,9 @@ class TransportLayerClient(object): logger.debug("get_context_state dest=%s, context=%s", destination, context) - subpath = "/state/%s/" % context - - args = {} - if event_id: - args["event_id"] = event_id - - return self._do_request_for_transaction( - destination, subpath, args=args + path = PREFIX + "/state/%s/" % context + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, ) @log_function @@ -69,9 +65,8 @@ class TransportLayerClient(object): logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id) - subpath = "/event/%s/" % (event_id, ) - - return self._do_request_for_transaction(destination, subpath) + path = PREFIX + "/event/%s/" % (event_id, ) + return self.client.get_json(destination, path=path) @log_function def backfill(self, dest, context, event_tuples, limit): @@ -96,16 +91,16 @@ class TransportLayerClient(object): # TODO: raise? return - subpath = "/backfill/%s/" % (context,) + path = PREFIX + "/backfill/%s/" % (context,) args = { "v": event_tuples, "limit": [str(limit)], } - return self._do_request_for_transaction( - dest, - subpath, + return self.client.get_json( + destination, + path=path, args=args, ) @@ -227,31 +222,3 @@ class TransportLayerClient(object): ) defer.returnValue(response) - - @defer.inlineCallbacks - @log_function - def _do_request_for_transaction(self, destination, subpath, args={}): - """ - Args: - destination (str) - path (str) - args (dict): This is parsed directly to the HttpClient. - - Returns: - Deferred: Results in a dict. - """ - - data = yield self.client.get_json( - destination, - path=PREFIX + subpath, - args=args, - ) - - # Add certain keys to the JSON, ready for decoding as a Transaction - data.update( - origin=destination, - destination=self.server_name, - transaction_id=None - ) - - defer.returnValue(data) -- cgit 1.4.1 From 5fed04264056263e10b920a917a3a40f88e7e820 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 16 Jan 2015 18:59:04 +0000 Subject: Finish renaming "context" to "room_id" in federation codebase --- synapse/federation/replication.py | 94 +++++++++++++--------------------- synapse/federation/transport/client.py | 47 +++++++---------- synapse/federation/transport/server.py | 1 - synapse/handlers/_base.py | 4 +- synapse/handlers/federation.py | 10 ++-- synapse/http/matrixfederationclient.py | 1 - tests/handlers/test_room.py | 4 +- 7 files changed, 62 insertions(+), 99 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a4c29b484b..6620532a60 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -256,23 +256,21 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_state_for_context(self, destination, context, event_id): - """Requests all of the `current` state PDUs for a given context from + def get_state_for_room(self, destination, room_id, event_id): + """Requests all of the `current` state PDUs for a given room from a remote home server. Args: destination (str): The remote homeserver to query for the state. - context (str): The context we're interested in. + room_id (str): The id of the room we're interested in. event_id (str): The id of the event we want the state at. Returns: Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_context_state( - destination, - context, - event_id=event_id, + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, ) pdus = [ @@ -288,9 +286,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def get_event_auth(self, destination, context, event_id): + def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth( - destination, context, event_id, + destination, room_id, event_id, ) auth_chain = [ @@ -304,9 +302,9 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_backfill_request(self, origin, context, versions, limit): + def on_backfill_request(self, origin, room_id, versions, limit): pdus = yield self.handler.on_backfill_request( - origin, context, versions, limit + origin, room_id, versions, limit ) defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) @@ -380,12 +378,10 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def on_context_state_request(self, origin, context, event_id): + def on_context_state_request(self, origin, room_id, event_id): if event_id: pdus = yield self.handler.get_state_for_pdu( - origin, - context, - event_id, + origin, room_id, event_id, ) auth_chain = yield self.store.get_auth_chain( [pdu.event_id for pdu in pdus] @@ -413,7 +409,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function def on_pull_request(self, origin, versions): - raise NotImplementedError("Pull transacions not implemented") + raise NotImplementedError("Pull transactions not implemented") @defer.inlineCallbacks def on_query_request(self, query_type, args): @@ -422,30 +418,21 @@ class ReplicationLayer(object): defer.returnValue((200, response)) else: defer.returnValue( - (404, "No handler for Query type '%s'" % (query_type, )) + (404, "No handler for Query type '%s'" % (query_type,)) ) @defer.inlineCallbacks - def on_make_join_request(self, context, user_id): - pdu = yield self.handler.on_make_join_request(context, user_id) + def on_make_join_request(self, room_id, user_id): + pdu = yield self.handler.on_make_join_request(room_id, user_id) time_now = self._clock.time_msec() - defer.returnValue({ - "event": pdu.get_pdu_json(time_now), - }) + defer.returnValue({"event": pdu.get_pdu_json(time_now)}) @defer.inlineCallbacks def on_invite_request(self, origin, content): pdu = self.event_from_pdu_json(content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) time_now = self._clock.time_msec() - defer.returnValue( - ( - 200, - { - "event": ret_pdu.get_pdu_json(time_now), - } - ) - ) + defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -462,26 +449,17 @@ class ReplicationLayer(object): })) @defer.inlineCallbacks - def on_event_auth(self, origin, context, event_id): + def on_event_auth(self, origin, room_id, event_id): time_now = self._clock.time_msec() auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue( - ( - 200, - { - "auth_chain": [ - a.get_pdu_json(time_now) for a in auth_pdus - ], - } - ) - ) + defer.returnValue((200, { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + })) @defer.inlineCallbacks - def make_join(self, destination, context, user_id): + def make_join(self, destination, room_id, user_id): ret = yield self.transport_layer.make_join( - destination=destination, - context=context, - user_id=user_id, + destination, room_id, user_id ) pdu_dict = ret["event"] @@ -494,10 +472,10 @@ class ReplicationLayer(object): def send_join(self, destination, pdu): time_now = self._clock.time_msec() _, content = yield self.transport_layer.send_join( - destination, - pdu.room_id, - pdu.event_id, - pdu.get_pdu_json(time_now), + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), ) logger.debug("Got content: %s", content) @@ -507,9 +485,6 @@ class ReplicationLayer(object): for p in content.get("state", []) ] - # FIXME: We probably want to do something with the auth_chain given - # to us - auth_chain = [ self.event_from_pdu_json(p, outlier=True) for p in content.get("auth_chain", []) @@ -523,11 +498,11 @@ class ReplicationLayer(object): }) @defer.inlineCallbacks - def send_invite(self, destination, context, event_id, pdu): + def send_invite(self, destination, room_id, event_id, pdu): time_now = self._clock.time_msec() code, content = yield self.transport_layer.send_invite( destination=destination, - context=context, + room_id=room_id, event_id=event_id, content=pdu.get_pdu_json(time_now), ) @@ -657,7 +632,7 @@ class ReplicationLayer(object): "_handle_new_pdu getting state for %s", pdu.room_id ) - state, auth_chain = yield self.get_state_for_context( + state, auth_chain = yield self.get_state_for_room( origin, pdu.room_id, pdu.event_id, ) @@ -816,7 +791,7 @@ class _TransactionQueue(object): logger.info("TX [%s] is ready for retry", destination) logger.info("TX [%s] _attempt_new_transaction", destination) - + if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -830,14 +805,15 @@ class _TransactionQueue(object): pending_failures = self.pending_failures_by_dest.pop(destination, []) if pending_pdus: - logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) + logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) if not pending_pdus and not pending_edus and not pending_failures: return logger.debug( - "TX [%s] Attempting new transaction " - "(pdus: %d, edus: %d, failures: %d)", + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", destination, len(pending_pdus), len(pending_edus), diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d61ff192eb..e634a3a213 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -29,9 +29,9 @@ class TransportLayerClient(object): """Sends federation HTTP requests to other servers""" @log_function - def get_context_state(self, destination, context, event_id): - """ Requests all state for a given context (i.e. room) from the - given server. + def get_room_state(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Args: destination (str): The host name of the remote home server we want @@ -42,10 +42,10 @@ class TransportLayerClient(object): Returns: Deferred: Results in a dict received from the remote homeserver. """ - logger.debug("get_context_state dest=%s, context=%s", - destination, context) + logger.debug("get_room_state dest=%s, room=%s", + destination, room_id) - path = PREFIX + "/state/%s/" % context + path = PREFIX + "/state/%s/" % room_id return self.client.get_json( destination, path=path, args={"event_id": event_id}, ) @@ -69,13 +69,13 @@ class TransportLayerClient(object): return self.client.get_json(destination, path=path) @log_function - def backfill(self, dest, context, event_tuples, limit): + def backfill(self, destination, room_id, event_tuples, limit): """ Requests `limit` previous PDUs in a given context before list of PDUs. Args: dest (str) - context (str) + room_id (str) event_tuples (list) limt (int) @@ -83,15 +83,15 @@ class TransportLayerClient(object): Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, context=%s, event_tuples=%s, limit=%s", - dest, context, repr(event_tuples), str(limit) + "backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s", + destination, room_id, repr(event_tuples), str(limit) ) if not event_tuples: # TODO: raise? return - path = PREFIX + "/backfill/%s/" % (context,) + path = PREFIX + "/backfill/%s/" % (room_id,) args = { "v": event_tuples, @@ -159,8 +159,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def make_join(self, destination, context, user_id, retry_on_dns_fail=True): - path = PREFIX + "/make_join/%s/%s" % (context, user_id,) + def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): + path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) response = yield self.client.get_json( destination=destination, @@ -172,11 +172,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_join(self, destination, context, event_id, content): - path = PREFIX + "/send_join/%s/%s" % ( - context, - event_id, - ) + def send_join(self, destination, room_id, event_id, content): + path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) code, content = yield self.client.put_json( destination=destination, @@ -191,11 +188,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_invite(self, destination, context, event_id, content): - path = PREFIX + "/invite/%s/%s" % ( - context, - event_id, - ) + def send_invite(self, destination, room_id, event_id, content): + path = PREFIX + "/invite/%s/%s" % (room_id, event_id) code, content = yield self.client.put_json( destination=destination, @@ -210,11 +204,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def get_event_auth(self, destination, context, event_id): - path = PREFIX + "/event_auth/%s/%s" % ( - context, - event_id, - ) + def get_event_auth(self, destination, room_id, event_id): + path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) response = yield self.client.get_json( destination=destination, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 34b50def7d..a380a6910b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -285,7 +285,6 @@ class TransportLayerServer(object): defer.returnValue((code, response)) - @log_function def _on_backfill_request(self, origin, context, v_list, limits): if not limits: diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 38af034b4d..f33d17a31e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -144,7 +144,5 @@ class BaseHandler(object): yield self.notifier.on_new_room_event(event, extra_users=extra_users) yield federation_handler.handle_new_event( - event, - None, - destinations=destinations, + event, destinations=destinations, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 195f7c618a..81203bf1a3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -75,14 +75,14 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def handle_new_event(self, event, snapshot, destinations): + def handle_new_event(self, event, destinations): """ Takes in an event from the client to server side, that has already been authed and handled by the state module, and sends it to any remote home servers that may be interested. Args: - event - snapshot (.storage.Snapshot): THe snapshot the event happened after + event: The event to send + destinations: A list of destinations to send it to Returns: Deferred: Resolved when it has successfully been queued for @@ -154,7 +154,7 @@ class FederationHandler(BaseHandler): replication = self.replication_layer if not state: - state, auth_chain = yield replication.get_state_for_context( + state, auth_chain = yield replication.get_state_for_room( origin, context=event.room_id, event_id=event.event_id, ) @@ -281,7 +281,7 @@ class FederationHandler(BaseHandler): """ pdu = yield self.replication_layer.send_invite( destination=target_host, - context=event.room_id, + room_id=event.room_id, event_id=event.event_id, pdu=event ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index aa14782b0f..1dda3ba2c7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -72,7 +72,6 @@ class MatrixFederationHttpClient(object): requests. """ - def __init__(self, hs): self.hs = hs self.signing_key = hs.config.signing_key[0] diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 0cb8aa4fbc..d3253b48b8 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -223,7 +223,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): yield room_handler.change_membership(event, context) self.federation.handle_new_event.assert_called_once_with( - event, None, destinations=set() + event, destinations=set() ) self.datastore.persist_event.assert_called_once_with( @@ -301,7 +301,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): yield room_handler.change_membership(event, context) self.federation.handle_new_event.assert_called_once_with( - event, None, destinations=set(['red']) + event, destinations=set(['red']) ) self.datastore.persist_event.assert_called_once_with( -- cgit 1.4.1 From 3e85e52b3f0e9330f29ec3d0f572db7b122c88b0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Jan 2015 15:26:19 +0000 Subject: Allow ':memory:' as the database path for sqlite3 --- synapse/app/homeserver.py | 8 +++++++- synapse/config/database.py | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 43b5c26144..61ad53fbb2 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -247,7 +247,13 @@ def setup(): logger.info("Database prepared in %s.", db_name) - hs.get_db_pool() + db_pool = hs.get_db_pool() + + if db_name == ":memory:" + # Memory databases will need to be setup each time they are opened. + reactor.callWhenRunning( + hs.get_db_pool().runWithConnection, prepare_database + ) if config.manhole: f = twisted.manhole.telnet.ShellFactory() diff --git a/synapse/config/database.py b/synapse/config/database.py index 0d33583a7d..daa161c952 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -20,7 +20,10 @@ import os class DatabaseConfig(Config): def __init__(self, args): super(DatabaseConfig, self).__init__(args) - self.database_path = self.abspath(args.database_path) + if args.database_path == ":memory:": + self.database_path = ":memory:" + else: + self.database_path = self.abspath(args.database_path) @classmethod def add_arguments(cls, parser): -- cgit 1.4.1 From 00e9c08609eb498d783c3812d9415f2706029559 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Jan 2015 15:30:48 +0000 Subject: Fix syntax --- synapse/app/homeserver.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 61ad53fbb2..f00b06aa7f 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -241,7 +241,8 @@ def setup(): except UpgradeDatabaseException: sys.stderr.write( "\nFailed to upgrade database.\n" - "Have you checked for version specific instructions in UPGRADES.rst?\n" + "Have you checked for version specific instructions in" + " UPGRADES.rst?\n" ) sys.exit(1) @@ -249,7 +250,7 @@ def setup(): db_pool = hs.get_db_pool() - if db_name == ":memory:" + if db_name == ":memory:": # Memory databases will need to be setup each time they are opened. reactor.callWhenRunning( hs.get_db_pool().runWithConnection, prepare_database -- cgit 1.4.1 From 42529cbcedbea7c7f7347c793f113e2cbc7c73eb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Jan 2015 15:33:04 +0000 Subject: Fix pyflakes errors --- synapse/app/homeserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index f00b06aa7f..afe3d19760 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -253,7 +253,7 @@ def setup(): if db_name == ":memory:": # Memory databases will need to be setup each time they are opened. reactor.callWhenRunning( - hs.get_db_pool().runWithConnection, prepare_database + db_pool.runWithConnection, prepare_database ) if config.manhole: -- cgit 1.4.1 From dc70d1fef8c2f2f68c598c75e9808b6bed0873f6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Jan 2015 16:24:54 +0000 Subject: Only start the notifier timeout once we've had a chance to check for updates. Otherwise the timeout could fire while we are waiting for the database to return any updates it might have --- synapse/notifier.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index b9d52d0c4c..3aec1d4af2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -244,14 +244,14 @@ class Notifier(object): ) if timeout: - self.clock.call_later(timeout/1000.0, _timeout_listener) - self._register_with_keys(listener) yield self._check_for_updates(listener) if not timeout: _timeout_listener() + else: + self.clock.call_later(timeout/1000.0, _timeout_listener) return -- cgit 1.4.1 From dbe71e670c9f7068591b61f0975fdf5225a2cf3f Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 21 Jan 2015 16:58:16 +0000 Subject: Use common base class for two Presence unit-tests, avoiding boilerplate copypasta --- tests/handlers/test_presence.py | 79 +++++++++++++---------------------------- 1 file changed, 25 insertions(+), 54 deletions(-) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index b85a89052a..e96b73f977 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -59,23 +59,29 @@ class JustPresenceHandlers(object): def __init__(self, hs): self.presence_handler = PresenceHandler(hs) -class PresenceStateTestCase(unittest.TestCase): - """ Tests presence management. """ +class PresenceTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): db_pool = SQLiteMemoryDbPool() yield db_pool.prepare() + self.clock = MockClock() + self.mock_config = NonCallableMock() self.mock_config.signing_key = [MockKey()] + self.mock_federation_resource = MockHttpResource() + + self.mock_http_client = Mock(spec=[]) + self.mock_http_client.put_json = DeferredMockCallable() + hs = HomeServer("test", - clock=MockClock(), + clock=self.clock, db_pool=db_pool, handlers=None, - resource_for_federation=Mock(), - http_client=None, + resource_for_federation=self.mock_federation_resource, + http_client=self.mock_http_client, config=self.mock_config, keyring=Mock(), ) @@ -92,11 +98,19 @@ class PresenceStateTestCase(unittest.TestCase): self.u_banana = hs.parse_userid("@banana:test") self.u_clementine = hs.parse_userid("@clementine:test") - yield self.store.create_presence(self.u_apple.localpart) + for u in self.u_apple, self.u_banana, self.u_clementine: + yield self.store.create_presence(u.localpart) + yield self.store.set_presence_state( self.u_apple.localpart, {"state": ONLINE, "status_msg": "Online"} ) + # ID of a local user that does not exist + self.u_durian = hs.parse_userid("@durian:test") + + # A remote user + self.u_cabbage = hs.parse_userid("@cabbage:elsewhere") + self.handler = hs.get_handlers().presence_handler self.room_members = [] @@ -128,6 +142,10 @@ class PresenceStateTestCase(unittest.TestCase): self.handler.start_polling_presence = self.mock_start self.handler.stop_polling_presence = self.mock_stop + +class PresenceStateTestCase(PresenceTestCase): + """ Tests presence management. """ + @defer.inlineCallbacks def test_get_my_state(self): state = yield self.handler.get_state( @@ -206,56 +224,9 @@ class PresenceStateTestCase(unittest.TestCase): self.mock_stop.assert_called_with(self.u_apple) -class PresenceInvitesTestCase(unittest.TestCase): +class PresenceInvitesTestCase(PresenceTestCase): """ Tests presence management. """ - @defer.inlineCallbacks - def setUp(self): - self.mock_http_client = Mock(spec=[]) - self.mock_http_client.put_json = DeferredMockCallable() - - self.mock_federation_resource = MockHttpResource() - - db_pool = SQLiteMemoryDbPool() - yield db_pool.prepare() - - self.mock_config = NonCallableMock() - self.mock_config.signing_key = [MockKey()] - - hs = HomeServer("test", - clock=MockClock(), - db_pool=db_pool, - handlers=None, - resource_for_client=Mock(), - resource_for_federation=self.mock_federation_resource, - http_client=self.mock_http_client, - config=self.mock_config, - keyring=Mock(), - ) - hs.handlers = JustPresenceHandlers(hs) - - self.store = hs.get_datastore() - - # Some local users to test with - self.u_apple = hs.parse_userid("@apple:test") - self.u_banana = hs.parse_userid("@banana:test") - yield self.store.create_presence(self.u_apple.localpart) - yield self.store.create_presence(self.u_banana.localpart) - - # ID of a local user that does not exist - self.u_durian = hs.parse_userid("@durian:test") - - # A remote user - self.u_cabbage = hs.parse_userid("@cabbage:elsewhere") - - self.handler = hs.get_handlers().presence_handler - - self.mock_start = Mock() - self.mock_stop = Mock() - - self.handler.start_polling_presence = self.mock_start - self.handler.stop_polling_presence = self.mock_stop - @defer.inlineCallbacks def test_invite_local(self): # TODO(paul): This test will likely break if/when real auth permissions -- cgit 1.4.1 From 73315ce9de1612ad936b490fe164d9eb61a51b34 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 21 Jan 2015 20:01:57 +0000 Subject: Abstract out the room ID from presence tests, so it's stored in self --- tests/handlers/test_presence.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index e96b73f977..c309fbb054 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -113,17 +113,18 @@ class PresenceTestCase(unittest.TestCase): self.handler = hs.get_handlers().presence_handler + self.room_id = "a-room" self.room_members = [] def get_rooms_for_user(user): if user in self.room_members: - return defer.succeed(["a-room"]) + return defer.succeed([self.room_id]) else: return defer.succeed([]) room_member_handler.get_rooms_for_user = get_rooms_for_user def get_room_members(room_id): - if room_id == "a-room": + if room_id == self.room_id: return defer.succeed(self.room_members) else: return defer.succeed([]) @@ -529,24 +530,25 @@ class PresencePushTestCase(unittest.TestCase): ]) self.room_member_handler = hs.handlers.room_member_handler + self.room_id = "a-room" self.room_members = [] def get_rooms_for_user(user): if user in self.room_members: - return defer.succeed(["a-room"]) + return defer.succeed([self.room_id]) else: return defer.succeed([]) self.room_member_handler.get_rooms_for_user = get_rooms_for_user def get_room_members(room_id): - if room_id == "a-room": + if room_id == self.room_id: return defer.succeed(self.room_members) else: return defer.succeed([]) self.room_member_handler.get_room_members = get_room_members def get_room_hosts(room_id): - if room_id == "a-room": + if room_id == self.room_id: hosts = set([u.domain for u in self.room_members]) return defer.succeed(hosts) else: @@ -882,7 +884,7 @@ class PresencePushTestCase(unittest.TestCase): ) yield self.distributor.fire("user_joined_room", self.u_clementine, - "a-room" + self.room_id ) self.room_members.append(self.u_clementine) @@ -945,7 +947,7 @@ class PresencePushTestCase(unittest.TestCase): self.room_members = [self.u_apple, self.u_banana] yield self.distributor.fire("user_joined_room", self.u_potato, - "a-room" + self.room_id ) yield put_json.await_calls() @@ -974,7 +976,7 @@ class PresencePushTestCase(unittest.TestCase): self.room_members.append(self.u_potato) yield self.distributor.fire("user_joined_room", self.u_clementine, - "a-room" + self.room_id ) put_json.await_calls() -- cgit 1.4.1 From f4ce61ed36e7b639550953882b6ea14171108784 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 15 Jan 2015 16:57:00 +0000 Subject: Move scripts into scripts --- UPGRADE.rst | 4 ++-- database-prepare-for-0.0.1.sh | 21 --------------------- database-prepare-for-0.5.0.sh | 21 --------------------- database-save.sh | 16 ---------------- nuke-room-from-db.sh | 24 ------------------------ scripts/database-prepare-for-0.0.1.sh | 21 +++++++++++++++++++++ scripts/database-prepare-for-0.5.0.sh | 21 +++++++++++++++++++++ scripts/database-save.sh | 16 ++++++++++++++++ scripts/nuke-room-from-db.sh | 24 ++++++++++++++++++++++++ scripts/sphinx_api_docs.sh | 1 + sphinx_api_docs.sh | 1 - 11 files changed, 85 insertions(+), 85 deletions(-) delete mode 100755 database-prepare-for-0.0.1.sh delete mode 100755 database-prepare-for-0.5.0.sh delete mode 100755 database-save.sh delete mode 100755 nuke-room-from-db.sh create mode 100755 scripts/database-prepare-for-0.0.1.sh create mode 100755 scripts/database-prepare-for-0.5.0.sh create mode 100755 scripts/database-save.sh create mode 100755 scripts/nuke-room-from-db.sh create mode 100644 scripts/sphinx_api_docs.sh delete mode 100644 sphinx_api_docs.sh diff --git a/UPGRADE.rst b/UPGRADE.rst index 9618ad2d57..0f81f3e11f 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -52,7 +52,7 @@ resulting conflicts during the upgrade process. Before running the command the homeserver should be first completely shutdown. To run it, simply specify the location of the database, e.g.: - ./database-prepare-for-0.5.0.sh "homeserver.db" + ./scripts/database-prepare-for-0.5.0.sh "homeserver.db" Once this has successfully completed it will be safe to restart the homeserver. You may notice that the homeserver takes a few seconds longer to @@ -147,7 +147,7 @@ rooms the home server was a member of and room alias mappings. Before running the command the homeserver should be first completely shutdown. To run it, simply specify the location of the database, e.g.: - ./database-prepare-for-0.0.1.sh "homeserver.db" + ./scripts/database-prepare-for-0.0.1.sh "homeserver.db" Once this has successfully completed it will be safe to restart the homeserver. You may notice that the homeserver takes a few seconds longer to diff --git a/database-prepare-for-0.0.1.sh b/database-prepare-for-0.0.1.sh deleted file mode 100755 index 43d759a5cd..0000000000 --- a/database-prepare-for-0.0.1.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash - -# This is will prepare a synapse database for running with v0.0.1 of synapse. -# It will store all the user information, but will *delete* all messages and -# room data. - -set -e - -cp "$1" "$1.bak" - -DUMP=$(sqlite3 "$1" << 'EOF' -.dump users -.dump access_tokens -.dump presence -.dump profiles -EOF -) - -rm "$1" - -sqlite3 "$1" <<< "$DUMP" diff --git a/database-prepare-for-0.5.0.sh b/database-prepare-for-0.5.0.sh deleted file mode 100755 index e824cb583e..0000000000 --- a/database-prepare-for-0.5.0.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash - -# This is will prepare a synapse database for running with v0.5.0 of synapse. -# It will store all the user information, but will *delete* all messages and -# room data. - -set -e - -cp "$1" "$1.bak" - -DUMP=$(sqlite3 "$1" << 'EOF' -.dump users -.dump access_tokens -.dump presence -.dump profiles -EOF -) - -rm "$1" - -sqlite3 "$1" <<< "$DUMP" diff --git a/database-save.sh b/database-save.sh deleted file mode 100755 index 040c8a4943..0000000000 --- a/database-save.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -# This script will write a dump file of local user state if you want to splat -# your entire server database and start again but preserve the identity of -# local users and their access tokens. -# -# To restore it, use -# -# $ sqlite3 homeserver.db < table-save.sql - -sqlite3 "$1" <<'EOF' >table-save.sql -.dump users -.dump access_tokens -.dump presence -.dump profiles -EOF diff --git a/nuke-room-from-db.sh b/nuke-room-from-db.sh deleted file mode 100755 index 58c036c896..0000000000 --- a/nuke-room-from-db.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash - -## CAUTION: -## This script will remove (hopefully) all trace of the given room ID from -## your homeserver.db - -## Do not run it lightly. - -ROOMID="$1" - -sqlite3 homeserver.db <table-save.sql +.dump users +.dump access_tokens +.dump presence +.dump profiles +EOF diff --git a/scripts/nuke-room-from-db.sh b/scripts/nuke-room-from-db.sh new file mode 100755 index 0000000000..58c036c896 --- /dev/null +++ b/scripts/nuke-room-from-db.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +## CAUTION: +## This script will remove (hopefully) all trace of the given room ID from +## your homeserver.db + +## Do not run it lightly. + +ROOMID="$1" + +sqlite3 homeserver.db < Date: Thu, 15 Jan 2015 16:51:52 +0000 Subject: Remove jsfiddles --- jsfiddles/create_room_send_msg/demo.css | 17 -- jsfiddles/create_room_send_msg/demo.html | 30 --- jsfiddles/create_room_send_msg/demo.js | 113 ----------- jsfiddles/event_stream/demo.css | 17 -- jsfiddles/event_stream/demo.html | 23 --- jsfiddles/event_stream/demo.js | 145 -------------- jsfiddles/example_app/demo.css | 43 ---- jsfiddles/example_app/demo.details | 7 - jsfiddles/example_app/demo.html | 56 ------ jsfiddles/example_app/demo.js | 327 ------------------------------- jsfiddles/register_login/demo.css | 7 - jsfiddles/register_login/demo.html | 20 -- jsfiddles/register_login/demo.js | 79 -------- jsfiddles/room_memberships/demo.css | 17 -- jsfiddles/room_memberships/demo.html | 37 ---- jsfiddles/room_memberships/demo.js | 141 ------------- 16 files changed, 1079 deletions(-) delete mode 100644 jsfiddles/create_room_send_msg/demo.css delete mode 100644 jsfiddles/create_room_send_msg/demo.html delete mode 100644 jsfiddles/create_room_send_msg/demo.js delete mode 100644 jsfiddles/event_stream/demo.css delete mode 100644 jsfiddles/event_stream/demo.html delete mode 100644 jsfiddles/event_stream/demo.js delete mode 100644 jsfiddles/example_app/demo.css delete mode 100644 jsfiddles/example_app/demo.details delete mode 100644 jsfiddles/example_app/demo.html delete mode 100644 jsfiddles/example_app/demo.js delete mode 100644 jsfiddles/register_login/demo.css delete mode 100644 jsfiddles/register_login/demo.html delete mode 100644 jsfiddles/register_login/demo.js delete mode 100644 jsfiddles/room_memberships/demo.css delete mode 100644 jsfiddles/room_memberships/demo.html delete mode 100644 jsfiddles/room_memberships/demo.js diff --git a/jsfiddles/create_room_send_msg/demo.css b/jsfiddles/create_room_send_msg/demo.css deleted file mode 100644 index 48a55f372d..0000000000 --- a/jsfiddles/create_room_send_msg/demo.css +++ /dev/null @@ -1,17 +0,0 @@ -.loggedin { - visibility: hidden; -} - -p { - font-family: monospace; -} - -table -{ - border-spacing:5px; -} - -th,td -{ - padding:5px; -} diff --git a/jsfiddles/create_room_send_msg/demo.html b/jsfiddles/create_room_send_msg/demo.html deleted file mode 100644 index 088ff7ac0f..0000000000 --- a/jsfiddles/create_room_send_msg/demo.html +++ /dev/null @@ -1,30 +0,0 @@ -
-

This room creation / message sending demo requires a home server to be running on http://localhost:8008

-
-
- - - -
-
-
- - -
-
- - - -
- - - - - - - - - -
Room IDMy stateRoom AliasLatest message
-
- diff --git a/jsfiddles/create_room_send_msg/demo.js b/jsfiddles/create_room_send_msg/demo.js deleted file mode 100644 index 9c346e2f64..0000000000 --- a/jsfiddles/create_room_send_msg/demo.js +++ /dev/null @@ -1,113 +0,0 @@ -var accountInfo = {}; - -var showLoggedIn = function(data) { - accountInfo = data; - getCurrentRoomList(); - $(".loggedin").css({visibility: "visible"}); -}; - -$('.login').live('click', function() { - var user = $("#userLogin").val(); - var password = $("#passwordLogin").val(); - $.ajax({ - url: "http://localhost:8008/_matrix/client/api/v1/login", - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify({ user: user, password: password, type: "m.login.password" }), - dataType: "json", - success: function(data) { - showLoggedIn(data); - }, - error: function(err) { - var errMsg = "To try this, you need a home server running!"; - var errJson = $.parseJSON(err.responseText); - if (errJson) { - errMsg = JSON.stringify(errJson); - } - alert(errMsg); - } - }); -}); - -var getCurrentRoomList = function() { - var url = "http://localhost:8008/_matrix/client/api/v1/initialSync?access_token=" + accountInfo.access_token + "&limit=1"; - $.getJSON(url, function(data) { - var rooms = data.rooms; - for (var i=0; i 0) { - data.room_alias_name = roomAlias; - } - $.ajax({ - url: "http://localhost:8008/_matrix/client/api/v1/createRoom?access_token="+accountInfo.access_token, - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify(data), - dataType: "json", - success: function(data) { - data.membership = "join"; // you are automatically joined into every room you make. - data.latest_message = ""; - addRoom(data); - }, - error: function(err) { - alert(JSON.stringify($.parseJSON(err.responseText))); - } - }); -}); - -var addRoom = function(data) { - row = "" + - ""+data.room_id+"" + - ""+data.membership+"" + - ""+data.room_alias+"" + - ""+data.latest_message+"" + - ""; - $("#rooms").append(row); -}; - -$('.sendMessage').live('click', function() { - var roomId = $("#roomId").val(); - var body = $("#messageBody").val(); - var msgId = $.now(); - - if (roomId.length === 0 || body.length === 0) { - return; - } - - var url = "http://localhost:8008/_matrix/client/api/v1/rooms/$roomid/send/m.room.message?access_token=$token"; - url = url.replace("$token", accountInfo.access_token); - url = url.replace("$roomid", encodeURIComponent(roomId)); - - var data = { - msgtype: "m.text", - body: body - }; - - $.ajax({ - url: url, - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify(data), - dataType: "json", - success: function(data) { - $("#messageBody").val(""); - // wipe the table and reload it. Using the event stream would be the best - // solution but that is out of scope of this fiddle. - $("#rooms").find("tr:gt(0)").remove(); - getCurrentRoomList(); - }, - error: function(err) { - alert(JSON.stringify($.parseJSON(err.responseText))); - } - }); -}); diff --git a/jsfiddles/event_stream/demo.css b/jsfiddles/event_stream/demo.css deleted file mode 100644 index 48a55f372d..0000000000 --- a/jsfiddles/event_stream/demo.css +++ /dev/null @@ -1,17 +0,0 @@ -.loggedin { - visibility: hidden; -} - -p { - font-family: monospace; -} - -table -{ - border-spacing:5px; -} - -th,td -{ - padding:5px; -} diff --git a/jsfiddles/event_stream/demo.html b/jsfiddles/event_stream/demo.html deleted file mode 100644 index 7657780d28..0000000000 --- a/jsfiddles/event_stream/demo.html +++ /dev/null @@ -1,23 +0,0 @@ -
-

This event stream demo requires a home server to be running on http://localhost:8008

-
-
- - - -
-
-
- -
-

- - - - - - - -
Room IDLatest message
-
- diff --git a/jsfiddles/event_stream/demo.js b/jsfiddles/event_stream/demo.js deleted file mode 100644 index acba8391fa..0000000000 --- a/jsfiddles/event_stream/demo.js +++ /dev/null @@ -1,145 +0,0 @@ -var accountInfo = {}; - -var eventStreamInfo = { - from: "END" -}; - -var roomInfo = []; - -var longpollEventStream = function() { - var url = "http://localhost:8008/_matrix/client/api/v1/events?access_token=$token&from=$from"; - url = url.replace("$token", accountInfo.access_token); - url = url.replace("$from", eventStreamInfo.from); - - $.getJSON(url, function(data) { - eventStreamInfo.from = data.end; - - var hasNewLatestMessage = false; - for (var i=0; i"+roomList[i].room_id+"" + - ""+roomList[i].latest_message+"" + - ""; - rows += row; - } - - $("#rooms").append(rows); -}; - diff --git a/jsfiddles/example_app/demo.css b/jsfiddles/example_app/demo.css deleted file mode 100644 index 4c1e157cc8..0000000000 --- a/jsfiddles/example_app/demo.css +++ /dev/null @@ -1,43 +0,0 @@ -.roomListDashboard, .roomContents, .sendMessageForm { - visibility: hidden; -} - -.roomList { - background-color: #909090; -} - -.messageWrapper { - background-color: #EEEEEE; - height: 400px; - overflow: scroll; -} - -.membersWrapper { - background-color: #EEEEEE; - height: 200px; - width: 50%; - overflow: scroll; -} - -.textEntry { - width: 100% -} - -p { - font-family: monospace; -} - -table -{ - border-spacing:5px; -} - -th,td -{ - padding:5px; -} - -.roomList tr:not(:first-child):hover { - background-color: orange; - cursor: pointer; -} diff --git a/jsfiddles/example_app/demo.details b/jsfiddles/example_app/demo.details deleted file mode 100644 index 3f96d3e744..0000000000 --- a/jsfiddles/example_app/demo.details +++ /dev/null @@ -1,7 +0,0 @@ - name: Example Matrix Client - description: Includes login, live event streaming, creating rooms, sending messages and viewing member lists. - authors: - - matrix.org - resources: - - http://matrix.org - normalize_css: no \ No newline at end of file diff --git a/jsfiddles/example_app/demo.html b/jsfiddles/example_app/demo.html deleted file mode 100644 index 7a9dffddd0..0000000000 --- a/jsfiddles/example_app/demo.html +++ /dev/null @@ -1,56 +0,0 @@ - - -
-
- - -
- - - - - - - - -
RoomMy stateLatest message
-
- -
-

Select a room

-
- - - -
-
-
- - -
-
- -
-

Member list:

-
- - - -
-
-
- diff --git a/jsfiddles/example_app/demo.js b/jsfiddles/example_app/demo.js deleted file mode 100644 index 13c9c2b339..0000000000 --- a/jsfiddles/example_app/demo.js +++ /dev/null @@ -1,327 +0,0 @@ -var accountInfo = {}; - -var eventStreamInfo = { - from: "END" -}; - -var roomInfo = []; -var memberInfo = []; -var viewingRoomId; - -// ************** Event Streaming ************** -var longpollEventStream = function() { - var url = "http://localhost:8008/_matrix/client/api/v1/events?access_token=$token&from=$from"; - url = url.replace("$token", accountInfo.access_token); - url = url.replace("$from", eventStreamInfo.from); - - $.getJSON(url, function(data) { - eventStreamInfo.from = data.end; - - var hasNewLatestMessage = false; - var updatedMemberList = false; - var i=0; - var j=0; - for (i=0; i 0) { - data.room_alias_name = roomAlias; - } - $.ajax({ - url: "http://localhost:8008/_matrix/client/api/v1/createRoom?access_token="+accountInfo.access_token, - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify(data), - dataType: "json", - success: function(response) { - $("#roomAlias").val(""); - response.membership = "join"; // you are automatically joined into every room you make. - response.latest_message = ""; - - roomInfo.push(response); - setRooms(roomInfo); - }, - error: function(err) { - alert(JSON.stringify($.parseJSON(err.responseText))); - } - }); -}); - -// ************** Getting current state ************** -var getCurrentRoomList = function() { - var url = "http://localhost:8008/_matrix/client/api/v1/initialSync?access_token=" + accountInfo.access_token + "&limit=1"; - $.getJSON(url, function(data) { - var rooms = data.rooms; - for (var i=0; i=0; --i) { - addMessage(data.chunk[i]); - } - }); -}; - -var getMemberList = function(roomId) { - $("#members").empty(); - memberInfo = []; - var url = "http://localhost:8008/_matrix/client/api/v1/rooms/" + - encodeURIComponent(roomId) + "/members?access_token=" + accountInfo.access_token; - $.getJSON(url, function(data) { - for (var i=0; i"+roomList[i].room_id+"" + - ""+roomList[i].membership+"" + - ""+roomList[i].latest_message+"" + - ""; - rows += row; - } - - $("#rooms").append(rows); - - $('#rooms').find("tr").click(function(){ - var roomId = $(this).find('td:eq(0)').text(); - var membership = $(this).find('td:eq(1)').text(); - if (membership !== "join") { - console.log("Joining room " + roomId); - var url = "http://localhost:8008/_matrix/client/api/v1/rooms/$roomid/join?access_token=$token"; - url = url.replace("$token", accountInfo.access_token); - url = url.replace("$roomid", encodeURIComponent(roomId)); - $.ajax({ - url: url, - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify({membership: "join"}), - dataType: "json", - success: function(data) { - loadRoomContent(roomId); - getCurrentRoomList(); - }, - error: function(err) { - alert(JSON.stringify($.parseJSON(err.responseText))); - } - }); - } - else { - loadRoomContent(roomId); - } - }); -}; - -var addMessage = function(data) { - - var msg = data.content.body; - if (data.type === "m.room.member") { - if (data.content.membership === undefined) { - return; - } - if (data.content.membership === "invite") { - msg = "invited " + data.state_key + " to the room"; - } - else if (data.content.membership === "join") { - msg = "joined the room"; - } - else if (data.content.membership === "leave") { - msg = "left the room"; - } - else if (data.content.membership === "ban") { - msg = "was banned from the room"; - } - } - if (msg === undefined) { - return; - } - - var row = "" + - ""+data.user_id+"" + - ""+msg+"" + - ""; - $("#messages").append(row); -}; - -var addMember = function(data) { - var row = "" + - ""+data.state_key+"" + - ""+data.content.membership+"" + - ""; - $("#members").append(row); -}; - diff --git a/jsfiddles/register_login/demo.css b/jsfiddles/register_login/demo.css deleted file mode 100644 index 11781c250f..0000000000 --- a/jsfiddles/register_login/demo.css +++ /dev/null @@ -1,7 +0,0 @@ -.loggedin { - visibility: hidden; -} - -p { - font-family: monospace; -} diff --git a/jsfiddles/register_login/demo.html b/jsfiddles/register_login/demo.html deleted file mode 100644 index fcac453ac2..0000000000 --- a/jsfiddles/register_login/demo.html +++ /dev/null @@ -1,20 +0,0 @@ -
-

This registration/login demo requires a home server to be running on http://localhost:8008

-
-
- - - -
-
- - - -
-
-

- - -

-
- diff --git a/jsfiddles/register_login/demo.js b/jsfiddles/register_login/demo.js deleted file mode 100644 index 2e6957b631..0000000000 --- a/jsfiddles/register_login/demo.js +++ /dev/null @@ -1,79 +0,0 @@ -var accountInfo = {}; - -var showLoggedIn = function(data) { - accountInfo = data; - $(".loggedin").css({visibility: "visible"}); - $("#welcomeText").text("Welcome " + accountInfo.user_id+". Your access token is: " + - accountInfo.access_token); -}; - -$('.register').live('click', function() { - var user = $("#user").val(); - var password = $("#password").val(); - $.ajax({ - url: "http://localhost:8008/_matrix/client/api/v1/register", - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify({ user: user, password: password, type: "m.login.password" }), - dataType: "json", - success: function(data) { - showLoggedIn(data); - }, - error: function(err) { - var errMsg = "To try this, you need a home server running!"; - var errJson = $.parseJSON(err.responseText); - if (errJson) { - errMsg = JSON.stringify(errJson); - } - alert(errMsg); - } - }); -}); - -var login = function(user, password) { - $.ajax({ - url: "http://localhost:8008/_matrix/client/api/v1/login", - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify({ user: user, password: password, type: "m.login.password" }), - dataType: "json", - success: function(data) { - showLoggedIn(data); - }, - error: function(err) { - var errMsg = "To try this, you need a home server running!"; - var errJson = $.parseJSON(err.responseText); - if (errJson) { - errMsg = JSON.stringify(errJson); - } - alert(errMsg); - } - }); -}; - -$('.login').live('click', function() { - var user = $("#userLogin").val(); - var password = $("#passwordLogin").val(); - $.getJSON("http://localhost:8008/_matrix/client/api/v1/login", function(data) { - if (data.flows[0].type !== "m.login.password") { - alert("I don't know how to login with this type: " + data.type); - return; - } - login(user, password); - }); -}); - -$('.logout').live('click', function() { - accountInfo = {}; - $("#imSyncText").text(""); - $(".loggedin").css({visibility: "hidden"}); -}); - -$('.testToken').live('click', function() { - var url = "http://localhost:8008/_matrix/client/api/v1/initialSync?access_token=" + accountInfo.access_token + "&limit=1"; - $.getJSON(url, function(data) { - $("#imSyncText").text(JSON.stringify(data, undefined, 2)); - }).fail(function(err) { - $("#imSyncText").text(JSON.stringify($.parseJSON(err.responseText))); - }); -}); diff --git a/jsfiddles/room_memberships/demo.css b/jsfiddles/room_memberships/demo.css deleted file mode 100644 index 48a55f372d..0000000000 --- a/jsfiddles/room_memberships/demo.css +++ /dev/null @@ -1,17 +0,0 @@ -.loggedin { - visibility: hidden; -} - -p { - font-family: monospace; -} - -table -{ - border-spacing:5px; -} - -th,td -{ - padding:5px; -} diff --git a/jsfiddles/room_memberships/demo.html b/jsfiddles/room_memberships/demo.html deleted file mode 100644 index e6f39df5aa..0000000000 --- a/jsfiddles/room_memberships/demo.html +++ /dev/null @@ -1,37 +0,0 @@ -
-

This room membership demo requires a home server to be running on http://localhost:8008

-
-
- - - -
-
-
- -
-
- - - - -
-
- - -
- - - - - - - - -
Room IDMy stateRoom Alias
-
- diff --git a/jsfiddles/room_memberships/demo.js b/jsfiddles/room_memberships/demo.js deleted file mode 100644 index 8a7b1aa88e..0000000000 --- a/jsfiddles/room_memberships/demo.js +++ /dev/null @@ -1,141 +0,0 @@ -var accountInfo = {}; - -var showLoggedIn = function(data) { - accountInfo = data; - getCurrentRoomList(); - $(".loggedin").css({visibility: "visible"}); - $("#membership").change(function() { - if ($("#membership").val() === "invite") { - $("#targetUser").css({visibility: "visible"}); - } - else { - $("#targetUser").css({visibility: "hidden"}); - } -}); -}; - -$('.login').live('click', function() { - var user = $("#userLogin").val(); - var password = $("#passwordLogin").val(); - $.ajax({ - url: "http://localhost:8008/_matrix/client/api/v1/login", - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify({ user: user, password: password, type: "m.login.password" }), - dataType: "json", - success: function(data) { - $("#rooms").find("tr:gt(0)").remove(); - showLoggedIn(data); - }, - error: function(err) { - var errMsg = "To try this, you need a home server running!"; - var errJson = $.parseJSON(err.responseText); - if (errJson) { - errMsg = JSON.stringify(errJson); - } - alert(errMsg); - } - }); -}); - -var getCurrentRoomList = function() { - $("#roomId").val(""); - // wipe the table and reload it. Using the event stream would be the best - // solution but that is out of scope of this fiddle. - $("#rooms").find("tr:gt(0)").remove(); - - var url = "http://localhost:8008/_matrix/client/api/v1/initialSync?access_token=" + accountInfo.access_token + "&limit=1"; - $.getJSON(url, function(data) { - var rooms = data.rooms; - for (var i=0; i"+data.room_id+"" + - ""+data.membership+"" + - ""+data.room_alias+"" + - ""; - $("#rooms").append(row); -}; - -$('.changeMembership').live('click', function() { - var roomId = $("#roomId").val(); - var member = $("#targetUser").val(); - var membership = $("#membership").val(); - - if (roomId.length === 0) { - return; - } - - var url = "http://localhost:8008/_matrix/client/api/v1/rooms/$roomid/$membership?access_token=$token"; - url = url.replace("$token", accountInfo.access_token); - url = url.replace("$roomid", encodeURIComponent(roomId)); - url = url.replace("$membership", membership); - - var data = {}; - - if (membership === "invite") { - data = { - user_id: member - }; - } - - $.ajax({ - url: url, - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify(data), - dataType: "json", - success: function(data) { - getCurrentRoomList(); - }, - error: function(err) { - alert(JSON.stringify($.parseJSON(err.responseText))); - } - }); -}); - -$('.joinAlias').live('click', function() { - var roomAlias = $("#roomAlias").val(); - var url = "http://localhost:8008/_matrix/client/api/v1/join/$roomalias?access_token=$token"; - url = url.replace("$token", accountInfo.access_token); - url = url.replace("$roomalias", encodeURIComponent(roomAlias)); - $.ajax({ - url: url, - type: "POST", - contentType: "application/json; charset=utf-8", - data: JSON.stringify({}), - dataType: "json", - success: function(data) { - getCurrentRoomList(); - }, - error: function(err) { - alert(JSON.stringify($.parseJSON(err.responseText))); - } - }); -}); -- cgit 1.4.1 From d3d0713de587c97c3c23200a15215eb0807ff87e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 15 Jan 2015 17:06:12 +0000 Subject: Move experiments, graph and cmdclient into contrib --- cmdclient/console.py | 747 ---------------------------------- cmdclient/http.py | 217 ---------- contrib/cmdclient/console.py | 747 ++++++++++++++++++++++++++++++++++ contrib/cmdclient/http.py | 217 ++++++++++ contrib/experiments/cursesio.py | 168 ++++++++ contrib/experiments/test_messaging.py | 394 ++++++++++++++++++ contrib/graph/graph.py | 151 +++++++ contrib/graph/graph2.py | 156 +++++++ experiments/cursesio.py | 168 -------- experiments/test_messaging.py | 394 ------------------ graph/graph.py | 151 ------- graph/graph2.py | 156 ------- 12 files changed, 1833 insertions(+), 1833 deletions(-) delete mode 100755 cmdclient/console.py delete mode 100644 cmdclient/http.py create mode 100755 contrib/cmdclient/console.py create mode 100644 contrib/cmdclient/http.py create mode 100644 contrib/experiments/cursesio.py create mode 100644 contrib/experiments/test_messaging.py create mode 100644 contrib/graph/graph.py create mode 100644 contrib/graph/graph2.py delete mode 100644 experiments/cursesio.py delete mode 100644 experiments/test_messaging.py delete mode 100644 graph/graph.py delete mode 100644 graph/graph2.py diff --git a/cmdclient/console.py b/cmdclient/console.py deleted file mode 100755 index d9c6ec6a70..0000000000 --- a/cmdclient/console.py +++ /dev/null @@ -1,747 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Starts a synapse client console. """ - -from twisted.internet import reactor, defer, threads -from http import TwistedHttpClient - -import argparse -import cmd -import getpass -import json -import shlex -import sys -import time -import urllib -import urlparse - -import nacl.signing -import nacl.encoding - -from syutil.crypto.jsonsign import verify_signed_json, SignatureVerifyException - -CONFIG_JSON = "cmdclient_config.json" - -TRUSTED_ID_SERVERS = [ - 'localhost:8001' -] - -class SynapseCmd(cmd.Cmd): - - """Basic synapse command-line processor. - - This processes commands from the user and calls the relevant HTTP methods. - """ - - def __init__(self, http_client, server_url, identity_server_url, username, token): - cmd.Cmd.__init__(self) - self.http_client = http_client - self.http_client.verbose = True - self.config = { - "url": server_url, - "identityServerUrl": identity_server_url, - "user": username, - "token": token, - "verbose": "on", - "complete_usernames": "on", - "send_delivery_receipts": "on" - } - self.path_prefix = "/_matrix/client/api/v1" - self.event_stream_token = "END" - self.prompt = ">>> " - - def do_EOF(self, line): # allows CTRL+D quitting - return True - - def emptyline(self): - pass # else it repeats the previous command - - def _usr(self): - return self.config["user"] - - def _tok(self): - return self.config["token"] - - def _url(self): - return self.config["url"] + self.path_prefix - - def _identityServerUrl(self): - return self.config["identityServerUrl"] - - def _is_on(self, config_name): - if config_name in self.config: - return self.config[config_name] == "on" - return False - - def _domain(self): - if "user" not in self.config or not self.config["user"]: - return None - return self.config["user"].split(":")[1] - - def do_config(self, line): - """ Show the config for this client: "config" - Edit a key value mapping: "config key value" e.g. "config token 1234" - Config variables: - user: The username to auth with. - token: The access token to auth with. - url: The url of the server. - verbose: [on|off] The verbosity of requests/responses. - complete_usernames: [on|off] Auto complete partial usernames by - assuming they are on the same homeserver as you. - E.g. name >> @name:yourhost - send_delivery_receipts: [on|off] Automatically send receipts to - messages when performing a 'stream' command. - Additional key/values can be added and can be substituted into requests - by using $. E.g. 'config roomid room1' then 'raw get /rooms/$roomid'. - """ - if len(line) == 0: - print json.dumps(self.config, indent=4) - return - - try: - args = self._parse(line, ["key", "val"], force_keys=True) - - # make sure restricted config values are checked - config_rules = [ # key, valid_values - ("verbose", ["on", "off"]), - ("complete_usernames", ["on", "off"]), - ("send_delivery_receipts", ["on", "off"]) - ] - for key, valid_vals in config_rules: - if key == args["key"] and args["val"] not in valid_vals: - print "%s value must be one of %s" % (args["key"], - valid_vals) - return - - # toggle the http client verbosity - if args["key"] == "verbose": - self.http_client.verbose = "on" == args["val"] - - # assign the new config - self.config[args["key"]] = args["val"] - print json.dumps(self.config, indent=4) - - save_config(self.config) - except Exception as e: - print e - - def do_register(self, line): - """Registers for a new account: "register " - : The desired user ID - : Do not automatically clobber config values. - """ - args = self._parse(line, ["userid", "noupdate"]) - - password = None - pwd = None - pwd2 = "_" - while pwd != pwd2: - pwd = getpass.getpass("Type a password for this user: ") - pwd2 = getpass.getpass("Retype the password: ") - if pwd != pwd2 or len(pwd) == 0: - print "Password mismatch." - pwd = None - else: - password = pwd - - body = { - "type": "m.login.password" - } - if "userid" in args: - body["user"] = args["userid"] - if password: - body["password"] = password - - reactor.callFromThread(self._do_register, body, - "noupdate" not in args) - - @defer.inlineCallbacks - def _do_register(self, data, update_config): - # check the registration flows - url = self._url() + "/register" - json_res = yield self.http_client.do_request("GET", url) - print json.dumps(json_res, indent=4) - - passwordFlow = None - for flow in json_res["flows"]: - if flow["type"] == "m.login.recaptcha" or ("stages" in flow and "m.login.recaptcha" in flow["stages"]): - print "Unable to register: Home server requires captcha." - return - if flow["type"] == "m.login.password" and "stages" not in flow: - passwordFlow = flow - break - - if not passwordFlow: - return - - json_res = yield self.http_client.do_request("POST", url, data=data) - print json.dumps(json_res, indent=4) - if update_config and "user_id" in json_res: - self.config["user"] = json_res["user_id"] - self.config["token"] = json_res["access_token"] - save_config(self.config) - - def do_login(self, line): - """Login as a specific user: "login @bob:localhost" - You MAY be prompted for a password, or instructed to visit a URL. - """ - try: - args = self._parse(line, ["user_id"], force_keys=True) - can_login = threads.blockingCallFromThread( - reactor, - self._check_can_login) - if can_login: - p = getpass.getpass("Enter your password: ") - user = args["user_id"] - if self._is_on("complete_usernames") and not user.startswith("@"): - domain = self._domain() - if domain: - user = "@" + user + ":" + domain - - reactor.callFromThread(self._do_login, user, p) - #print " got %s " % p - except Exception as e: - print e - - @defer.inlineCallbacks - def _do_login(self, user, password): - path = "/login" - data = { - "user": user, - "password": password, - "type": "m.login.password" - } - url = self._url() + path - json_res = yield self.http_client.do_request("POST", url, data=data) - print json_res - - if "access_token" in json_res: - self.config["user"] = user - self.config["token"] = json_res["access_token"] - save_config(self.config) - print "Login successful." - - @defer.inlineCallbacks - def _check_can_login(self): - path = "/login" - # ALWAYS check that the home server can handle the login request before - # submitting! - url = self._url() + path - json_res = yield self.http_client.do_request("GET", url) - print json_res - - if "flows" not in json_res: - print "Failed to find any login flows." - defer.returnValue(False) - - flow = json_res["flows"][0] # assume first is the one we want. - if ("type" not in flow or "m.login.password" != flow["type"] or - "stages" in flow): - fallback_url = self._url() + "/login/fallback" - print ("Unable to login via the command line client. Please visit " - "%s to login." % fallback_url) - defer.returnValue(False) - defer.returnValue(True) - - def do_emailrequest(self, line): - """Requests the association of a third party identifier -
The email address) - A string of characters generated when requesting an email that you'll supply in subsequent calls to identify yourself - The number of times the user has requested an email. Leave this the same between requests to retry the request at the transport level. Increment it to request that the email be sent again. - """ - args = self._parse(line, ['address', 'clientSecret', 'sendAttempt']) - - postArgs = {'email': args['address'], 'clientSecret': args['clientSecret'], 'sendAttempt': args['sendAttempt']} - - reactor.callFromThread(self._do_emailrequest, postArgs) - - @defer.inlineCallbacks - def _do_emailrequest(self, args): - url = self._identityServerUrl()+"/_matrix/identity/api/v1/validate/email/requestToken" - - json_res = yield self.http_client.do_request("POST", url, data=urllib.urlencode(args), jsonreq=False, - headers={'Content-Type': ['application/x-www-form-urlencoded']}) - print json_res - if 'sid' in json_res: - print "Token sent. Your session ID is %s" % (json_res['sid']) - - def do_emailvalidate(self, line): - """Validate and associate a third party ID - The session ID (sid) given to you in the response to requestToken - The token sent to your third party identifier address - The same clientSecret you supplied in requestToken - """ - args = self._parse(line, ['sid', 'token', 'clientSecret']) - - postArgs = { 'sid' : args['sid'], 'token' : args['token'], 'clientSecret': args['clientSecret'] } - - reactor.callFromThread(self._do_emailvalidate, postArgs) - - @defer.inlineCallbacks - def _do_emailvalidate(self, args): - url = self._identityServerUrl()+"/_matrix/identity/api/v1/validate/email/submitToken" - - json_res = yield self.http_client.do_request("POST", url, data=urllib.urlencode(args), jsonreq=False, - headers={'Content-Type': ['application/x-www-form-urlencoded']}) - print json_res - - def do_3pidbind(self, line): - """Validate and associate a third party ID - The session ID (sid) given to you in the response to requestToken - The same clientSecret you supplied in requestToken - """ - args = self._parse(line, ['sid', 'clientSecret']) - - postArgs = { 'sid' : args['sid'], 'clientSecret': args['clientSecret'] } - postArgs['mxid'] = self.config["user"] - - reactor.callFromThread(self._do_3pidbind, postArgs) - - @defer.inlineCallbacks - def _do_3pidbind(self, args): - url = self._identityServerUrl()+"/_matrix/identity/api/v1/3pid/bind" - - json_res = yield self.http_client.do_request("POST", url, data=urllib.urlencode(args), jsonreq=False, - headers={'Content-Type': ['application/x-www-form-urlencoded']}) - print json_res - - def do_join(self, line): - """Joins a room: "join " """ - try: - args = self._parse(line, ["roomid"], force_keys=True) - self._do_membership_change(args["roomid"], "join", self._usr()) - except Exception as e: - print e - - def do_joinalias(self, line): - try: - args = self._parse(line, ["roomname"], force_keys=True) - path = "/join/%s" % urllib.quote(args["roomname"]) - reactor.callFromThread(self._run_and_pprint, "POST", path, {}) - except Exception as e: - print e - - def do_topic(self, line): - """"topic [set|get] []" - Set the topic for a room: topic set - Get the topic for a room: topic get - """ - try: - args = self._parse(line, ["action", "roomid", "topic"]) - if "action" not in args or "roomid" not in args: - print "Must specify set|get and a room ID." - return - if args["action"].lower() not in ["set", "get"]: - print "Must specify set|get, not %s" % args["action"] - return - - path = "/rooms/%s/topic" % urllib.quote(args["roomid"]) - - if args["action"].lower() == "set": - if "topic" not in args: - print "Must specify a new topic." - return - body = { - "topic": args["topic"] - } - reactor.callFromThread(self._run_and_pprint, "PUT", path, body) - elif args["action"].lower() == "get": - reactor.callFromThread(self._run_and_pprint, "GET", path) - except Exception as e: - print e - - def do_invite(self, line): - """Invite a user to a room: "invite " """ - try: - args = self._parse(line, ["userid", "roomid"], force_keys=True) - - user_id = args["userid"] - - reactor.callFromThread(self._do_invite, args["roomid"], user_id) - except Exception as e: - print e - - @defer.inlineCallbacks - def _do_invite(self, roomid, userstring): - if (not userstring.startswith('@') and - self._is_on("complete_usernames")): - url = self._identityServerUrl()+"/_matrix/identity/api/v1/lookup" - - json_res = yield self.http_client.do_request("GET", url, qparams={'medium':'email','address':userstring}) - - mxid = None - - if 'mxid' in json_res and 'signatures' in json_res: - url = self._identityServerUrl()+"/_matrix/identity/api/v1/pubkey/ed25519" - - pubKey = None - pubKeyObj = yield self.http_client.do_request("GET", url) - if 'public_key' in pubKeyObj: - pubKey = nacl.signing.VerifyKey(pubKeyObj['public_key'], encoder=nacl.encoding.HexEncoder) - else: - print "No public key found in pubkey response!" - - sigValid = False - - if pubKey: - for signame in json_res['signatures']: - if signame not in TRUSTED_ID_SERVERS: - print "Ignoring signature from untrusted server %s" % (signame) - else: - try: - verify_signed_json(json_res, signame, pubKey) - sigValid = True - print "Mapping %s -> %s correctly signed by %s" % (userstring, json_res['mxid'], signame) - break - except SignatureVerifyException as e: - print "Invalid signature from %s" % (signame) - print e - - if sigValid: - print "Resolved 3pid %s to %s" % (userstring, json_res['mxid']) - mxid = json_res['mxid'] - else: - print "Got association for %s but couldn't verify signature" % (userstring) - - if not mxid: - mxid = "@" + userstring + ":" + self._domain() - - self._do_membership_change(roomid, "invite", mxid) - - def do_leave(self, line): - """Leaves a room: "leave " """ - try: - args = self._parse(line, ["roomid"], force_keys=True) - self._do_membership_change(args["roomid"], "leave", self._usr()) - except Exception as e: - print e - - def do_send(self, line): - """Sends a message. "send " """ - args = self._parse(line, ["roomid", "body"]) - txn_id = "txn%s" % int(time.time()) - path = "/rooms/%s/send/m.room.message/%s" % (urllib.quote(args["roomid"]), - txn_id) - body_json = { - "msgtype": "m.text", - "body": args["body"] - } - reactor.callFromThread(self._run_and_pprint, "PUT", path, body_json) - - def do_list(self, line): - """List data about a room. - "list members [query]" - List all the members in this room. - "list messages [query]" - List all the messages in this room. - - Where [query] will be directly applied as query parameters, allowing - you to use the pagination API. E.g. the last 3 messages in this room: - "list messages from=END&to=START&limit=3" - """ - args = self._parse(line, ["type", "roomid", "qp"]) - if not "type" in args or not "roomid" in args: - print "Must specify type and room ID." - return - if args["type"] not in ["members", "messages"]: - print "Unrecognised type: %s" % args["type"] - return - room_id = args["roomid"] - path = "/rooms/%s/%s" % (urllib.quote(room_id), args["type"]) - - qp = {"access_token": self._tok()} - if "qp" in args: - for key_value_str in args["qp"].split("&"): - try: - key_value = key_value_str.split("=") - qp[key_value[0]] = key_value[1] - except: - print "Bad query param: %s" % key_value - return - - reactor.callFromThread(self._run_and_pprint, "GET", path, - query_params=qp) - - def do_create(self, line): - """Creates a room. - "create [public|private] " - Create a room with the - specified visibility. - "create " - Create a room with default visibility. - "create [public|private]" - Create a room with specified visibility. - "create" - Create a room with default visibility. - """ - args = self._parse(line, ["vis", "roomname"]) - # fixup args depending on which were set - body = {} - if "vis" in args and args["vis"] in ["public", "private"]: - body["visibility"] = args["vis"] - - if "roomname" in args: - room_name = args["roomname"] - body["room_alias_name"] = room_name - elif "vis" in args and args["vis"] not in ["public", "private"]: - room_name = args["vis"] - body["room_alias_name"] = room_name - - reactor.callFromThread(self._run_and_pprint, "POST", "/createRoom", body) - - def do_raw(self, line): - """Directly send a JSON object: "raw " - : Required. One of "PUT", "GET", "POST", "xPUT", "xGET", - "xPOST". Methods with 'x' prefixed will not automatically append the - access token. - : Required. E.g. "/events" - : Optional. E.g. "{ "msgtype":"custom.text", "body":"abc123"}" - """ - args = self._parse(line, ["method", "path", "data"]) - # sanity check - if "method" not in args or "path" not in args: - print "Must specify path and method." - return - - args["method"] = args["method"].upper() - valid_methods = ["PUT", "GET", "POST", "DELETE", - "XPUT", "XGET", "XPOST", "XDELETE"] - if args["method"] not in valid_methods: - print "Unsupported method: %s" % args["method"] - return - - if "data" not in args: - args["data"] = None - else: - try: - args["data"] = json.loads(args["data"]) - except Exception as e: - print "Data is not valid JSON. %s" % e - return - - qp = {"access_token": self._tok()} - if args["method"].startswith("X"): - qp = {} # remove access token - args["method"] = args["method"][1:] # snip the X - else: - # append any query params the user has set - try: - parsed_url = urlparse.urlparse(args["path"]) - qp.update(urlparse.parse_qs(parsed_url.query)) - args["path"] = parsed_url.path - except: - pass - - reactor.callFromThread(self._run_and_pprint, args["method"], - args["path"], - args["data"], - query_params=qp) - - def do_stream(self, line): - """Stream data from the server: "stream " """ - args = self._parse(line, ["timeout"]) - timeout = 5000 - if "timeout" in args: - try: - timeout = int(args["timeout"]) - except ValueError: - print "Timeout must be in milliseconds." - return - reactor.callFromThread(self._do_event_stream, timeout) - - @defer.inlineCallbacks - def _do_event_stream(self, timeout): - res = yield self.http_client.get_json( - self._url() + "/events", - { - "access_token": self._tok(), - "timeout": str(timeout), - "from": self.event_stream_token - }) - print json.dumps(res, indent=4) - - if "chunk" in res: - for event in res["chunk"]: - if (event["type"] == "m.room.message" and - self._is_on("send_delivery_receipts") and - event["user_id"] != self._usr()): # not sent by us - self._send_receipt(event, "d") - - # update the position in the stram - if "end" in res: - self.event_stream_token = res["end"] - - def _send_receipt(self, event, feedback_type): - path = ("/rooms/%s/messages/%s/%s/feedback/%s/%s" % - (urllib.quote(event["room_id"]), event["user_id"], event["msg_id"], - self._usr(), feedback_type)) - data = {} - reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data, - alt_text="Sent receipt for %s" % event["msg_id"]) - - def _do_membership_change(self, roomid, membership, userid): - path = "/rooms/%s/state/m.room.member/%s" % (urllib.quote(roomid), urllib.quote(userid)) - data = { - "membership": membership - } - reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data) - - def do_displayname(self, line): - """Get or set my displayname: "displayname [new_name]" """ - args = self._parse(line, ["name"]) - path = "/profile/%s/displayname" % (self.config["user"]) - - if "name" in args: - data = {"displayname": args["name"]} - reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data) - else: - reactor.callFromThread(self._run_and_pprint, "GET", path) - - def _do_presence_state(self, state, line): - args = self._parse(line, ["msgstring"]) - path = "/presence/%s/status" % (self.config["user"]) - data = {"state": state} - if "msgstring" in args: - data["status_msg"] = args["msgstring"] - - reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data) - - def do_offline(self, line): - """Set my presence state to OFFLINE""" - self._do_presence_state(0, line) - - def do_away(self, line): - """Set my presence state to AWAY""" - self._do_presence_state(1, line) - - def do_online(self, line): - """Set my presence state to ONLINE""" - self._do_presence_state(2, line) - - def _parse(self, line, keys, force_keys=False): - """ Parses the given line. - - Args: - line : The line to parse - keys : A list of keys to map onto the args - force_keys : True to enforce that the line has a value for every key - Returns: - A dict of key:arg - """ - line_args = shlex.split(line) - if force_keys and len(line_args) != len(keys): - raise IndexError("Must specify all args: %s" % keys) - - # do $ substitutions - for i, arg in enumerate(line_args): - for config_key in self.config: - if ("$" + config_key) in arg: - arg = arg.replace("$" + config_key, - self.config[config_key]) - line_args[i] = arg - - return dict(zip(keys, line_args)) - - @defer.inlineCallbacks - def _run_and_pprint(self, method, path, data=None, - query_params={"access_token": None}, alt_text=None): - """ Runs an HTTP request and pretty prints the output. - - Args: - method: HTTP method - path: Relative path - data: Raw JSON data if any - query_params: dict of query parameters to add to the url - """ - url = self._url() + path - if "access_token" in query_params: - query_params["access_token"] = self._tok() - - json_res = yield self.http_client.do_request(method, url, - data=data, - qparams=query_params) - if alt_text: - print alt_text - else: - print json.dumps(json_res, indent=4) - - -def save_config(config): - with open(CONFIG_JSON, 'w') as out: - json.dump(config, out) - - -def main(server_url, identity_server_url, username, token, config_path): - print "Synapse command line client" - print "===========================" - print "Server: %s" % server_url - print "Type 'help' to get started." - print "Close this console with CTRL+C then CTRL+D." - if not username or not token: - print "- 'register ' - Register an account" - print "- 'stream' - Connect to the event stream" - print "- 'create ' - Create a room" - print "- 'send ' - Send a message" - http_client = TwistedHttpClient() - - # the command line client - syn_cmd = SynapseCmd(http_client, server_url, identity_server_url, username, token) - - # load synapse.json config from a previous session - global CONFIG_JSON - CONFIG_JSON = config_path # bit cheeky, but just overwrite the global - try: - with open(config_path, 'r') as config: - syn_cmd.config = json.load(config) - try: - http_client.verbose = "on" == syn_cmd.config["verbose"] - except: - pass - print "Loaded config from %s" % config_path - except: - pass - - # Twisted-specific: Runs the command processor in Twisted's event loop - # to maintain a single thread for both commands and event processing. - # If using another HTTP client, just call syn_cmd.cmdloop() - reactor.callInThread(syn_cmd.cmdloop) - reactor.run() - - -if __name__ == '__main__': - parser = argparse.ArgumentParser("Starts a synapse client.") - parser.add_argument( - "-s", "--server", dest="server", default="http://localhost:8008", - help="The URL of the home server to talk to.") - parser.add_argument( - "-i", "--identity-server", dest="identityserver", default="http://localhost:8090", - help="The URL of the identity server to talk to.") - parser.add_argument( - "-u", "--username", dest="username", - help="Your username on the server.") - parser.add_argument( - "-t", "--token", dest="token", - help="Your access token.") - parser.add_argument( - "-c", "--config", dest="config", default=CONFIG_JSON, - help="The location of the config.json file to read from.") - args = parser.parse_args() - - if not args.server: - print "You must supply a server URL to communicate with." - parser.print_help() - sys.exit(1) - - server = args.server - if not server.startswith("http://"): - server = "http://" + args.server - - main(server, args.identityserver, args.username, args.token, args.config) diff --git a/cmdclient/http.py b/cmdclient/http.py deleted file mode 100644 index 869f782ec1..0000000000 --- a/cmdclient/http.py +++ /dev/null @@ -1,217 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.web.client import Agent, readBody -from twisted.web.http_headers import Headers -from twisted.internet import defer, reactor - -from pprint import pformat - -import json -import urllib - - -class HttpClient(object): - """ Interface for talking json over http - """ - - def put_json(self, url, data): - """ Sends the specifed json data using PUT - - Args: - url (str): The URL to PUT data to. - data (dict): A dict containing the data that will be used as - the request body. This will be encoded as JSON. - - Returns: - Deferred: Succeeds when we get *any* HTTP response. - - The result of the deferred is a tuple of `(code, response)`, - where `response` is a dict representing the decoded JSON body. - """ - pass - - def get_json(self, url, args=None): - """ Get's some json from the given host homeserver and path - - Args: - url (str): The URL to GET data from. - args (dict): A dictionary used to create query strings, defaults to - None. - **Note**: The value of each key is assumed to be an iterable - and *not* a string. - - Returns: - Deferred: Succeeds when we get *any* HTTP response. - - The result of the deferred is a tuple of `(code, response)`, - where `response` is a dict representing the decoded JSON body. - """ - pass - - -class TwistedHttpClient(HttpClient): - """ Wrapper around the twisted HTTP client api. - - Attributes: - agent (twisted.web.client.Agent): The twisted Agent used to send the - requests. - """ - - def __init__(self): - self.agent = Agent(reactor) - - @defer.inlineCallbacks - def put_json(self, url, data): - response = yield self._create_put_request( - url, - data, - headers_dict={"Content-Type": ["application/json"]} - ) - body = yield readBody(response) - defer.returnValue((response.code, body)) - - @defer.inlineCallbacks - def get_json(self, url, args=None): - if args: - # generates a list of strings of form "k=v". - qs = urllib.urlencode(args, True) - url = "%s?%s" % (url, qs) - response = yield self._create_get_request(url) - body = yield readBody(response) - defer.returnValue(json.loads(body)) - - def _create_put_request(self, url, json_data, headers_dict={}): - """ Wrapper of _create_request to issue a PUT request - """ - - if "Content-Type" not in headers_dict: - raise defer.error( - RuntimeError("Must include Content-Type header for PUTs")) - - return self._create_request( - "PUT", - url, - producer=_JsonProducer(json_data), - headers_dict=headers_dict - ) - - def _create_get_request(self, url, headers_dict={}): - """ Wrapper of _create_request to issue a GET request - """ - return self._create_request( - "GET", - url, - headers_dict=headers_dict - ) - - @defer.inlineCallbacks - def do_request(self, method, url, data=None, qparams=None, jsonreq=True, headers={}): - if qparams: - url = "%s?%s" % (url, urllib.urlencode(qparams, True)) - - if jsonreq: - prod = _JsonProducer(data) - headers['Content-Type'] = ["application/json"]; - else: - prod = _RawProducer(data) - - if method in ["POST", "PUT"]: - response = yield self._create_request(method, url, - producer=prod, - headers_dict=headers) - else: - response = yield self._create_request(method, url) - - body = yield readBody(response) - defer.returnValue(json.loads(body)) - - @defer.inlineCallbacks - def _create_request(self, method, url, producer=None, headers_dict={}): - """ Creates and sends a request to the given url - """ - headers_dict["User-Agent"] = ["Synapse Cmd Client"] - - retries_left = 5 - print "%s to %s with headers %s" % (method, url, headers_dict) - if self.verbose and producer: - if "password" in producer.data: - temp = producer.data["password"] - producer.data["password"] = "[REDACTED]" - print json.dumps(producer.data, indent=4) - producer.data["password"] = temp - else: - print json.dumps(producer.data, indent=4) - - while True: - try: - response = yield self.agent.request( - method, - url.encode("UTF8"), - Headers(headers_dict), - producer - ) - break - except Exception as e: - print "uh oh: %s" % e - if retries_left: - yield self.sleep(2 ** (5 - retries_left)) - retries_left -= 1 - else: - raise e - - if self.verbose: - print "Status %s %s" % (response.code, response.phrase) - print pformat(list(response.headers.getAllRawHeaders())) - defer.returnValue(response) - - def sleep(self, seconds): - d = defer.Deferred() - reactor.callLater(seconds, d.callback, seconds) - return d - -class _RawProducer(object): - def __init__(self, data): - self.data = data - self.body = data - self.length = len(self.body) - - def startProducing(self, consumer): - consumer.write(self.body) - return defer.succeed(None) - - def pauseProducing(self): - pass - - def stopProducing(self): - pass - -class _JsonProducer(object): - """ Used by the twisted http client to create the HTTP body from json - """ - def __init__(self, jsn): - self.data = jsn - self.body = json.dumps(jsn).encode("utf8") - self.length = len(self.body) - - def startProducing(self, consumer): - consumer.write(self.body) - return defer.succeed(None) - - def pauseProducing(self): - pass - - def stopProducing(self): - pass \ No newline at end of file diff --git a/contrib/cmdclient/console.py b/contrib/cmdclient/console.py new file mode 100755 index 0000000000..d9c6ec6a70 --- /dev/null +++ b/contrib/cmdclient/console.py @@ -0,0 +1,747 @@ +#!/usr/bin/env python + +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Starts a synapse client console. """ + +from twisted.internet import reactor, defer, threads +from http import TwistedHttpClient + +import argparse +import cmd +import getpass +import json +import shlex +import sys +import time +import urllib +import urlparse + +import nacl.signing +import nacl.encoding + +from syutil.crypto.jsonsign import verify_signed_json, SignatureVerifyException + +CONFIG_JSON = "cmdclient_config.json" + +TRUSTED_ID_SERVERS = [ + 'localhost:8001' +] + +class SynapseCmd(cmd.Cmd): + + """Basic synapse command-line processor. + + This processes commands from the user and calls the relevant HTTP methods. + """ + + def __init__(self, http_client, server_url, identity_server_url, username, token): + cmd.Cmd.__init__(self) + self.http_client = http_client + self.http_client.verbose = True + self.config = { + "url": server_url, + "identityServerUrl": identity_server_url, + "user": username, + "token": token, + "verbose": "on", + "complete_usernames": "on", + "send_delivery_receipts": "on" + } + self.path_prefix = "/_matrix/client/api/v1" + self.event_stream_token = "END" + self.prompt = ">>> " + + def do_EOF(self, line): # allows CTRL+D quitting + return True + + def emptyline(self): + pass # else it repeats the previous command + + def _usr(self): + return self.config["user"] + + def _tok(self): + return self.config["token"] + + def _url(self): + return self.config["url"] + self.path_prefix + + def _identityServerUrl(self): + return self.config["identityServerUrl"] + + def _is_on(self, config_name): + if config_name in self.config: + return self.config[config_name] == "on" + return False + + def _domain(self): + if "user" not in self.config or not self.config["user"]: + return None + return self.config["user"].split(":")[1] + + def do_config(self, line): + """ Show the config for this client: "config" + Edit a key value mapping: "config key value" e.g. "config token 1234" + Config variables: + user: The username to auth with. + token: The access token to auth with. + url: The url of the server. + verbose: [on|off] The verbosity of requests/responses. + complete_usernames: [on|off] Auto complete partial usernames by + assuming they are on the same homeserver as you. + E.g. name >> @name:yourhost + send_delivery_receipts: [on|off] Automatically send receipts to + messages when performing a 'stream' command. + Additional key/values can be added and can be substituted into requests + by using $. E.g. 'config roomid room1' then 'raw get /rooms/$roomid'. + """ + if len(line) == 0: + print json.dumps(self.config, indent=4) + return + + try: + args = self._parse(line, ["key", "val"], force_keys=True) + + # make sure restricted config values are checked + config_rules = [ # key, valid_values + ("verbose", ["on", "off"]), + ("complete_usernames", ["on", "off"]), + ("send_delivery_receipts", ["on", "off"]) + ] + for key, valid_vals in config_rules: + if key == args["key"] and args["val"] not in valid_vals: + print "%s value must be one of %s" % (args["key"], + valid_vals) + return + + # toggle the http client verbosity + if args["key"] == "verbose": + self.http_client.verbose = "on" == args["val"] + + # assign the new config + self.config[args["key"]] = args["val"] + print json.dumps(self.config, indent=4) + + save_config(self.config) + except Exception as e: + print e + + def do_register(self, line): + """Registers for a new account: "register " + : The desired user ID + : Do not automatically clobber config values. + """ + args = self._parse(line, ["userid", "noupdate"]) + + password = None + pwd = None + pwd2 = "_" + while pwd != pwd2: + pwd = getpass.getpass("Type a password for this user: ") + pwd2 = getpass.getpass("Retype the password: ") + if pwd != pwd2 or len(pwd) == 0: + print "Password mismatch." + pwd = None + else: + password = pwd + + body = { + "type": "m.login.password" + } + if "userid" in args: + body["user"] = args["userid"] + if password: + body["password"] = password + + reactor.callFromThread(self._do_register, body, + "noupdate" not in args) + + @defer.inlineCallbacks + def _do_register(self, data, update_config): + # check the registration flows + url = self._url() + "/register" + json_res = yield self.http_client.do_request("GET", url) + print json.dumps(json_res, indent=4) + + passwordFlow = None + for flow in json_res["flows"]: + if flow["type"] == "m.login.recaptcha" or ("stages" in flow and "m.login.recaptcha" in flow["stages"]): + print "Unable to register: Home server requires captcha." + return + if flow["type"] == "m.login.password" and "stages" not in flow: + passwordFlow = flow + break + + if not passwordFlow: + return + + json_res = yield self.http_client.do_request("POST", url, data=data) + print json.dumps(json_res, indent=4) + if update_config and "user_id" in json_res: + self.config["user"] = json_res["user_id"] + self.config["token"] = json_res["access_token"] + save_config(self.config) + + def do_login(self, line): + """Login as a specific user: "login @bob:localhost" + You MAY be prompted for a password, or instructed to visit a URL. + """ + try: + args = self._parse(line, ["user_id"], force_keys=True) + can_login = threads.blockingCallFromThread( + reactor, + self._check_can_login) + if can_login: + p = getpass.getpass("Enter your password: ") + user = args["user_id"] + if self._is_on("complete_usernames") and not user.startswith("@"): + domain = self._domain() + if domain: + user = "@" + user + ":" + domain + + reactor.callFromThread(self._do_login, user, p) + #print " got %s " % p + except Exception as e: + print e + + @defer.inlineCallbacks + def _do_login(self, user, password): + path = "/login" + data = { + "user": user, + "password": password, + "type": "m.login.password" + } + url = self._url() + path + json_res = yield self.http_client.do_request("POST", url, data=data) + print json_res + + if "access_token" in json_res: + self.config["user"] = user + self.config["token"] = json_res["access_token"] + save_config(self.config) + print "Login successful." + + @defer.inlineCallbacks + def _check_can_login(self): + path = "/login" + # ALWAYS check that the home server can handle the login request before + # submitting! + url = self._url() + path + json_res = yield self.http_client.do_request("GET", url) + print json_res + + if "flows" not in json_res: + print "Failed to find any login flows." + defer.returnValue(False) + + flow = json_res["flows"][0] # assume first is the one we want. + if ("type" not in flow or "m.login.password" != flow["type"] or + "stages" in flow): + fallback_url = self._url() + "/login/fallback" + print ("Unable to login via the command line client. Please visit " + "%s to login." % fallback_url) + defer.returnValue(False) + defer.returnValue(True) + + def do_emailrequest(self, line): + """Requests the association of a third party identifier +
The email address) + A string of characters generated when requesting an email that you'll supply in subsequent calls to identify yourself + The number of times the user has requested an email. Leave this the same between requests to retry the request at the transport level. Increment it to request that the email be sent again. + """ + args = self._parse(line, ['address', 'clientSecret', 'sendAttempt']) + + postArgs = {'email': args['address'], 'clientSecret': args['clientSecret'], 'sendAttempt': args['sendAttempt']} + + reactor.callFromThread(self._do_emailrequest, postArgs) + + @defer.inlineCallbacks + def _do_emailrequest(self, args): + url = self._identityServerUrl()+"/_matrix/identity/api/v1/validate/email/requestToken" + + json_res = yield self.http_client.do_request("POST", url, data=urllib.urlencode(args), jsonreq=False, + headers={'Content-Type': ['application/x-www-form-urlencoded']}) + print json_res + if 'sid' in json_res: + print "Token sent. Your session ID is %s" % (json_res['sid']) + + def do_emailvalidate(self, line): + """Validate and associate a third party ID + The session ID (sid) given to you in the response to requestToken + The token sent to your third party identifier address + The same clientSecret you supplied in requestToken + """ + args = self._parse(line, ['sid', 'token', 'clientSecret']) + + postArgs = { 'sid' : args['sid'], 'token' : args['token'], 'clientSecret': args['clientSecret'] } + + reactor.callFromThread(self._do_emailvalidate, postArgs) + + @defer.inlineCallbacks + def _do_emailvalidate(self, args): + url = self._identityServerUrl()+"/_matrix/identity/api/v1/validate/email/submitToken" + + json_res = yield self.http_client.do_request("POST", url, data=urllib.urlencode(args), jsonreq=False, + headers={'Content-Type': ['application/x-www-form-urlencoded']}) + print json_res + + def do_3pidbind(self, line): + """Validate and associate a third party ID + The session ID (sid) given to you in the response to requestToken + The same clientSecret you supplied in requestToken + """ + args = self._parse(line, ['sid', 'clientSecret']) + + postArgs = { 'sid' : args['sid'], 'clientSecret': args['clientSecret'] } + postArgs['mxid'] = self.config["user"] + + reactor.callFromThread(self._do_3pidbind, postArgs) + + @defer.inlineCallbacks + def _do_3pidbind(self, args): + url = self._identityServerUrl()+"/_matrix/identity/api/v1/3pid/bind" + + json_res = yield self.http_client.do_request("POST", url, data=urllib.urlencode(args), jsonreq=False, + headers={'Content-Type': ['application/x-www-form-urlencoded']}) + print json_res + + def do_join(self, line): + """Joins a room: "join " """ + try: + args = self._parse(line, ["roomid"], force_keys=True) + self._do_membership_change(args["roomid"], "join", self._usr()) + except Exception as e: + print e + + def do_joinalias(self, line): + try: + args = self._parse(line, ["roomname"], force_keys=True) + path = "/join/%s" % urllib.quote(args["roomname"]) + reactor.callFromThread(self._run_and_pprint, "POST", path, {}) + except Exception as e: + print e + + def do_topic(self, line): + """"topic [set|get] []" + Set the topic for a room: topic set + Get the topic for a room: topic get + """ + try: + args = self._parse(line, ["action", "roomid", "topic"]) + if "action" not in args or "roomid" not in args: + print "Must specify set|get and a room ID." + return + if args["action"].lower() not in ["set", "get"]: + print "Must specify set|get, not %s" % args["action"] + return + + path = "/rooms/%s/topic" % urllib.quote(args["roomid"]) + + if args["action"].lower() == "set": + if "topic" not in args: + print "Must specify a new topic." + return + body = { + "topic": args["topic"] + } + reactor.callFromThread(self._run_and_pprint, "PUT", path, body) + elif args["action"].lower() == "get": + reactor.callFromThread(self._run_and_pprint, "GET", path) + except Exception as e: + print e + + def do_invite(self, line): + """Invite a user to a room: "invite " """ + try: + args = self._parse(line, ["userid", "roomid"], force_keys=True) + + user_id = args["userid"] + + reactor.callFromThread(self._do_invite, args["roomid"], user_id) + except Exception as e: + print e + + @defer.inlineCallbacks + def _do_invite(self, roomid, userstring): + if (not userstring.startswith('@') and + self._is_on("complete_usernames")): + url = self._identityServerUrl()+"/_matrix/identity/api/v1/lookup" + + json_res = yield self.http_client.do_request("GET", url, qparams={'medium':'email','address':userstring}) + + mxid = None + + if 'mxid' in json_res and 'signatures' in json_res: + url = self._identityServerUrl()+"/_matrix/identity/api/v1/pubkey/ed25519" + + pubKey = None + pubKeyObj = yield self.http_client.do_request("GET", url) + if 'public_key' in pubKeyObj: + pubKey = nacl.signing.VerifyKey(pubKeyObj['public_key'], encoder=nacl.encoding.HexEncoder) + else: + print "No public key found in pubkey response!" + + sigValid = False + + if pubKey: + for signame in json_res['signatures']: + if signame not in TRUSTED_ID_SERVERS: + print "Ignoring signature from untrusted server %s" % (signame) + else: + try: + verify_signed_json(json_res, signame, pubKey) + sigValid = True + print "Mapping %s -> %s correctly signed by %s" % (userstring, json_res['mxid'], signame) + break + except SignatureVerifyException as e: + print "Invalid signature from %s" % (signame) + print e + + if sigValid: + print "Resolved 3pid %s to %s" % (userstring, json_res['mxid']) + mxid = json_res['mxid'] + else: + print "Got association for %s but couldn't verify signature" % (userstring) + + if not mxid: + mxid = "@" + userstring + ":" + self._domain() + + self._do_membership_change(roomid, "invite", mxid) + + def do_leave(self, line): + """Leaves a room: "leave " """ + try: + args = self._parse(line, ["roomid"], force_keys=True) + self._do_membership_change(args["roomid"], "leave", self._usr()) + except Exception as e: + print e + + def do_send(self, line): + """Sends a message. "send " """ + args = self._parse(line, ["roomid", "body"]) + txn_id = "txn%s" % int(time.time()) + path = "/rooms/%s/send/m.room.message/%s" % (urllib.quote(args["roomid"]), + txn_id) + body_json = { + "msgtype": "m.text", + "body": args["body"] + } + reactor.callFromThread(self._run_and_pprint, "PUT", path, body_json) + + def do_list(self, line): + """List data about a room. + "list members [query]" - List all the members in this room. + "list messages [query]" - List all the messages in this room. + + Where [query] will be directly applied as query parameters, allowing + you to use the pagination API. E.g. the last 3 messages in this room: + "list messages from=END&to=START&limit=3" + """ + args = self._parse(line, ["type", "roomid", "qp"]) + if not "type" in args or not "roomid" in args: + print "Must specify type and room ID." + return + if args["type"] not in ["members", "messages"]: + print "Unrecognised type: %s" % args["type"] + return + room_id = args["roomid"] + path = "/rooms/%s/%s" % (urllib.quote(room_id), args["type"]) + + qp = {"access_token": self._tok()} + if "qp" in args: + for key_value_str in args["qp"].split("&"): + try: + key_value = key_value_str.split("=") + qp[key_value[0]] = key_value[1] + except: + print "Bad query param: %s" % key_value + return + + reactor.callFromThread(self._run_and_pprint, "GET", path, + query_params=qp) + + def do_create(self, line): + """Creates a room. + "create [public|private] " - Create a room with the + specified visibility. + "create " - Create a room with default visibility. + "create [public|private]" - Create a room with specified visibility. + "create" - Create a room with default visibility. + """ + args = self._parse(line, ["vis", "roomname"]) + # fixup args depending on which were set + body = {} + if "vis" in args and args["vis"] in ["public", "private"]: + body["visibility"] = args["vis"] + + if "roomname" in args: + room_name = args["roomname"] + body["room_alias_name"] = room_name + elif "vis" in args and args["vis"] not in ["public", "private"]: + room_name = args["vis"] + body["room_alias_name"] = room_name + + reactor.callFromThread(self._run_and_pprint, "POST", "/createRoom", body) + + def do_raw(self, line): + """Directly send a JSON object: "raw " + : Required. One of "PUT", "GET", "POST", "xPUT", "xGET", + "xPOST". Methods with 'x' prefixed will not automatically append the + access token. + : Required. E.g. "/events" + : Optional. E.g. "{ "msgtype":"custom.text", "body":"abc123"}" + """ + args = self._parse(line, ["method", "path", "data"]) + # sanity check + if "method" not in args or "path" not in args: + print "Must specify path and method." + return + + args["method"] = args["method"].upper() + valid_methods = ["PUT", "GET", "POST", "DELETE", + "XPUT", "XGET", "XPOST", "XDELETE"] + if args["method"] not in valid_methods: + print "Unsupported method: %s" % args["method"] + return + + if "data" not in args: + args["data"] = None + else: + try: + args["data"] = json.loads(args["data"]) + except Exception as e: + print "Data is not valid JSON. %s" % e + return + + qp = {"access_token": self._tok()} + if args["method"].startswith("X"): + qp = {} # remove access token + args["method"] = args["method"][1:] # snip the X + else: + # append any query params the user has set + try: + parsed_url = urlparse.urlparse(args["path"]) + qp.update(urlparse.parse_qs(parsed_url.query)) + args["path"] = parsed_url.path + except: + pass + + reactor.callFromThread(self._run_and_pprint, args["method"], + args["path"], + args["data"], + query_params=qp) + + def do_stream(self, line): + """Stream data from the server: "stream " """ + args = self._parse(line, ["timeout"]) + timeout = 5000 + if "timeout" in args: + try: + timeout = int(args["timeout"]) + except ValueError: + print "Timeout must be in milliseconds." + return + reactor.callFromThread(self._do_event_stream, timeout) + + @defer.inlineCallbacks + def _do_event_stream(self, timeout): + res = yield self.http_client.get_json( + self._url() + "/events", + { + "access_token": self._tok(), + "timeout": str(timeout), + "from": self.event_stream_token + }) + print json.dumps(res, indent=4) + + if "chunk" in res: + for event in res["chunk"]: + if (event["type"] == "m.room.message" and + self._is_on("send_delivery_receipts") and + event["user_id"] != self._usr()): # not sent by us + self._send_receipt(event, "d") + + # update the position in the stram + if "end" in res: + self.event_stream_token = res["end"] + + def _send_receipt(self, event, feedback_type): + path = ("/rooms/%s/messages/%s/%s/feedback/%s/%s" % + (urllib.quote(event["room_id"]), event["user_id"], event["msg_id"], + self._usr(), feedback_type)) + data = {} + reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data, + alt_text="Sent receipt for %s" % event["msg_id"]) + + def _do_membership_change(self, roomid, membership, userid): + path = "/rooms/%s/state/m.room.member/%s" % (urllib.quote(roomid), urllib.quote(userid)) + data = { + "membership": membership + } + reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data) + + def do_displayname(self, line): + """Get or set my displayname: "displayname [new_name]" """ + args = self._parse(line, ["name"]) + path = "/profile/%s/displayname" % (self.config["user"]) + + if "name" in args: + data = {"displayname": args["name"]} + reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data) + else: + reactor.callFromThread(self._run_and_pprint, "GET", path) + + def _do_presence_state(self, state, line): + args = self._parse(line, ["msgstring"]) + path = "/presence/%s/status" % (self.config["user"]) + data = {"state": state} + if "msgstring" in args: + data["status_msg"] = args["msgstring"] + + reactor.callFromThread(self._run_and_pprint, "PUT", path, data=data) + + def do_offline(self, line): + """Set my presence state to OFFLINE""" + self._do_presence_state(0, line) + + def do_away(self, line): + """Set my presence state to AWAY""" + self._do_presence_state(1, line) + + def do_online(self, line): + """Set my presence state to ONLINE""" + self._do_presence_state(2, line) + + def _parse(self, line, keys, force_keys=False): + """ Parses the given line. + + Args: + line : The line to parse + keys : A list of keys to map onto the args + force_keys : True to enforce that the line has a value for every key + Returns: + A dict of key:arg + """ + line_args = shlex.split(line) + if force_keys and len(line_args) != len(keys): + raise IndexError("Must specify all args: %s" % keys) + + # do $ substitutions + for i, arg in enumerate(line_args): + for config_key in self.config: + if ("$" + config_key) in arg: + arg = arg.replace("$" + config_key, + self.config[config_key]) + line_args[i] = arg + + return dict(zip(keys, line_args)) + + @defer.inlineCallbacks + def _run_and_pprint(self, method, path, data=None, + query_params={"access_token": None}, alt_text=None): + """ Runs an HTTP request and pretty prints the output. + + Args: + method: HTTP method + path: Relative path + data: Raw JSON data if any + query_params: dict of query parameters to add to the url + """ + url = self._url() + path + if "access_token" in query_params: + query_params["access_token"] = self._tok() + + json_res = yield self.http_client.do_request(method, url, + data=data, + qparams=query_params) + if alt_text: + print alt_text + else: + print json.dumps(json_res, indent=4) + + +def save_config(config): + with open(CONFIG_JSON, 'w') as out: + json.dump(config, out) + + +def main(server_url, identity_server_url, username, token, config_path): + print "Synapse command line client" + print "===========================" + print "Server: %s" % server_url + print "Type 'help' to get started." + print "Close this console with CTRL+C then CTRL+D." + if not username or not token: + print "- 'register ' - Register an account" + print "- 'stream' - Connect to the event stream" + print "- 'create ' - Create a room" + print "- 'send ' - Send a message" + http_client = TwistedHttpClient() + + # the command line client + syn_cmd = SynapseCmd(http_client, server_url, identity_server_url, username, token) + + # load synapse.json config from a previous session + global CONFIG_JSON + CONFIG_JSON = config_path # bit cheeky, but just overwrite the global + try: + with open(config_path, 'r') as config: + syn_cmd.config = json.load(config) + try: + http_client.verbose = "on" == syn_cmd.config["verbose"] + except: + pass + print "Loaded config from %s" % config_path + except: + pass + + # Twisted-specific: Runs the command processor in Twisted's event loop + # to maintain a single thread for both commands and event processing. + # If using another HTTP client, just call syn_cmd.cmdloop() + reactor.callInThread(syn_cmd.cmdloop) + reactor.run() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser("Starts a synapse client.") + parser.add_argument( + "-s", "--server", dest="server", default="http://localhost:8008", + help="The URL of the home server to talk to.") + parser.add_argument( + "-i", "--identity-server", dest="identityserver", default="http://localhost:8090", + help="The URL of the identity server to talk to.") + parser.add_argument( + "-u", "--username", dest="username", + help="Your username on the server.") + parser.add_argument( + "-t", "--token", dest="token", + help="Your access token.") + parser.add_argument( + "-c", "--config", dest="config", default=CONFIG_JSON, + help="The location of the config.json file to read from.") + args = parser.parse_args() + + if not args.server: + print "You must supply a server URL to communicate with." + parser.print_help() + sys.exit(1) + + server = args.server + if not server.startswith("http://"): + server = "http://" + args.server + + main(server, args.identityserver, args.username, args.token, args.config) diff --git a/contrib/cmdclient/http.py b/contrib/cmdclient/http.py new file mode 100644 index 0000000000..869f782ec1 --- /dev/null +++ b/contrib/cmdclient/http.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.web.client import Agent, readBody +from twisted.web.http_headers import Headers +from twisted.internet import defer, reactor + +from pprint import pformat + +import json +import urllib + + +class HttpClient(object): + """ Interface for talking json over http + """ + + def put_json(self, url, data): + """ Sends the specifed json data using PUT + + Args: + url (str): The URL to PUT data to. + data (dict): A dict containing the data that will be used as + the request body. This will be encoded as JSON. + + Returns: + Deferred: Succeeds when we get *any* HTTP response. + + The result of the deferred is a tuple of `(code, response)`, + where `response` is a dict representing the decoded JSON body. + """ + pass + + def get_json(self, url, args=None): + """ Get's some json from the given host homeserver and path + + Args: + url (str): The URL to GET data from. + args (dict): A dictionary used to create query strings, defaults to + None. + **Note**: The value of each key is assumed to be an iterable + and *not* a string. + + Returns: + Deferred: Succeeds when we get *any* HTTP response. + + The result of the deferred is a tuple of `(code, response)`, + where `response` is a dict representing the decoded JSON body. + """ + pass + + +class TwistedHttpClient(HttpClient): + """ Wrapper around the twisted HTTP client api. + + Attributes: + agent (twisted.web.client.Agent): The twisted Agent used to send the + requests. + """ + + def __init__(self): + self.agent = Agent(reactor) + + @defer.inlineCallbacks + def put_json(self, url, data): + response = yield self._create_put_request( + url, + data, + headers_dict={"Content-Type": ["application/json"]} + ) + body = yield readBody(response) + defer.returnValue((response.code, body)) + + @defer.inlineCallbacks + def get_json(self, url, args=None): + if args: + # generates a list of strings of form "k=v". + qs = urllib.urlencode(args, True) + url = "%s?%s" % (url, qs) + response = yield self._create_get_request(url) + body = yield readBody(response) + defer.returnValue(json.loads(body)) + + def _create_put_request(self, url, json_data, headers_dict={}): + """ Wrapper of _create_request to issue a PUT request + """ + + if "Content-Type" not in headers_dict: + raise defer.error( + RuntimeError("Must include Content-Type header for PUTs")) + + return self._create_request( + "PUT", + url, + producer=_JsonProducer(json_data), + headers_dict=headers_dict + ) + + def _create_get_request(self, url, headers_dict={}): + """ Wrapper of _create_request to issue a GET request + """ + return self._create_request( + "GET", + url, + headers_dict=headers_dict + ) + + @defer.inlineCallbacks + def do_request(self, method, url, data=None, qparams=None, jsonreq=True, headers={}): + if qparams: + url = "%s?%s" % (url, urllib.urlencode(qparams, True)) + + if jsonreq: + prod = _JsonProducer(data) + headers['Content-Type'] = ["application/json"]; + else: + prod = _RawProducer(data) + + if method in ["POST", "PUT"]: + response = yield self._create_request(method, url, + producer=prod, + headers_dict=headers) + else: + response = yield self._create_request(method, url) + + body = yield readBody(response) + defer.returnValue(json.loads(body)) + + @defer.inlineCallbacks + def _create_request(self, method, url, producer=None, headers_dict={}): + """ Creates and sends a request to the given url + """ + headers_dict["User-Agent"] = ["Synapse Cmd Client"] + + retries_left = 5 + print "%s to %s with headers %s" % (method, url, headers_dict) + if self.verbose and producer: + if "password" in producer.data: + temp = producer.data["password"] + producer.data["password"] = "[REDACTED]" + print json.dumps(producer.data, indent=4) + producer.data["password"] = temp + else: + print json.dumps(producer.data, indent=4) + + while True: + try: + response = yield self.agent.request( + method, + url.encode("UTF8"), + Headers(headers_dict), + producer + ) + break + except Exception as e: + print "uh oh: %s" % e + if retries_left: + yield self.sleep(2 ** (5 - retries_left)) + retries_left -= 1 + else: + raise e + + if self.verbose: + print "Status %s %s" % (response.code, response.phrase) + print pformat(list(response.headers.getAllRawHeaders())) + defer.returnValue(response) + + def sleep(self, seconds): + d = defer.Deferred() + reactor.callLater(seconds, d.callback, seconds) + return d + +class _RawProducer(object): + def __init__(self, data): + self.data = data + self.body = data + self.length = len(self.body) + + def startProducing(self, consumer): + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + +class _JsonProducer(object): + """ Used by the twisted http client to create the HTTP body from json + """ + def __init__(self, jsn): + self.data = jsn + self.body = json.dumps(jsn).encode("utf8") + self.length = len(self.body) + + def startProducing(self, consumer): + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass \ No newline at end of file diff --git a/contrib/experiments/cursesio.py b/contrib/experiments/cursesio.py new file mode 100644 index 0000000000..95d87a1fda --- /dev/null +++ b/contrib/experiments/cursesio.py @@ -0,0 +1,168 @@ +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import curses +import curses.wrapper +from curses.ascii import isprint + +from twisted.internet import reactor + + +class CursesStdIO(): + def __init__(self, stdscr, callback=None): + self.statusText = "Synapse test app -" + self.searchText = '' + self.stdscr = stdscr + + self.logLine = '' + + self.callback = callback + + self._setup() + + def _setup(self): + self.stdscr.nodelay(1) # Make non blocking + + self.rows, self.cols = self.stdscr.getmaxyx() + self.lines = [] + + curses.use_default_colors() + + self.paintStatus(self.statusText) + self.stdscr.refresh() + + def set_callback(self, callback): + self.callback = callback + + def fileno(self): + """ We want to select on FD 0 """ + return 0 + + def connectionLost(self, reason): + self.close() + + def print_line(self, text): + """ add a line to the internal list of lines""" + + self.lines.append(text) + self.redraw() + + def print_log(self, text): + self.logLine = text + self.redraw() + + def redraw(self): + """ method for redisplaying lines + based on internal list of lines """ + + self.stdscr.clear() + self.paintStatus(self.statusText) + i = 0 + index = len(self.lines) - 1 + while i < (self.rows - 3) and index >= 0: + self.stdscr.addstr(self.rows - 3 - i, 0, self.lines[index], + curses.A_NORMAL) + i = i + 1 + index = index - 1 + + self.printLogLine(self.logLine) + + self.stdscr.refresh() + + def paintStatus(self, text): + if len(text) > self.cols: + raise RuntimeError("TextTooLongError") + + self.stdscr.addstr( + self.rows - 2, 0, + text + ' ' * (self.cols - len(text)), + curses.A_STANDOUT) + + def printLogLine(self, text): + self.stdscr.addstr( + 0, 0, + text + ' ' * (self.cols - len(text)), + curses.A_STANDOUT) + + def doRead(self): + """ Input is ready! """ + curses.noecho() + c = self.stdscr.getch() # read a character + + if c == curses.KEY_BACKSPACE: + self.searchText = self.searchText[:-1] + + elif c == curses.KEY_ENTER or c == 10: + text = self.searchText + self.searchText = '' + + self.print_line(">> %s" % text) + + try: + if self.callback: + self.callback.on_line(text) + except Exception as e: + self.print_line(str(e)) + + self.stdscr.refresh() + + elif isprint(c): + if len(self.searchText) == self.cols - 2: + return + self.searchText = self.searchText + chr(c) + + self.stdscr.addstr(self.rows - 1, 0, + self.searchText + (' ' * ( + self.cols - len(self.searchText) - 2))) + + self.paintStatus(self.statusText + ' %d' % len(self.searchText)) + self.stdscr.move(self.rows - 1, len(self.searchText)) + self.stdscr.refresh() + + def logPrefix(self): + return "CursesStdIO" + + def close(self): + """ clean up """ + + curses.nocbreak() + self.stdscr.keypad(0) + curses.echo() + curses.endwin() + + +class Callback(object): + + def __init__(self, stdio): + self.stdio = stdio + + def on_line(self, text): + self.stdio.print_line(text) + + +def main(stdscr): + screen = CursesStdIO(stdscr) # create Screen object + + callback = Callback(screen) + + screen.set_callback(callback) + + stdscr.refresh() + reactor.addReader(screen) + reactor.run() + screen.close() + + +if __name__ == '__main__': + curses.wrapper(main) diff --git a/contrib/experiments/test_messaging.py b/contrib/experiments/test_messaging.py new file mode 100644 index 0000000000..fedf786cec --- /dev/null +++ b/contrib/experiments/test_messaging.py @@ -0,0 +1,394 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" This is an example of using the server to server implementation to do a +basic chat style thing. It accepts commands from stdin and outputs to stdout. + +It assumes that ucids are of the form @, and uses as +the address of the remote home server to hit. + +Usage: + python test_messaging.py + +Currently assumes the local address is localhost: + +""" + + +from synapse.federation import ( + ReplicationHandler +) + +from synapse.federation.units import Pdu + +from synapse.util import origin_from_ucid + +from synapse.app.homeserver import SynapseHomeServer + +#from synapse.util.logutils import log_function + +from twisted.internet import reactor, defer +from twisted.python import log + +import argparse +import json +import logging +import os +import re + +import cursesio +import curses.wrapper + + +logger = logging.getLogger("example") + + +def excpetion_errback(failure): + logging.exception(failure) + + +class InputOutput(object): + """ This is responsible for basic I/O so that a user can interact with + the example app. + """ + + def __init__(self, screen, user): + self.screen = screen + self.user = user + + def set_home_server(self, server): + self.server = server + + def on_line(self, line): + """ This is where we process commands. + """ + + try: + m = re.match("^join (\S+)$", line) + if m: + # The `sender` wants to join a room. + room_name, = m.groups() + self.print_line("%s joining %s" % (self.user, room_name)) + self.server.join_room(room_name, self.user, self.user) + #self.print_line("OK.") + return + + m = re.match("^invite (\S+) (\S+)$", line) + if m: + # `sender` wants to invite someone to a room + room_name, invitee = m.groups() + self.print_line("%s invited to %s" % (invitee, room_name)) + self.server.invite_to_room(room_name, self.user, invitee) + #self.print_line("OK.") + return + + m = re.match("^send (\S+) (.*)$", line) + if m: + # `sender` wants to message a room + room_name, body = m.groups() + self.print_line("%s send to %s" % (self.user, room_name)) + self.server.send_message(room_name, self.user, body) + #self.print_line("OK.") + return + + m = re.match("^backfill (\S+)$", line) + if m: + # we want to backfill a room + room_name, = m.groups() + self.print_line("backfill %s" % room_name) + self.server.backfill(room_name) + return + + self.print_line("Unrecognized command") + + except Exception as e: + logger.exception(e) + + def print_line(self, text): + self.screen.print_line(text) + + def print_log(self, text): + self.screen.print_log(text) + + +class IOLoggerHandler(logging.Handler): + + def __init__(self, io): + logging.Handler.__init__(self) + self.io = io + + def emit(self, record): + if record.levelno < logging.WARN: + return + + msg = self.format(record) + self.io.print_log(msg) + + +class Room(object): + """ Used to store (in memory) the current membership state of a room, and + which home servers we should send PDUs associated with the room to. + """ + def __init__(self, room_name): + self.room_name = room_name + self.invited = set() + self.participants = set() + self.servers = set() + + self.oldest_server = None + + self.have_got_metadata = False + + def add_participant(self, participant): + """ Someone has joined the room + """ + self.participants.add(participant) + self.invited.discard(participant) + + server = origin_from_ucid(participant) + self.servers.add(server) + + if not self.oldest_server: + self.oldest_server = server + + def add_invited(self, invitee): + """ Someone has been invited to the room + """ + self.invited.add(invitee) + self.servers.add(origin_from_ucid(invitee)) + + +class HomeServer(ReplicationHandler): + """ A very basic home server implentation that allows people to join a + room and then invite other people. + """ + def __init__(self, server_name, replication_layer, output): + self.server_name = server_name + self.replication_layer = replication_layer + self.replication_layer.set_handler(self) + + self.joined_rooms = {} + + self.output = output + + def on_receive_pdu(self, pdu): + """ We just received a PDU + """ + pdu_type = pdu.pdu_type + + if pdu_type == "sy.room.message": + self._on_message(pdu) + elif pdu_type == "sy.room.member" and "membership" in pdu.content: + if pdu.content["membership"] == "join": + self._on_join(pdu.context, pdu.state_key) + elif pdu.content["membership"] == "invite": + self._on_invite(pdu.origin, pdu.context, pdu.state_key) + else: + self.output.print_line("#%s (unrec) %s = %s" % + (pdu.context, pdu.pdu_type, json.dumps(pdu.content)) + ) + + #def on_state_change(self, pdu): + ##self.output.print_line("#%s (state) %s *** %s" % + ##(pdu.context, pdu.state_key, pdu.pdu_type) + ##) + + #if "joinee" in pdu.content: + #self._on_join(pdu.context, pdu.content["joinee"]) + #elif "invitee" in pdu.content: + #self._on_invite(pdu.origin, pdu.context, pdu.content["invitee"]) + + def _on_message(self, pdu): + """ We received a message + """ + self.output.print_line("#%s %s %s" % + (pdu.context, pdu.content["sender"], pdu.content["body"]) + ) + + def _on_join(self, context, joinee): + """ Someone has joined a room, either a remote user or a local user + """ + room = self._get_or_create_room(context) + room.add_participant(joinee) + + self.output.print_line("#%s %s %s" % + (context, joinee, "*** JOINED") + ) + + def _on_invite(self, origin, context, invitee): + """ Someone has been invited + """ + room = self._get_or_create_room(context) + room.add_invited(invitee) + + self.output.print_line("#%s %s %s" % + (context, invitee, "*** INVITED") + ) + + if not room.have_got_metadata and origin is not self.server_name: + logger.debug("Get room state") + self.replication_layer.get_state_for_context(origin, context) + room.have_got_metadata = True + + @defer.inlineCallbacks + def send_message(self, room_name, sender, body): + """ Send a message to a room! + """ + destinations = yield self.get_servers_for_context(room_name) + + try: + yield self.replication_layer.send_pdu( + Pdu.create_new( + context=room_name, + pdu_type="sy.room.message", + content={"sender": sender, "body": body}, + origin=self.server_name, + destinations=destinations, + ) + ) + except Exception as e: + logger.exception(e) + + @defer.inlineCallbacks + def join_room(self, room_name, sender, joinee): + """ Join a room! + """ + self._on_join(room_name, joinee) + + destinations = yield self.get_servers_for_context(room_name) + + try: + pdu = Pdu.create_new( + context=room_name, + pdu_type="sy.room.member", + is_state=True, + state_key=joinee, + content={"membership": "join"}, + origin=self.server_name, + destinations=destinations, + ) + yield self.replication_layer.send_pdu(pdu) + except Exception as e: + logger.exception(e) + + @defer.inlineCallbacks + def invite_to_room(self, room_name, sender, invitee): + """ Invite someone to a room! + """ + self._on_invite(self.server_name, room_name, invitee) + + destinations = yield self.get_servers_for_context(room_name) + + try: + yield self.replication_layer.send_pdu( + Pdu.create_new( + context=room_name, + is_state=True, + pdu_type="sy.room.member", + state_key=invitee, + content={"membership": "invite"}, + origin=self.server_name, + destinations=destinations, + ) + ) + except Exception as e: + logger.exception(e) + + def backfill(self, room_name, limit=5): + room = self.joined_rooms.get(room_name) + + if not room: + return + + dest = room.oldest_server + + return self.replication_layer.backfill(dest, room_name, limit) + + def _get_room_remote_servers(self, room_name): + return [i for i in self.joined_rooms.setdefault(room_name,).servers] + + def _get_or_create_room(self, room_name): + return self.joined_rooms.setdefault(room_name, Room(room_name)) + + def get_servers_for_context(self, context): + return defer.succeed( + self.joined_rooms.setdefault(context, Room(context)).servers + ) + + +def main(stdscr): + parser = argparse.ArgumentParser() + parser.add_argument('user', type=str) + parser.add_argument('-v', '--verbose', action='count') + args = parser.parse_args() + + user = args.user + server_name = origin_from_ucid(user) + + ## Set up logging ## + + root_logger = logging.getLogger() + + formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)d - ' + '%(levelname)s - %(message)s') + if not os.path.exists("logs"): + os.makedirs("logs") + fh = logging.FileHandler("logs/%s" % user) + fh.setFormatter(formatter) + + root_logger.addHandler(fh) + root_logger.setLevel(logging.DEBUG) + + # Hack: The only way to get it to stop logging to sys.stderr :( + log.theLogPublisher.observers = [] + observer = log.PythonLoggingObserver() + observer.start() + + ## Set up synapse server + + curses_stdio = cursesio.CursesStdIO(stdscr) + input_output = InputOutput(curses_stdio, user) + + curses_stdio.set_callback(input_output) + + app_hs = SynapseHomeServer(server_name, db_name="dbs/%s" % user) + replication = app_hs.get_replication_layer() + + hs = HomeServer(server_name, replication, curses_stdio) + + input_output.set_home_server(hs) + + ## Add input_output logger + io_logger = IOLoggerHandler(input_output) + io_logger.setFormatter(formatter) + root_logger.addHandler(io_logger) + + ## Start! ## + + try: + port = int(server_name.split(":")[1]) + except: + port = 12345 + + app_hs.get_http_server().start_listening(port) + + reactor.addReader(curses_stdio) + + reactor.run() + + +if __name__ == "__main__": + curses.wrapper(main) diff --git a/contrib/graph/graph.py b/contrib/graph/graph.py new file mode 100644 index 0000000000..b2acadcf5e --- /dev/null +++ b/contrib/graph/graph.py @@ -0,0 +1,151 @@ +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import sqlite3 +import pydot +import cgi +import json +import datetime +import argparse +import urllib2 + + +def make_name(pdu_id, origin): + return "%s@%s" % (pdu_id, origin) + + +def make_graph(pdus, room, filename_prefix): + pdu_map = {} + node_map = {} + + origins = set() + colors = set(("red", "green", "blue", "yellow", "purple")) + + for pdu in pdus: + origins.add(pdu.get("origin")) + + color_map = {color: color for color in colors if color in origins} + colors -= set(color_map.values()) + + color_map[None] = "black" + + for o in origins: + if o in color_map: + continue + try: + c = colors.pop() + color_map[o] = c + except: + print "Run out of colours!" + color_map[o] = "black" + + graph = pydot.Dot(graph_name="Test") + + for pdu in pdus: + name = make_name(pdu.get("pdu_id"), pdu.get("origin")) + pdu_map[name] = pdu + + t = datetime.datetime.fromtimestamp( + float(pdu["ts"]) / 1000 + ).strftime('%Y-%m-%d %H:%M:%S,%f') + + label = ( + "<" + "%(name)s
" + "Type: %(type)s
" + "State key: %(state_key)s
" + "Content: %(content)s
" + "Time: %(time)s
" + "Depth: %(depth)s
" + ">" + ) % { + "name": name, + "type": pdu.get("pdu_type"), + "state_key": pdu.get("state_key"), + "content": cgi.escape(json.dumps(pdu.get("content")), quote=True), + "time": t, + "depth": pdu.get("depth"), + } + + node = pydot.Node( + name=name, + label=label, + color=color_map[pdu.get("origin")] + ) + node_map[name] = node + graph.add_node(node) + + for pdu in pdus: + start_name = make_name(pdu.get("pdu_id"), pdu.get("origin")) + for i, o in pdu.get("prev_pdus", []): + end_name = make_name(i, o) + + if end_name not in node_map: + print "%s not in nodes" % end_name + continue + + edge = pydot.Edge(node_map[start_name], node_map[end_name]) + graph.add_edge(edge) + + # Add prev_state edges, if they exist + if pdu.get("prev_state_id") and pdu.get("prev_state_origin"): + prev_state_name = make_name( + pdu.get("prev_state_id"), pdu.get("prev_state_origin") + ) + + if prev_state_name in node_map: + state_edge = pydot.Edge( + node_map[start_name], node_map[prev_state_name], + style='dotted' + ) + graph.add_edge(state_edge) + + graph.write('%s.dot' % filename_prefix, format='raw', prog='dot') +# graph.write_png("%s.png" % filename_prefix, prog='dot') + graph.write_svg("%s.svg" % filename_prefix, prog='dot') + + +def get_pdus(host, room): + transaction = json.loads( + urllib2.urlopen( + "http://%s/_matrix/federation/v1/context/%s/" % (host, room) + ).read() + ) + + return transaction["pdus"] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Generate a PDU graph for a given room by talking " + "to the given homeserver to get the list of PDUs. \n" + "Requires pydot." + ) + parser.add_argument( + "-p", "--prefix", dest="prefix", + help="String to prefix output files with" + ) + parser.add_argument('host') + parser.add_argument('room') + + args = parser.parse_args() + + host = args.host + room = args.room + prefix = args.prefix if args.prefix else "%s_graph" % (room) + + pdus = get_pdus(host, room) + + make_graph(pdus, room, prefix) diff --git a/contrib/graph/graph2.py b/contrib/graph/graph2.py new file mode 100644 index 0000000000..6b551d42e5 --- /dev/null +++ b/contrib/graph/graph2.py @@ -0,0 +1,156 @@ +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import sqlite3 +import pydot +import cgi +import json +import datetime +import argparse + +from synapse.events import FrozenEvent + + +def make_graph(db_name, room_id, file_prefix, limit): + conn = sqlite3.connect(db_name) + + sql = ( + "SELECT json FROM event_json as j " + "INNER JOIN events as e ON e.event_id = j.event_id " + "WHERE j.room_id = ?" + ) + + args = [room_id] + + if limit: + sql += ( + " ORDER BY topological_ordering DESC, stream_ordering DESC " + "LIMIT ?" + ) + + args.append(limit) + + c = conn.execute(sql, args) + + events = [FrozenEvent(json.loads(e[0])) for e in c.fetchall()] + + events.sort(key=lambda e: e.depth) + + node_map = {} + state_groups = {} + + graph = pydot.Dot(graph_name="Test") + + for event in events: + c = conn.execute( + "SELECT state_group FROM event_to_state_groups " + "WHERE event_id = ?", + (event.event_id,) + ) + + res = c.fetchone() + state_group = res[0] if res else None + + if state_group is not None: + state_groups.setdefault(state_group, []).append(event.event_id) + + t = datetime.datetime.fromtimestamp( + float(event.origin_server_ts) / 1000 + ).strftime('%Y-%m-%d %H:%M:%S,%f') + + content = json.dumps(event.get_dict()["content"]) + + label = ( + "<" + "%(name)s
" + "Type: %(type)s
" + "State key: %(state_key)s
" + "Content: %(content)s
" + "Time: %(time)s
" + "Depth: %(depth)s
" + "State group: %(state_group)s
" + ">" + ) % { + "name": event.event_id, + "type": event.type, + "state_key": event.get("state_key", None), + "content": cgi.escape(content, quote=True), + "time": t, + "depth": event.depth, + "state_group": state_group, + } + + node = pydot.Node( + name=event.event_id, + label=label, + ) + + node_map[event.event_id] = node + graph.add_node(node) + + for event in events: + for prev_id, _ in event.prev_events: + try: + end_node = node_map[prev_id] + except: + end_node = pydot.Node( + name=prev_id, + label="<%s>" % (prev_id,), + ) + + node_map[prev_id] = end_node + graph.add_node(end_node) + + edge = pydot.Edge(node_map[event.event_id], end_node) + graph.add_edge(edge) + + for group, event_ids in state_groups.items(): + if len(event_ids) <= 1: + continue + + cluster = pydot.Cluster( + str(group), + label="" % (str(group),) + ) + + for event_id in event_ids: + cluster.add_node(node_map[event_id]) + + graph.add_subgraph(cluster) + + graph.write('%s.dot' % file_prefix, format='raw', prog='dot') + graph.write_svg("%s.svg" % file_prefix, prog='dot') + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Generate a PDU graph for a given room by talking " + "to the given homeserver to get the list of PDUs. \n" + "Requires pydot." + ) + parser.add_argument( + "-p", "--prefix", dest="prefix", + help="String to prefix output files with", + default="graph_output" + ) + parser.add_argument( + "-l", "--limit", + help="Only retrieve the last N events.", + ) + parser.add_argument('db') + parser.add_argument('room') + + args = parser.parse_args() + + make_graph(args.db, args.room, args.prefix, args.limit) diff --git a/experiments/cursesio.py b/experiments/cursesio.py deleted file mode 100644 index 95d87a1fda..0000000000 --- a/experiments/cursesio.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import curses -import curses.wrapper -from curses.ascii import isprint - -from twisted.internet import reactor - - -class CursesStdIO(): - def __init__(self, stdscr, callback=None): - self.statusText = "Synapse test app -" - self.searchText = '' - self.stdscr = stdscr - - self.logLine = '' - - self.callback = callback - - self._setup() - - def _setup(self): - self.stdscr.nodelay(1) # Make non blocking - - self.rows, self.cols = self.stdscr.getmaxyx() - self.lines = [] - - curses.use_default_colors() - - self.paintStatus(self.statusText) - self.stdscr.refresh() - - def set_callback(self, callback): - self.callback = callback - - def fileno(self): - """ We want to select on FD 0 """ - return 0 - - def connectionLost(self, reason): - self.close() - - def print_line(self, text): - """ add a line to the internal list of lines""" - - self.lines.append(text) - self.redraw() - - def print_log(self, text): - self.logLine = text - self.redraw() - - def redraw(self): - """ method for redisplaying lines - based on internal list of lines """ - - self.stdscr.clear() - self.paintStatus(self.statusText) - i = 0 - index = len(self.lines) - 1 - while i < (self.rows - 3) and index >= 0: - self.stdscr.addstr(self.rows - 3 - i, 0, self.lines[index], - curses.A_NORMAL) - i = i + 1 - index = index - 1 - - self.printLogLine(self.logLine) - - self.stdscr.refresh() - - def paintStatus(self, text): - if len(text) > self.cols: - raise RuntimeError("TextTooLongError") - - self.stdscr.addstr( - self.rows - 2, 0, - text + ' ' * (self.cols - len(text)), - curses.A_STANDOUT) - - def printLogLine(self, text): - self.stdscr.addstr( - 0, 0, - text + ' ' * (self.cols - len(text)), - curses.A_STANDOUT) - - def doRead(self): - """ Input is ready! """ - curses.noecho() - c = self.stdscr.getch() # read a character - - if c == curses.KEY_BACKSPACE: - self.searchText = self.searchText[:-1] - - elif c == curses.KEY_ENTER or c == 10: - text = self.searchText - self.searchText = '' - - self.print_line(">> %s" % text) - - try: - if self.callback: - self.callback.on_line(text) - except Exception as e: - self.print_line(str(e)) - - self.stdscr.refresh() - - elif isprint(c): - if len(self.searchText) == self.cols - 2: - return - self.searchText = self.searchText + chr(c) - - self.stdscr.addstr(self.rows - 1, 0, - self.searchText + (' ' * ( - self.cols - len(self.searchText) - 2))) - - self.paintStatus(self.statusText + ' %d' % len(self.searchText)) - self.stdscr.move(self.rows - 1, len(self.searchText)) - self.stdscr.refresh() - - def logPrefix(self): - return "CursesStdIO" - - def close(self): - """ clean up """ - - curses.nocbreak() - self.stdscr.keypad(0) - curses.echo() - curses.endwin() - - -class Callback(object): - - def __init__(self, stdio): - self.stdio = stdio - - def on_line(self, text): - self.stdio.print_line(text) - - -def main(stdscr): - screen = CursesStdIO(stdscr) # create Screen object - - callback = Callback(screen) - - screen.set_callback(callback) - - stdscr.refresh() - reactor.addReader(screen) - reactor.run() - screen.close() - - -if __name__ == '__main__': - curses.wrapper(main) diff --git a/experiments/test_messaging.py b/experiments/test_messaging.py deleted file mode 100644 index fedf786cec..0000000000 --- a/experiments/test_messaging.py +++ /dev/null @@ -1,394 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -""" This is an example of using the server to server implementation to do a -basic chat style thing. It accepts commands from stdin and outputs to stdout. - -It assumes that ucids are of the form @, and uses as -the address of the remote home server to hit. - -Usage: - python test_messaging.py - -Currently assumes the local address is localhost: - -""" - - -from synapse.federation import ( - ReplicationHandler -) - -from synapse.federation.units import Pdu - -from synapse.util import origin_from_ucid - -from synapse.app.homeserver import SynapseHomeServer - -#from synapse.util.logutils import log_function - -from twisted.internet import reactor, defer -from twisted.python import log - -import argparse -import json -import logging -import os -import re - -import cursesio -import curses.wrapper - - -logger = logging.getLogger("example") - - -def excpetion_errback(failure): - logging.exception(failure) - - -class InputOutput(object): - """ This is responsible for basic I/O so that a user can interact with - the example app. - """ - - def __init__(self, screen, user): - self.screen = screen - self.user = user - - def set_home_server(self, server): - self.server = server - - def on_line(self, line): - """ This is where we process commands. - """ - - try: - m = re.match("^join (\S+)$", line) - if m: - # The `sender` wants to join a room. - room_name, = m.groups() - self.print_line("%s joining %s" % (self.user, room_name)) - self.server.join_room(room_name, self.user, self.user) - #self.print_line("OK.") - return - - m = re.match("^invite (\S+) (\S+)$", line) - if m: - # `sender` wants to invite someone to a room - room_name, invitee = m.groups() - self.print_line("%s invited to %s" % (invitee, room_name)) - self.server.invite_to_room(room_name, self.user, invitee) - #self.print_line("OK.") - return - - m = re.match("^send (\S+) (.*)$", line) - if m: - # `sender` wants to message a room - room_name, body = m.groups() - self.print_line("%s send to %s" % (self.user, room_name)) - self.server.send_message(room_name, self.user, body) - #self.print_line("OK.") - return - - m = re.match("^backfill (\S+)$", line) - if m: - # we want to backfill a room - room_name, = m.groups() - self.print_line("backfill %s" % room_name) - self.server.backfill(room_name) - return - - self.print_line("Unrecognized command") - - except Exception as e: - logger.exception(e) - - def print_line(self, text): - self.screen.print_line(text) - - def print_log(self, text): - self.screen.print_log(text) - - -class IOLoggerHandler(logging.Handler): - - def __init__(self, io): - logging.Handler.__init__(self) - self.io = io - - def emit(self, record): - if record.levelno < logging.WARN: - return - - msg = self.format(record) - self.io.print_log(msg) - - -class Room(object): - """ Used to store (in memory) the current membership state of a room, and - which home servers we should send PDUs associated with the room to. - """ - def __init__(self, room_name): - self.room_name = room_name - self.invited = set() - self.participants = set() - self.servers = set() - - self.oldest_server = None - - self.have_got_metadata = False - - def add_participant(self, participant): - """ Someone has joined the room - """ - self.participants.add(participant) - self.invited.discard(participant) - - server = origin_from_ucid(participant) - self.servers.add(server) - - if not self.oldest_server: - self.oldest_server = server - - def add_invited(self, invitee): - """ Someone has been invited to the room - """ - self.invited.add(invitee) - self.servers.add(origin_from_ucid(invitee)) - - -class HomeServer(ReplicationHandler): - """ A very basic home server implentation that allows people to join a - room and then invite other people. - """ - def __init__(self, server_name, replication_layer, output): - self.server_name = server_name - self.replication_layer = replication_layer - self.replication_layer.set_handler(self) - - self.joined_rooms = {} - - self.output = output - - def on_receive_pdu(self, pdu): - """ We just received a PDU - """ - pdu_type = pdu.pdu_type - - if pdu_type == "sy.room.message": - self._on_message(pdu) - elif pdu_type == "sy.room.member" and "membership" in pdu.content: - if pdu.content["membership"] == "join": - self._on_join(pdu.context, pdu.state_key) - elif pdu.content["membership"] == "invite": - self._on_invite(pdu.origin, pdu.context, pdu.state_key) - else: - self.output.print_line("#%s (unrec) %s = %s" % - (pdu.context, pdu.pdu_type, json.dumps(pdu.content)) - ) - - #def on_state_change(self, pdu): - ##self.output.print_line("#%s (state) %s *** %s" % - ##(pdu.context, pdu.state_key, pdu.pdu_type) - ##) - - #if "joinee" in pdu.content: - #self._on_join(pdu.context, pdu.content["joinee"]) - #elif "invitee" in pdu.content: - #self._on_invite(pdu.origin, pdu.context, pdu.content["invitee"]) - - def _on_message(self, pdu): - """ We received a message - """ - self.output.print_line("#%s %s %s" % - (pdu.context, pdu.content["sender"], pdu.content["body"]) - ) - - def _on_join(self, context, joinee): - """ Someone has joined a room, either a remote user or a local user - """ - room = self._get_or_create_room(context) - room.add_participant(joinee) - - self.output.print_line("#%s %s %s" % - (context, joinee, "*** JOINED") - ) - - def _on_invite(self, origin, context, invitee): - """ Someone has been invited - """ - room = self._get_or_create_room(context) - room.add_invited(invitee) - - self.output.print_line("#%s %s %s" % - (context, invitee, "*** INVITED") - ) - - if not room.have_got_metadata and origin is not self.server_name: - logger.debug("Get room state") - self.replication_layer.get_state_for_context(origin, context) - room.have_got_metadata = True - - @defer.inlineCallbacks - def send_message(self, room_name, sender, body): - """ Send a message to a room! - """ - destinations = yield self.get_servers_for_context(room_name) - - try: - yield self.replication_layer.send_pdu( - Pdu.create_new( - context=room_name, - pdu_type="sy.room.message", - content={"sender": sender, "body": body}, - origin=self.server_name, - destinations=destinations, - ) - ) - except Exception as e: - logger.exception(e) - - @defer.inlineCallbacks - def join_room(self, room_name, sender, joinee): - """ Join a room! - """ - self._on_join(room_name, joinee) - - destinations = yield self.get_servers_for_context(room_name) - - try: - pdu = Pdu.create_new( - context=room_name, - pdu_type="sy.room.member", - is_state=True, - state_key=joinee, - content={"membership": "join"}, - origin=self.server_name, - destinations=destinations, - ) - yield self.replication_layer.send_pdu(pdu) - except Exception as e: - logger.exception(e) - - @defer.inlineCallbacks - def invite_to_room(self, room_name, sender, invitee): - """ Invite someone to a room! - """ - self._on_invite(self.server_name, room_name, invitee) - - destinations = yield self.get_servers_for_context(room_name) - - try: - yield self.replication_layer.send_pdu( - Pdu.create_new( - context=room_name, - is_state=True, - pdu_type="sy.room.member", - state_key=invitee, - content={"membership": "invite"}, - origin=self.server_name, - destinations=destinations, - ) - ) - except Exception as e: - logger.exception(e) - - def backfill(self, room_name, limit=5): - room = self.joined_rooms.get(room_name) - - if not room: - return - - dest = room.oldest_server - - return self.replication_layer.backfill(dest, room_name, limit) - - def _get_room_remote_servers(self, room_name): - return [i for i in self.joined_rooms.setdefault(room_name,).servers] - - def _get_or_create_room(self, room_name): - return self.joined_rooms.setdefault(room_name, Room(room_name)) - - def get_servers_for_context(self, context): - return defer.succeed( - self.joined_rooms.setdefault(context, Room(context)).servers - ) - - -def main(stdscr): - parser = argparse.ArgumentParser() - parser.add_argument('user', type=str) - parser.add_argument('-v', '--verbose', action='count') - args = parser.parse_args() - - user = args.user - server_name = origin_from_ucid(user) - - ## Set up logging ## - - root_logger = logging.getLogger() - - formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)d - ' - '%(levelname)s - %(message)s') - if not os.path.exists("logs"): - os.makedirs("logs") - fh = logging.FileHandler("logs/%s" % user) - fh.setFormatter(formatter) - - root_logger.addHandler(fh) - root_logger.setLevel(logging.DEBUG) - - # Hack: The only way to get it to stop logging to sys.stderr :( - log.theLogPublisher.observers = [] - observer = log.PythonLoggingObserver() - observer.start() - - ## Set up synapse server - - curses_stdio = cursesio.CursesStdIO(stdscr) - input_output = InputOutput(curses_stdio, user) - - curses_stdio.set_callback(input_output) - - app_hs = SynapseHomeServer(server_name, db_name="dbs/%s" % user) - replication = app_hs.get_replication_layer() - - hs = HomeServer(server_name, replication, curses_stdio) - - input_output.set_home_server(hs) - - ## Add input_output logger - io_logger = IOLoggerHandler(input_output) - io_logger.setFormatter(formatter) - root_logger.addHandler(io_logger) - - ## Start! ## - - try: - port = int(server_name.split(":")[1]) - except: - port = 12345 - - app_hs.get_http_server().start_listening(port) - - reactor.addReader(curses_stdio) - - reactor.run() - - -if __name__ == "__main__": - curses.wrapper(main) diff --git a/graph/graph.py b/graph/graph.py deleted file mode 100644 index b2acadcf5e..0000000000 --- a/graph/graph.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import sqlite3 -import pydot -import cgi -import json -import datetime -import argparse -import urllib2 - - -def make_name(pdu_id, origin): - return "%s@%s" % (pdu_id, origin) - - -def make_graph(pdus, room, filename_prefix): - pdu_map = {} - node_map = {} - - origins = set() - colors = set(("red", "green", "blue", "yellow", "purple")) - - for pdu in pdus: - origins.add(pdu.get("origin")) - - color_map = {color: color for color in colors if color in origins} - colors -= set(color_map.values()) - - color_map[None] = "black" - - for o in origins: - if o in color_map: - continue - try: - c = colors.pop() - color_map[o] = c - except: - print "Run out of colours!" - color_map[o] = "black" - - graph = pydot.Dot(graph_name="Test") - - for pdu in pdus: - name = make_name(pdu.get("pdu_id"), pdu.get("origin")) - pdu_map[name] = pdu - - t = datetime.datetime.fromtimestamp( - float(pdu["ts"]) / 1000 - ).strftime('%Y-%m-%d %H:%M:%S,%f') - - label = ( - "<" - "%(name)s
" - "Type: %(type)s
" - "State key: %(state_key)s
" - "Content: %(content)s
" - "Time: %(time)s
" - "Depth: %(depth)s
" - ">" - ) % { - "name": name, - "type": pdu.get("pdu_type"), - "state_key": pdu.get("state_key"), - "content": cgi.escape(json.dumps(pdu.get("content")), quote=True), - "time": t, - "depth": pdu.get("depth"), - } - - node = pydot.Node( - name=name, - label=label, - color=color_map[pdu.get("origin")] - ) - node_map[name] = node - graph.add_node(node) - - for pdu in pdus: - start_name = make_name(pdu.get("pdu_id"), pdu.get("origin")) - for i, o in pdu.get("prev_pdus", []): - end_name = make_name(i, o) - - if end_name not in node_map: - print "%s not in nodes" % end_name - continue - - edge = pydot.Edge(node_map[start_name], node_map[end_name]) - graph.add_edge(edge) - - # Add prev_state edges, if they exist - if pdu.get("prev_state_id") and pdu.get("prev_state_origin"): - prev_state_name = make_name( - pdu.get("prev_state_id"), pdu.get("prev_state_origin") - ) - - if prev_state_name in node_map: - state_edge = pydot.Edge( - node_map[start_name], node_map[prev_state_name], - style='dotted' - ) - graph.add_edge(state_edge) - - graph.write('%s.dot' % filename_prefix, format='raw', prog='dot') -# graph.write_png("%s.png" % filename_prefix, prog='dot') - graph.write_svg("%s.svg" % filename_prefix, prog='dot') - - -def get_pdus(host, room): - transaction = json.loads( - urllib2.urlopen( - "http://%s/_matrix/federation/v1/context/%s/" % (host, room) - ).read() - ) - - return transaction["pdus"] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Generate a PDU graph for a given room by talking " - "to the given homeserver to get the list of PDUs. \n" - "Requires pydot." - ) - parser.add_argument( - "-p", "--prefix", dest="prefix", - help="String to prefix output files with" - ) - parser.add_argument('host') - parser.add_argument('room') - - args = parser.parse_args() - - host = args.host - room = args.room - prefix = args.prefix if args.prefix else "%s_graph" % (room) - - pdus = get_pdus(host, room) - - make_graph(pdus, room, prefix) diff --git a/graph/graph2.py b/graph/graph2.py deleted file mode 100644 index 6b551d42e5..0000000000 --- a/graph/graph2.py +++ /dev/null @@ -1,156 +0,0 @@ -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import sqlite3 -import pydot -import cgi -import json -import datetime -import argparse - -from synapse.events import FrozenEvent - - -def make_graph(db_name, room_id, file_prefix, limit): - conn = sqlite3.connect(db_name) - - sql = ( - "SELECT json FROM event_json as j " - "INNER JOIN events as e ON e.event_id = j.event_id " - "WHERE j.room_id = ?" - ) - - args = [room_id] - - if limit: - sql += ( - " ORDER BY topological_ordering DESC, stream_ordering DESC " - "LIMIT ?" - ) - - args.append(limit) - - c = conn.execute(sql, args) - - events = [FrozenEvent(json.loads(e[0])) for e in c.fetchall()] - - events.sort(key=lambda e: e.depth) - - node_map = {} - state_groups = {} - - graph = pydot.Dot(graph_name="Test") - - for event in events: - c = conn.execute( - "SELECT state_group FROM event_to_state_groups " - "WHERE event_id = ?", - (event.event_id,) - ) - - res = c.fetchone() - state_group = res[0] if res else None - - if state_group is not None: - state_groups.setdefault(state_group, []).append(event.event_id) - - t = datetime.datetime.fromtimestamp( - float(event.origin_server_ts) / 1000 - ).strftime('%Y-%m-%d %H:%M:%S,%f') - - content = json.dumps(event.get_dict()["content"]) - - label = ( - "<" - "%(name)s
" - "Type: %(type)s
" - "State key: %(state_key)s
" - "Content: %(content)s
" - "Time: %(time)s
" - "Depth: %(depth)s
" - "State group: %(state_group)s
" - ">" - ) % { - "name": event.event_id, - "type": event.type, - "state_key": event.get("state_key", None), - "content": cgi.escape(content, quote=True), - "time": t, - "depth": event.depth, - "state_group": state_group, - } - - node = pydot.Node( - name=event.event_id, - label=label, - ) - - node_map[event.event_id] = node - graph.add_node(node) - - for event in events: - for prev_id, _ in event.prev_events: - try: - end_node = node_map[prev_id] - except: - end_node = pydot.Node( - name=prev_id, - label="<%s>" % (prev_id,), - ) - - node_map[prev_id] = end_node - graph.add_node(end_node) - - edge = pydot.Edge(node_map[event.event_id], end_node) - graph.add_edge(edge) - - for group, event_ids in state_groups.items(): - if len(event_ids) <= 1: - continue - - cluster = pydot.Cluster( - str(group), - label="" % (str(group),) - ) - - for event_id in event_ids: - cluster.add_node(node_map[event_id]) - - graph.add_subgraph(cluster) - - graph.write('%s.dot' % file_prefix, format='raw', prog='dot') - graph.write_svg("%s.svg" % file_prefix, prog='dot') - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Generate a PDU graph for a given room by talking " - "to the given homeserver to get the list of PDUs. \n" - "Requires pydot." - ) - parser.add_argument( - "-p", "--prefix", dest="prefix", - help="String to prefix output files with", - default="graph_output" - ) - parser.add_argument( - "-l", "--limit", - help="Only retrieve the last N events.", - ) - parser.add_argument('db') - parser.add_argument('room') - - args = parser.parse_args() - - make_graph(args.db, args.room, args.prefix, args.limit) -- cgit 1.4.1