[Catalyst-commits] r6850 - in
trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst:
Engine/JobQueue Helper/JobQueue JobQueue
kixx at dev.catalyst.perl.org
kixx at dev.catalyst.perl.org
Thu Sep 6 15:54:57 GMT 2007
Author: kixx
Date: 2007-09-06 15:54:56 +0100 (Thu, 06 Sep 2007)
New Revision: 6850
Modified:
trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm
trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm
trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm
Log:
switch Job object to Class::C3
add support for different schedule types (cron implemented, at planned)
factor out schedule line parsing and scheduler initialization to Job object
Modified: trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm
===================================================================
--- trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm 2007-09-06 13:11:46 UTC (rev 6849)
+++ trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Engine/JobQueue/POE.pm 2007-09-06 14:54:56 UTC (rev 6850)
@@ -2,6 +2,9 @@
use warnings;
use strict;
+
+use 5.008_001;
+
use base 'Catalyst::Engine::CGI';
use Carp;
use Data::Dumper;
@@ -12,9 +15,6 @@
use Scalar::Util qw/refaddr/;
use POE;
-use POE::Component::Cron;
-use DateTime::Event::Cron;
-use DateTime::Event::Random;
use Catalyst::Exception;
use Catalyst::JobQueue::Job;
@@ -38,7 +38,7 @@
sub CONFIG_DEFAULTS {
{
render => { to => [qw/log/] },
- schedule_file => 'crontab',
+ schedule_file => { cron => 'crontab' },
}
}
@@ -129,35 +129,29 @@
DEBUG && print "Job Queue started\n";
DEBUG && print Dumper($self->{config});
- my $schedule_file = exists $self->{config}->{schedule_file} ? $self->{config}{schedule_file} : CONFIG_DEFAULTS()->{schedule_file};
- my $file = substr($schedule_file, 0, 1) eq "/" ? $schedule_file : File::Spec->catfile($self->{config}->{home}, $schedule_file);
- DEBUG && print "Parsing cron file $file\n";
- if (-e $file) {
- my $job_list = _parse_crontab($file);
- $self->{jobs} = { map { $_->ID => $_ } @{$job_list} };
+
+ # Instantiate job objects
+ my $schedule_file_cfg = exists $self->{config}->{schedule_file} ? $self->{config}{schedule_file} : CONFIG_DEFAULTS()->{schedule_file};
+ foreach my $sched_type (keys %{$schedule_file_cfg}) {
+ my $schedule_file = $schedule_file_cfg->{$sched_type};
+ my $file = substr($schedule_file, 0, 1) eq "/" ? $schedule_file : File::Spec->catfile($self->{config}->{home}, $schedule_file);
+ DEBUG && print "Parsing $sched_type file $file\n";
+ if (-e $file) {
+ my $job_list = _parse_schedule_file( $sched_type, $file);
+ # $self->{jobs} = { map { $_->ID => $_ } @{$job_list} };
+ $self->{jobs}->{ $_->ID } = $_ foreach @{$job_list};
+ }
+ else {
+ Catalyst::Exception->throw( message => qq/Cannot find schedule file "$file"/ );
+ }
}
- else {
- Catalyst::Exception->throw( message => qq/Cannot find schedule file "$file"/ );
- }
+ DEBUG && print Dumper($self->{jobs});
- DEBUG && print Dumper($self->{jobs});
+ # Start jobs
foreach my $jobid (keys %{$self->{jobs}}) {
DEBUG && print "Starting job $jobid\n";
my $job = $self->{jobs}->{$jobid};
- $job->scheduler(
- POE::Component::Cron->add(
- $session,
- 'run_job',
- DateTime::Event::Cron->from_cron($job->cronspec)->iterator(
- span =>
- DateTime::Span->from_datetimes(
- start => DateTime->now,
- end => DateTime::Infinite::Future->new,
- ),
- ),
- $job->ID,
- )
- );
+ $job->start( $session, 'run_job' );
DEBUG && print "Job ID: ", $job->ID, "\n Data: " , Dumper($job);
}
@@ -226,8 +220,7 @@
$poe_kernel->yield( 'prepare_done', $ID );
- # Wait until all prepare processing has completed, or we will return too
- # early
+ # Wait until all prepare processing has completed, or we will return too early
while ( !$job->flags->{_prepare_done} ) {
$poe_kernel->run_one_timeslice();
}
@@ -266,7 +259,8 @@
DEBUG && warn "[Job $ID] - $method\n";
my $job = $self->{jobs}->{$ID};
-
+
+ # TODO integrate NEXT localization from C::E::H::POE for subrequests if neccessary
{
local (*ENV) = $job->env;
$job->context->$method();
@@ -422,6 +416,18 @@
}
+sub _parse_schedule_file
+{
+ my ($type, $filename) = @_;
+
+ if ($type eq 'cron') {
+ return _parse_crontab($filename);
+ }
+ else {
+ Catalyst::Exception->throw( message => qq/Unsupported schedule type $type/ );
+ }
+}
+
sub _parse_crontab
{
my $filename = shift;
@@ -437,12 +443,7 @@
$line =~ s/^\s+//;
$line =~ s/\s+$//;
next unless length $line;
- my @cron_line = split(/\s+/, $line);
- $job = Catalyst::JobQueue::Job->new({
- cronspec => join(' ', splice (@cron_line, 0, 5)),
- user => shift @cron_line,
- request => \@cron_line,
- });
+ $job = Catalyst::JobQueue::Job->new({ spec => $line, type => 'cron' });
push @cron_entries, $job;
}
return \@cron_entries;
Modified: trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm
===================================================================
--- trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm 2007-09-06 13:11:46 UTC (rev 6849)
+++ trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/Helper/JobQueue/POE.pm 2007-09-06 14:54:56 UTC (rev 6850)
@@ -116,6 +116,8 @@
require [% app %];
+Class::C3::initialize();
+
[% app %]->run( {
argv => \@argv,
'fork' => $fork,
Modified: trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm
===================================================================
--- trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm 2007-09-06 13:11:46 UTC (rev 6849)
+++ trunk/Catalyst-Engine-JobQueue-POE/lib/Catalyst/JobQueue/Job.pm 2007-09-06 14:54:56 UTC (rev 6850)
@@ -3,29 +3,66 @@
use strict;
use warnings;
+use DateTime::Event::Cron;
+use DateTime::Set;
+use DateTime::Span;
+use POE::Component::Cron;
+
+use Class::C3;
use base qw/Class::Accessor::Fast/;
__PACKAGE__->mk_accessors(
- qw/ID cronspec user request last_status context env flags scheduler/
+ qw/ID user request last_status context env flags scheduler iterator schedule type spec/
);
use Carp;
-use NEXT;
use Scalar::Util qw/refaddr/;
-use version; our $VERSION = qv('0.0.1');
+use version; our $VERSION = qv('0.1.0');
sub DEBUG { $ENV{CATALYST_DEBUG} || 0; }
sub new {
my ($self, @args) = @_;
- $self = $self->NEXT::new( @args );
+ $self = $self->next::method( @args );
$self->ID( refaddr($self) );
$self->flags( {} ) unless $self->flags;
+
+ if ($self->type and $self->spec) {
+ $self->parse_spec($self->type, $self->spec);
+ }
+
return $self;
}
+sub parse_spec
+{
+ my ($self, $type, $spec) = @_;
+
+ $self->type($type);
+ $self->spec($spec);
+
+ my $parser = "_parse_${type}_spec";
+ if ($self->can($parser)) {
+ return $self->$parser($spec);
+ }
+ else {
+ # TODO use Catalyst::Exception
+ die "No parser defined for $type";
+ }
+
+}
+
+sub start
+{
+ my ($self, $session, $handler_event) = @_;
+
+ $self->scheduler(
+ POE::Component::Cron->add($session, $handler_event, $self->iterator, $self->ID)
+ );
+}
+
sub cleanup
{
my $self = shift;
@@ -37,6 +74,28 @@
$self->flags( {} );
}
+sub _parse_cron_spec
+{
+ my ($self, $spec) = @_;
+
+ my @cron_spec = split(/\s+/, $spec);
+ $self->schedule( join(' ', splice (@cron_spec, 0, 5)) );
+ $self->user( shift @cron_spec );
+ $self->request( \@cron_spec );
+
+ # build iterator
+ $self->iterator(
+ DateTime::Event::Cron->from_cron( $self->schedule )->iterator(
+ span =>
+ DateTime::Span->from_datetimes(
+ start => DateTime->now,
+ end => DateTime::Infinite::Future->new,
+ ),
+ ),
+ );
+
+}
+
1; # Magic true value required at end of module
__END__
More information about the Catalyst-commits
mailing list