Skip to content

Commit

Permalink
add support for create log store
Browse files Browse the repository at this point in the history
  • Loading branch information
谭林华 committed Apr 12, 2017
1 parent 04d5937 commit 126c95b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ _testmain.go
*.test
*.prof
bin/
docker-image/pilot
docker-images/pilot
2 changes: 1 addition & 1 deletion docker-images/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN apk update && apk upgrade && \
gem install fluentd -v "~> 0.12.0" --no-ri --no-rdoc && \
gem install fluent-plugin-elasticsearch --no-ri --no-rdoc && \
gem install gelf -v "~> 3.0.0" --no-ri --no-rdoc && \
gem install aliyun_sls_sdk -v ">=0.0.5" --no-ri --no-rdoc && \
gem install aliyun_sls_sdk -v ">=0.0.9" --no-ri --no-rdoc && \
apk del build-base ruby-dev && \
rm -rf /root/.gem

Expand Down
13 changes: 8 additions & 5 deletions docker-images/config.default
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ cat >> $FLUENTD_CONFIG << EOF
<match docker.**>
@type aliyun_sls
project $ALIYUNSLS_PROJECT
region_endpoint $ALIYUNSLS_REGION_ENDPOINT
access_key_id $ALIYUNSLS_ACCESS_KEY_ID
access_key_secret $ALIYUNSLS_ACCESS_KEY_SECRET
ssl_verify $SSL_VERIFY
project $ALIYUNSLS_PROJECT
region_endpoint $ALIYUNSLS_REGION_ENDPOINT
access_key_id $ALIYUNSLS_ACCESS_KEY_ID
access_key_secret $ALIYUNSLS_ACCESS_KEY_SECRET
ssl_verify $SSL_VERIFY
need_create_logstore $ALIYUNSLS_NEED_CREATE_LOGSTORE
create_logstore_ttl $CREATE_LOGSTORE_TTL
create_logstore_shard_count $CREATE_LOGSTORE_SHARD_COUNT
flush_interval 3s
</match>
Expand Down
84 changes: 59 additions & 25 deletions docker-images/plugins/out_aliyun_sls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ class AliyunSlsOutput < BufferedOutput
config_param :access_key_id, :string, :default => nil
config_param :access_key_secret, :string, :default => nil
config_param :ssl_verify, :bool, :default => false
config_param :need_create_logstore, :bool, :default => false
config_param :create_logstore_ttl, :integer, :default => 1
config_param :create_logstore_shard_count, :integer, :default => 2

def initialize
super
require "aliyun_sls_sdk/protobuf"
require "aliyun_sls_sdk/connection"
require "aliyun_sls_sdk"
@log_store_created = false
end

def configure(conf)
Expand All @@ -38,29 +42,75 @@ def format(tag, time, record)

def client
@topic = `hostname`.strip
@_sls_con ||= AliyunSlsSdk::Connection.new(@project, @region_endpoint, @access_key_id, @access_key_secret)
@_sls_con ||= AliyunSlsSdk::LogClient.new(@region_endpoint, @access_key_id, @access_key_secret, @ssl_verify)
end

def createLogStore(logstore_name)
begin
getStoreResp = client.get_logstore(@project, logstore_name)
rescue AliyunSlsSdk::LogException => e
if e.errorCode == "LogStoreNotExist"
retries = 2
begin
createLogStoreResp = logClient.create_logstore(@project, logstore_name, @create_logstore_ttl, @create_logstore_shard_count)
rescue AliyunSlsSdk::LogException => e
if e.errorCode == "LogstoreAlreadyExist"
log.warn "logstore #{logstore_name} already exist"
else
raise
end
rescue => e
if retries > 0
log.warn "Error caught when creating logs store: #{e}"
retries -= 1
retry
end
end
end
end
end

def getLogItem(record)
contents = {}
record.each { |k, v|
contents[k] = v
}
AliyunSlsSdk::LogItem.new(nil, contents)
end

def write(chunk)
log_list_hash = {}
chunk.msgpack_each do |tag, time, record|
if record and record["@target"]
logStoreName = record["@target"]
if not @log_store_created
if @need_create_logstore
createLogStore(logStoreName)
@log_store_created = true
else
@log_store_created = true
end
end
record.delete("@target")
if not log_list_hash[logStoreName]
log_list = AliyunSlsSdk::Protobuf::LogGroup.new(:logs => [], :topic => @topic, :source => @source)
log_list_hash[logStoreName] = log_list
logItems = []
log_list_hash[logStoreName] = logItems
end
log = AliyunSlsSdk::Protobuf::Log.new(:time => Time.now.to_i, :contents => [])
pack_log_item(log_list_hash[logStoreName], log, record)
log_list_hash[logStoreName] << getLogItem(record)
else
log.warn "no @target key in record: #{record}, tag: #{tag}, time: #{time}"
end
end
log_list_hash.each do |storeName, log_list|

puts log_list_hash.to_s
log_list_hash.each do |storeName, logitems|
puts storeName
puts logitems
putLogRequest = AliyunSlsSdk::PutLogsRequest.new(@project, storeName, @topic, nil, logitems, nil, true)
retries = 2
begin
client.puts_logs(storeName, log_list, @ssl_verify)
rescue Exception => e
client.put_logs(putLogRequest)
rescue => e
if retries > 0
log.warn "\tCaught in puts logs: #{e}"
client.http.shutdown
Expand All @@ -72,21 +122,5 @@ def write(chunk)
end
end
end

private

def pack_log_item(log_list, log, record)
pack_hash_log_item(log, record)
log_list.logs << log
end

def pack_hash_log_item(log, hash)
if hash
hash.each { |k, v|
log_item = AliyunSlsSdk::Protobuf::Log::Content.new(:key => k, :value => v || "")
log.contents << log_item
}
end
end
end
end

0 comments on commit 126c95b

Please sign in to comment.