[Catalyst-commits] r7047 - in
trunk/Catalyst-Engine-HTTP-Sprocket/lib: .
Catalyst/Engine/HTTP/Sprocket
andyg at dev.catalyst.perl.org
andyg at dev.catalyst.perl.org
Tue Oct 23 04:01:48 GMT 2007
Author: andyg
Date: 2007-10-23 04:01:48 +0100 (Tue, 23 Oct 2007)
New Revision: 7047
Removed:
trunk/Catalyst-Engine-HTTP-Sprocket/lib/POE/
Modified:
trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm
trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm
Log:
Remove all use of filters, header parsing is now done in the child process
Modified: trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm
===================================================================
--- trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm 2007-10-22 18:52:48 UTC (rev 7046)
+++ trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm 2007-10-23 03:01:48 UTC (rev 7047)
@@ -7,9 +7,6 @@
use Data::Dump qw(dump);
use POE;
-use POE::Filter::HTTPD::HeadersOnly;
-use POE::Filter::Reference;
-use POE::Filter::Stream;
use POE::Wheel::Run;
use Catalyst::Engine::HTTP::Sprocket::Worker;
@@ -68,7 +65,7 @@
CloseEvent => 'child_closed',
StdoutEvent => 'child_stdout',
StderrEvent => 'child_stderr',
- StdinFilter => POE::Filter::Reference->new(),
+ StdinFilter => POE::Filter::Stream->new(),
StdoutFilter => POE::Filter::Stream->new(),
StderrFilter => POE::Filter::Line->new(),
);
@@ -110,32 +107,22 @@
$self->take_connection( $con );
- $con->filter->push(
- POE::Filter::HTTPD::HeadersOnly->new(
- headers_only => 1,
- )
- );
-
#$con->set_time_out( 5 );
return OK;
}
sub local_receive {
- my ( $self, $server, $con, $req ) = @_;
+ my ( $self, $server, $con, $input ) = @_;
- # XXX: Static requests should be handled directly with AIO
-
- # XXX: improve performance by parsing headers only in the child?
-
- # If this connection is now handling body data, just pass it along
- if ( my $id = $con->x->{body_mode} ) {
- DEBUG && warn "[$id] Passed along " . length($req) . " bytes of body data\n";
- $self->{children}->{ $id }->put( $req );
+ # If this connection is already being handled, just pass the data along
+ if ( my $id = $con->x->{handler_id} ) {
+ DEBUG && warn "[$id] Passed along " . length($input) . " bytes of body data\n";
+ $self->{children}->{ $id }->put( $input );
return 1;
}
- DEBUG && warn "local_receive: " . dump($req) . "\n";
+ DEBUG && warn "local_receive: " . dump($input) . "\n";
# Find a free child to send the request to
for my $id ( keys %{ $self->{children} } ) {
@@ -144,39 +131,22 @@
# Mark the child as busy handling this connection
$self->{child_busy}->{ $id } = $con;
- # Add connection info to request
- $req->{_con_addr} = $con->peer_ip;
- $req->{_con_host} = $con->peer_hostname;
- $req->{_con_server} = $con->local_ip;
- $req->{_con_sport} = $con->local_port;
+ # Link this child to the connection
+ $con->x->{handler_id} = $id;
- DEBUG && warn "[$id] " . $req->method . ' ' . $req->uri . "\n";
+ # XXX: Static requests should be handled directly with AIO
- # Send the request
- $self->{children}->{ $id }->put( $req );
+ # Pass connection info before the request
+ my $conn = join( "|", (
+ $con->peer_ip,
+ $con->peer_hostname,
+ $con->local_ip,
+ $con->local_port,
+ ) ) . "\x0D\x0A";
- # Change the filter to Stream if this is not a GET/HEAD request
- if ( $req->method !~ /^(?:GET|HEAD)$/ ) {
- DEBUG && warn "[$id] Switched to Stream filter for " . $req->method . " body\n";
-
- $self->{children}->{ $id }->set_stdin_filter( POE::Filter::Stream->new() );
-
- $con->x->{body_mode} = $id;
- # Existing body data in the filter's buffer will be ready immediately,
- # additional data will come in through this method, we will detect this with
- # the body_mode flag
- }
+ # Send the data along to the child for handling
+ $self->{children}->{ $id }->put( $conn . $input );
- # Get pending data out of the HeadersOnly filter and pass it along
- my $pending = $con->filter->get_pending();
- if ( @{$pending} && $pending->[0] ) {
- DEBUG && warn "[$id] Passed along " . length( $pending->[0] ) . " bytes of body data\n";
- $self->{children}->{ $id }->put( $pending->[0] );
- }
-
- # Pop the HeadersOnly filter off, we use Stream for the response
- $con->filter->pop();
-
return OK;
}
@@ -219,53 +189,38 @@
$con->send( $input );
- if ( !$con->x->{seen_length} && $input =~ /Content-Length:\s+(\d+)/i ) {
+ if ( $con->x->{length} ) {
+ # Body chunk(s), count bytes sent
+ my $sent = length $input;
+ $con->x->{sent} += $sent;
+
+ DEBUG && warn "[$wheel_id] Sent $sent bytes of body data\n";
+ }
+ elsif ( $input =~ /Content-Length:\s+(\d+)/i ) {
# The header chunk, remember the body length
$con->x->{length} = $1;
$con->x->{sent} = 0;
DEBUG && warn "[$wheel_id] Starting response with length: " . $con->x->{length} . "\n";
- $con->x->{seen_length} = 1;
-
# Any data in this chunk past \r\n\r\n is body data
if ( $input =~ /\x0D\x0A\x0D\x0A(.+)/s ) {
$con->x->{sent} = length $1;
DEBUG && warn "[$wheel_id] Sent " . $con->x->{sent} . " bytes after header\n";
}
}
- else {
- # Body chunk(s), count bytes sent
- my $sent = length $input;
- $con->x->{sent} += $sent;
-
- DEBUG && warn "[$wheel_id] Sent $sent bytes\n";
- }
-
- if ( $con->x->{seen_length} && $con->x->{sent} >= $con->x->{length} ) {
+
+ if ( $con->x->{length} && $con->x->{sent} >= $con->x->{length} ) {
# All done sending the body, reset for the next request
DEBUG && warn "[$wheel_id] Done sending " . $con->x->{sent} . " bytes, resetting\n";
-
+
+ delete $con->x->{handler_id};
delete $con->x->{sent};
delete $con->x->{length};
- delete $con->x->{seen_length};
-
- # Push the HeadersOnly filter back on for the next request
- $con->filter->push(
- POE::Filter::HTTPD::HeadersOnly->new(
- headers_only => 1,
- )
- );
-
- if ( $con->x->{body_mode} ) {
- delete $con->x->{body_mode};
-
- $self->{children}->{ $id }->set_stdin_filter( POE::Filter::Reference->new() );
- }
-
+
# This child is free again
$self->{child_busy}->{ $wheel_id } = 0;
-
+
# XXX: keep-alive stuff
}
}
Modified: trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm
===================================================================
--- trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm 2007-10-22 18:52:48 UTC (rev 7046)
+++ trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm 2007-10-23 03:01:48 UTC (rev 7047)
@@ -6,16 +6,19 @@
use base qw(Catalyst::Engine::CGI);
use Data::Dump qw(dump);
+use HTTP::Headers;
+use HTTP::Response;
use HTTP::Status qw(status_message);
use NEXT;
use POE::Filter::Reference;
+sub DEBUG () { $ENV{CATALYST_POE_DEBUG} || 0 }
+
sub new {
my ( $class, $config ) = @_;
bless {
config => $config,
- filter => POE::Filter::Reference->new(),
body => '',
}, $class;
}
@@ -41,29 +44,87 @@
my %copy_of_env = %ENV;
- while ( sysread STDIN, my $buf = '', 1024 ) {
- my $req = $self->{filter}->get( [ $buf ] );
+ my $inputbuf = '';
+
+ while ( sysread STDIN, my $input = '', 65536 ) {
- if ( @{$req} ) {
- $req = $req->[0];
+ $inputbuf .= $input;
+
+ # Have we read enough to include all headers?
+ if ( $inputbuf =~ /(\x0D\x0A?\x0D\x0A?|\x0A\x0D?\x0A\x0D?)/s ) {
+
+ # Copy the buffer for header parsing, and remove the header block
+ # from the content buffer.
+ my $buf = $inputbuf;
+ $inputbuf =~ s/.*?(\x0D\x0A?\x0D\x0A?|\x0A\x0D?\x0A\x0D?)//s;
- my ( $path, $query_string ) = split /\?/, $req->uri, 2;
-
+ # Pull out the connection-specific data from the first line
+ my ($con_addr, $con_host, $con_server, $con_sport);
+ $buf =~ s/^([^\|]+)\|([^\|]+)\|([^\|]+)\|([^\x0D]+)\x0D\x0A//;
+ $con_addr = $1;
+ $con_host = $2;
+ $con_server = $3;
+ $con_sport = $4;
+
+ # Parse the request line.
+ if ( $buf !~ s/^(\w+)[ \t]+(\S+)(?:[ \t]+(HTTP\/\d+\.\d+))?[^\012]*\012// ) {
+ # Invalid request
+ DEBUG && warn "[$$] Bad request: $buf\n";
+
+ my $status = 400;
+ my $message = status_message($status);
+ my $response = HTTP::Response->new( $status => $message );
+ $response->content_type( 'text/plain' );
+ $response->content( "$status $message" );
+ syswrite STDOUT, $response->as_string("\x0D\x0A");
+ next;
+ }
+
+ my $method = $1;
+ my $uri = $2;
+ my $proto = $3 || 'HTTP/0.9';
+
+ DEBUG && warn "[$$] $method $uri $proto\n";
+
+ my ( $path, $query_string ) = split /\?/, $uri, 2;
+
# Initialize CGI environment
local %ENV = (
PATH_INFO => $path || '',
QUERY_STRING => $query_string || '',
- REMOTE_ADDR => $req->{_con_addr},
- REMOTE_HOST => $req->{_con_host},
- REQUEST_METHOD => $req->method || '',
- SERVER_NAME => $req->{_con_server},
- SERVER_PORT => $req->{_con_sport},
- SERVER_PROTOCOL => $req->protocol,
+ REMOTE_ADDR => $con_addr,
+ REMOTE_HOST => $con_host,
+ REQUEST_METHOD => $method || '',
+ SERVER_NAME => $con_server,
+ SERVER_PORT => $con_sport,
+ SERVER_PROTOCOL => $proto,
%copy_of_env,
);
+ # Parse headers
+ my $headers = HTTP::Headers->new;
+ my ($key, $val);
+ HEADER:
+ while ( $buf =~ s/^([^\012]*)\012// ) {
+ $_ = $1;
+ s/\015$//;
+ if ( /^([\w\-~]+)\s*:\s*(.*)/ ) {
+ $headers->push_header( $key, $val ) if $key;
+ ($key, $val) = ($1, $2);
+ }
+ elsif ( /^\s+(.*)/ ) {
+ $val .= " $1";
+ }
+ else {
+ last HEADER;
+ }
+ }
+ $headers->push_header( $key, $val ) if $key;
+
+ DEBUG && warn "[$$] " . dump($headers) . "\n";
+
# Convert headers into ENV vars
- $req->headers->scan( sub {
+ $headers->scan( sub {
my ( $key, $val ) = @_;
$key = uc $key;
@@ -79,13 +140,12 @@
$ENV{$key} = $val;
}
} );
-
- # Pull out any pending data from the filter for use by read_chunk
- my $pending = $self->{filter}->get_pending();
- if ( defined $pending && @{$pending} ) {
- $self->{body} = $pending->[0];
- }
-
+
+ # Save inputbuf for use in read_chunk and reset
+ # inputbuf for the next request.
+ $self->{body} = $inputbuf;
+ $inputbuf = '';
+
# Pass flow to Catalyst
$config->{appclass}->handle_request();
}
More information about the Catalyst-commits
mailing list