@@ -6,15 +6,18 @@ import (
66 "fmt"
77 "math"
88 "net"
9+ "net/url"
910 "strconv"
1011 "strings"
1112 "time"
1213
1314 "github.com/Sirupsen/logrus"
1415 "github.com/docker/docker/daemon/logger"
1516 "github.com/docker/docker/daemon/logger/loggerutils"
17+ "github.com/docker/docker/pkg/urlutil"
1618 "github.com/docker/go-units"
1719 "github.com/fluent/fluent-logger-golang/fluent"
20+ "github.com/pkg/errors"
1821)
1922
2023type fluentd struct {
@@ -25,9 +28,17 @@ type fluentd struct {
2528 extra map [string ]string
2629}
2730
31+ type location struct {
32+ protocol string
33+ host string
34+ port int
35+ path string
36+ }
37+
2838const (
2939 name = "fluentd"
3040
41+ defaultProtocol = "tcp"
3142 defaultHost = "127.0.0.1"
3243 defaultPort = 24224
3344 defaultBufferLimit = 1024 * 1024
@@ -60,7 +71,7 @@ func init() {
6071// the context. The supported context configuration variable is
6172// fluentd-address.
6273func New (ctx logger.Context ) (logger.Logger , error ) {
63- host , port , err := parseAddress (ctx .Config [addressKey ])
74+ loc , err := parseAddress (ctx .Config [addressKey ])
6475 if err != nil {
6576 return nil , err
6677 }
@@ -107,12 +118,14 @@ func New(ctx logger.Context) (logger.Logger, error) {
107118 }
108119
109120 fluentConfig := fluent.Config {
110- FluentPort : port ,
111- FluentHost : host ,
112- BufferLimit : bufferLimit ,
113- RetryWait : retryWait ,
114- MaxRetry : maxRetries ,
115- AsyncConnect : asyncConnect ,
121+ FluentPort : loc .port ,
122+ FluentHost : loc .host ,
123+ FluentNetwork : loc .protocol ,
124+ FluentSocketPath : loc .path ,
125+ BufferLimit : bufferLimit ,
126+ RetryWait : retryWait ,
127+ MaxRetry : maxRetries ,
128+ AsyncConnect : asyncConnect ,
116129 }
117130
118131 logrus .WithField ("container" , ctx .ContainerID ).WithField ("config" , fluentConfig ).
@@ -172,29 +185,65 @@ func ValidateLogOpt(cfg map[string]string) error {
172185 }
173186 }
174187
175- if _ , _ , err := parseAddress (cfg ["fluentd-address" ]); err != nil {
188+ if _ , err := parseAddress (cfg ["fluentd-address" ]); err != nil {
176189 return err
177190 }
178191
179192 return nil
180193}
181194
182- func parseAddress (address string ) (string , int , error ) {
195+ func parseAddress (address string ) (* location , error ) {
183196 if address == "" {
184- return defaultHost , defaultPort , nil
197+ return & location {
198+ protocol : defaultProtocol ,
199+ host : defaultHost ,
200+ port : defaultPort ,
201+ path : "" ,
202+ }, nil
203+ }
204+
205+ protocol := defaultProtocol
206+ givenAddress := address
207+ if urlutil .IsTransportURL (address ) {
208+ url , err := url .Parse (address )
209+ if err != nil {
210+ return nil , errors .Wrapf (err , "invalid fluentd-address %s" , givenAddress )
211+ }
212+ // unix and unixgram socket
213+ if url .Scheme == "unix" || url .Scheme == "unixgram" {
214+ return & location {
215+ protocol : url .Scheme ,
216+ host : "" ,
217+ port : 0 ,
218+ path : url .Path ,
219+ }, nil
220+ }
221+ // tcp|udp
222+ protocol = url .Scheme
223+ address = url .Host
185224 }
186225
187226 host , port , err := net .SplitHostPort (address )
188227 if err != nil {
189228 if ! strings .Contains (err .Error (), "missing port in address" ) {
190- return "" , 0 , fmt . Errorf ( "invalid fluentd-address %s: %s " , address , err )
229+ return nil , errors . Wrapf ( err , "invalid fluentd-address %s" , givenAddress )
191230 }
192- return host , defaultPort , nil
231+ return & location {
232+ protocol : protocol ,
233+ host : host ,
234+ port : defaultPort ,
235+ path : "" ,
236+ }, nil
193237 }
194238
195239 portnum , err := strconv .Atoi (port )
196240 if err != nil {
197- return "" , 0 , fmt . Errorf ( "invalid fluentd-address %s: %s " , address , err )
241+ return nil , errors . Wrapf ( err , "invalid fluentd-address %s" , givenAddress )
198242 }
199- return host , portnum , nil
243+ return & location {
244+ protocol : protocol ,
245+ host : host ,
246+ port : portnum ,
247+ path : "" ,
248+ }, nil
200249}
0 commit comments