[Catalyst-commits] r7074 - in trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP: . Sprocket

andyg at dev.catalyst.perl.org andyg at dev.catalyst.perl.org
Sat Oct 27 03:58:51 GMT 2007


Author: andyg
Date: 2007-10-27 03:58:50 +0100 (Sat, 27 Oct 2007)
New Revision: 7074

Modified:
   trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket.pm
   trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm
   trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm
Log:
Sprocket engine: start_servers, min_spare, max_spare, idle_timeout params, watchdog to start/kill children as needed

Modified: trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm
===================================================================
--- trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm	2007-10-26 18:20:36 UTC (rev 7073)
+++ trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Server.pm	2007-10-27 02:58:50 UTC (rev 7074)
@@ -19,8 +19,9 @@
 
 use Catalyst::Engine::HTTP::Sprocket::Worker;
 
-sub DEBUG ()   { $ENV{CATALYST_POE_DEBUG} || 0 }
-sub HAS_AIO () { Sprocket::AIO::HAS_AIO() }
+sub DEBUG ()         { $ENV{CATALYST_POE_DEBUG} || 0 }
+sub HAS_AIO ()       { Sprocket::AIO::HAS_AIO() }
+sub WATCHDOG_TIME () { 5 }
 
 my $DONE_MAGIC = 'PmlTpdeB3BGukGz8eb99Wg';
 my $DONE_REGEX = qr{$DONE_MAGIC$};
@@ -28,8 +29,11 @@
 sub new {
     my ( $class, $config ) = @_;
     
-    # Number of children to fork
-    $config->{children} ||= 1;
+    # Process control params
+    $config->{start_servers} ||= 5;  # number of children to start on startup
+    $config->{min_spare}     ||= 5;  # min number of idle children
+    $config->{max_spare}     ||= 10; # max number of idle children
+    $config->{idle_timeout}  ||= 30; # kill idle children > min_spare after this time
     
     if ( HAS_AIO ) {
         # Try to serve everything under /static using AIO
@@ -46,9 +50,16 @@
     $config->{child_done} = $DONE_MAGIC;
     
     my $self = $class->SUPER::new(
-        name   => 'Catalyst Server',
-        config => $config,
-        mime   => MIME::Types->new( only_complete => 1 ),
+        name        => 'Catalyst Server',
+        config      => $config,
+        mime        => MIME::Types->new( only_complete => 1 ),
+        children    => {}, # child wheels
+        child_busy  => {}, # empty for idle children, otherwise contains $con
+        stats       => {
+            started  => time(),
+            num_reqs => 0,
+        },
+        stats_child => {}, # per-child stats
     );
     
     # preload the type index hash so it's not built on the first request
@@ -59,6 +70,10 @@
             $self => [ qw(
                 _start
                 
+                spawn_child
+                kill_child
+                watchdog
+                
                 sig_int
                 sig_child
                 dump_state
@@ -88,45 +103,70 @@
     }
     
     # Fork child proc(s)
-    $self->{children}   = {};
-    $self->{child_busy} = {};
-    
-    for ( 1 .. $self->{config}->{children} ) {
-        my $wheel = POE::Wheel::Run->new(
-            Program      => \&Catalyst::Engine::HTTP::Sprocket::Worker::run,
-            ProgramArgs  => [ $self->{config} ],
-            CloseOnCall  => ($^O eq 'MSWin32' ? 0 : 1),
-            ErrorEvent   => 'child_error',
-            CloseEvent   => 'child_closed',
-            StdoutEvent  => 'child_stdout',
-            StderrEvent  => 'child_stderr',
-            StdinFilter  => POE::Filter::Stream->new(),
-            StdoutFilter => POE::Filter::Stream->new(),
-            StderrFilter => POE::Filter::Line->new(),
-        );
-        
-        # Check for errors
-        if ( !defined $wheel ) {
-            die "Unable to create worker process";
-        }
-        
-        # Setup CHLD handler
-        if ( $kernel->can('sig_child') ) {
-            $kernel->sig_child( $wheel->PID, 'sig_child' );
-        }
-        else {
-            $kernel->sig( CHLD => 'sig_child' );
-        }
-        
-        $self->{children}->{ $wheel->ID } = $wheel;
-        
-        # Set child's status to inactive
-        $self->{child_busy}->{ $wheel->ID } = 0;
+    for ( 1 .. $self->{config}->{start_servers} ) {   
+        $kernel->yield( 'spawn_child' );
     }
     
+    # watchdog process, starts/kills children as necessary
+    $kernel->delay_set( watchdog => WATCHDOG_TIME );
+    
     return $self;
 }
 
+sub spawn_child {
+    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
+    
+    my $wheel = POE::Wheel::Run->new(
+        Program      => \&Catalyst::Engine::HTTP::Sprocket::Worker::run,
+        ProgramArgs  => [ $self->{config} ],
+        CloseOnCall  => ($^O eq 'MSWin32' ? 0 : 1),
+        ErrorEvent   => 'child_error',
+        CloseEvent   => 'child_closed',
+        StdoutEvent  => 'child_stdout',
+        StderrEvent  => 'child_stderr',
+        StdinFilter  => POE::Filter::Stream->new(),
+        StdoutFilter => POE::Filter::Stream->new(),
+        StderrFilter => POE::Filter::Line->new(),
+    );
+    
+    # Check for errors
+    if ( !defined $wheel ) {
+        warn "ERROR: Unable to create worker process: $!\n";
+        return;
+    }
+    
+    # Setup CHLD handler
+    if ( $kernel->can('sig_child') ) {
+        $kernel->sig_child( $wheel->PID, 'sig_child' );
+    }
+    else {
+        $kernel->sig( CHLD => 'sig_child' );
+    }
+    
+    $self->{children}->{ $wheel->ID } = $wheel;
+    
+    # Set child's status to inactive
+    $self->{child_busy}->{ $wheel->ID } = 0;
+    
+    $self->{stats_child}->{ $wheel->ID } = {
+        started  => time(),
+        num_reqs => 0,
+        last_req => 0, # last time child handled a request
+    };
+    
+    DEBUG && warn 'Worker process ' . $wheel->ID . ' (' . $wheel->PID . ") started\n";
+}
+
+sub kill_child {
+    my ( $kernel, $self, $wheel_id ) = @_[ KERNEL, OBJECT, ARG0 ];
+    
+    $self->{children}->{ $wheel_id }->kill();
+    
+    delete $self->{children}->{ $wheel_id };
+    delete $self->{child_busy}->{ $wheel_id };
+    delete $self->{stats_child}->{ $wheel_id };
+}
+
 sub sig_int {
     my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
     
@@ -142,9 +182,9 @@
 }
 
 sub sig_child {
-    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
+    my ( $kernel, $self, $id ) = @_[ KERNEL, OBJECT, ARG1 ];
     
-    # XXX: remove child from children hash
+    DEBUG && warn "Got SIGCHLD for $id\n";
     
     $kernel->sig_handled();
 }
@@ -155,6 +195,67 @@
     warn dump($self) . "\n";
 }
 
+sub watchdog {
+    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
+    
+    my $min_spare = $self->{config}->{min_spare};
+    my $max_spare = $self->{config}->{max_spare};
+    
+    # Check that we have at least min_spare idle children
+    my $idle = grep { !$_ } values %{ $self->{child_busy} };
+    
+    if ( $idle < $min_spare ) {
+        DEBUG && warn "watchdog: $idle idle workers, starting more...\n";
+        
+        # Spawn one child per second
+        my $spawn_time = 1;
+        for ( 1 .. ( $min_spare - $idle ) ) {
+            $kernel->delay_set( spawn_child => $spawn_time++ );
+        }
+    }
+    
+    # Check if we have too many idle servers
+    elsif ( $idle > $max_spare ) {
+        my $to_kill = $idle - $max_spare;
+        
+        DEBUG && warn "watchdog: too many idle workers ($idle), killing $to_kill\n";
+        
+        my $killed = 1;
+        for my $wheel_id ( keys %{ $self->{child_busy} } ) {
+            if ( !$self->{child_busy}->{ $wheel_id } ) {
+                $kernel->call( $_[SESSION], kill_child => $wheel_id );
+                
+                DEBUG && warn "watchdog: killed worker $wheel_id\n";
+                
+                last if $killed++ == $to_kill;
+            }
+        }
+    }
+    
+    # Check if we have more than min_spare and some are idle
+    elsif ( $idle > $min_spare ) {
+        my $time = time();
+        my $idle_timeout = $self->{config}->{idle_timeout};
+        my $to_kill      = $idle - $min_spare;
+        
+        my $killed = 1;
+        for my $wheel_id ( keys %{ $self->{child_busy} } ) {
+            if ( !$self->{child_busy}->{ $wheel_id } ) {
+                my $last_req = $self->{stats_child}->{ $wheel_id }->{last_req};
+                if ( ( $time - $last_req ) > $idle_timeout ) {
+                    $kernel->call( $_[SESSION], kill_child => $wheel_id );
+                    
+                    DEBUG && warn "watchdog: killing idle worker $wheel_id\n";
+                    
+                    last if $killed++ == $to_kill;
+                }
+            }
+        }
+    }
+    
+    $kernel->delay_set( watchdog => WATCHDOG_TIME );
+}
+
 ### Sprocket server methods
 
 sub local_connected {
@@ -202,6 +303,9 @@
     
     DEBUG && warn "[$con] local_receive: " . dump($input) . "\n";
     
+    # Update stats
+    $self->{stats}->{num_reqs}++;
+    
     if ( $self->{config}->{aio_static} && $input =~ m{^GET /static/([^ \?]+)} ) {
         my $file = $self->{config}->{aio_static_path} . '/' . uri_unescape($1);
         DEBUG && warn "[$con] directly serving static file $file\n";
@@ -256,6 +360,10 @@
     # Mark the child as busy handling this connection
     $self->{child_busy}->{ $wheel_id } = $con;
     
+    # Update stats
+    $self->{stats_child}->{ $wheel_id }->{num_reqs}++;
+    $self->{stats_child}->{ $wheel_id }->{last_req} = time();
+    
     # Link this child to the connection
     $con->x->{handler_id} = $wheel_id;
     
@@ -282,10 +390,7 @@
     
     warn "Child $wheel_id generated $op error $errnum: $errstr\n";
     
-    delete $self->{children}->{ $wheel_id };
-    delete $self->{child_busy}->{ $wheel_id };
-    
-    # XXX: start another child
+    $kernel->call( $_[SESSION], kill_child => $wheel_id );
 }
 
 sub child_closed {
@@ -293,10 +398,7 @@
     
     DEBUG && warn "Child closed: $wheel_id\n";
     
-    delete $self->{children}->{ $wheel_id };
-    delete $self->{child_busy}->{ $wheel_id };
-    
-    # XXX: start another child
+    $kernel->call( $_[SESSION], kill_child => $wheel_id );
 }
 
 sub child_stdout {
@@ -352,6 +454,8 @@
     
     my $wheel_id = delete $con->x->{handler_id};
     
+    return unless $wheel_id;
+    
     $self->{child_busy}->{ $wheel_id } = 0;
 }
 

Modified: trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm
===================================================================
--- trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm	2007-10-26 18:20:36 UTC (rev 7073)
+++ trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket/Worker.pm	2007-10-27 02:58:50 UTC (rev 7074)
@@ -42,6 +42,9 @@
     my $self = __PACKAGE__->new( $config );
     $config->{appclass}->engine( $self );
     
+    # Set process name
+    $0 = $config->{appclass} . ' [Catalyst POE worker]';
+    
     my %copy_of_env = %ENV;
     
     my $inputbuf = '';

Modified: trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket.pm
===================================================================
--- trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket.pm	2007-10-26 18:20:36 UTC (rev 7073)
+++ trunk/Catalyst-Engine-HTTP-Sprocket/lib/Catalyst/Engine/HTTP/Sprocket.pm	2007-10-27 02:58:50 UTC (rev 7074)
@@ -41,11 +41,12 @@
     };
     
     Sprocket::Server->spawn(
-        LogLevel      => ( DEBUG ? 4 : 1 ),
-        Name          => 'Catalyst',
-        ListenPort    => $port,
-        ListenAddress => $addr,
-        Plugins       => [
+        LogLevel       => ( DEBUG ? 4 : 1 ),
+        MaxConnections => 10000,
+        Name           => 'Catalyst',
+        ListenPort     => $port,
+        ListenAddress  => $addr,
+        Plugins        => [
             {
                 Plugin => Catalyst::Engine::HTTP::Sprocket::Server->new(
                     $config,
@@ -54,6 +55,11 @@
         ],
     );
     
+    my $url = "http://$host";
+    $url .= ":$port" unless $port == 80;
+
+    print "You can connect to your server at $url\n";
+    
     POE::Kernel->run;
 }
 




More information about the Catalyst-commits mailing list