-
Notifications
You must be signed in to change notification settings - Fork 630
EM.attach_server patch #93
Copy link
Copy link
Closed
Description
From a6fe3a20f1777159cae1936c5dbdf9d1b23b9c0b Mon Sep 17 00:00:00 2001
From: Hongli Lai (Phusion) <[email protected]>
Date: Sat, 19 Jun 2010 13:03:13 +0200
Subject: [PATCH] Add EventMachine::attach_server for attaching an arbitrary server IO object or file descriptor to the event loop.
---
ext/cmain.cpp | 10 +++++++++
ext/ed.cpp | 11 ++++++---
ext/ed.h | 5 ++-
ext/em.cpp | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++
ext/em.h | 1 +
ext/eventmachine.h | 1 +
ext/rubymain.cpp | 13 +++++++++++
lib/eventmachine.rb | 22 +++++++++++++++++++
tests/test_attach.rb | 22 +++++++++++++++++++
9 files changed, 135 insertions(+), 6 deletions(-)
diff --git a/ext/cmain.cpp b/ext/cmain.cpp
index 143c06f..993c2a7 100644
--- a/ext/cmain.cpp
+++ b/ext/cmain.cpp
@@ -152,6 +152,16 @@ extern "C" int evma_detach_fd (const unsigned long binding)
return -1;
}
+/*********************
+evma_attach_server_fd
+**********************/
+
+extern "C" const unsigned long evma_attach_server_fd (int file_descriptor)
+{
+ ensure_eventmachine("evma_attach_server_fd");
+ return EventMachine->AttachServerFD (file_descriptor);
+}
+
/************************
evma_get_file_descriptor
************************/
diff --git a/ext/ed.cpp b/ext/ed.cpp
index 06c8f6c..fb70760 100644
--- a/ext/ed.cpp
+++ b/ext/ed.cpp
@@ -50,7 +50,8 @@ bool SetSocketNonblocking (SOCKET sd)
EventableDescriptor::EventableDescriptor
****************************************/
-EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
+EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em, bool autoclose):
+ bAutoClose (autoclose),
bCloseNow (false),
bCloseAfterWriting (false),
MySocket (sd),
@@ -114,7 +115,9 @@ EventableDescriptor::~EventableDescriptor()
ProxiedFrom->StopProxy();
}
StopProxy();
- Close();
+ if (bAutoClose) {
+ Close();
+ }
}
@@ -1258,8 +1261,8 @@ void LoopbreakDescriptor::Write()
AcceptorDescriptor::AcceptorDescriptor
**************************************/
-AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em):
- EventableDescriptor (sd, parent_em)
+AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em, bool autoclose):
+ EventableDescriptor (sd, parent_em, autoclose)
{
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLIN;
diff --git a/ext/ed.h b/ext/ed.h
index bcfe682..1cbf224 100644
--- a/ext/ed.h
+++ b/ext/ed.h
@@ -36,7 +36,7 @@ class EventableDescriptor
class EventableDescriptor: public Bindable_t
{
public:
- EventableDescriptor (int, EventMachine_t*);
+ EventableDescriptor (int, EventMachine_t*, bool autoclose = true);
virtual ~EventableDescriptor();
int GetSocket() {return MySocket;}
@@ -97,6 +97,7 @@ class EventableDescriptor: public Bindable_t
virtual uint64_t GetNextHeartbeat();
private:
+ bool bAutoClose;
bool bCloseNow;
bool bCloseAfterWriting;
@@ -309,7 +310,7 @@ class AcceptorDescriptor
class AcceptorDescriptor: public EventableDescriptor
{
public:
- AcceptorDescriptor (int, EventMachine_t*);
+ AcceptorDescriptor (int, EventMachine_t*, bool = true);
virtual ~AcceptorDescriptor();
virtual void Read();
diff --git a/ext/em.cpp b/ext/em.cpp
index fff89bc..465e329 100644
--- a/ext/em.cpp
+++ b/ext/em.cpp
@@ -1446,6 +1446,62 @@ int EventMachine_t::DetachFD (EventableDescriptor *ed)
return fd;
}
+/******************************
+EventMachine_t::AttachServerFD
+*******************************/
+
+const unsigned long EventMachine_t::AttachServerFD (int sd_accept)
+{
+ unsigned long output_binding = 0;
+
+ { // set reuseaddr to improve performance on restarts.
+ int oval = 1;
+ if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) {
+ //__warning ("setsockopt failed while creating listener","");
+ goto fail;
+ }
+ }
+
+ { // set CLOEXEC. Only makes sense on Unix
+ #ifdef OS_UNIX
+ int cloexec = fcntl (sd_accept, F_GETFD, 0);
+ assert (cloexec >= 0);
+ cloexec |= FD_CLOEXEC;
+ fcntl (sd_accept, F_SETFD, cloexec);
+ #endif
+ }
+
+ if (listen (sd_accept, 100)) {
+ //__warning ("listen failed");
+ goto fail;
+ }
+
+ {
+ // Set the acceptor non-blocking.
+ // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
+ if (!SetSocketNonblocking (sd_accept)) {
+ //int val = fcntl (sd_accept, F_GETFL, 0);
+ //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
+ goto fail;
+ }
+ }
+
+ { // Looking good.
+ AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this, false);
+ if (!ad)
+ throw std::runtime_error ("unable to allocate acceptor");
+ Add (ad);
+ output_binding = ad->GetBinding();
+ }
+
+ return output_binding;
+
+ fail:
+ if (sd_accept != INVALID_SOCKET)
+ closesocket (sd_accept);
+ return 0;
+}
+
/************
name2address
************/
diff --git a/ext/em.h b/ext/em.h
index 18f00d6..5ddeb5c 100644
--- a/ext/em.h
+++ b/ext/em.h
@@ -95,6 +95,7 @@ class EventMachine_t
const unsigned long AttachFD (int, bool);
int DetachFD (EventableDescriptor*);
+ const unsigned long AttachServerFD (int);
void ArmKqueueWriter (EventableDescriptor*);
void ArmKqueueReader (EventableDescriptor*);
diff --git a/ext/eventmachine.h b/ext/eventmachine.h
index 0d900fd..fcdc0ef 100644
--- a/ext/eventmachine.h
+++ b/ext/eventmachine.h
@@ -49,6 +49,7 @@ extern "C" {
const unsigned long evma_attach_fd (int file_descriptor, int watch_mode);
int evma_detach_fd (const unsigned long binding);
+ const unsigned long evma_attach_server_fd (int file_descriptor);
int evma_get_file_descriptor (const unsigned long binding);
int evma_is_notify_readable (const unsigned long binding);
void evma_set_notify_readable (const unsigned long binding, int mode);
diff --git a/ext/rubymain.cpp b/ext/rubymain.cpp
index 0a7f3ee..c1f14e8 100644
--- a/ext/rubymain.cpp
+++ b/ext/rubymain.cpp
@@ -558,6 +558,18 @@ static VALUE t_detach_fd (VALUE self, VALUE signature)
return INT2NUM(evma_detach_fd (NUM2ULONG (signature)));
}
+/***********
+t_attach_server_fd
+***********/
+
+static VALUE t_attach_server_fd (VALUE self, VALUE file_descriptor, VALUE watch_mode)
+{
+ const unsigned long f = evma_attach_server_fd (NUM2INT(file_descriptor));
+ if (!f)
+ rb_raise (rb_eRuntimeError, "no connection");
+ return ULONG2NUM (f);
+}
+
/**************
t_get_sock_opt
**************/
@@ -1128,6 +1140,7 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 2);
rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1);
+ rb_define_module_function (EmModule, "attach_server_fd", (VALUE (*)(...))t_attach_server_fd, 1);
rb_define_module_function (EmModule, "get_sock_opt", (VALUE (*)(...))t_get_sock_opt, 3);
rb_define_module_function (EmModule, "set_notify_readable", (VALUE (*)(...))t_set_notify_readable, 2);
rb_define_module_function (EmModule, "set_notify_writable", (VALUE (*)(...))t_set_notify_writable, 2);
diff --git a/lib/eventmachine.rb b/lib/eventmachine.rb
index f7f9432..7db2e4c 100644
--- a/lib/eventmachine.rb
+++ b/lib/eventmachine.rb
@@ -740,6 +740,28 @@ module EventMachine
block_given? and yield c
c
end
+
+ # Attaches a server IO object or file descriptor to the eventloop.
+ # This function behaves just like start_server but allows you to reuse
+ # an already existing file descriptor instead of having EventMachine create
+ # one. The descriptor must be accept()able and will be set as non-blocking.
+ #
+ # Unlike start_server however, the file descriptor is not closed when
+ # EventMachine is released, so you will have to do any cleanups manually.
+ # If +io+ is an IO object then a reference to it will be kept until
+ # EventMachine is released, so that the file descriptor isn't accidentally
+ # closed by the garbage collector.
+ def EventMachine::attach_server io, handler = nil, *args, &block
+ klass = klass_from_handler(Connection, handler, *args)
+ if io.respond_to?(:fileno)
+ fd = defined?(JRuby) ? JRuby.runtime.getDescriptorByFileno(io.fileno).getChannel : io.fileno
+ else
+ fd = io
+ end
+ s = attach_server_fd(fd)
+ @acceptors[s] = [klass,args,block,io]
+ s
+ end
# Connect to a given host/port and re-use the provided EventMachine::Connection instance
diff --git a/tests/test_attach.rb b/tests/test_attach.rb
index 2359a96..e4be15d 100644
--- a/tests/test_attach.rb
+++ b/tests/test_attach.rb
@@ -28,6 +28,7 @@ class TestAttach < Test::Unit::TestCase
class EchoServer < EM::Connection
def receive_data data
+ $received_data << data
send_data data
end
end
@@ -53,12 +54,14 @@ class TestAttach < Test::Unit::TestCase
def setup
$read, $write, $sock, $r, $w, $fd, $sock, $before, $after = nil
+ $received_data = ""
end
def teardown
[$read, $write, $sock, $r, $w, $fd, $sock, $before, $after].each do |io|
io.close rescue nil
end
+ $received_data = nil
end
def test_attach
@@ -75,6 +78,25 @@ class TestAttach < Test::Unit::TestCase
assert_equal false, $sock.closed?
assert_equal $sock.readline, "def\n"
end
+
+ def test_attach_server
+ $before = TCPServer.new(Host, Port)
+ EM.run {
+ EM.attach_server $before, EchoServer
+
+ handler = Class.new(EM::Connection) do
+ def initialize
+ send_data "hello world"
+ close_connection_after_writing
+ EM.add_timer(0.1) { EM.stop }
+ end
+ end
+ EM.connect(Host, Port, handler)
+ }
+
+ assert_equal false, $before.closed?
+ assert_equal "hello world", $received_data
+ end
module PipeWatch
def notify_readable
--
1.6.6
Reactions are currently unavailable