Skip to content

EM.attach_server patch #93

@tmm1

Description

@tmm1
From a6fe3a20f1777159cae1936c5dbdf9d1b23b9c0b Mon Sep 17 00:00:00 2001
From: Hongli Lai (Phusion) <[email protected]>
Date: Sat, 19 Jun 2010 13:03:13 +0200
Subject: [PATCH] Add EventMachine::attach_server for attaching an arbitrary server IO object or file descriptor to the event loop.

---
 ext/cmain.cpp        |   10 +++++++++
 ext/ed.cpp           |   11 ++++++---
 ext/ed.h             |    5 ++-
 ext/em.cpp           |   56 ++++++++++++++++++++++++++++++++++++++++++++++++++
 ext/em.h             |    1 +
 ext/eventmachine.h   |    1 +
 ext/rubymain.cpp     |   13 +++++++++++
 lib/eventmachine.rb  |   22 +++++++++++++++++++
 tests/test_attach.rb |   22 +++++++++++++++++++
 9 files changed, 135 insertions(+), 6 deletions(-)

diff --git a/ext/cmain.cpp b/ext/cmain.cpp
index 143c06f..993c2a7 100644
--- a/ext/cmain.cpp
+++ b/ext/cmain.cpp
@@ -152,6 +152,16 @@ extern "C" int evma_detach_fd (const unsigned long binding)
            return -1;
 }

+/*********************
+evma_attach_server_fd
+**********************/
+
+extern "C" const unsigned long evma_attach_server_fd (int file_descriptor)
+{
+   ensure_eventmachine("evma_attach_server_fd");
+   return EventMachine->AttachServerFD (file_descriptor);
+}
+
 /************************
 evma_get_file_descriptor
 ************************/
diff --git a/ext/ed.cpp b/ext/ed.cpp
index 06c8f6c..fb70760 100644
--- a/ext/ed.cpp
+++ b/ext/ed.cpp
@@ -50,7 +50,8 @@ bool SetSocketNonblocking (SOCKET sd)
 EventableDescriptor::EventableDescriptor
 ****************************************/

-EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
+EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em, bool autoclose):
+   bAutoClose (autoclose),
    bCloseNow (false),
    bCloseAfterWriting (false),
    MySocket (sd),
@@ -114,7 +115,9 @@ EventableDescriptor::~EventableDescriptor()
        ProxiedFrom->StopProxy();
    }
    StopProxy();
-   Close();
+   if (bAutoClose) {
+       Close();
+   }
 }


@@ -1258,8 +1261,8 @@ void LoopbreakDescriptor::Write()
 AcceptorDescriptor::AcceptorDescriptor
 **************************************/

-AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em):
-   EventableDescriptor (sd, parent_em)
+AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em, bool autoclose):
+   EventableDescriptor (sd, parent_em, autoclose)
 {
    #ifdef HAVE_EPOLL
    EpollEvent.events = EPOLLIN;
diff --git a/ext/ed.h b/ext/ed.h
index bcfe682..1cbf224 100644
--- a/ext/ed.h
+++ b/ext/ed.h
@@ -36,7 +36,7 @@ class EventableDescriptor
 class EventableDescriptor: public Bindable_t
 {
    public:
-       EventableDescriptor (int, EventMachine_t*);
+       EventableDescriptor (int, EventMachine_t*, bool autoclose = true);
        virtual ~EventableDescriptor();

        int GetSocket() {return MySocket;}
@@ -97,6 +97,7 @@ class EventableDescriptor: public Bindable_t
        virtual uint64_t GetNextHeartbeat();

    private:
+       bool bAutoClose;
        bool bCloseNow;
        bool bCloseAfterWriting;

@@ -309,7 +310,7 @@ class AcceptorDescriptor
 class AcceptorDescriptor: public EventableDescriptor
 {
    public:
-       AcceptorDescriptor (int, EventMachine_t*);
+       AcceptorDescriptor (int, EventMachine_t*, bool = true);
        virtual ~AcceptorDescriptor();

        virtual void Read();
diff --git a/ext/em.cpp b/ext/em.cpp
index fff89bc..465e329 100644
--- a/ext/em.cpp
+++ b/ext/em.cpp
@@ -1446,6 +1446,62 @@ int EventMachine_t::DetachFD (EventableDescriptor *ed)
    return fd;
 }

+/******************************
+EventMachine_t::AttachServerFD
+*******************************/
+
+const unsigned long EventMachine_t::AttachServerFD (int sd_accept)
+{
+   unsigned long output_binding = 0;
+   
+   { // set reuseaddr to improve performance on restarts.
+       int oval = 1;
+       if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) {
+           //__warning ("setsockopt failed while creating listener","");
+           goto fail;
+       }
+   }
+
+   { // set CLOEXEC. Only makes sense on Unix
+       #ifdef OS_UNIX
+       int cloexec = fcntl (sd_accept, F_GETFD, 0);
+       assert (cloexec >= 0);
+       cloexec |= FD_CLOEXEC;
+       fcntl (sd_accept, F_SETFD, cloexec);
+       #endif
+   }
+
+   if (listen (sd_accept, 100)) {
+       //__warning ("listen failed");
+       goto fail;
+   }
+
+   {
+       // Set the acceptor non-blocking.
+       // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
+       if (!SetSocketNonblocking (sd_accept)) {
+       //int val = fcntl (sd_accept, F_GETFL, 0);
+       //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
+           goto fail;
+       }
+   }
+
+   { // Looking good.
+       AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this, false);
+       if (!ad)
+           throw std::runtime_error ("unable to allocate acceptor");
+       Add (ad);
+       output_binding = ad->GetBinding();
+   }
+
+   return output_binding;
+
+   fail:
+   if (sd_accept != INVALID_SOCKET)
+       closesocket (sd_accept);
+   return 0;
+}
+
 /************
 name2address
 ************/
diff --git a/ext/em.h b/ext/em.h
index 18f00d6..5ddeb5c 100644
--- a/ext/em.h
+++ b/ext/em.h
@@ -95,6 +95,7 @@ class EventMachine_t

        const unsigned long AttachFD (int, bool);
        int DetachFD (EventableDescriptor*);
+       const unsigned long AttachServerFD (int);

        void ArmKqueueWriter (EventableDescriptor*);
        void ArmKqueueReader (EventableDescriptor*);
diff --git a/ext/eventmachine.h b/ext/eventmachine.h
index 0d900fd..fcdc0ef 100644
--- a/ext/eventmachine.h
+++ b/ext/eventmachine.h
@@ -49,6 +49,7 @@ extern "C" {

    const unsigned long evma_attach_fd (int file_descriptor, int watch_mode);
    int evma_detach_fd (const unsigned long binding);
+   const unsigned long evma_attach_server_fd (int file_descriptor);
    int evma_get_file_descriptor (const unsigned long binding);
    int evma_is_notify_readable (const unsigned long binding);
    void evma_set_notify_readable (const unsigned long binding, int mode);
diff --git a/ext/rubymain.cpp b/ext/rubymain.cpp
index 0a7f3ee..c1f14e8 100644
--- a/ext/rubymain.cpp
+++ b/ext/rubymain.cpp
@@ -558,6 +558,18 @@ static VALUE t_detach_fd (VALUE self, VALUE signature)
    return INT2NUM(evma_detach_fd (NUM2ULONG (signature)));
 }

+/***********
+t_attach_server_fd
+***********/
+
+static VALUE t_attach_server_fd (VALUE self, VALUE file_descriptor, VALUE watch_mode)
+{
+   const unsigned long f = evma_attach_server_fd (NUM2INT(file_descriptor));
+   if (!f)
+       rb_raise (rb_eRuntimeError, "no connection");
+   return ULONG2NUM (f);
+}
+
 /**************
 t_get_sock_opt
 **************/
@@ -1128,6 +1140,7 @@ extern "C" void Init_rubyeventmachine()

    rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 2);
    rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1);
+   rb_define_module_function (EmModule, "attach_server_fd", (VALUE (*)(...))t_attach_server_fd, 1);
    rb_define_module_function (EmModule, "get_sock_opt", (VALUE (*)(...))t_get_sock_opt, 3);
    rb_define_module_function (EmModule, "set_notify_readable", (VALUE (*)(...))t_set_notify_readable, 2);
    rb_define_module_function (EmModule, "set_notify_writable", (VALUE (*)(...))t_set_notify_writable, 2);
diff --git a/lib/eventmachine.rb b/lib/eventmachine.rb
index f7f9432..7db2e4c 100644
--- a/lib/eventmachine.rb
+++ b/lib/eventmachine.rb
@@ -740,6 +740,28 @@ module EventMachine
     block_given? and yield c
     c
   end
+  
+  # Attaches a server IO object or file descriptor to the eventloop.
+  # This function behaves just like start_server but allows you to reuse
+  # an already existing file descriptor instead of having EventMachine create
+  # one. The descriptor must be accept()able and will be set as non-blocking.
+  #
+  # Unlike start_server however, the file descriptor is not closed when
+  # EventMachine is released, so you will have to do any cleanups manually.
+  # If +io+ is an IO object then a reference to it will be kept until
+  # EventMachine is released, so that the file descriptor isn't accidentally
+  # closed by the garbage collector.
+  def EventMachine::attach_server io, handler = nil, *args, &block
+    klass = klass_from_handler(Connection, handler, *args)
+    if io.respond_to?(:fileno)
+      fd = defined?(JRuby) ? JRuby.runtime.getDescriptorByFileno(io.fileno).getChannel : io.fileno
+    else
+      fd = io
+    end
+    s = attach_server_fd(fd)
+    @acceptors[s] = [klass,args,block,io]
+    s
+  end


   # Connect to a given host/port and re-use the provided EventMachine::Connection instance
diff --git a/tests/test_attach.rb b/tests/test_attach.rb
index 2359a96..e4be15d 100644
--- a/tests/test_attach.rb
+++ b/tests/test_attach.rb
@@ -28,6 +28,7 @@ class TestAttach < Test::Unit::TestCase

   class EchoServer < EM::Connection
     def receive_data data
+      $received_data << data
       send_data data
     end
   end
@@ -53,12 +54,14 @@ class TestAttach < Test::Unit::TestCase

   def setup
     $read, $write, $sock, $r, $w, $fd, $sock, $before, $after = nil
+    $received_data = ""
   end

   def teardown
     [$read, $write, $sock, $r, $w, $fd, $sock, $before, $after].each do |io|
       io.close rescue nil
     end
+    $received_data = nil
   end

   def test_attach
@@ -75,6 +78,25 @@ class TestAttach < Test::Unit::TestCase
     assert_equal false, $sock.closed?
     assert_equal $sock.readline, "def\n"
   end
+  
+  def test_attach_server
+    $before = TCPServer.new(Host, Port)
+    EM.run {
+      EM.attach_server $before, EchoServer
+      
+      handler = Class.new(EM::Connection) do
+        def initialize
+          send_data "hello world"
+          close_connection_after_writing
+          EM.add_timer(0.1) { EM.stop }
+        end
+      end
+      EM.connect(Host, Port, handler)
+    }
+    
+    assert_equal false, $before.closed?
+    assert_equal "hello world", $received_data
+  end

   module PipeWatch
     def notify_readable
-- 
1.6.6

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions