delayed_job is a process based asynchronous task processing gem which can be ran at background. It will fork the specified number of processes to execute the tasks asynchronously. The task status is usually stored in the database so that it can be easily integrated into a Rails application where asynchronous job execution is desired.
Normally when a job fails to execute or error occurs, it would save the error into the database with the column last_error. Ideally all these will be handled by the delayed_job gem itself, we no need to worry about anything. However, in some extreme cases, we would want to update the last_error. For example, the last_error contains some characters which the database cannot handle and that would kill the delayed_job process. How can we update last_error before saving it to database? In this post, we would provide a solution by using delayed job plugin.
When delayed job is started, it would fork a process in the background and polls the delayed_job database table and work off any available job. When a job is reserved, it would start to run the job and lock the corresponding job record in the database. The record will be deleted if a job is successfully performed. But if an error is occurring, the job would be requeued and worked on in a later time and its error would be updated in the database. The error would normally contain the reason why the job fails and the stacktrace if there is any and also the error from the low level command executed. Since the error would come from various sources, it would probably contain something whose encoding is not the same as that of the database and therefore some special characters would cause database error which in turn causes the delayed job to die.
The error would look like: when saving the last_error to database
default_worker.10 | ArgumentError: string contains null byte
To prevent the delayed job from dying, there would be a few solutions to resolve this, like updating encoding of the database which is risky though. Or is there a way we can process the error message before it is saved to the database? Probably, but the challenge is that delayed job is working by itself which doesn't need any other coding besides the normal initialization and configuration. Fortunately, delayed job allows plugins which can be invoked and affects the global behavior of delayed jobs while different actions are being performed. These plugins can define callbacks and hooks which would be called when various events happen.
The callbacks and hooks spread in the whole lifecycle of a job, this gives us the chance to inject something when it's trying to do something. There are a few events in the lifecycle of a job. According to its source code, we could see below events:
EVENTS = { :enqueue => [:job], :execute => [:worker], :loop => [:worker], :perform => [:worker, :job], :error => [:worker, :job], :failure => [:worker, :job], :invoke_job => [:job] }.freeze
Here the events interest us would be error or failure, but after digging into its source code, we could see below code:
def run(job) job_say job, 'RUNNING' runtime = Benchmark.realtime do Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job } job.destroy end job_say job, format('COMPLETED after %.4f', runtime) return true # did work rescue DeserializationError => error job_say job, "FAILED permanently with #{error.class.name}: #{error.message}", 'error' job.error = error failed(job) rescue Exception => error # rubocop:disable RescueException self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) } return false # work failed end # Reschedule the job in the future (when a job fails). # Uses an exponential scale depending on the number of failed attempts. def reschedule(job, time = nil) if (job.attempts += 1) < max_attempts(job) time ||= job.reschedule_at job.run_at = time job.unlock job.save! else job_say job, "REMOVED permanently because of #{job.attempts} consecutive failures", 'error' failed(job) end end def failed(job) self.class.lifecycle.run_callbacks(:failure, self, job) do begin job.hook(:failure) rescue => error say "Error when running failure callback: #{error}", 'error' say error.backtrace.join("\n"), 'error' ensure job.destroy_failed_jobs? ? job.destroy : job.fail! end end end
In run(), when the job failed, it would either call failed() or run the callback with event error. And the error callback is going to reschedule the job and finally calls failed() if maximum attempts exceed and still fails. Hence we should focus on the failure event, the default callback behavior for failure event is it would save the error into the database(job.fail!) if it's not configured to destroy the job when failing.
So we will need to do something while running the failure callback. We have two options here, one is to write a job specific hook which will be called when job.hook(:failure) is called, the other one is a global option which is using delayed job plugin. Next, we will work on the global option. In the run_callbacks() method, we would luckily find it defines three hooks as well - before, around or after. This means there are three places we can do something around an event
- Before the event behavior is performed
- Around the event behavior
- After the event behavior is performed
See below the Callback code:
class Callback def initialize @before = [] @after = [] # Identity proc. Avoids special cases when there is no existing around chain. @around = lambda { |*args, &block| block.call(*args) } end def execute(*args, &block) @before.each { |c| c.call(*args) } result = @around.call(*args, &block) @after.each { |c| c.call(*args) } result end def add(type, &callback) case type when :before @before << callback when :after @after << callback when :around chain = @around # use a local variable so that the current chain is closed over in the following lambda @around = lambda { |*a, &block| chain.call(*a) { |*b| callback.call(*b, &block) } } else raise InvalidCallback, "Invalid callback type: #{type}" end end end
To update the error message, we would choose to do it when around hook is called. Since the job which contains the error message is available when the around hook is invoked and we would just update the error here and then proceed with the original behavior. It's just like intercepting something and update it and then proceed.
The plugin would look something like:
require 'delayed_job' class ErrorDelayedJobPlugin < Delayed::Plugin def self.update_last_error(event, job) begin unless job.last_error.nil? job.last_error = job.last_error.gsub("\u0000", '') # Replace null byte job.last_error = job.last_error.encode('UTF-8', invalid: :replace, undef: :replace, replace: '') end rescue => e end end callbacks do |lifecycle| lifecycle.around(:failure) do |worker, job, *args, &block| update_last_error(:around_failure, job) block.call(worker, job) end end end
From above code, we can see there are four arguments in the do block. The job is the running job which contains the error, and the &block is a reference to the default block to be ran which is to save the data to the database. From the implementation, what it does is when the failure event callback is ran, it first updates the last_error by substituting the null byte with an empty string. Thereafter, it continues the normal behavior of saving the error into database.
To enable the plugin, register it in app/config/initializers/delayed_job.rb. Adding below into the delayed_job.rb.
require 'delayed_job/error_delayed_job_plugin' # Path of the plugin file Delayed::Worker.plugins << ErrorDelayedJobPlugin
In this way, we can manipulate what we want to store in the delayed_job table. This can be applied to update other columns or doing other things with delayed job plugins.