[Catalyst-commits] r6850 - in trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst: Engine/JobQueue Helper/JobQueue JobQueue

kixx at dev.catalyst.perl.org kixx at dev.catalyst.perl.org
Thu Sep 6 15:54:57 GMT 2007


Author: kixx
Date: 2007-09-06 15:54:56 +0100 (Thu, 06 Sep 2007)
New Revision: 6850

Modified:
   trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm
   trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm
   trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm
Log:
switch Job object to Class::C3
add support for different schedule types (cron implemented, at planned)
factor out schedule line parsing and scheduler initialization to Job object


Modified: trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm
===================================================================
--- trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm	2007-09-06 13:11:46 UTC (rev 6849)
+++ trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm	2007-09-06 14:54:56 UTC (rev 6850)
@@ -2,6 +2,9 @@
 
 use warnings;
 use strict;
+
+use 5.008_001;
+
 use base 'Catalyst::Engine::CGI';
 use Carp;
 use Data::Dumper;
@@ -12,9 +15,6 @@
 use Scalar::Util qw/refaddr/;
 
 use POE;
-use POE::Component::Cron;
-use DateTime::Event::Cron;
-use DateTime::Event::Random;
 
 use Catalyst::Exception;
 use Catalyst::JobQueue::Job;
@@ -38,7 +38,7 @@
 sub CONFIG_DEFAULTS {
     {
         render => { to => [qw/log/] },
-        schedule_file => 'crontab',
+        schedule_file => { cron => 'crontab' },
     }
 }
 
@@ -129,35 +129,29 @@
 
     DEBUG && print "Job Queue started\n";
     DEBUG && print Dumper($self->{config});
-    my $schedule_file = exists $self->{config}->{schedule_file} ?  $self->{config}{schedule_file} : CONFIG_DEFAULTS()->{schedule_file};
-    my $file = substr($schedule_file, 0, 1) eq "/" ? $schedule_file : File::Spec->catfile($self->{config}->{home}, $schedule_file);
-    DEBUG && print "Parsing cron file $file\n";
-    if (-e $file) {
-        my $job_list = _parse_crontab($file);
-        $self->{jobs} = { map { $_->ID => $_ } @{$job_list} };
+    
+    # Instantiate job objects
+    my $schedule_file_cfg = exists $self->{config}->{schedule_file} ?  $self->{config}{schedule_file} : CONFIG_DEFAULTS()->{schedule_file};
+    foreach my $sched_type (keys %{$schedule_file_cfg}) {
+        my $schedule_file = $schedule_file_cfg->{$sched_type};
+        my $file = substr($schedule_file, 0, 1) eq "/" ? $schedule_file : File::Spec->catfile($self->{config}->{home}, $schedule_file);
+        DEBUG && print "Parsing $sched_type file $file\n";
+        if (-e $file) {
+            my $job_list = _parse_schedule_file( $sched_type, $file);
+            # $self->{jobs} = { map { $_->ID => $_ } @{$job_list} };
+            $self->{jobs}->{ $_->ID } = $_ foreach @{$job_list};
+        }
+        else {
+            Catalyst::Exception->throw( message => qq/Cannot find schedule file "$file"/ );
+        }
     }
-    else {
-        Catalyst::Exception->throw( message => qq/Cannot find schedule file "$file"/ );
-    }
+    DEBUG && print Dumper($self->{jobs});
 
-    DEBUG && print Dumper($self->{jobs});
+    # Start jobs
     foreach my $jobid (keys %{$self->{jobs}}) {
         DEBUG && print "Starting job $jobid\n";
         my $job = $self->{jobs}->{$jobid};
-        $job->scheduler( 
-            POE::Component::Cron->add(
-                $session,
-                'run_job',
-                DateTime::Event::Cron->from_cron($job->cronspec)->iterator( 
-                    span =>
-                    DateTime::Span->from_datetimes( 
-                        start => DateTime->now,
-                        end   => DateTime::Infinite::Future->new,
-                    ),
-                ),
-                $job->ID,
-            )
-        );
+        $job->start( $session, 'run_job' );
         DEBUG && print "Job ID: ", $job->ID, "\n Data: " , Dumper($job);
     }
     
@@ -226,8 +220,7 @@
 
     $poe_kernel->yield( 'prepare_done', $ID );
 
-    # Wait until all prepare processing has completed, or we will return too
-    # early
+    # Wait until all prepare processing has completed, or we will return too early
     while ( !$job->flags->{_prepare_done} ) {
         $poe_kernel->run_one_timeslice();
     } 
@@ -266,7 +259,8 @@
 
     DEBUG && warn "[Job $ID] - $method\n";
     my $job = $self->{jobs}->{$ID};
-    
+   
+    # TODO integrate NEXT localization from C::E::H::POE for subrequests if neccessary
     {
         local (*ENV) = $job->env;
         $job->context->$method();
@@ -422,6 +416,18 @@
 
 }
 
+sub _parse_schedule_file
+{
+	my ($type, $filename) = @_;
+
+	if ($type eq 'cron') {
+		return _parse_crontab($filename);
+	}
+	else {
+		Catalyst::Exception->throw( message => qq/Unsupported schedule type $type/ );
+	}
+}
+
 sub _parse_crontab
 {
     my $filename = shift;
@@ -437,12 +443,7 @@
         $line =~ s/^\s+//;
         $line =~ s/\s+$//;
         next unless length $line;
-        my @cron_line = split(/\s+/, $line);
-        $job = Catalyst::JobQueue::Job->new({
-            cronspec => join(' ', splice (@cron_line, 0, 5)),
-            user     => shift @cron_line,
-            request  => \@cron_line,
-        });
+        $job = Catalyst::JobQueue::Job->new({ spec => $line, type => 'cron' });
         push @cron_entries, $job;
     }
     return \@cron_entries;

Modified: trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm
===================================================================
--- trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm	2007-09-06 13:11:46 UTC (rev 6849)
+++ trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm	2007-09-06 14:54:56 UTC (rev 6850)
@@ -116,6 +116,8 @@
 
 require [% app %];
 
+Class::C3::initialize();
+
 [% app %]->run( {
     argv        => \@argv,
     'fork'      => $fork,

Modified: trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm
===================================================================
--- trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm	2007-09-06 13:11:46 UTC (rev 6849)
+++ trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm	2007-09-06 14:54:56 UTC (rev 6850)
@@ -3,29 +3,66 @@
 use strict;
 use warnings;
 
+use DateTime::Event::Cron;
+use DateTime::Set;
+use DateTime::Span;
+use POE::Component::Cron;
+
+use Class::C3;
 use base qw/Class::Accessor::Fast/;
 
 __PACKAGE__->mk_accessors( 
-    qw/ID cronspec user request last_status context env flags scheduler/ 
+    qw/ID user request last_status context env flags scheduler iterator schedule type spec/ 
 );
 
 use Carp;
-use NEXT;
 use Scalar::Util qw/refaddr/;
 
-use version; our $VERSION = qv('0.0.1');
+use version; our $VERSION = qv('0.1.0');
 
 sub DEBUG { $ENV{CATALYST_DEBUG} || 0; }
 
 sub new {
     my ($self, @args) = @_;
 
-    $self = $self->NEXT::new( @args );
+    $self = $self->next::method( @args );
     $self->ID( refaddr($self) );
     $self->flags( {} ) unless $self->flags;
+
+    if ($self->type and $self->spec) {
+    	$self->parse_spec($self->type, $self->spec);
+    }
+    
     return $self;
 }
 
+sub parse_spec
+{
+    my ($self, $type, $spec) = @_;
+    
+    $self->type($type);
+    $self->spec($spec);
+
+    my $parser = "_parse_${type}_spec";
+    if ($self->can($parser)) {
+        return $self->$parser($spec);
+    }
+    else {
+        # TODO use Catalyst::Exception
+    	die "No parser defined for $type";
+    }
+
+}
+
+sub start
+{
+	my ($self, $session, $handler_event) = @_;
+
+    $self->scheduler(
+        POE::Component::Cron->add($session, $handler_event, $self->iterator, $self->ID)
+    );
+}
+
 sub cleanup
 {
     my $self = shift;
@@ -37,6 +74,28 @@
     $self->flags( {} );
 }
 
+sub _parse_cron_spec
+{
+	my ($self, $spec) = @_;
+
+    my @cron_spec = split(/\s+/, $spec);
+    $self->schedule( join(' ', splice (@cron_spec, 0, 5)) );
+    $self->user( shift @cron_spec );
+    $self->request( \@cron_spec );
+
+    # build iterator
+    $self->iterator(
+        DateTime::Event::Cron->from_cron( $self->schedule )->iterator(
+            span =>
+                DateTime::Span->from_datetimes( 
+                    start => DateTime->now,
+                    end   => DateTime::Infinite::Future->new,
+                ),
+        ),
+    );
+	
+}
+
 1; # Magic true value required at end of module
 __END__
 




More information about the Catalyst-commits mailing list