I waited and waited and waited, and when no message came, I knew it must have been from you.
- Ashleigh Brilliant
In this chapter, we build on the lessons learned in the preceding chapter and implement two layers on top of sockets. The first is an asynchronous message-passing system, Msg, which takes advantage of nonblocking I/O where available. We then build a remote procedure call module, RPC, on top of the Msg substrate. RPC offers the convenience of synchronous procedure calls and accounts for exceptions, wantarray , parameter marshalling, and so on.
Before we proceed, let us get one basic definition out of the way. In Chapter 12, Networking with Sockets , we glossed over the definition of a " message." A socket connection is simply a stream of bytes and leaves it to the application to define message boundaries, so the receiver can tell when one message ends and another starts. Some protocols insert an end-of-message character, an arbitrarily chosen byte such as ASCII 4 (Ctrl-D), or a lone period on a line, and some prepend message lengths so that the receiver knows how much to expect. We use the latter option in this chapter.
In this section, we implement a module called Msg, an event-driven, client-server, messaging framework,[ 1 ] using the IO::Select and IO::Socket modules. These are its key characteristics:
You can instruct Msg to either send a message right away or queue it for later delivery.
Msg checks to see whether your system supports POSIX and, if so, uses its nonblocking I/O support (as shown in Chapter 12 ). On systems with nonblocking I/O support but no POSIX compliance, you can inherit from Msg and override two methods to set a filehandle's blocking properties. On systems with no support at all for this facility, a send or a receive will block, but because select is used to determine a good time when messages can be sent out or received, it minimizes the chances that these calls might block (or block for long).
Msg simply prepends every outgoing buffer with 4 bytes containing the message's length. The receiving side knows to expect at least 4 bytes and subsequently knows how long a message to expect.
Msg doesn't bother to look inside your message; this means that you have to be careful about sending binary messages to some other architecture. One simple solution is to encode all your messages in ASCII (using sprintf or pack ). The RPC module, described later in this chapter, uses the FreezeThaw library to obtain a network-transparent encoding.
[1] Talk about being fully buzzword-compliant!
The following code shows a client that uses Msg:
use Msg; $conn = Msg->connect('localhost', 8080); die "Error: Could not connect\n" unless $conn; $conn->send_now("Message $i"); ($msg, $err) = $conn->rcv_now();
connect is a static method that creates a connection object (one of its attributes is a socket connection). The send_now method pumps out messages on that connection, and a corresponding method called rcv_now blocks until it receives a message from the other side. We'll look at deferred (or queued) messaging shortly.
The following code shows a server built using Msg:
use Msg; use strict; my $host = 'localhost'; my $port = 8080; Msg->new_server($host, $port, \&login_proc); print "Server created. Waiting for events"; Msg->event_loop(); #--------------------------------------------------------------- sub login_proc { # Unconditionally accept an incoming connection request return \&rcvd_msg_from_client; } sub rcvd_msg_from_client { my ($conn, $msg, $err) = @_; if (defined $msg) { print "$msg\n"; } }
The script calls new_server to create a listening socket (the program's network address) and then calls event_loop , an event dispatcher, which is a thin wrapper over select .
When a client process attempts to connect, Msg creates a local connection object and calls the login procedure supplied by you (to new_server ), with the connection object as an argument. In the login procedure, you can query the remote host and port if you want and refuse a connection request by returning undef . To accept the connection, you return the reference to a subroutine ( rcvd_msg_from_client in this example), which will be called on every subsequent message received on that connection. Different connections can have different receiving procedures if you so wish.
This is how you send or receive messages in a deferred fashion:
$conn = Msg->connect($remote_host, $remote_port, \&msg_from_server); $conn->send_later($msg); Msg->event_loop();
The connect method takes a reference to a subroutine exactly like new_server . event_loop sends queued outgoing messages when the connection becomes writable, and is responsible for dispatching incoming messages to corresponding local subroutines (deferred receive). Note that if a client wants to use deferred messages, it has to call event_loop .
Do you see the lines between a "client" and "server" blurring? Both have event loops (although the client requires it only for deferred messages) and respond to incoming messages. In a traditional client/server setup such as a database connection, the client initiates the conversation ( connect ) and asks the questions. The server never initiates a request. In a peer-peer setup, as in a telephone conversation, one process initiates the conversation, but once the connection is established, either process can send messages. Msg supports this peer-peer model.
Other filehandles can be incorporated into the event loop, like this:
Msg->set_event_handler (\*STDIN, "read" => \&kbd_input);
The process can now respond to keyboard events and still keep an ear out for incoming messages, or send outgoing queued messages in the "background."
All event-driven frameworks support timer events to periodically trigger a background task. If you have a time-consuming task, you are expected to split it up into more manageable pieces and use a timer (with a 0 second time-out) to trigger the next subtask. This way, you can keep returning to the event loop after every subtask is over and get a chance to process other messages that might have trickled in since then. Since this chapter is about networking, I have not taken the trouble to add timer support to Msg. It is a rather trivial addition, because select supports a millisecond resolution time-out facility.
Msg exhibits the public interface shown in Table 13.1 .
Method |
Description |
---|---|
connect(host, port, [rcv_cb]) |
Connects to a server at the remote host and port and returns a connection object. rcv_callback is the reference to a user-defined subroutine, which is called as follows when the remote process sends a message (at any time): rcv_callback($conn, $msg, $err) conn is the connection object, used to send messages or issue a disconnect. msg is the received message; it is undef if the connection is closed on the other side. ( Msg automatically closes the connection on this side if this happens.) err , if present, contains the last error value on a sysread . |
$conn->send_now($msg) |
Sends the message right away and blocks if it needs to. If there are queued messages, it sends them first before attempting to send |
$conn->send_later($msg) |
Puts the message in a queue associated with the connection object and leaves it to event_loop (described later) to dispatch it when the socket becomes writable. That is, you have to call event_loop at some point; otherwise, the message never goes out. |
$conn->disconnect() |
Closes the connection. |
($msg, $err) = $conn->rcv_now() |
Blocks until it receives a full message. It does not call the callback function given to connect . In a scalar context, it returns only the message; otherwise it returns the error code, if any. |
new_server($thishost, $thisport, [login_proc]) |
A static method that creates a listening socket at
thishost
,
thisport
. When a remote socket attempts to |
set_event_handler( $handle, ["read" => rd_cb], ["write" => wt_cb]) |
|
event_loop ([count]) |
Executes the select loop |
The Msg implementation is divided into four logical parts:
Send routines . For connecting to a remote process and sending messages to it.
Receive routines . For receiving notification when a message or a connection request comes in.
Support for nonblocking I/O . Routines to make a socket blocking or non-blocking if the platform supports the POSIX module.
Let's start with the send-side routines:
package Msg; use strict; use IO::Select; use IO::Socket; use Carp; use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles); %rd_callbacks = (); %wt_callbacks = (); $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); my $blocking_supported = 0;
sub connect { my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_; # Create a new internet socket my $sock = IO::Socket::INET->new ( PeerAddr => $to_host, PeerPort => $to_port, Proto => 'tcp'); return undef unless $sock; # Create a connection end-point object my $conn = bless { sock => $sock, rcvd_notification_proc => $rcvd_notification_proc, }, $pkg; if ($rcvd_notification_proc) { # Bundle _rcv and $conn together in a closure my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } $conn; }
connect sets up a client socket and creates the connection object mentioned earlier. The connection object is a communications endpoint and has the following attributes:
sock
The socket connection
rcvd_notification_proc
A callback function to call on receipt of a message
queue
A reference to a list of buffered messages
send_offset
In nonblocking mode, Msg allows partial writes. If the socket blocks, we note down how much of the topmost message in the queue we have already sent.
msg
In nonblocking mode,
msg
contains a partial incoming message...
bytes_to_read
....and
bytes_to_read
contains the bytes still expected.
Once the connection is established, each side can use its local connection object to talk to the other side.
If the user specifies a callback ( $rcvd_notification_proc ), we set up our event handler to call a private routine _rcv , which in turn calls this callback when an entire message has been received:
sub disconnect { my $conn = shift; my $sock = delete $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "read" => undef, "write" => undef); close($sock); undef $!; # Should ideally process errors from close } sub send_now { my ($conn, $msg) = @_; _enqueue ($conn, $msg); $conn->_send (1); # 1 ==> flush }
send_now enqueues the message and tells _send to flush this message and other previous messages hanging around in the queue, if any.
sub send_later { my ($conn, $msg) = @_; _enqueue($conn, $msg); my $sock = $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "write" => sub {$conn->_send(0)}); }
send_later enqueues the message and registers a "write" callback. This is invoked later on when event_loop is called, and the file descriptor is writable.
sub _enqueue { my ($conn, $msg) = @_; # prepend length (encoded as network long) my $len = length($msg); $msg = pack ('N', $len) . $msg; push (@{$conn->{queue}}, $msg); }
_enqueue prepends each message with a length and pushes it into a queue associated with the connection. The length is encoded as a "network-independent long" (a 32-bit number) so that the receiving side knows to read exactly four bytes to obtain this length. As was mentioned earlier, the message itself is assumed to be independent of byte-ordering issues.
sub _send { my ($conn, $flush) = @_; my $sock = $conn->{sock}; return unless defined($sock); my ($rq) = $conn->{queue}; # rq -> ref. to queue. # If $flush is set, set the socket to blocking, and send all # messages in the queue - return only if there's an error # If $flush is 0 (deferred mode) make the socket non-blocking, and # return to the event loop only after every message, or if it # is likely to block in the middle of a message. $flush ? $conn->set_blocking() : $conn->set_non_blocking(); my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0; while (@$rq) { my $msg = $rq->[0]; my $bytes_to_write = length($msg) - $offset; my $bytes_written = 0; while ($bytes_to_write) { $bytes_written = syswrite ($sock, $msg, $bytes_to_write, $offset); if (!defined($bytes_written)) { if (_err_will_block($!)) { # Should happen only in deferred mode. Record how # much we have already sent. $conn->{send_offset} = $offset; # Event handler should already be set, so we will # be called back eventually, and will resume sending return 1; } else { # Uh, oh $conn->handle_send_err($!); return 0; # fail. Message remains in queue .. } } $offset += $bytes_written; $bytes_to_write -= $bytes_written; } delete $conn->{send_offset}; $offset = 0; shift @$rq; last unless $flush; # Go back to select and wait # for it to fire again. } # Call me back if queue has not been drained. if (@$rq) { set_event_handler ($sock, "write" => sub {$conn->_send(0)}); } else { set_event_handler ($sock, "write" => undef); } 1; # Success }
_send does the real work of sending the message and is called either directly from send_now or as a callback from the event loop. If called from send_now , it sets the socket to blocking mode and flushes all messages in the queue. If called from the event loop, it sets the socket to nonblocking mode and flushes at most one message at a time before returning to the event loop. This way, other connections get their share of time. If syswrite says it will block, _send notes down how much of the message has already been sent (in the send_offset attribute) and returns to the event loop. In all cases, it accounts for the fact that syswrite might write only a part of the buffer.
sub handle_send_err { # For more meaningful handling of send errors, subclass Msg and # rebless $conn. my ($conn, $err_msg) = @_; warn "Error while sending: $err_msg \n"; set_event_handler ($conn->{sock}, "write" => undef); }
This is a wishy-washy error-handling procedure that doesn't do anything other than turn off the event loop notification. It does not touch the connection object in any way, so you can potentially resume from where you left off. To do this, you must override this method in an inherited class (look at the RPC module described later, for an example).
The procedures in this section implement the listening side:
my ($g_login_proc, $g_pkg); # The prefix g_ stands for global my $main_socket = 0; sub new_server { @_ == 4 || die "new_server (myhost, myport, login_proc)\n"; my ($pkg, $my_host, $my_port, $login_proc) = @_; $main_socket = IO::Socket::INET->new ( LocalAddr => $my_host, LocalPort => $my_port, Listen => 5, Proto => 'tcp', Reuse => 1); die "Could not create socket: $! \n" unless $main_socket; set_event_handler ($main_socket, "read" => \&_new_client); $g_login_proc = $login_proc; $g_pkg = $pkg; }
new_server is somewhat similar to connect . It creates a listening socket and registers the user-defined login procedure with the event handler. (Client programs that have no wish to send or receive deferred messages don't need to call new_server or event_loop .) This login procedure will not be called until the server calls event_loop and a connection request comes in. Unlike connect , new_server doesn't create a connection object yet; that is the job of _new_client :
sub _new_client { my $sock = $main_socket->accept(); my $conn = bless { 'sock' => $sock, 'state' => 'connected' }, $g_pkg; my $rcvd_notification_proc = &$g_login_proc ($conn); if ($rcvd_notification_proc) { $conn->{rcvd_notification_proc} = $rcvd_notification_proc; my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } else { # Login failed $conn->disconnect(); } }
_new_client
is called when a connection request is received. After doing an
accept
, it gives the user-defined login procedure a chance to accept or reject the connection. If the login procedure accepts the request, it returns a reference to a subroutine. This code reference is promptly associated with the newly created connection object and will be called when a message arrives on that connection.
_
rcv
is registered as the standard callback to process all incoming messages (for all connections) and accumulates an entire message before calling the code reference mentioned above.
sub _rcv { # Complement to _send my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush # Find out how much has already been received, if at all my ($msg, $offset, $bytes_to_read, $bytes_read); my $sock = $conn->{sock}; return unless defined($sock); if (exists $conn->{msg}) { $msg = $conn->{msg}; delete $conn->{'msg'}; # Have made a copy. $offset = length($msg) - 1; # sysread appends to it. $bytes_to_read = $conn->{bytes_to_read}; } else { # The typical case ... $msg = ""; # Otherwise -w complains $offset = 0 ; $bytes_to_read = 0 ; # Will get set soon } # We want to read the message length in blocking mode. Quite # unlikely that we'll get blocked too long reading 4 bytes if (!$bytes_to_read) { # Get new length my $buf; $conn->set_blocking(); $bytes_read = sysread($sock, $buf, 4); if ($! || ($bytes_read != 4)) { goto FINISH; } $bytes_to_read = unpack ('N', $buf); } $conn->set_non_blocking() unless $rcv_now; while ($bytes_to_read) { $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset); if (defined ($bytes_read)) { if ($bytes_read == 0) { last; } $bytes_to_read -= $bytes_read; $offset += $bytes_read; } else { if (_err_will_block($!)) { # Should come here only in non-blocking mode $conn->{msg} = $msg; $conn->{bytes_to_read} = $bytes_to_read; return ; # .. to event loop; _rcv will be called # later when socket is readable again. } else { last; } } } # Message successfully read. FINISH: if (length($msg) == 0) { $conn->disconnect(); } if ($rcv_now) { return ($msg, $!); } else { &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); } }
_read is the complement to _send and does the grunt work of reading from a socket. Unlike _send , it does not know how much data it is going to deal with, but does know that the first four bytes of any message contain the encoded length (of the rest of the message). To simplify matters, it sets the mode to blocking before attempting to read these four bytes with the (reasonable) hope that if it blocks at all, it wouldn't be for too long. Once the length has been decoded, it sets the mode back to nonblocking, if required, and proceeds to read the socket. Like _send , it accounts for the fact that sysread may return less data than asked for, or it might return an error saying it may block. If the socket will block, _rcv copies the partial message into the connection object, notes down the number of bytes still left to read, and returns, waiting for it to be triggered by the event loop again. If there's an error, it disconnects the connection automatically.
sub rcv_now { my ($conn) = @_; my ($msg, $err) = _rcv ($conn, 1); # 1 means receive immediately return wantarray ? ($msg, $err) : $msg; }
BEGIN { eval { require POSIX; POSIX->import(qw(F_SETFL O_NONBLOCK EAGAIN)); }; $blocking_supported = 1 unless $@; }
BEGIN tests to see whether it can load the POSIX module and, if so, sets $blocking_supported , which is used by the following routines:
sub _err_will_block { if ($blocking_supported) { return ($_[0] == EAGAIN()); } return 0; } sub set_non_blocking { if ($blocking_supported) { # preserve other fcntl flags my $flags = fcntl ($_[0], F_GETFL(), 0); fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK()); } } sub set_blocking { if ($blocking_supported) { my $flags = fcntl ($_[0], F_GETFL(), 0); $flags &= ~O_NONBLOCK(); # Clear blocking, but preserve others fcntl ($_[0], F_SETFL(), $flags); } }
set_blocking and set_non_blocking both call fcntl as explained in the last chapter. The F_SETFL sets the file descriptor's flags to the bitmask you supplied, so we take care not to obliterate the flags that may already have been set.
The event loop support routines use IO::Select to manage collections of filehandles and socket handles. The send and receive side routines described earlier call these routines, but because these procedures do not make any assumptions about who is calling them, they are at a logically lower level. This means that for this module to coexist with another event-driven toolkit, you will just have to rewrite the routines shown below (while preserving the interface). For example, to make Msg work with Tk, you can have set_event_handler (described below) simply delegate its functionality to an equivalent Tk procedure called fileevent (described in Section 14.1, "Introduction to GUIs, Tk, and Perl/Tk" ); similarly, event_loop can simply call Tk's run method, instead of calling IO::Select.
sub set_event_handler { shift unless ref($_[0]); # shift if first arg is package name my ($handle, %args) = @_; my $callback; if (exists $args{'write'}) { $callback = $args{'write'}; if ($callback) { $wt_callbacks{$handle} = $callback; $wt_handles->add($handle); } else { delete $wt_callbacks{$handle}; $wt_handles->remove($handle); } } if (exists $args{'read'}) { $callback = $args{'read'}; if ($callback) { $rd_callbacks{$handle} = $callback; $rd_handles->add($handle); } else { delete $rd_callbacks{$handle}; $rd_handles->remove($handle); } } }
set_event_handler simply keeps track of read and write callbacks by using the handle as a hash index. To remove a callback, you call set_event_handler with a callback value of undef :
sub event_loop { my ($pkg, $loop_count) = @_; my ($conn, $r, $w, $rset, $wset); while (1) { # Quit the loop if no handles left to process last unless ($rd_handles->count() || $wt_handles->count()); ($rset, $wset) = IO::Select->select ($rd_handles, $wt_handles, undef, undef); foreach $r (@$rset) { &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r}; } foreach $w (@$wset) { &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; } if (defined($loop_count)) { last unless --$loop_count; } } }
event_loop is normally an infinite loop but can be instructed to cycle a limited number of times. The idea of giving this a loop count is to be able to dispatch other events without forfeiting control to an infinite loop. Take a look at the RPC implementation, described in the following section, which uses a count of 1 to dispatch messages in a controlled fashion.
Copyright © 2001 O'Reilly & Associates. All rights reserved.