diff --git a/Gemfile b/Gemfile index 81a1a74..e405f36 100644 --- a/Gemfile +++ b/Gemfile @@ -11,3 +11,5 @@ gem "rspec", "~> 3.0" gem "rubocop", "~> 1.21" gem "async", "~> 2.9" + +gem "wampproto", github: "xconnio/wampproto.rb", branch: :main, require: true diff --git a/lib/wamp.rb b/lib/wamp.rb index 3d68ad1..f3ebcd0 100644 --- a/lib/wamp.rb +++ b/lib/wamp.rb @@ -1,12 +1,11 @@ # frozen_string_literal: true require_relative "wamp/version" -require_relative "wamp/connection/base" +require_relative "wamp/message_handler" +require_relative "wamp/connection/session" +require_relative "wamp/connection/websocket_connection" require_relative "wamp/serializer" require_relative "wamp/router" -require_relative "wamp/message" -require_relative "wamp/manager" -require_relative "wamp/auth" module Wamp class Error < StandardError; end diff --git a/lib/wamp/auth.rb b/lib/wamp/auth.rb deleted file mode 100644 index 97f330c..0000000 --- a/lib/wamp/auth.rb +++ /dev/null @@ -1,11 +0,0 @@ -# frozen_string_literal: true - -require_relative "auth/cra" -require_relative "auth/cryptosign" -require_relative "auth/ticket" - -module Wamp - # Auth classes - module Auth - end -end diff --git a/lib/wamp/auth/anonymous.rb b/lib/wamp/auth/anonymous.rb deleted file mode 100644 index 29c7622..0000000 --- a/lib/wamp/auth/anonymous.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Auth - # generates wampcra authentication signature - class Anonymous - def initialize(details = {}) - @details = details - end - - def details - {}.tap do |hsh| - hsh[:authid] = "anonymous" - hsh[:authmethods] = ["anonymous"] - end - end - - def authenticate(_challenge); end - end - end -end diff --git a/lib/wamp/auth/cra.rb b/lib/wamp/auth/cra.rb deleted file mode 100644 index 0075886..0000000 --- a/lib/wamp/auth/cra.rb +++ /dev/null @@ -1,49 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Auth - # generates wampcra authentication signature - class Cra - attr_reader :secret - - def initialize(secret, details = {}) - @secret = secret - @details = details - end - - def details - {}.tap do |hsh| - hsh[:authid] = @details.fetch(:authid) - hsh[:authmethods] = ["wampcra"] - hsh[:authextra] = @details.fetch(:authextra, {}) - end - end - - def authenticate(challenge) - signature = create_signature(challenge) - Wamp::Message::Authenticate.new(signature) - end - - private - - def create_signature(challenge) - extra = challenge.extra - hmac = OpenSSL::HMAC.new(create_drived_secret(extra), "SHA256") if extra.key?("salt") - hmac ||= OpenSSL::HMAC.new(secret, "SHA256") - - hmac.update(extra["challenge"]) - - Base64.encode64(hmac.digest).gsub("\n", "") - end - - def create_drived_secret(extra) - salt = extra["salt"] - length = extra["keylen"] - iterations = extra["iterations"] - - key = OpenSSL::KDF.pbkdf2_hmac(secret, salt: salt, iterations: iterations, length: length, hash: "SHA256") - key.unpack1("H*") - end - end - end -end diff --git a/lib/wamp/auth/cryptosign.rb b/lib/wamp/auth/cryptosign.rb deleted file mode 100644 index eb432b2..0000000 --- a/lib/wamp/auth/cryptosign.rb +++ /dev/null @@ -1,81 +0,0 @@ -# frozen_string_literal: true - -require "ed25519" - -module Wamp - module Auth - # generates wampcra authentication signature - class Cryptosign - attr_reader :private_key - - def initialize(private_key, details = {}) - @private_key = private_key - @details = details - end - - def details - {}.tap do |hsh| - hsh[:authid] = @details.fetch(:authid) - hsh[:authmethods] = ["cryptosign"] - hsh[:authextra] = @details.fetch(:authextra, {}).merge(pubkey: public_key) - end - end - - def authenticate(challenge) - signature = create_signature(challenge) - Wamp::Message::Authenticate.new(signature) - end - - private - - def key_pair - @key_pair ||= Ed25519::SigningKey.new(hex_to_binary(private_key)) - end - - def public_key - binary_to_hex(key_pair.verify_key.to_bytes) - end - - def create_signature(challenge) - extra = challenge.extra - return handle_channel_binding(extra) if extra[:channel_id] - - handle_without_channel_binding(extra) - end - - def handle_without_channel_binding(extra) - hex_challenge = extra[:challenge] - binary_challenge = hex_to_binary(hex_challenge) - binary_signature = key_pair.sign(binary_challenge) - signature = binary_to_hex(binary_signature) - - "#{signature}#{hex_challenge}" - end - - def handle_channel_binding(extra) - channel_id = hex_to_binary(extra[:channel_id]) - challenge = hex_to_binary(extra[:challenge]) - xored_challenge = xored_strings(channel_id, challenge) - binary_signed_challenge = key_pair.sign(xored_challenge) - signature = binary_to_hex(binary_signed_challenge) - hex_xored_challenge = binary_to_hex(xored_challenge) - "#{signature}#{hex_xored_challenge}" - end - - def xored_strings(channel_id, challenge_str) - channel_id_bytes = channel_id.bytes - challenge_bytes = challenge_str.bytes - xored = channel_id_bytes.zip(challenge_bytes).map { |byte1, byte2| byte1 ^ byte2 } - xored.pack("C*") - end - - def hex_to_binary(hex_string) - [hex_string].pack("H*") - end - - def binary_to_hex(binary_string) - binary_string.unpack1("H*") - end - end - end -end diff --git a/lib/wamp/auth/ticket.rb b/lib/wamp/auth/ticket.rb deleted file mode 100644 index 032e947..0000000 --- a/lib/wamp/auth/ticket.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Auth - # generates ticket authentication signature - class Ticket - attr_reader :secret - - def initialize(secret, details = {}) - @secret = secret - @details = details - end - - def details - {}.tap do |hsh| - hsh[:authid] = @details.fetch(:authid) - hsh[:authmethods] = ["ticket"] - hsh[:authextra] = @details.fetch(:authextra, {}) - end - end - - def authenticate(challenge) - signature = create_signature(challenge) - Wamp::Message::Authenticate.new(signature) - end - - private - - def create_signature(_challenge) - secret - end - end - end -end diff --git a/lib/wamp/connection/base.rb b/lib/wamp/connection/base.rb deleted file mode 100644 index bfe804a..0000000 --- a/lib/wamp/connection/base.rb +++ /dev/null @@ -1,88 +0,0 @@ -# frozen_string_literal: true - -require_relative "websocket_client" -require_relative "../auth/anonymous" -require_relative "../message/validate" - -module Wamp - module Connection - # class to start accepting connnections - class Base - class UnsupportedSerializer < StandardError; end - - include WebSocket::Driver::EventEmitter - attr_reader :websocket, :url - - def initialize(url = "ws://localhost:8080/ws", realm = "realm1", options = {}) - super() - @url = url - @realm = realm - @options = Message::Validate.options!(options, %i[serializer auth]) - - @websocket = Wamp::Connection::WebsocketClient.new(self, protocols) - end - - def run - websocket.run - ensure - p "Close" - close - end - - def transmit(wamp_message) - websocket.transmit encode(wamp_message) - end - - def on_open; end - - def on_message(data) - coder.decode(data) - end - - def on_close(reason, code) - p [:on_close, reason, code] - end - - def on_error(message); end - - def close(code = 3000, reason = "Reason") - websocket.close(code, reason) - end - - def auth - @options.fetch(:auth, Auth::Anonymous.new) - end - - private - - def serializer - @options.fetch(:serializer, :json).to_sym - end - - def protocols - protocol = { json: "wamp.2.json", cbor: "wamp.2.cbor", msgpack: "wamp.2.msgpack" }[serializer] - raise UnsupportedSerializer unless protocol - - [protocol] - end - - def encode(wamp_message) - coder.encode wamp_message - end - - def decode(websocket_message) - coder.decode websocket_message - end - - def coder - @coder ||= case serializer - when :json then Wamp::Serializer::JSON - when :msgpack then Wamp::Serializer::MessagePack - when :cbor then Wamp::Serializer::Cbor - else - raise "Unsupported protocol #{websocket.protocol}" - end - end - end - end -end diff --git a/lib/wamp/connection/session.rb b/lib/wamp/connection/session.rb new file mode 100644 index 0000000..5ca3ad0 --- /dev/null +++ b/lib/wamp/connection/session.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module Wamp + module Connection + # Client Session + class Session + attr_reader :joiner, :session, :store, :api + attr_accessor :executor, :stream + + def initialize(joiner = Wampproto::Joiner.new("realm1")) + @joiner = joiner + @session = Wampproto::Session.new(joiner.serializer) + @api = MessageHandler::Api.new(self) + @store = {} + end + + def on_join(&block) + self.executor = block + end + + def on_open + stream.on_message joiner.send_hello + end + + def on_message(data) + handler = MessageHandler.resolve(data, self) + handler.handle + end + + def transmit(data) + stream.on_message data + end + end + end +end diff --git a/lib/wamp/connection/websocket_connection.rb b/lib/wamp/connection/websocket_connection.rb new file mode 100644 index 0000000..3e1b75e --- /dev/null +++ b/lib/wamp/connection/websocket_connection.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +require "wampproto" +require_relative "websocket_client" + +# extending the class +class Wampproto::Joiner # rubocop:disable Style/ClassAndModuleChildren + def joined? + state == STATE_JOINED + end +end + +module Wamp + module Connection + # Conn + class WebSocketConnection < Session + attr_reader :url, :websocket + + def initialize(url = "ws://localhost:8080/ws", joiner = Wampproto::Joiner.new("realm1")) + super(joiner) + @url = url + @store = {} + @websocket = Wamp::Connection::WebsocketClient.new(self, protocols) + end + + def run + websocket.run + ensure + p "Close" + websocket.close + end + + def on_open + transmit joiner.send_hello + end + + def transmit(data) + websocket.transmit data + end + + def on_close(reason, code) + p [:on_close, reason, code] + end + + def on_error; end + + private + + def protocols + case joiner.serializer.name + when Wampproto::Serializer::JSON.name then "wamp.2.json" + when Wampproto::Serializer::Msgpack.name then "wamp.2.msgpack" + when Wampproto::Serializer::Cbor.name then "wamp.2.cbor" + end.then do |protocol| + [protocol] + end + end + end + end +end diff --git a/lib/wamp/message.rb b/lib/wamp/message.rb deleted file mode 100644 index 48ae6ce..0000000 --- a/lib/wamp/message.rb +++ /dev/null @@ -1,113 +0,0 @@ -# frozen_string_literal: true - -require_relative "message/hello" -require_relative "message/welcome" -require_relative "message/abort" -require_relative "message/challenge" -require_relative "message/authenticate" - -require_relative "message/goodbye" -require_relative "message/error" - -require_relative "message/subscribe" -require_relative "message/subscribed" -require_relative "message/unsubscribe" -require_relative "message/unsubscribed" -require_relative "message/event" - -require_relative "message/publish" -require_relative "message/published" - -require_relative "message/call" -require_relative "message/cancel" -require_relative "message/result" - -require_relative "message/register" -require_relative "message/registered" -require_relative "message/unregister" -require_relative "message/unregistered" - -require_relative "message/invocation" -require_relative "message/interrupt" -require_relative "message/yield" - -module Wamp - # message root - module Message - module Type - HELLO = 1 - WELCOME = 2 - ABORT = 3 - CHALLENGE = 4 - AUTHENTICATE = 5 - GOODBYE = 6 - - ERROR = 8 - - PUBLISH = 16 - PUBLISHED = 17 - - SUBSCRIBE = 32 - SUBSCRIBED = 33 - UNSUBSCRIBE = 34 - UNSUBSCRIBED = 35 - EVENT = 36 - - CALL = 48 - CANCEL = 49 - RESULT = 50 - - REGISTER = 64 - REGISTERED = 65 - UNREGISTER = 66 - UNREGISTERED = 67 - INVOCATION = 68 - INTERRUPT = 69 - YIELD = 70 - end - - HANDLER = { - Type::HELLO => Hello, - Type::WELCOME => Welcome, - Type::ABORT => Abort, - Type::CHALLENGE => Challenge, - Type::AUTHENTICATE => Authenticate, - Type::GOODBYE => Goodbye, - - Type::ERROR => Error, - - Type::SUBSCRIBE => Subscribe, - Type::SUBSCRIBED => Subscribed, - Type::UNSUBSCRIBE => Unsubscribe, - Type::UNSUBSCRIBED => Unsubscribed, - - Type::PUBLISH => Publish, - Type::PUBLISHED => Published, - Type::EVENT => Event, - - Type::CALL => Call, - Type::CANCEL => Cancel, - Type::RESULT => Result, - - Type::REGISTER => Register, - Type::REGISTERED => Registered, - Type::UNREGISTER => Unregister, - Type::UNREGISTERED => Unregistered, - - Type::INTERRUPT => Interrupt, - Type::INVOCATION => Invocation, - - Type::YIELD => Yield - }.freeze - - def self.resolve(wamp_message) - type, = Validate.array!("Wamp Message", wamp_message) - begin - HANDLER[type].parse(wamp_message) - rescue StandardError => e - p wamp_message - raise e - end - end - end -end diff --git a/lib/wamp/message/abort.rb b/lib/wamp/message/abort.rb deleted file mode 100644 index 30aef93..0000000 --- a/lib/wamp/message/abort.rb +++ /dev/null @@ -1,33 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # abort message - class Abort - attr_reader :details, :reason, :args, :kwargs - - def initialize(details, reason, *args, **kwargs) - @details = Validate.hash!("Details", details) - @reason = Validate.string!("Reason", reason) - @args = Validate.array!("Arguments", args) - @kwargs = Validate.hash!("Keyword Arguments", kwargs) - end - - def payload - @payload = [Type::ABORT, @details, @reason] - @payload << @args if @kwargs.any? || @args.any? - @payload << @kwargs if @kwargs.any? - @payload - end - - def self.parse(wamp_message) - _type, details, reason, args, kwargs = wamp_message - args ||= [] - kwargs ||= {} - new(details, reason, *args, **kwargs) - end - end - end -end diff --git a/lib/wamp/message/authenticate.rb b/lib/wamp/message/authenticate.rb deleted file mode 100644 index 4fe1d54..0000000 --- a/lib/wamp/message/authenticate.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # Wamp Authenticate message - class Authenticate - attr_reader :signature, :extra - - def initialize(signature, extra = {}) - @signature = Validate.string!("Signature", signature) - @extra = Validate.hash!("Extra", extra) - end - - def payload - [Type::AUTHENTICATE, signature, extra] - end - - def self.parse(wamp_message) - _type, signature, extra = wamp_message - new(signature, extra) - end - end - end -end diff --git a/lib/wamp/message/call.rb b/lib/wamp/message/call.rb deleted file mode 100644 index 0e8ae41..0000000 --- a/lib/wamp/message/call.rb +++ /dev/null @@ -1,32 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Message - # wamp call message - class Call - attr_reader :request_id, :options, :procedure, :args, :kwargs - - def initialize(request_id, options, procedure, *args, **kwargs) - @request_id = Validate.int!("Request Id", request_id) - @options = Validate.hash!("Options", options) - @procedure = Validate.string!("Procedure", procedure) - @args = Validate.array!("Arguments", args) - @kwargs = Validate.hash!("Keyword Arguments", kwargs) - end - - def payload - @payload = [Type::CALL, @request_id, @options, @procedure] - @payload << @args if @kwargs.any? || @args.any? - @payload << @kwargs if @kwargs.any? - @payload - end - - def self.parse(wamp_message) - _type, request_id, options, procedure, args, kwargs = wamp_message - args ||= [] - kwargs ||= {} - new(request_id, options, procedure, *args, **kwargs) - end - end - end -end diff --git a/lib/wamp/message/cancel.rb b/lib/wamp/message/cancel.rb deleted file mode 100644 index 21a857a..0000000 --- a/lib/wamp/message/cancel.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # Wamp Cancel message - class Cancel - attr_reader :request_id, :options - - def initialize(request_id, options = {}) - @request_id = Validate.int!("Request Id", request_id) - @options = Validate.hash!("Options", options) - end - - def payload - [Type::CANCEL, request_id, options] - end - - def self.parse(wamp_message) - _type, request_id, options = wamp_message - new(request_id, options) - end - end - end -end diff --git a/lib/wamp/message/challenge.rb b/lib/wamp/message/challenge.rb deleted file mode 100644 index e8d2387..0000000 --- a/lib/wamp/message/challenge.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # Wamp Challenge message - class Challenge - attr_reader :auth_method, :extra - - def initialize(auth_method, extra = {}) - @auth_method = Validate.string!("AuthMethod", auth_method) - @extra = Validate.hash!("Extra", extra) - end - - def payload - [Type::CHALLENGE, auth_method, extra] - end - - def self.parse(wamp_message) - _type, auth_method, extra = wamp_message - new(auth_method, extra) - end - end - end -end diff --git a/lib/wamp/message/error.rb b/lib/wamp/message/error.rb deleted file mode 100644 index e2628e2..0000000 --- a/lib/wamp/message/error.rb +++ /dev/null @@ -1,28 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # abort message - class Error - attr_reader :message_type, :request_id, :details, :error - - def initialize(message_type, request_id, details, error) - @message_type = Validate.int!("Message Type", message_type) - @request_id = Validate.int!("Request Id", request_id) - @details = Validate.hash!("Details", details) - @error = Validate.string!("Error", error) - end - - def payload - [Type::ERROR, @message_type, @request_id, @details, @error] - end - - def self.parse(wamp_message) - _type, message_type, request_id, details, error = Validate.length!(wamp_message, 5) - new(message_type, request_id, details, error) - end - end - end -end diff --git a/lib/wamp/message/event.rb b/lib/wamp/message/event.rb deleted file mode 100644 index ed87471..0000000 --- a/lib/wamp/message/event.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # event message - class Event - attr_reader :subscription_id, :publication_id, :details, :args, :kwargs - - def initialize(subscription_id, publication_id, details, *args, **kwargs) - @subscription_id = Validate.int!("Subscription Id", subscription_id) - @publication_id = Validate.int!("Publication Id", publication_id) - @details = Validate.hash!("details", details) - @args = Validate.array!("Arguments", args) - @kwargs = Validate.hash!("Keyword Arguments", kwargs) - end - - def payload - @payload = [Type::EVENT, subscription_id, publication_id, details] - @payload << @args if @kwargs.any? || @args.any? - @payload << @kwargs if @kwargs.any? - @payload - end - - def self.parse(wamp_message) - _type, subscription_id, publication_id, details, args, kwargs = wamp_message - args ||= [] - kwargs ||= {} - new(subscription_id, publication_id, details, *args, **kwargs) - end - end - end -end diff --git a/lib/wamp/message/goodbye.rb b/lib/wamp/message/goodbye.rb deleted file mode 100644 index 6922fbe..0000000 --- a/lib/wamp/message/goodbye.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # abort message - class Goodbye - attr_reader :details, :reason - - def initialize(details, reason) - @details = Validate.hash!("Details", details) - @reason = Validate.string!("Reason", reason) - end - - def payload - [Type::GOODBYE, @details, @reason] - end - - def self.parse(wamp_message) - _type, details, reason = Validate.greater_than_equal!(wamp_message, 3) - new(details, reason) - end - end - end -end diff --git a/lib/wamp/message/hello.rb b/lib/wamp/message/hello.rb deleted file mode 100644 index e9b663e..0000000 --- a/lib/wamp/message/hello.rb +++ /dev/null @@ -1,57 +0,0 @@ -# frozen_string_literal: true - -require_relative "../../wamp/version" -require_relative "validate" - -module Wamp - module Message - # Wamp Hello message - class Hello - attr_reader :realm, :details - - def initialize(realm, details = {}) - @realm = Validate.string!("Realm", realm) - @details = default_details.merge(parse_details(Validate.hash!("Details", details))).merge(additional_details) - end - - def payload - [Type::HELLO, @realm, @details] - end - - def parse_details(hsh = {}) - details = {} - details[:roles] = hsh.fetch(:roles, default_roles) - details[:authid] = hsh.fetch(:authid, "") - details[:authmethods] = [*hsh.fetch(:authmethods, "anonymous")] - details[:authextra] = hsh.fetch(:authextra) if hsh[:authextra] - details - end - - def self.parse(wamp_message) - _type, realm, details = wamp_message - new(realm, details) - end - - private - - def default_details - { roles: default_roles } - end - - def default_roles - { caller: { - features: { call_canceling: true, caller_identification: true, progressive_call_results: true } - }, publisher: {}, subscriber: {}, - callee: { - - features: { call_canceling: true, progressive_call_results: true, registration_revocation: true, - caller_identification: true } - } } - end - - def additional_details - { agent: "Ruby-Wamp-Client-#{Wamp::VERSION}" } - end - end - end -end diff --git a/lib/wamp/message/interrupt.rb b/lib/wamp/message/interrupt.rb deleted file mode 100644 index bde595d..0000000 --- a/lib/wamp/message/interrupt.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # Wamp Interrupt message - class Interrupt - attr_reader :request_id, :options - - def initialize(request_id, options = {}) - @request_id = Validate.int!("Request Id", request_id) - @options = Validate.hash!("Options", options) - end - - def payload - [Type::INTERRUPT, request_id, options] - end - - def self.parse(wamp_message) - _type, request_id, options = wamp_message - new(request_id, options) - end - end - end -end diff --git a/lib/wamp/message/invocation.rb b/lib/wamp/message/invocation.rb deleted file mode 100644 index 974676f..0000000 --- a/lib/wamp/message/invocation.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # abort message - class Invocation - attr_reader :request_id, :registration_id, :details, :args, :kwargs - - def initialize(request_id, registration_id, details, *args, **kwargs) - @request_id = Validate.int!("Request Id", request_id) - @registration_id = Validate.int!("Registration Id", registration_id) - @details = Validate.hash!("Details", details) - @args = Validate.array!("Arguments", args) - @kwargs = Validate.hash!("Keyword Arguments", kwargs) - end - - def payload - @payload = [Type::INVOCATION, request_id, registration_id, details] - @payload << args if kwargs.any? || args.any? - @payload << kwargs if kwargs.any? - @payload - end - - def self.parse(wamp_message) - _type, request_id, registration_id, details, args, kwargs = wamp_message - args ||= [] - kwargs ||= {} - new(request_id, registration_id, details, *args, **kwargs) - end - end - end -end diff --git a/lib/wamp/message/publish.rb b/lib/wamp/message/publish.rb deleted file mode 100644 index db85228..0000000 --- a/lib/wamp/message/publish.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # publish message - class Publish - attr_reader :request_id, :options, :topic, :args, :kwargs - - def initialize(request_id, options, topic, *args, **kwargs) - @request_id = Validate.int!("Request Id", request_id) - @options = Validate.hash!("Options", options) - @topic = Validate.string!("Topic", topic) - @args = Validate.array!("Arguments", args) - @kwargs = Validate.hash!("Keyword Arguments", kwargs) - end - - def payload - @payload = [Type::PUBLISH, @request_id, @options, @topic] - @payload << @args if @kwargs.any? || @args.any? - @payload << @kwargs if @kwargs.any? - @payload - end - - def self.parse(wamp_message) - _type, request_id, options, topic, args, kwargs = wamp_message - args ||= [] - kwargs ||= {} - new(request_id, options, topic, *args, **kwargs) - end - end - end -end diff --git a/lib/wamp/message/published.rb b/lib/wamp/message/published.rb deleted file mode 100644 index f7e3a44..0000000 --- a/lib/wamp/message/published.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # published message - class Published - attr_reader :request_id, :publication_id - - def initialize(request_id, publication_id) - @request_id = Validate.int!("Request Id", request_id) - @publication_id = Validate.int!("Publication Id", publication_id) - end - - def payload - [Type::PUBLISHED, request_id, publication_id] - end - - def self.parse(wamp_message) - _type, request_id, publication_id = Validate.length!(wamp_message, 3) - new(request_id, publication_id) - end - end - end -end diff --git a/lib/wamp/message/register.rb b/lib/wamp/message/register.rb deleted file mode 100644 index 524e06a..0000000 --- a/lib/wamp/message/register.rb +++ /dev/null @@ -1,27 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # publish message - class Register - attr_reader :request_id, :options, :procedure - - def initialize(request_id, options, procedure) - @request_id = Validate.int!("Request Id", request_id) - @options = Validate.hash!("Options", options) - @procedure = Validate.string!("Procedure", procedure) - end - - def payload - [Type::REGISTER, @request_id, @options, @procedure] - end - - def self.parse(wamp_message) - _type, request_id, options, procedure = wamp_message - new(request_id, options, procedure) - end - end - end -end diff --git a/lib/wamp/message/registered.rb b/lib/wamp/message/registered.rb deleted file mode 100644 index 0b41409..0000000 --- a/lib/wamp/message/registered.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # publish message - class Registered - attr_reader :request_id, :registration_id - - def initialize(request_id, registration_id) - @request_id = Validate.int!("Request Id", request_id) - @registration_id = Validate.int!("Registration Id", registration_id) - end - - def payload - [Type::REGISTERED, @request_id, @registration_id] - end - - def self.parse(wamp_message) - _type, request_id, registration_id = wamp_message - new(request_id, registration_id) - end - end - end -end diff --git a/lib/wamp/message/result.rb b/lib/wamp/message/result.rb deleted file mode 100644 index 98c374b..0000000 --- a/lib/wamp/message/result.rb +++ /dev/null @@ -1,33 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # wamp call message - class Result - attr_reader :request_id, :details, :procedure, :args, :kwargs - - def initialize(request_id, details, *args, **kwargs) - @request_id = Validate.int!("Request Id", request_id) - @details = Validate.hash!("Details", details) - @args = Validate.array!("Arguments", args) - @kwargs = Validate.hash!("Keyword Arguments", kwargs) - end - - def payload - @payload = [Type::RESULT, @request_id, @details] - @payload << @args if @kwargs.any? || @args.any? - @payload << @kwargs if @kwargs.any? - @payload - end - - def self.parse(wamp_message) - _type, request_id, details, args, kwargs = wamp_message - args ||= [] - kwargs ||= {} - new(request_id, details, *args, **kwargs) - end - end - end -end diff --git a/lib/wamp/message/subscribe.rb b/lib/wamp/message/subscribe.rb deleted file mode 100644 index 0d63911..0000000 --- a/lib/wamp/message/subscribe.rb +++ /dev/null @@ -1,22 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # abort message - class Subscribe - attr_reader :request_id, :options, :topic - - def initialize(request_id, options, topic) - @request_id = Validate.int!("Request Id", request_id) - @options = Validate.hash!("Options", options) - @topic = Validate.string!("Topic", topic) - end - - def payload - [Type::SUBSCRIBE, @request_id, @options, @topic] - end - end - end -end diff --git a/lib/wamp/message/subscribed.rb b/lib/wamp/message/subscribed.rb deleted file mode 100644 index 396e954..0000000 --- a/lib/wamp/message/subscribed.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # abort message - class Subscribed - attr_reader :request_id, :subscription_id - - def initialize(request_id, subscription_id) - @request_id = Validate.int!("Request Id", request_id) - @subscription_id = Validate.int!("Subscription Id", subscription_id) - end - - def payload - [Type::SUBSCRIBED, @request_id, @subscription_id] - end - - def self.parse(wamp_message) - _type, request_id, subscription_id = Validate.length!(wamp_message, 3) - new(request_id, subscription_id) - end - end - end -end diff --git a/lib/wamp/message/unregister.rb b/lib/wamp/message/unregister.rb deleted file mode 100644 index 065f98f..0000000 --- a/lib/wamp/message/unregister.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # unregister message - class Unregister - attr_reader :request_id, :registration_id - - def initialize(request_id, registration_id) - @request_id = Validate.int!("Request Id", request_id) - @registration_id = Validate.int!("Registration Id", registration_id) - end - - def payload - [Type::REGISTERED, @request_id, @registration_id] - end - - def self.parse(wamp_message) - _type, request_id, registration_id = wamp_message - new(request_id, registration_id) - end - end - end -end diff --git a/lib/wamp/message/unregistered.rb b/lib/wamp/message/unregistered.rb deleted file mode 100644 index 53dc920..0000000 --- a/lib/wamp/message/unregistered.rb +++ /dev/null @@ -1,25 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # unregistered message - class Unregistered - attr_reader :request_id - - def initialize(request_id) - @request_id = Validate.int!("Request Id", request_id) - end - - def payload - [Type::UNREGISTERED, @request_id] - end - - def self.parse(wamp_message) - _type, request_id = wamp_message - new(request_id) - end - end - end -end diff --git a/lib/wamp/message/unsubscribe.rb b/lib/wamp/message/unsubscribe.rb deleted file mode 100644 index d700c4c..0000000 --- a/lib/wamp/message/unsubscribe.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # abort message - class Unsubscribe - attr_reader :request_id, :subscription_id - - def initialize(request_id, subscription_id) - @request_id = Validate.int!("Request Id", request_id) - @subscription_id = Validate.int!("Subscription Id", subscription_id) - end - - def payload - [Type::UNSUBSCRIBE, @request_id, @subscription_id] - end - - def self.parse(wamp_message) - _type, request_id, subscription_id = Validate.length!(wamp_message, 3) - new(request_id, subscription_id) - end - end - end -end diff --git a/lib/wamp/message/unsubscribed.rb b/lib/wamp/message/unsubscribed.rb deleted file mode 100644 index 70d5a02..0000000 --- a/lib/wamp/message/unsubscribed.rb +++ /dev/null @@ -1,25 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # Unsubscribed message - class Unsubscribed - attr_reader :request_id - - def initialize(request_id) - @request_id = Validate.int!("Request Id", request_id) - end - - def payload - [Type::UNSUBSCRIBED, @request_id] - end - - def self.parse(wamp_message) - _type, request_id = Validate.length!(wamp_message, 2) - new(request_id) - end - end - end -end diff --git a/lib/wamp/message/validate.rb b/lib/wamp/message/validate.rb deleted file mode 100644 index d468e49..0000000 --- a/lib/wamp/message/validate.rb +++ /dev/null @@ -1,54 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Message - # validation - class Validate - class << self - def int!(name, value) - return value if value.is_a?(Integer) - - raise ArgumentError, "The #{name} argument should be an integer." - end - - def string!(name, value) - return value if value.is_a?(String) - - raise ArgumentError, "The #{name} argument should be a string." - end - - def hash!(name, value) - return value.transform_keys(&:to_sym) if value.is_a?(Hash) - - raise ArgumentError, "The #{name} argument should be a dictionary." - end - - def array!(name, value) - return value if value.is_a?(Array) - - raise ArgumentError, "The #{name} argument should be a list." - end - - def length!(array, expected_length) - return array if array.length == expected_length - - raise ArgumentError, "The response message length should be #{expected_length} but got #{array.length} " - end - - def greater_than_equal!(array, expected_length) - return array if array.length >= expected_length - - raise ArgumentError, "The response message length is #{array.length} but it should be #{expected_length} " - end - - def options!(options, valid_keys) - options.each_key do |key| - raise ArgumentError, "Unrecognized option: #{key.inspect}" unless valid_keys.include?(key) - end - - options - end - end - end - end -end diff --git a/lib/wamp/message/welcome.rb b/lib/wamp/message/welcome.rb deleted file mode 100644 index 2fcc497..0000000 --- a/lib/wamp/message/welcome.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # welcome message - class Welcome - attr_reader :session_id, :details - - def initialize(session_id, details = {}) - @session_id = Validate.int!("Session Id", session_id) - @details = Validate.hash!("Details", details) - end - - def payload - [Type::WELCOME, @session_id, @details] - end - - def self.parse(wamp_message) - _type, session_id, details = wamp_message - new(session_id, details) - end - end - end -end diff --git a/lib/wamp/message/yield.rb b/lib/wamp/message/yield.rb deleted file mode 100644 index a933f73..0000000 --- a/lib/wamp/message/yield.rb +++ /dev/null @@ -1,33 +0,0 @@ -# frozen_string_literal: true - -require_relative "validate" - -module Wamp - module Message - # yield message - class Yield - attr_reader :request_id, :options, :args, :kwargs - - def initialize(request_id, options, *args, **kwargs) - @request_id = Validate.int!("Request Id", request_id) - @options = Validate.hash!("Options", options) - @args = Validate.array!("Arguments", args) - @kwargs = Validate.hash!("Keyword Arguments", kwargs) - end - - def payload - @payload = [Type::YIELD, request_id, options] - @payload << args if kwargs.any? || args.any? - @payload << kwargs if kwargs.any? - @payload - end - - def self.parse(wamp_message) - _type, request_id, options, args, kwargs = wamp_message - args ||= [] - kwargs ||= {} - new(request_id, options, *args, **kwargs) - end - end - end -end diff --git a/lib/wamp/message_handler.rb b/lib/wamp/message_handler.rb new file mode 100644 index 0000000..9d835b8 --- /dev/null +++ b/lib/wamp/message_handler.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +require_relative "message_handler/base" + +require_relative "message_handler/hello" +require_relative "message_handler/welcome" +require_relative "message_handler/challenge" +require_relative "message_handler/goodbye" + +require_relative "message_handler/subscribe" +require_relative "message_handler/subscribed" +require_relative "message_handler/unsubscribe" +require_relative "message_handler/unsubscribed" + +require_relative "message_handler/publish" +require_relative "message_handler/published" +require_relative "message_handler/event" + +require_relative "message_handler/call" +require_relative "message_handler/result" + +require_relative "message_handler/register" +require_relative "message_handler/registered" +require_relative "message_handler/unregister" +require_relative "message_handler/unregistered" + +require_relative "message_handler/invocation" + +require_relative "message_handler/error" + +require_relative "message_handler/api" + +module Wamp + # routes messages + module MessageHandler + # instantiate correct handler + module ClassMethods + def resolve(data, connection) + # return handle_when_not_joined(data, connection) unless connection.joiner.joined? + + message = connection.joiner.serializer.deserialize(data) + klass_name = demodulize(message.class.name) + klass = constantize("Wamp::MessageHandler::#{klass_name}") + klass.new(message, connection) + end + + def handle_when_not_joined(data, connection) + authenticate = connection.joiner.receive(data) # maybe welcome message then state should be joined + connection.transmit authenticate unless connection.joiner.joined? + connection.executor.call(connection.api) if connection.joiner.joined? + Struct.new(:handle).new + end + + def from(message, connection) + klass_name = demodulize(message.class.name) + klass = constantize("Wamp::MessageHandler::#{klass_name}") + klass.new(message, connection) + end + + def demodulize(path) + path = path.to_s + if i = path.rindex("::") # rubocop:disable Lint/AssignmentInCondition + path[(i + 2), path.length] + else + path + end + end + + def constantize(camel_cased_word) + Object.const_get(camel_cased_word) + end + end + extend ClassMethods + end +end diff --git a/lib/wamp/message_handler/api.rb b/lib/wamp/message_handler/api.rb new file mode 100644 index 0000000..96df3d4 --- /dev/null +++ b/lib/wamp/message_handler/api.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require "forwardable" + +module Wamp + module MessageHandler + # handles session + class Api + extend Forwardable + + attr_reader :connection, :session_id, :id_gen + + def initialize(connection) + @connection = connection + @id_gen = Wampproto::IdGenerator.new + end + + def subscribe(topic, handler, options = {}, &block) + message = Wampproto::Message::Subscribe.new(next_request_id, options, topic) + action = MessageHandler::Subscribe.new(message, connection) + action.send_message(handler, &block) + end + + def unsubscribe(subscription_id, &block) + subscription_id = connection.store[subscription_id] if connection.store.include?(subscription_id) + + message = Wampproto::Message::Unsubscribe.new(next_request_id, subscription_id.to_i) + action = MessageHandler::Unsubscribe.new(message, connection) + action.send_message(&block) + end + + def publish(topic, options = {}, *args, **kwargs, &block) + options = options.merge({ acknowledge: true }) if block_given? + message = Wampproto::Message::Publish.new(next_request_id, options, topic, *args, **kwargs) + + action = MessageHandler::Publish.new(message, connection) + action.send_message(&block) + end + + def call(procedure, options = {}, *args, **kwargs, &handler) + message = Wampproto::Message::Call.new(next_request_id, options, procedure, *args, **kwargs) + + MessageHandler::Call.new(message, connection).send_message(handler) + end + + def register(procedure, handler, options = {}, &block) + message = Wampproto::Message::Register.new(next_request_id, options, procedure) + action = MessageHandler::Register.new(message, connection) + action.send_message(handler, &block) + end + + def unregister(registration_id, &block) + registration_id = connection.store[registration_id] if connection.store.include?(registration_id) + + message = Wampproto::Message::Unregister.new(next_request_id, registration_id.to_i) + action = MessageHandler::Unregister.new(message, connection) + action.send_message(&block) + end + + private + + def next_request_id + id_gen.next + end + end + end +end diff --git a/lib/wamp/message_handler/base.rb b/lib/wamp/message_handler/base.rb new file mode 100644 index 0000000..f900a00 --- /dev/null +++ b/lib/wamp/message_handler/base.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require "forwardable" + +module Wamp + module MessageHandler + # Result + class Base + extend Forwardable + attr_reader :message, :connection + + def initialize(message, connection) + @message = message + @connection = connection + end + + def handle + raise NotImplementedError + end + + def send_message + raise NotImplementedError + end + + private + + def stored_data + @stored_data ||= store.delete(store_key) || {} + end + + def store_key + "#{prefix}_#{identity}" + end + + def prefix + "request" + end + + def identity + message.request_id + end + + def deliver_response + callback = stored_data.fetch(:callback, proc {}) + return unless callback + + callback.call(message) + end + + def validate_received_message + connection.session.receive_message(message) + end + + def send_serialized(message) + connection.transmit session.send_message(message) + end + + def_delegators :@connection, :store, :joiner, :session + # def_delegator :auth, :authenticate + # def_delegator :@connection, :remove_all_listeners, :off + end + end +end diff --git a/lib/wamp/message_handler/call.rb b/lib/wamp/message_handler/call.rb new file mode 100644 index 0000000..7c99beb --- /dev/null +++ b/lib/wamp/message_handler/call.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Call + class Call < Base + def send_message(handler) + store[store_key] = { handler: handler, callback: handler } + + send_serialized message + end + end + end +end diff --git a/lib/wamp/message_handler/challenge.rb b/lib/wamp/message_handler/challenge.rb new file mode 100644 index 0000000..74bfb4f --- /dev/null +++ b/lib/wamp/message_handler/challenge.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Challenge + class Challenge < Hello + def handle + connection.transmit connection.joiner.receive(connection.joiner.serializer.serialize(message)) + end + end + end +end diff --git a/lib/wamp/message_handler/error.rb b/lib/wamp/message_handler/error.rb new file mode 100644 index 0000000..ebfd884 --- /dev/null +++ b/lib/wamp/message_handler/error.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Call handler with error message + class Error < Base + def handle + validate_received_message + + stored_data[:callback].call(message) + end + end + end +end diff --git a/lib/wamp/message_handler/event.rb b/lib/wamp/message_handler/event.rb new file mode 100644 index 0000000..d9f51c1 --- /dev/null +++ b/lib/wamp/message_handler/event.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # publish event to subscriber + class Event < Base + def handle + validate_received_message + + store[alt_store_key].fetch(:handler).call(message) + end + + def alt_store_key + "subscription_#{message.subscription_id}" + end + end + end +end diff --git a/lib/wamp/message_handler/goodbye.rb b/lib/wamp/message_handler/goodbye.rb new file mode 100644 index 0000000..8d5729b --- /dev/null +++ b/lib/wamp/message_handler/goodbye.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # send unregister message + class Goodbye < Base + def send_message(&callback) + store[store_key] = { callback: callback } + + send_serialized message + end + + def handle + goodbye = Wampproto::Message::Goodbye.new({}, "wamp.close.goodbye_and_out") + send_serialized goodbye + end + end + end +end diff --git a/lib/wamp/message_handler/hello.rb b/lib/wamp/message_handler/hello.rb new file mode 100644 index 0000000..24c3c65 --- /dev/null +++ b/lib/wamp/message_handler/hello.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Hello + class Hello < Base + def handle + msg, is_welcome = connection.acceptor.receive(connection.serializer.serialize(message)) + connection.transmit msg + connection.router.attach_client(connection) if is_welcome + end + + def send_message + connection.transmit connection.joiner.send_hello + end + end + end +end diff --git a/lib/wamp/message_handler/invocation.rb b/lib/wamp/message_handler/invocation.rb new file mode 100644 index 0000000..23a4f08 --- /dev/null +++ b/lib/wamp/message_handler/invocation.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Call + class Invocation < Base + def handle + connection.session.receive_message(message) + data = store.fetch(alt_store_key) + + send_yield_message data.fetch(:handler) + end + + def alt_store_key + "registration_#{message.registration_id}" + end + + private + + def send_yield_message(handler) + result = handler.call(message) + yield_message = result if result.instance_of?(Wampproto::Message::Yield) + yield_message ||= Wampproto::Message::Yield.new(message.request_id, {}, result) + send_serialized yield_message + end + end + end +end diff --git a/lib/wamp/message_handler/publish.rb b/lib/wamp/message_handler/publish.rb new file mode 100644 index 0000000..d18c49b --- /dev/null +++ b/lib/wamp/message_handler/publish.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Publish message + class Publish < Base + def send_message(&callback) + store[store_key] = { callback: callback } if message.options[:acknowledge] + + send_serialized message + end + end + end +end diff --git a/lib/wamp/message_handler/published.rb b/lib/wamp/message_handler/published.rb new file mode 100644 index 0000000..f5c3386 --- /dev/null +++ b/lib/wamp/message_handler/published.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Published confirmation message + class Published < Base + def handle + validate_received_message + + deliver_response + end + end + end +end diff --git a/lib/wamp/message_handler/register.rb b/lib/wamp/message_handler/register.rb new file mode 100644 index 0000000..6e6bc20 --- /dev/null +++ b/lib/wamp/message_handler/register.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # publish event to subscriber + class Register < Base + def send_message(handler, &callback) + store[store_key] = { handler: handler, callback: callback, procedure: message.procedure } + + send_serialized message + end + end + end +end diff --git a/lib/wamp/message_handler/registered.rb b/lib/wamp/message_handler/registered.rb new file mode 100644 index 0000000..5168ace --- /dev/null +++ b/lib/wamp/message_handler/registered.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Registered callback + class Registered < Base + def handle + validate_received_message + + store[alt_store_key] = { handler: stored_data.fetch(:handler), procedure: stored_data.fetch(:procedure) } + store_procedure + + deliver_response + end + + def alt_store_key + "registration_#{message.registration_id}" + end + + def store_procedure + procedure = stored_data.fetch(:procedure) + store[procedure] = message.registration_id + end + end + end +end diff --git a/lib/wamp/message_handler/result.rb b/lib/wamp/message_handler/result.rb new file mode 100644 index 0000000..7165cf8 --- /dev/null +++ b/lib/wamp/message_handler/result.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Result + class Result < Base + def handle + validate_received_message + stored_data.fetch(:handler).call(message) + end + end + end +end diff --git a/lib/wamp/message_handler/subscribe.rb b/lib/wamp/message_handler/subscribe.rb new file mode 100644 index 0000000..924672d --- /dev/null +++ b/lib/wamp/message_handler/subscribe.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Request subscription + class Subscribe < Base + def send_message(handler, &callback) + store[store_key] = { handler: handler, callback: callback, topic: message.topic } + + send_serialized message + end + end + end +end diff --git a/lib/wamp/message_handler/subscribed.rb b/lib/wamp/message_handler/subscribed.rb new file mode 100644 index 0000000..206efb6 --- /dev/null +++ b/lib/wamp/message_handler/subscribed.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Receive subscribed + class Subscribed < Base + def handle + validate_received_message + + store[alt_store_key] = { handler: stored_data.fetch(:handler), topic: stored_data.fetch(:topic) } + store_topic + + deliver_response + end + + def alt_store_key + "subscription_#{message.subscription_id}" + end + + def store_topic + topic = stored_data.fetch(:topic) + store[topic] = message.subscription_id + end + end + end +end diff --git a/lib/wamp/message_handler/unregister.rb b/lib/wamp/message_handler/unregister.rb new file mode 100644 index 0000000..346872d --- /dev/null +++ b/lib/wamp/message_handler/unregister.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # send unregister message + class Unregister < Base + def send_message(&callback) + store[store_key] = { callback: callback, registration_id: message.registration_id } + + send_serialized message + end + end + end +end diff --git a/lib/wamp/message_handler/unregistered.rb b/lib/wamp/message_handler/unregistered.rb new file mode 100644 index 0000000..96af562 --- /dev/null +++ b/lib/wamp/message_handler/unregistered.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # callback for unregister message + class Unregistered < Base + def handle + validate_received_message + + delete_procedure store.delete(alt_store_key) + + deliver_response + end + + def alt_store_key + "registration_#{registration_id}" + end + + def delete_procedure(data) + store.delete data.fetch(:procedure) + end + + def registration_id + stored_data.fetch(:registration_id) + end + end + end +end diff --git a/lib/wamp/message_handler/unsubscribe.rb b/lib/wamp/message_handler/unsubscribe.rb new file mode 100644 index 0000000..38c888e --- /dev/null +++ b/lib/wamp/message_handler/unsubscribe.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Request subscription + class Unsubscribe < Base + def send_message(&callback) + store[store_key] = { callback: callback, subscription_id: message.subscription_id } + + send_serialized message + end + end + end +end diff --git a/lib/wamp/message_handler/unsubscribed.rb b/lib/wamp/message_handler/unsubscribed.rb new file mode 100644 index 0000000..7d49b2a --- /dev/null +++ b/lib/wamp/message_handler/unsubscribed.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Receive unsubscribed + class Unsubscribed < Base + def handle + validate_received_message + delete_topic store.delete(alt_store_key) + + deliver_response + end + + private + + def alt_store_key + "subscription_#{subscription_id}" + end + + def delete_topic(data) + store.delete data.fetch(:topic) + end + + def subscription_id + stored_data.fetch(:subscription_id) + end + end + end +end diff --git a/lib/wamp/message_handler/welcome.rb b/lib/wamp/message_handler/welcome.rb new file mode 100644 index 0000000..118ed54 --- /dev/null +++ b/lib/wamp/message_handler/welcome.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Welcome + class Welcome < Base + def handle + connection.executor&.call(connection.api) + end + end + end +end diff --git a/lib/wamp/router.rb b/lib/wamp/router.rb index 3a3d72f..23510c4 100644 --- a/lib/wamp/router.rb +++ b/lib/wamp/router.rb @@ -1,8 +1,9 @@ # frozen_string_literal: true +require_relative "router/realm" +require_relative "router/base" +require_relative "router/client" require_relative "router/server" -require_relative "router/session/base" -require_relative "router/registrations" module Wamp # Router side of wamp diff --git a/lib/wamp/router/base.rb b/lib/wamp/router/base.rb new file mode 100644 index 0000000..8b9e69e --- /dev/null +++ b/lib/wamp/router/base.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +require_relative "realm" + +module Wamp + module Router + # Router + class Base + attr_reader :realms + + def initialize + @realms = {} + end + + def add_realm(name) + realms[name] = Realm.new(name) + end + + def remove_realm(name) + realms.delete(name) + end + + def attach_client(client) + error_message = "cannot attach client to non-existent realm #{client.realm}" + raise Wampproto::ValueError, error_message unless realms.include?(client.realm) + + realms[client.realm].attach_client(client) + end + + def detach_client(client) + error_message = "cannot attach client to non-existent realm #{client.realm}" + raise Wampproto::ValueError, error_message unless realms.include?(client.realm) + + realms[client.realm].detach_client(client) + end + + def receive_message(client, message) + error_message = "cannot attach client to non-existent realm #{client.realm}" + raise Wampproto::ValueError, error_message unless realms.include?(client.realm) + + realms[client.realm].receive_message(client.session_id, message) + end + end + end +end diff --git a/lib/wamp/router/client.rb b/lib/wamp/router/client.rb new file mode 100644 index 0000000..d897881 --- /dev/null +++ b/lib/wamp/router/client.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +require_relative "realm" + +module Wamp + module Router + # Auth + class Authenticator + def self.authenticate(request) + Wampproto::Acceptor::Response.new(request.authid, "role", "secret") + end + end + + # Server Session + class Client + attr_accessor :router, :connection + attr_reader :acceptor, :serializer + + def initialize(serializer = Wampproto::Serializer::JSON, authenticator = Authenticator) + @serializer = serializer + @acceptor = Wampproto::Acceptor.new(serializer, authenticator) + end + + def realm + acceptor.session_details&.realm + end + + def send_message(message) + transmit(message) + end + + def session_id + acceptor.session_details&.session_id + end + + def transmit(data) + case data + when Wampproto::Message::Base + connection.on_message serializer.serialize(data) + else + connection.on_message data + end + end + + def on_message(data) + unless acceptor.accepted? + msg, is_welcome = acceptor.receive(data) + transmit msg + router.attach_client(self) if is_welcome + end + router.receive_message(self, serializer.deserialize(data)) + end + end + end +end diff --git a/lib/wamp/router/connection.rb b/lib/wamp/router/connection.rb index b198700..31e6999 100644 --- a/lib/wamp/router/connection.rb +++ b/lib/wamp/router/connection.rb @@ -2,17 +2,31 @@ require "websocket/driver" +# extending the class +class Wampproto::Acceptor # rubocop:disable Style/ClassAndModuleChildren + def accepted? + state == STATE_WELCOME_SENT + end +end + +# Testing +class Authenticator + def self.authenticate(request) + Wampproto::Acceptor::Response.new(request.authid, "role") + end +end + module Wamp module Router # TOP Level Doc - class Connection + class Connection < Client include WebSocket::Driver::EventEmitter CONNECTING = 0 OPEN = 1 CLOSING = 2 CLOSED = 3 - attr_reader :socket, :session + attr_reader :socket, :session, :acceptor def initialize(socket, &cleanup) super() @@ -23,8 +37,17 @@ def initialize(socket, &cleanup) @driver.on(:message) { on_message(_1.data) } @driver.on(:close) { |evt| begin_close(evt.reason, evt.code) } @driver.on(:connect) { on_connect } - @session = Wamp::Manager::Session.new(self) - @ready_state = OPEN + @ready_state = CONNECTING + end + + def on_connect + @driver.start if WebSocket::Driver.websocket?(@driver.env) + choose_serializer_from @driver.env["HTTP_SEC_WEBSOCKET_PROTOCOL"] + @acceptor = Wampproto::Acceptor.new(serializer, Authenticator) + end + + def connection + self end def begin_close(reason, code) @@ -33,7 +56,7 @@ def begin_close(reason, code) @ready_state = CLOSING @close_params = [reason, code] - @cleanup&.call(session) + @cleanup&.call(self) finalize_close end @@ -41,23 +64,19 @@ def finalize_close return if @ready_state == CLOSED @ready_state = CLOSED - socket.close @driver.close - end - - def on_connect - @driver.start if WebSocket::Driver.websocket?(@driver.env) + socket.close end def listen(&block) - return unless @ready_state == OPEN + return unless [CONNECTING, OPEN].include?(@ready_state) data = socket.read_nonblock(4096, exception: false) case data when :wait_readable # do nothing when nil - block.call + block&.call @driver.close else receive_data(data) @@ -66,7 +85,7 @@ def listen(&block) # triggers on_message def receive_data(data) - return unless @ready_state == OPEN + return unless [OPEN, CONNECTING].include?(@ready_state) @driver.parse(data) end @@ -77,7 +96,15 @@ def write(data) end def transmit(message) - @driver.text(encode(message)) + # return false if @ready_state > OPEN + + case message + when Wampproto::Message::Base then transmit(serializer.serialize(message)) + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) + else false + end end private @@ -88,12 +115,6 @@ def on_open(_evt) @ready_state = OPEN end - def on_message(data) - msg = Wamp::Message.resolve(coder.decode(data)) - manager = Wamp::Manager::Event.resolve(msg, session) - manager.emit_event(msg) - end - def on_close(message) return if @ready_state == CLOSED @@ -114,8 +135,18 @@ def decode(websocket_message) coder.decode websocket_message end - def coder - @coder ||= Wamp::Serializer::JSON + attr_reader :serializer + + def choose_serializer_from(protocols) + @serializer = if protocols.include?("wamp.2.msgpack") + Wampproto::Serializer::Msgpack + elsif protocols.include?("wamp.2.cbor") + Wampproto::Serializer::Cbor + elsif protocols.include?("wamp.2.json") + Wampproto::Serializer::JSON + else + close + end end end end diff --git a/lib/wamp/router/realm.rb b/lib/wamp/router/realm.rb new file mode 100644 index 0000000..d7832b3 --- /dev/null +++ b/lib/wamp/router/realm.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +module Wamp + module Router + # Realm + class Realm + attr_reader :broker, :dealer, :name, :clients + + DEALER_MESSAGES = [ + Wampproto::Message::Call, + Wampproto::Message::Yield, + Wampproto::Message::Register, + Wampproto::Message::Unregister + ].freeze + + BROKER_MESSAGES = [ + Wampproto::Message::Publish, + Wampproto::Message::Subscribe, + Wampproto::Message::Unsubscribe + ].freeze + + GOODBYE_MESSAGE = Wampproto::Message::Goodbye + + def initialize(name) + @name = name + @broker = Wampproto::Broker.new(id_gen) + @dealer = Wampproto::Dealer.new(id_gen) + @clients = {} + end + + def attach_client(client) + session_id = client.session_id + + clients[session_id] = client + broker.add_session(session_id) + dealer.add_session(session_id) + end + + def detach_client(client) + remove_client(client.session_id) + end + + def clear + clients.each { |client| remove_client(client.session_id) } + end + + def receive_message(session_id, message) + case message + when *DEALER_MESSAGES then handle_dealer(session_id, message) + when *BROKER_MESSAGES then handle_broker(session_id, message) + when GOODBYE_MESSAGE then handle_goodbye(session_id, message) + end + end + + private + + def handle_dealer(session_id, message) + send_message dealer.receive_message(session_id, message) + end + + def handle_broker(session_id, message) + send_message broker.receive_message(session_id, message) + end + + def handle_goodbye(session_id, _message) + goodbye = Wampproto::Message::Goodbye.new({}, "wamp.close.goodbye_and_out") + send_message Wampproto::MessageWithRecipient.new(goodbye, session_id) + end + + def send_message(message_with_receipient) + Array(message_with_receipient).each do |object| + client = clients[object.recipient] + + next unless client + + client.send_message object.message + end + end + + def id_gen + @id_gen ||= Wampproto::IdGenerator.new + end + + def remove_client(session_id) + broker.remove_session(session_id) + dealer.remove_session(session_id) + clients.delete(session_id) + end + end + end +end diff --git a/lib/wamp/router/registrations.rb b/lib/wamp/router/registrations.rb deleted file mode 100644 index 28246e6..0000000 --- a/lib/wamp/router/registrations.rb +++ /dev/null @@ -1,122 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Router - # Handle Procedure Registrations - class Registrations - @registrations = {} - @registration_ids = {} - class << self - def register(message, session) - return procedure_already_registered(message) if check_registered?(message) - - registration_id = create_or_update_registration(message, session) - - Wamp::Message::Registered.new(message.request_id, registration_id) - end - - def check_registered?(message) - invocation_policy = message.options.fetch(:invoke, :single) - return @registrations.include?(message.procedure) if invocation_policy == "single" - - registration = @registrations[message.procedure] - return false unless registration - - return true if registration[:message].options.fetch(:invoke, :single) != invocation_policy - - false - end - - def clean_registrations(session) - @registrations.each_key { |procedure| clean_registration(procedure, session) } - end - - def clean_registration(procedure, session) - registration = @registrations[procedure] - sessions = registration[:sessions] - if sessions.one? && sessions.include?(session) - puts "Removing Registration #{registration[:registration_id]}, procedure: #{procedure}" - @registrations.delete(procedure) - elsif sessions.include?(session) - puts "Removing Session #{session.session_id}, procedure: #{procedure}" - sessions.delete(session) - end - end - - def clean_registration_by_id(registration_id, session) - procedure = @registration_ids[registration_id] - return unless procedure - - clean_registration(procedure, session) - end - - def create_or_update_registration(message, session) - registration = @registrations[message.procedure] || {} - registration.empty? ? create_registration(message, session) : update_registration(registration, session) - end - - def create_registration(message, session) - registration_id = create_registration_id(message.procedure) - @registrations[message.procedure] = { - message: message, - registration_id: registration_id, - sessions: [session] - } - registration_id - end - - def update_registration(registration, session) - registration[:sessions] << session - registration.fetch(:registration_id) - end - - def invoke(message, caller_session) - unless @registrations.include?(message.procedure) - return Manager::Event.resolve(no_such_procedure(message), caller_session) - end - - registration = @registrations.fetch(message.procedure) - registration_id = registration[:registration_id] - callee_session = find_session(registration) - - Wamp::Message::Invocation.new(message.request_id * 2000, registration_id, {}, *message.args, **message.kwargs) - .then { |msg| Manager::Event.resolve(msg, callee_session) } - end - - def find_session(registration) - sessions = registration.fetch(:sessions) - index = find_session_index(registration, sessions.length) - sessions[index] - end - - def find_session_index(registration, session_length) - invocation_policy = registration.fetch(:message).options.fetch(:invoke, :single).intern - index = { single: 0, first: 0, last: -1, random: rand(0..(session_length - 1)) }[invocation_policy] - return index if index - - cycle_index = registration.fetch(:cycle_index, 0) - registration[:cycle_index] = cycle_index < session_length - 1 ? cycle_index + 1 : 0 - cycle_index - end - - def procedure_already_registered(message) - Message::Error.new(Message::Type::REGISTER, message.request_id, {}, "wamp.error.procedure_already_exists") - end - - def no_such_procedure(message) - Message::Error.new(Message::Type::CALL, message.request_id, {}, "wamp.error.no_such_procedure") - end - - def create_registration_id(procedure) - id = rand(100_000..(2**53)) - if @registration_ids.include?(id) - create_registration_id(procedure) - else - @registration_ids[id] = procedure - id - end - end - end - end - end -end diff --git a/lib/wamp/router/server.rb b/lib/wamp/router/server.rb index 2fe5d03..a62fc4a 100644 --- a/lib/wamp/router/server.rb +++ b/lib/wamp/router/server.rb @@ -13,6 +13,8 @@ class Server def initialize @selector = NIO::Selector.new + @router = Wamp::Router::Base.new + @router.add_realm("realm1") end def run @@ -44,10 +46,11 @@ def accept_connection def create_connection(client) monitor = selector.register(client, :r) - connection = Connection.new(client) do |session| + connection = Connection.new(client) do |conn| selector.deregister(monitor) - Registrations.clean_registrations(session) + @router.detach_client(conn) end + connection.router = @router monitor.value = proc do connection.listen end diff --git a/lib/wamp/router/session/base.rb b/lib/wamp/router/session/base.rb deleted file mode 100644 index 6260509..0000000 --- a/lib/wamp/router/session/base.rb +++ /dev/null @@ -1,72 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Router - module Session - # handle session establishment - class Base - REALMS = ["realm1"].freeze - attr_reader :hello - - def initialize(hello) - @hello = hello - end - - def authenticate(message) - return protocol_violation if message.instance_of?(Message::Hello) - - return unless auth_method == "ticket" - - return welcome_message if message.signature == "hello" - - send_abort - end - - def send_abort - Message::Abort.new({ message: "Not Authorized" }, "wamp.error.not_authorized") - end - - def protocol_violation - Message::Abort.new( - { message: "Received HELLO message after session was established" }, - "wamp.error.protocol_violation" - ) - end - - def handle_auth - realm = find_realm(hello.realm) - return realm_missing unless realm - - handle_correct_auth - end - - def handle_correct_auth - if auth_method == "ticket" - Message::Challenge.new("ticket", {}) - else - welcome_message - end - end - - def auth_method - authmethods = [*hello.details[:authmethods]] - authmethods.first - end - - def welcome_message - Message::Welcome.new(Router.create_identifier, { roles: { broker: {} } }) - end - - def realm_missing - Wamp::Message::Abort.new( - { message: "The realm does not exists." }, "wamp.error.no_such_realm" - ) - end - - def find_realm(realm) - realm if REALMS.include?(realm) - end - end - end - end -end diff --git a/spec/wamp/integration/registration_spec.rb b/spec/wamp/integration/registration_spec.rb new file mode 100644 index 0000000..d8cdc07 --- /dev/null +++ b/spec/wamp/integration/registration_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +RSpec.describe "registrations" do + def create_client_session(router) + session = Wamp::Connection::Session.new + + server_session = Wamp::Router::Client.new + server_session.router = router + server_session.connection = session + + session.stream = server_session + session + end + + let(:realm) { "realm1" } + let(:router) { Wamp::Router::Base.new } + let(:procedure) { "com.hello.world" } + before { router.add_realm(realm) } + + context "registers" do + before { client.on_open } + let(:client) { create_client_session(router) } + it "to a procedure" do + counter = 0 + handler = proc {} + + expect do + client.api.register(procedure, handler) do |response| + counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Registered) + end + end.to change { counter }.by(1) + end + + context "calls" do + before { client2.on_open } + before { client.api.register(procedure, proc {}) } + let(:client2) { create_client_session(router) } + + it "to a registered procedure" do + counter = 0 + + expect do + client2.api.call(procedure) do |response| + counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Result) + end + end.to change { counter }.by(1) + end + end + + context "unregisters" do + before { client.api.register(procedure, proc {}) } + + it "a registered procedure" do + counter = 0 + + expect do + client.api.unregister(procedure) do |response| + counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Unregistered) + end + end.to change { counter }.by(1) + end + end + + context "non existant procedure" do + it "returns an error" do + counter = 0 + + expect do + client.api.unregister(procedure) do |response| + counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Error) + end + end.to change { counter }.by(1) + end + end + end +end diff --git a/spec/wamp/integration/subscription_spec.rb b/spec/wamp/integration/subscription_spec.rb new file mode 100644 index 0000000..23a9bdc --- /dev/null +++ b/spec/wamp/integration/subscription_spec.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +RSpec.describe "subscriptions" do + def create_client_session(router) + session = Wamp::Connection::Session.new + + server_session = Wamp::Router::Client.new + server_session.router = router + server_session.connection = session + + session.stream = server_session + session + end + + let(:realm) { "realm1" } + let(:router) { Wamp::Router::Base.new } + let(:topic) { "com.hello.world" } + before { router.add_realm(realm) } + + context "subscribes" do + before { client.on_open } + let(:client) { create_client_session(router) } + it "to a topic" do + subscription_counter = 0 + handler = proc {} + + expect do + client.api.subscribe(topic, handler) do |response| + subscription_counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Subscribed) + end + end.to change { subscription_counter }.by(1) + end + + context "publishes" do + before { client2.on_open } + let(:client2) { create_client_session(router) } + + before do + handler = proc { 100 } + client.api.subscribe(topic, handler) + expect(handler).to receive(:call).and_call_original + end + + it "to a topic" do + counter = 0 + expect do + client2.api.publish(topic) do |response| + counter += 1 + expect(response).to be_instance_of(Wampproto::Message::Published) + end + end.to change { counter }.by(1) + end + end + + context "unsubscribes" do + before { client.api.subscribe(topic, proc {}) } + + it "from a topic" do + counter = 0 + expect do + client.api.unsubscribe(topic) do |response| + counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Unsubscribed) + end + end.to change { counter }.by(1) + end + + context "when subscription is missing" do + it "returns error" do + counter = 0 + expect do + client.api.unsubscribe(129_876) do |response| + counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Error) + end + end.to change { counter }.by(1) + end + end + end + end +end diff --git a/spec/wamp/manager/event/register_spec.rb b/spec/wamp/manager/event/register_spec.rb deleted file mode 100644 index c0be316..0000000 --- a/spec/wamp/manager/event/register_spec.rb +++ /dev/null @@ -1,129 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Wamp::Manager::Event::Register do - let(:connection) { Wamp::Manager::Base.new } - let(:args) { [2, 3] } - let(:sum) { 5 } - let(:session) { connection.session } - let(:request_id) { 1 } - let(:registration_id) { 100 } - let(:register_message) { Wamp::Message::Register.new(request_id, {}, "com.ruby.method") } - let(:register_event) { Wamp::Manager::Event::Register.new(register_message, session) } - let(:registered_message) { Wamp::Message::Registered.new(request_id, registration_id) } - let(:registered_event) { Wamp::Manager::Event::Registered.new(registered_message, session) } - - let(:call_message) { Wamp::Message::Call.new(request_id + 1, {}, register_message.procedure, *args) } - let(:call_event) { Wamp::Manager::Event::Call.new(call_message, session) } - - let(:invocation_message) { Wamp::Message::Invocation.new(request_id + 2, registration_id, {}, *call_message.args) } - let(:yield_message) { Wamp::Message::Yield.new(request_id + 2, {}, sum) } - - let(:result_message) { Wamp::Message::Result.new(request_id + 1, {}, *yield_message.args) } - - context "success" do - it "register and call" do - expect(connection).to receive(:transmit).with(register_message.payload) - handler = lambda do |m, n| - m + n - end - session.register(register_message.procedure, handler, {}) - - expect(session).to receive(:emit).with(register_event.listen_event_name, - Wamp::Message::Registered).and_call_original - session.on_message(registered_message) # router sends this message on successful registration - - expect(connection).to receive(:transmit).with(call_message.payload) - session.call(call_message.procedure, {}, *call_message.args) do |result| - expect(result.args).to include(sum) - end - - expect(connection).to receive(:transmit).with(yield_message.payload) - expect(handler).to receive(:call).with(*call_message.args).and_call_original - - expect(session).to receive(:emit).with(registered_event.listen_event_name, - Wamp::Message::Invocation).and_call_original - session.on_message(invocation_message) # invocation event trasmits yield message and also calls the handler - - expect(session).to receive(:emit).with(call_event.listen_event_name, Wamp::Message::Result).and_call_original - session.on_message(result_message) - end - - context "unregister a registered procedure" do - let(:unregister_message) { Wamp::Message::Unregister.new(request_id + 1, registered_message.registration_id) } - let(:unregister_event) { Wamp::Manager::Event::Unregister.new(unregister_message, session) } - - let(:unregistered_message) { Wamp::Message::Unregistered.new(request_id + 1) } - let(:unregistered_event) { Wamp::Manager::Event::Unregistered.new(unregistered_message, session) } - - it "succeed" do - expect(connection).to receive(:transmit).with(register_message.payload) - handler = lambda do |m, n| - m + n - end - session.register(register_message.procedure, handler, {}) - - expect(session).to receive(:emit) - .with(register_event.listen_event_name, Wamp::Message::Registered).and_call_original - session.on_message(registered_message) # router sends this message on successful registration - - expect(connection).to receive(:transmit).with(unregister_message.payload) - session.unregister(unregister_message.registration_id) do |unregistered| - expect(unregistered).to eq unregistered_message - end - - expect(session).to receive(:emit) - .with(unregister_event.listen_event_name, Wamp::Message::Unregistered).and_call_original - session.on_message(unregistered_message) - end - end - end - - context "failure" do - context "already registered" do - let(:error_message) do - Wamp::Message::Error.new(Wamp::Message::Type::REGISTER, request_id + 1, {}, - "wamp.error.procedure_already_exists") - end - let(:error_event) { Wamp::Manager::Event::Error.new(error_message, session) } - - it "returns error" do - expect(connection).to receive(:transmit).with(register_message.payload) - handler = lambda do |m, n| - m + n - end - session.register(register_message.procedure, handler, {}) - - expect(session).to receive(:emit).with(register_event.listen_event_name, - Wamp::Message::Registered).and_call_original - session.on_message(registered_message) # router sends this message on successful registration - - # trying to register same procedure second time - register_message.instance_eval { @request_id = 2 } - expect(connection).to receive(:transmit).with(register_message.payload) - session.register(register_message.procedure, handler, {}) - - expect(session).to receive(:emit).with(error_event.listen_event_name, nil, Wamp::Message::Error) - expect { session.on_message(error_message) }.to output("Error: wamp.error.procedure_already_exists\n").to_stdout - end - end - - context "called unregistered procedure" do - let(:error_message) do - Wamp::Message::Error.new(Wamp::Message::Type::CALL, request_id, {}, "wamp.error.no_such_procedure") - end - let(:error_event) { Wamp::Manager::Event::Error.new(error_message, session) } - - it "returns error" do - call_message.instance_eval { @request_id = 1 } - expect(connection).to receive(:transmit).with(call_message.payload) - session.call(call_message.procedure, {}, *call_message.args) do |_result, error| - expect(error.error).to eq(error_message.error) - end - - expect(session).to receive(:emit).with(call_event.listen_event_name, nil, - Wamp::Message::Error).and_call_original - expect { session.on_message(error_message) }.to output("Error: wamp.error.no_such_procedure\n").to_stdout - end - end - end -end diff --git a/spec/wamp/manager/event/subscribe_spec.rb b/spec/wamp/manager/event/subscribe_spec.rb deleted file mode 100644 index 518e7da..0000000 --- a/spec/wamp/manager/event/subscribe_spec.rb +++ /dev/null @@ -1,72 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Wamp::Manager::Event::Subscribe do - let(:session_id) { 1_234_567 } - let(:request_id) { 1 } - let(:details) { {} } - let(:subscribe_message) { Wamp::Message::Subscribe.new(request_id, {}, "com.rspec.sum") } - let(:subscribe_event) { Wamp::Manager::Event::Subscribe.new(subscribe_message, session) } - let(:subscription_id) { 100 } - let(:next_request_id) { request_id + 1 } - let(:publication_id) { 200 } - - let(:subscribed_message) { Wamp::Message::Subscribed.new(request_id, subscription_id) } - let(:subscribed_event) { Wamp::Manager::Event::Subscribed.new(subscribed_message, session) } - - let(:publish_message) do - Wamp::Message::Publish.new(next_request_id, {}, subscribe_message.topic, 2, 3) - end - let(:publish_event) { Wamp::Manager::Event::Publish.new(publish_message, session) } - - let(:event_message) do - Wamp::Message::Event.new(subscription_id, publication_id, {}, *publish_message.args) - end - - let(:unsubscribe_message) { Wamp::Message::Unsubscribe.new(next_request_id, subscribed_message.subscription_id) } - let(:unsubscribed_message) { Wamp::Message::Unsubscribed.new(next_request_id) } - - let(:connection) { Wamp::Manager::Base.new } - let(:session) { connection.session } - - context "success" do - context "subscribe message sent" do - it "gets subscribed" do - expect(connection).to receive(:transmit).with(subscribe_message.payload) - handler = lambda do |n, m| - n + m - end - session.subscribe(subscribe_message.topic, handler, subscribe_message.options) - - connection.on_message(subscribed_message) - - expect(connection).to receive(:transmit).with(publish_message.payload) - session.publish(subscribe_message.topic, {}, *publish_message.args) - - expect(handler).to receive(:call).with(*publish_message.args).and_call_original - connection.on_message(event_message) - end - - it "gets unsubscribed after subscribing" do - expect(connection).to receive(:transmit).with(subscribe_message.payload) - handler = lambda do |n, m| - n + m - end - session.subscribe(subscribe_message.topic, handler, subscribe_message.options) - - connection.on_message(subscribed_message) - - expect(connection).to receive(:transmit).with(unsubscribe_message.payload) - session.unsubscribe(unsubscribe_message.subscription_id) - - connection.on_message(unsubscribed_message) - - publish_message.instance_eval { @request_id = 3 } - expect(connection).to receive(:transmit).with(publish_message.payload) - session.publish(subscribe_message.topic, {}, *publish_message.args, **publish_message.kwargs) - - expect(handler).not_to receive(:call).with(*publish_message.args, **publish_message.kwargs) - connection.on_message(event_message) - end - end - end -end diff --git a/spec/wamp/manager/event/welcome_spec.rb b/spec/wamp/manager/event/welcome_spec.rb deleted file mode 100644 index 69df76d..0000000 --- a/spec/wamp/manager/event/welcome_spec.rb +++ /dev/null @@ -1,85 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Wamp::Manager::Event::Welcome do - let(:session_id) { 1_234_567 } - let(:details) { {} } - let(:realm) { "realm1" } - let(:welcome_message) { Wamp::Message::Welcome.new(session_id, details) } - let(:welcome_event) { Wamp::Manager::Event::Welcome.new(welcome_message, connection.session) } - - context "success" do - let(:connection) { Wamp::Manager::Base.new } - let(:hello_event) { Wamp::Manager::Event::Hello.new(hello, connection.session) } - context "hello message sent" do - let(:hello) { Wamp::Message::Hello.new(realm, {}) } - it "receives welcome message" do - expect(connection).to receive(:transmit).with(hello.payload) - hello_event.add_event_listener # transmits hello message - - connection.on(welcome_event.emit_event_name) do |session| - expect(session).to eq(connection.session) - end - - connection.on_message(welcome_message) - end - end - - context "hello message including cryptosign auth details sent" do - let(:connection) { Wamp::Manager::Base.new(auth: cryptosign) } - let(:hello) { Wamp::Message::Hello.new(realm, cryptosign.details) } - JSON.load_file("./spec/cryptosign_spec_cases.json").each.with_index(1) do |test_case, index| - context index do - let(:private_key) { test_case["private_key"] } - let(:challenge) { test_case["challenge"] } - let(:signature) { test_case["signature"] } - let(:channel_id) { test_case["channel_id"].to_s.empty? ? nil : test_case["channel_id"] } - let(:authextra) { { "channel_binding" => channel_id ? "tls-unique" : nil } } - let(:cryptosign) { Wamp::Auth::Cryptosign.new(private_key, { authid: "joe", authextra: authextra }) } - let(:challenge_message) do - Wamp::Message::Challenge.new("cryptosign", - { "challenge" => challenge, "channel_id" => channel_id, - "channel_binding" => "tls-unique" }) - end - let(:challenge_event) { Wamp::Manager::Event::Challenge.new(challenge_message, connection.session) } - let(:authenticate_message) { Wamp::Message::Authenticate.new(signature, {}) } - it "receives welcome message" do - expect(connection).to receive(:transmit).with(hello.payload) - hello_event.add_event_listener - - expect(connection).to receive(:transmit).with(authenticate_message.payload).and_call_original - expect(connection).to receive(:emit).with(challenge_event.emit_event_name, - Wamp::Message::Challenge).and_call_original - connection.on_message(challenge_message) - - expect(connection).to receive(:emit).with(welcome_event.emit_event_name, - Wamp::Manager::Session).and_call_original - connection.on_message(welcome_message) - end - end - end - end - end - - context "failure" do - # [3, {}, "wamp.error.no_such_realm"] - let(:abort_message) { Wamp::Message::Abort.new({}, "wamp.error.no_such_realm") } - let(:abort_event) { Wamp::Manager::Event::Abort.new(abort_message, connection.session) } - - context "hello message sent" do - let(:connection) { Wamp::Manager::Base.new } - let(:hello) { Wamp::Message::Hello.new("INVALID_REALM", {}) } - let(:hello_event) { Wamp::Manager::Event::Hello.new(hello, connection.session) } - it "receives abort" do - expect(connection).to receive(:transmit).with(hello.payload) - hello_event.add_event_listener # transmits hello message - - expect(connection).to receive(:close).with(1000, abort_event.reason) - connection.on(abort_event.emit_event_name) do |message| - expect(message).to be_instance_of(Wamp::Message::Abort) - end - - connection.on_message(abort_message) - end - end - end -end diff --git a/spec/wamp/message/hello_spec.rb b/spec/wamp/message/hello_spec.rb deleted file mode 100644 index 3f3126e..0000000 --- a/spec/wamp/message/hello_spec.rb +++ /dev/null @@ -1,31 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Wamp::Message::Hello do - describe "#initialize" do - let(:subject) { described_class.new(*args) } - context "valid arguments" do - let(:args) { ["realm1"] } - it "should create a hello message instance" do - expect { subject }.not_to raise_error - expect(subject.payload).to include(1) - expect(subject.payload).to include("realm1") - end - end - - context "invalid" do - let(:args) { [1] } - context "realm" do - it "should raise validation exception" do - expect { subject }.to raise_error(ArgumentError) - end - end - - context "details" do - let(:args) { ["realm1", nil] } - it "should raise validation exception" do - expect { subject }.to raise_error(ArgumentError) - end - end - end - end -end diff --git a/spec/wamp/message/welcome_spec.rb b/spec/wamp/message/welcome_spec.rb deleted file mode 100644 index 856c076..0000000 --- a/spec/wamp/message/welcome_spec.rb +++ /dev/null @@ -1,23 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Wamp::Message::Welcome do - describe ".parse" do - let(:subject) { described_class.parse(wamp_message) } - context "valid" do - let(:wamp_message) { [2, 123, {}] } - it { is_expected.to be_instance_of(described_class) } - end - - context "invalid" do - context "empty message" do - let(:wamp_message) { [] } - it { expect { subject }.to raise_error(ArgumentError) } - end - - context "wrong session_id type" do - let(:wamp_message) { [1, "abc", {}] } - it { expect { subject }.to raise_error(ArgumentError) } - end - end - end -end diff --git a/spec/wamp/message_handler/api_spec.rb b/spec/wamp/message_handler/api_spec.rb new file mode 100644 index 0000000..b9ec671 --- /dev/null +++ b/spec/wamp/message_handler/api_spec.rb @@ -0,0 +1,158 @@ +# frozen_string_literal: true + +RSpec.describe Wamp::MessageHandler::Api do + let(:session_id) { 12_345 } + let(:request_id) { 1 } + let(:connection) { Wamp::Connection::WebSocketConnection.new } + + def received_message(message) + connection.on_message(connection.joiner.serializer.serialize(message)) + end + + let(:welcome) { Wampproto::Message::Welcome.new(session_id, {}) } + let(:received_welcome) { received_message(welcome) } + + before do + allow(Wamp::Connection::WebsocketClient).to receive(:new) + .and_return(instance_double(Wamp::Connection::WebsocketClient)) + allow(connection.websocket).to receive(:transmit) + connection.joiner.send_hello + end + + describe "#subscribe" do + let(:topic) { "com.hello.subscribe" } + let(:subscription_id) { 345 } + let(:publication_id) { 999 } + let(:subscribed) { Wampproto::Message::Subscribed.new(request_id, subscription_id) } + let(:received_subscribed) { received_message(subscribed) } + + let(:unsubscribed) { Wampproto::Message::Unsubscribed.new(request_id + 1) } + let(:received_unsubscribed) { received_message(unsubscribed) } + + let(:event) { Wampproto::Message::Event.new(subscription_id, publication_id, {}, 1, 2) } + let(:received_event) { received_message(event) } + + let(:published) { Wampproto::Message::Published.new(request_id, publication_id) } + let(:received_published) { received_message(published) } + + it "subscribes and unsubscribes" do + connection.on_join do |api| + handler = proc { |msg| msg } + api.subscribe(topic, handler) + received_subscribed + expect(connection.store).to include("subscription_#{subscription_id}") + + api.unsubscribe(topic) + received_unsubscribed + expect(connection.store).not_to include("subscription_#{subscription_id}") + end + + expect(connection.api).to receive(:subscribe).and_call_original + received_welcome + end + + it "subscribes and receives event" do + connection.on_join do |api| + handler = proc { |msg| msg } + api.subscribe(topic, handler) + received_subscribed + expect(connection.store).to include("subscription_#{subscription_id}") + + expect(handler).to receive(:call).and_return(event) + received_event + end + + expect(connection.api).to receive(:subscribe).and_call_original + received_welcome + end + + it "publishes an event" do + connection.on_join do |api| + api.publish(topic, { acknowledge: true }, 1, 2) do + expect(connection.store).not_to include("request_#{request_id}") + end + expect(connection.store).to include("request_#{request_id}") + end + received_welcome + expect(connection.store).to include("request_#{request_id}") + end + end + + describe "#register" do + let(:procedure) { "com.hello.register" } + let(:registration_id) { 8877 } + let(:registered) { Wampproto::Message::Registered.new(request_id, registration_id) } + let(:received_registered) { received_message(registered) } + + let(:unregistered) { Wampproto::Message::Unregistered.new(request_id + 1) } + let(:received_unregistered) { received_message(unregistered) } + + let(:invocation) { Wampproto::Message::Invocation.new(request_id + 1, registration_id, {}, 1) } + let(:received_invocation) { received_message(invocation) } + + it "registers and unregisters a procedure" do + connection.on_join do |api| + handler = proc { |msg| msg } + api.register(procedure, handler) do |response| + expect(response).to be_an_instance_of(Wampproto::Message::Registered) + end + received_registered + expect(connection.store).to include("registration_#{registration_id}") + + api.unregister(procedure) do |response| + expect(response).to be_an_instance_of(Wampproto::Message::Unregistered) + end + received_unregistered + expect(connection.store).not_to include("registration_#{registration_id}") + end + expect(connection.api).to receive(:register).and_call_original + + received_welcome + end + + it "registers and receives invocation" do + connection.on_join do |api| + handler = proc { |msg| expect(msg).to be_an_instance_of(Wampproto::Message::Invocation) } + + api.register(procedure, handler) do |response| + expect(response).to be_an_instance_of(Wampproto::Message::Registered) + end + received_registered + expect(connection.store).to include("registration_#{registration_id}") + + expect(handler).to receive(:call) + received_invocation + end + expect(connection.api).to receive(:register).and_call_original + + received_welcome + end + end + + describe "#call" do + let(:procedure) { "com.hello.register" } + # let(:call) { Wampproto::Message::Call.new(request_id, {}, procedure) } + # let(:received_call) { received_message(call) } + + let(:result) { Wampproto::Message::Result.new(request_id, {}, procedure, 4) } + let(:received_result) { received_message(result) } + + it "calls the procedure and receives the result" do + connection.on_join do |api| + counter = 1 + api.call(procedure, {}, 2) do |response| + counter += 1 + expect(response).to be_an_instance_of(Wampproto::Message::Result) + end + + expect(connection.store).to include("request_#{request_id}") + expect { received_result }.to change { counter }.by(1) + + expect(connection.store).not_to include("request_#{request_id}") + end + + expect(connection.api).to receive(:call).and_call_original + received_welcome + end + end +end