diff --git a/lib/travis/hub/handler.rb b/lib/travis/hub/handler.rb index ee2f8bda1..6321490d4 100644 --- a/lib/travis/hub/handler.rb +++ b/lib/travis/hub/handler.rb @@ -1,11 +1,12 @@ require 'travis/hub/helper/context' +require 'travis/hub/helper/hash' require 'travis/hub/helper/string' require 'travis/hub/service' module Travis module Hub class Handler - include Helper::Context, Helper::String + include Helper::Context, Helper::Hash, Helper::String attr_reader :context, :type, :event, :payload, :object @@ -46,7 +47,7 @@ def normalize_event(event) end def normalize_payload(payload) - payload = payload.symbolize_keys + payload = deep_symbolize_keys(payload) payload = normalize_state(payload) normalize_timestamps(payload) end diff --git a/lib/travis/hub/helper/hash.rb b/lib/travis/hub/helper/hash.rb new file mode 100644 index 000000000..78d1507a3 --- /dev/null +++ b/lib/travis/hub/helper/hash.rb @@ -0,0 +1,21 @@ +module Travis + module Hub + module Helper + module Hash + def deep_symbolize_keys(hash) + hash.map do |key, obj| + obj = case obj + when Array + obj.map { |obj| deep_symbolize_keys(obj) } + when ::Hash + deep_symbolize_keys(obj) + else + obj + end + [key.to_sym, obj] + end.to_h + end + end + end + end +end diff --git a/lib/travis/hub/service/state_update.rb b/lib/travis/hub/service/state_update.rb index 82ccebcc3..cd4051429 100644 --- a/lib/travis/hub/service/state_update.rb +++ b/lib/travis/hub/service/state_update.rb @@ -1,18 +1,17 @@ module Travis module Hub module Service - class StateUpdate < Struct.new(:event, :data) + class StateUpdate < Struct.new(:event, :data, :block) class Counter < Struct.new(:job_id, :redis) - TTL = 3600 * 12 + TTL = 3600 * 24 def count @count ||= redis.get(key).to_i end - def increment - count = redis.incr(key) + def store(count) + redis.set(key, count) redis.expire(key, TTL) - count end private @@ -29,47 +28,63 @@ def key MSGS = { missing: 'Received state update (%p) with no count for job id=%p, last known count: %p.', ordered: 'Received state update %p (%p) for job id=%p, last known count: %p', - unordered: 'Received state update %p (%p) for job id=%p, last known count: %p. %s', - skip: 'Skipping the message.' + unordered: 'Received state update %p (%p) for job id=%p, last known count: %p. Skipping the message.', } - def apply? - return true if out_of_band? # TODO we need to increment the counter here, don't we? - return missing unless given? - apply = ordered? ? ordered : unordered - return true unless ENV['UPDATE_COUNT'] - apply + def apply + if !enabled? || out_of_band? + call + elsif missing? + missing + elsif ordered? + ordered + else + unordered + end end private + def call + block.call + end + + def enabled? + !!ENV['UPDATE_COUNT'] + end + def out_of_band? OUT_OF_BAND.include?(event) end - def given? - !count.nil? + def missing? + count.nil? end def missing warn :missing, event, job_id, counter.count - true + call + store end def ordered info :ordered, count, event, job_id, counter.count - true + call + store end def unordered - warn :unordered, count, event, job_id, counter.count, ENV['UPDATE_COUNT'] ? MSGS[:skip] : '' - false + warn :unordered, count, event, job_id, counter.count end def ordered? count >= counter.count end + def store + counter.store(count) + end + def counter @counter ||= Counter.new(job_id, redis) end diff --git a/lib/travis/hub/service/update_job.rb b/lib/travis/hub/service/update_job.rb index 46ffdb964..278123096 100644 --- a/lib/travis/hub/service/update_job.rb +++ b/lib/travis/hub/service/update_job.rb @@ -23,8 +23,10 @@ class UpdateJob < Struct.new(:event, :data) def run exclusive do validate - update_job - notify + with_state_update do + update_job + notify + end end end instrument :run @@ -38,7 +40,6 @@ def job private def update_job - return skipped unless apply_state_update? return error_job if event == :reset && resets.limited? && !job.finished? return recancel if recancel? return skipped if skip_canceled? @@ -74,8 +75,8 @@ def skipped warn :skipped, event, job.id, job.state, data[:state], data end - def apply_state_update? - StateUpdate.new(context, event, data).apply? + def with_state_update(&block) + StateUpdate.new(context, event, data, block).apply end def resets diff --git a/spec/travis/hub/service/update_job_spec.rb b/spec/travis/hub/service/update_job_spec.rb index 96262921d..0f0e77427 100644 --- a/spec/travis/hub/service/update_job_spec.rb +++ b/spec/travis/hub/service/update_job_spec.rb @@ -270,7 +270,6 @@ def recieve(msg) it { expect(job.reload.state).to eq :received } it { expect(log).to include "W Received state update 2 (:start) for job id=#{job.id}, last known count: 3. Skipping the message." } - it { expect(log).to include "W Skipped event job:start for trying to update state from :received to :started" } end end end