Skip to content

Commit

Permalink
deep symbolize keys on messages
Browse files Browse the repository at this point in the history
  • Loading branch information
svenfuchs committed Jul 17, 2017
1 parent c1b0fd1 commit 574486e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 27 deletions.
5 changes: 3 additions & 2 deletions lib/travis/hub/handler.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
require 'travis/hub/helper/context'
require 'travis/hub/helper/hash'
require 'travis/hub/helper/string'
require 'travis/hub/service'

module Travis
module Hub
class Handler
include Helper::Context, Helper::String
include Helper::Context, Helper::Hash, Helper::String

attr_reader :context, :type, :event, :payload, :object

Expand Down Expand Up @@ -46,7 +47,7 @@ def normalize_event(event)
end

def normalize_payload(payload)
payload = payload.symbolize_keys
payload = deep_symbolize_keys(payload)
payload = normalize_state(payload)
normalize_timestamps(payload)
end
Expand Down
21 changes: 21 additions & 0 deletions lib/travis/hub/helper/hash.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Travis
module Hub
module Helper
module Hash
def deep_symbolize_keys(hash)
hash.map do |key, obj|
obj = case obj
when Array
obj.map { |obj| deep_symbolize_keys(obj) }
when ::Hash
deep_symbolize_keys(obj)
else
obj
end
[key.to_sym, obj]
end.to_h
end
end
end
end
end
53 changes: 34 additions & 19 deletions lib/travis/hub/service/state_update.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
module Travis
module Hub
module Service
class StateUpdate < Struct.new(:event, :data)
class StateUpdate < Struct.new(:event, :data, :block)
class Counter < Struct.new(:job_id, :redis)
TTL = 3600 * 12
TTL = 3600 * 24

def count
@count ||= redis.get(key).to_i
end

def increment
count = redis.incr(key)
def store(count)
redis.set(key, count)
redis.expire(key, TTL)
count
end

private
Expand All @@ -29,47 +28,63 @@ def key
MSGS = {
missing: 'Received state update (%p) with no count for job id=%p, last known count: %p.',
ordered: 'Received state update %p (%p) for job id=%p, last known count: %p',
unordered: 'Received state update %p (%p) for job id=%p, last known count: %p. %s',
skip: 'Skipping the message.'
unordered: 'Received state update %p (%p) for job id=%p, last known count: %p. Skipping the message.',
}

def apply?
return true if out_of_band? # TODO we need to increment the counter here, don't we?
return missing unless given?
apply = ordered? ? ordered : unordered
return true unless ENV['UPDATE_COUNT']
apply
def apply
if !enabled? || out_of_band?
call
elsif missing?
missing
elsif ordered?
ordered
else
unordered
end
end

private

def call
block.call
end

def enabled?
!!ENV['UPDATE_COUNT']
end

def out_of_band?
OUT_OF_BAND.include?(event)
end

def given?
!count.nil?
def missing?
count.nil?
end

def missing
warn :missing, event, job_id, counter.count
true
call
store
end

def ordered
info :ordered, count, event, job_id, counter.count
true
call
store
end

def unordered
warn :unordered, count, event, job_id, counter.count, ENV['UPDATE_COUNT'] ? MSGS[:skip] : ''
false
warn :unordered, count, event, job_id, counter.count
end

def ordered?
count >= counter.count
end

def store
counter.store(count)
end

def counter
@counter ||= Counter.new(job_id, redis)
end
Expand Down
11 changes: 6 additions & 5 deletions lib/travis/hub/service/update_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ class UpdateJob < Struct.new(:event, :data)
def run
exclusive do
validate
update_job
notify
with_state_update do
update_job
notify
end
end
end
instrument :run
Expand All @@ -38,7 +40,6 @@ def job
private

def update_job
return skipped unless apply_state_update?
return error_job if event == :reset && resets.limited? && !job.finished?
return recancel if recancel?
return skipped if skip_canceled?
Expand Down Expand Up @@ -74,8 +75,8 @@ def skipped
warn :skipped, event, job.id, job.state, data[:state], data
end

def apply_state_update?
StateUpdate.new(context, event, data).apply?
def with_state_update(&block)
StateUpdate.new(context, event, data, block).apply
end

def resets
Expand Down
1 change: 0 additions & 1 deletion spec/travis/hub/service/update_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def recieve(msg)

it { expect(job.reload.state).to eq :received }
it { expect(log).to include "W Received state update 2 (:start) for job id=#{job.id}, last known count: 3. Skipping the message." }
it { expect(log).to include "W Skipped event job:start for <Job id=#{job.id}> trying to update state from :received to :started" }
end
end
end
Expand Down

0 comments on commit 574486e

Please sign in to comment.