前からやろうやろうと思ってなかなか手をつけられずにいたのですが、やっと実装しました。
(エラーハンドリングとかまだだけど)
Postfix に限らず SMTP サーバーのログは from と to などが別の行として出力されます。
そして、各行ごとに出力される項目が異なります。from の行には size と nrcpt など。to には relay 先、delay, delays, status など。接続元や message-id はまた別の行です。
そこで IOS ビッグデータ技術ブログ: Postfixのログをfluentdを使ってTreasureDataに送る を見つけてとりあえず fluent-plugin-multi-format-parser を使って elasticsearch に送って配信エラー監視したり kibana で確認したりしてましたが、to の行で bounced になってるのを見て from は何かなと思ったら queue id から from を検索して確認するのがかなり面倒なのでやっぱりこれは1行にまとめなければと。
大雑把には to 以外のデータを redis なり memcached に登録しておいて to でそれを引っ張りだして返せば良いわけだけども、どうやって実装するのかなと。
最初は Filter プラグインで実装しようかと考えていましたが redis なり memcached なりを使うとするとそこでエラーになる可能性があるため、エラーの場合にリトライできるように BufferedOutput で実装を始めました。が、処理した結果をまた fluentd に戻す方法がわからなくて、困ってしまいました。どうしようかなって fluentd のリポジトリを眺めているうちに out_exec_filter というものがあることに気づいてこれだ!!ということで実装しました。
エラーハンドリングがないけど
#!/opt/td-agent/embedded/bin/ruby require 'redis' require 'json' redis = Redis.new while line = STDIN.gets record = JSON.load(line) if record.has_key?('from') # time は to のもので上書きされてしまうので from の時の値を別名で保存しておく record['received_at'] = Time.at(record['time'].to_i).strftime('%Y-%m-%dT%H:%M:%S%:z') end if record.has_key?('qid') key = record['host'] + ':' + record['qid'] stored = redis.hgetall(key) || {} if record.has_key?('to') print JSON.generate(stored.merge(record)) + "\n" elsif record['message'] == 'removed' redis.del(key) else redis.mapped_hmset(key, stored.merge(record)) # postfix の bounce_queue_lifetime の値に合わせて expire を設定する redis.expire(key, 86400) end end end
Postfix のログの parse を fluent-plugin-multi-format-parser ではなく Parser プラグインを書きました。gem 化してないので td-agent では /etc/td-agent/plugin/ ディレクトリに置いて使います。
module Fluent class TextParser class PostfixLogParser < Parser Plugin.register_parser('postfix_log_parser', self) config_param :time_format, :string, :default => nil def configure(conf) super @time_parser = TimeParser.new(@time_format) @addr_keys = ['to', 'from', 'orig_to'] end def parse_message(message) log = {} if m = message.match(/^(?<qid>[A-F0-9]+):\s+/) log['qid'] = m['qid'] message.gsub!(/^[A-F0-9]+:\s+/, '') else log['message'] = message return log end if m = message.match(/, status=(?<status>\S+)\s+(?<message>.*)$/) log['status'] = m['status'] log['message'] = m['message'] message.gsub!(/, status=.*$/, '') end if ! message.match(/^[a-z0-9\-]+=/) log['message'] = message return log end message.split(/, /).each do |kv| (key, value) = kv.split('=') if @addr_keys.include?(key) log[key] = value.gsub!(/^\<(.*)\>$/, '\1') else log[key] = value end end log end # to や from のドメインで集計したいこともあるのでドメイン抽出 def domain(addr) local, domain = addr.split('@', 2) parts = domain.split('.').reverse # 属性型JPドメインでは後ろから3つ分 (ISPとかのサブドメインを無視する) if parts[0] == 'jp' and parts[1].length == 2 return [parts[2], parts[1], parts[0]].join('.') elsif parts.length >= 2 return [parts[1], parts[0]].join('.') else return domain end end def parse(text) m = text.match(/^(?<time>\S+\s+\S+\s+\S+)\s+(?<host>\S+)\s+(?<process>[^\[]+)\[(?<pid>\d+)\]:\s+(?<message>.*)/) time = @time_parser.parse(m['time']) record = parse_message(m['message']) record['host'] = m['host'] record['process'] = m['process'] record['pid'] = m['pid'] # nrcpt は後ろに "(queue active)" とかついてるけど削って数値として扱えるようにしておく if record.has_key?('nrcpt') record['nrcpt'].gsub!(/\s.*$/, '') end if record.has_key?('to') record['to_domain'] = domain(record['to'].downcase) end if record.has_key?('from') && record['from'] != '' record['from_domain'] = domain(record['from'].downcase) end yield time, record end end end end
td-agent.conf はこんな感じ(必要に応じて Buffer まわりの調整を)
<source> type tail format postfix_log_parser time_format %b %e %T path /var/log/maillog pos_file /var/lib/td-agent/mail.pos tag mail.syslog </source> <match mail.syslog> type exec_filter command /etc/td-agent/exec_filter/postfix_log_binder.rb in_format json out_format json tag mail.filtered time_key time </match> <match mail.filtered> type elasticsearch hosts kibana2:9200,kibana3:9200 type_name postfix logstash_format true logstash_prefix postfix flush_interval 5s </match>
/var/log/maillog はそのままでは td-agent ユーザーでは読めないので rsyslog の設定を変更します。
$DirCreateMode 0750 $FileCreateMode 0640 $DirGroup sys $FileGroup sys
td-agent ユーザーを sys グループに所属させます。
sudo usermod -a -G sys td-agent
プラグインの書き方は公式ドキュメントを参照しました http://docs.fluentd.org/articles/plugin-development
昔 fluent-plugin-fortigate-log-parser ってのを書いたけど Parser なのに Output プラグインとして実装してしまったので書き直したいな。