Skip to content

Commit 30ac897

Browse files
Split TCP and UDP functionality, breaking some clients superficially
This splits the TCP and UDP functionality in different structs implementing a new Writer interface. This will break all clients that declare a variable like: 'var w *gelf.Writer'. Those clients will need to simply remove the '*' in front of the type to fix it.
1 parent 3794a9f commit 30ac897

6 files changed

+141
-70
lines changed

gelf/message.go

+33
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gelf
33
import (
44
"bytes"
55
"encoding/json"
6+
"time"
67
)
78

89
// Message represents the contents of the GELF message. It is gzipped
@@ -102,3 +103,35 @@ func (m *Message) UnmarshalJSON(data []byte) error {
102103
}
103104
return nil
104105
}
106+
107+
func constructMessage(p []byte, hostname string, facility string, file string, line int) (m Message) {
108+
// remove trailing and leading whitespace
109+
p = bytes.TrimSpace(p)
110+
111+
// If there are newlines in the message, use the first line
112+
// for the short message and set the full message to the
113+
// original input. If the input has no newlines, stick the
114+
// whole thing in Short.
115+
short := p
116+
full := []byte("")
117+
if i := bytes.IndexRune(p, '\n'); i > 0 {
118+
short = p[:i]
119+
full = p
120+
}
121+
122+
m = Message{
123+
Version: "1.1",
124+
Host: hostname,
125+
Short: string(short),
126+
Full: string(full),
127+
TimeUnix: float64(time.Now().Unix()),
128+
Level: 6, // info
129+
Facility: facility,
130+
Extra: map[string]interface{}{
131+
"_file": file,
132+
"_line": line,
133+
},
134+
}
135+
136+
return m
137+
}

gelf/tcpwriter.go

+68-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package gelf
22

33
import (
4+
"bytes"
5+
"fmt"
6+
"io"
47
"net"
58
"os"
69
)
710

8-
func NewTCPWriter(addr string) (*Writer, error) {
11+
type TCPWriter struct {
12+
GelfWriter
13+
}
14+
15+
func NewTCPWriter(addr string) (*TCPWriter, error) {
916
var err error
10-
w := new(Writer)
17+
w := new(TCPWriter)
1118
w.CompressionType = CompressNone
1219
w.proto = "tcp"
1320
w.addr = addr
@@ -21,3 +28,62 @@ func NewTCPWriter(addr string) (*Writer, error) {
2128

2229
return w, nil
2330
}
31+
32+
// WriteMessage sends the specified message to the GELF server
33+
// specified in the call to New(). It assumes all the fields are
34+
// filled out appropriately. In general, clients will want to use
35+
// Write, rather than WriteMessage.
36+
func (w *TCPWriter) WriteMessage(m *Message) (err error) {
37+
mBuf := newBuffer()
38+
defer bufPool.Put(mBuf)
39+
if err = m.MarshalJSONBuf(mBuf); err != nil {
40+
return err
41+
}
42+
mBytes := mBuf.Bytes()
43+
44+
var (
45+
zBuf *bytes.Buffer
46+
zBytes []byte
47+
)
48+
49+
var zw io.WriteCloser
50+
zBytes = mBytes
51+
if zw != nil {
52+
if err != nil {
53+
return
54+
}
55+
if _, err = zw.Write(mBytes); err != nil {
56+
zw.Close()
57+
return
58+
}
59+
zw.Close()
60+
zBytes = zBuf.Bytes()
61+
}
62+
63+
zBytes = append(zBytes, 0)
64+
n, err := w.conn.Write(zBytes)
65+
if err != nil {
66+
var errConn error
67+
if w.conn, errConn = net.Dial("tcp", w.addr); errConn != nil {
68+
return fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn)
69+
}
70+
n, err = w.conn.Write(zBytes)
71+
}
72+
if n != len(zBytes) {
73+
return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
74+
}
75+
76+
return nil
77+
}
78+
79+
func (w *TCPWriter) Write(p []byte) (n int, err error) {
80+
file, line := getCallerIgnoringLogMulti(1)
81+
82+
m := constructMessage(p, w.hostname, w.Facility, file, line)
83+
84+
if err = w.WriteMessage(&m); err != nil {
85+
return 0, err
86+
}
87+
88+
return len(p), nil
89+
}

gelf/tcpwriter_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
func TestNewTCPWriter(t *testing.T) {
1313
w, err := NewTCPWriter("")
14-
if err == nil || w != nil {
14+
if err == nil && w != nil {
1515
t.Errorf("New didn't fail")
1616
return
1717
}
@@ -267,6 +267,7 @@ func sendAndRecvMsgTCP(msg *Message) (*Message, error) {
267267
return nil, fmt.Errorf("Wrong signal received")
268268
}
269269

270+
w.Close()
270271
message, err := r.readMessage()
271272

272273
return message, nil
@@ -315,5 +316,6 @@ func sendAndRecv2MessagesWithDropTCP(msgData1 string, msgData2 string) (*Message
315316
return nil, nil, fmt.Errorf("readmessage: %s", err)
316317
}
317318

319+
w.Close()
318320
return message1, message2, nil
319321
}

gelf/udpwriter.go

+25-17
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ import (
1818
"sync"
1919
)
2020

21+
type UDPWriter struct {
22+
GelfWriter
23+
}
24+
2125
// What compression type the writer should use when sending messages
2226
// to the graylog2 server
2327
type CompressType int
@@ -57,9 +61,9 @@ func numChunks(b []byte) int {
5761
// New returns a new GELF Writer. This writer can be used to send the
5862
// output of the standard Go log functions to a central GELF server by
5963
// passing it to log.SetOutput()
60-
func NewWriter(addr string) (*Writer, error) {
64+
func NewWriter(addr string) (*UDPWriter, error) {
6165
var err error
62-
w := new(Writer)
66+
w := new(UDPWriter)
6367
w.CompressionLevel = flate.BestSpeed
6468

6569
if w.conn, err = net.Dial("udp", addr); err != nil {
@@ -80,7 +84,7 @@ func NewWriter(addr string) (*Writer, error) {
8084
//
8185
// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
8286
// total, chunk-data
83-
func (w *Writer) writeChunked(zBytes []byte) (err error) {
87+
func (w *GelfWriter) writeChunked(zBytes []byte) (err error) {
8488
b := make([]byte, 0, ChunkSize)
8589
buf := bytes.NewBuffer(b)
8690
nChunksI := numChunks(zBytes)
@@ -154,7 +158,7 @@ func newBuffer() *bytes.Buffer {
154158
// specified in the call to New(). It assumes all the fields are
155159
// filled out appropriately. In general, clients will want to use
156160
// Write, rather than WriteMessage.
157-
func (w *Writer) WriteMessage(m *Message) (err error) {
161+
func (w *UDPWriter) WriteMessage(m *Message) (err error) {
158162
mBuf := newBuffer()
159163
defer bufPool.Put(mBuf)
160164
if err = m.MarshalJSONBuf(mBuf); err != nil {
@@ -195,27 +199,31 @@ func (w *Writer) WriteMessage(m *Message) (err error) {
195199
zBytes = zBuf.Bytes()
196200
}
197201

198-
if numChunks(zBytes) > 1 && w.proto != "tcp" {
202+
if numChunks(zBytes) > 1 {
199203
return w.writeChunked(zBytes)
200204
}
201-
if w.proto == "tcp" {
202-
zBytes = append(zBytes, 0)
203-
}
204205
n, err := w.conn.Write(zBytes)
205206
if err != nil {
206-
if w.proto != "tcp" {
207-
return
208-
} else {
209-
var errConn error
210-
if w.conn, errConn = net.Dial("tcp", w.addr); errConn != nil {
211-
return fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn)
212-
}
213-
n, err = w.conn.Write(zBytes)
214-
}
207+
return
215208
}
216209
if n != len(zBytes) {
217210
return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
218211
}
219212

220213
return nil
221214
}
215+
216+
// Write encodes the given string in a GELF message and sends it to
217+
// the server specified in New().
218+
func (w *UDPWriter) Write(p []byte) (n int, err error) {
219+
// 1 for the function that called us.
220+
file, line := getCallerIgnoringLogMulti(1)
221+
222+
m := constructMessage(p, w.hostname, w.Facility, file, line)
223+
224+
if err = w.WriteMessage(&m); err != nil {
225+
return 0, err
226+
}
227+
228+
return len(p), nil
229+
}

gelf/writer.go

+10-50
Original file line numberDiff line numberDiff line change
@@ -5,69 +5,29 @@
55
package gelf
66

77
import (
8-
"bytes"
98
"net"
10-
"sync"
11-
"time"
129
)
1310

11+
type Writer interface {
12+
Close() error
13+
Write([]byte) (int, error)
14+
WriteMessage(*Message) error
15+
}
16+
1417
// Writer implements io.Writer and is used to send both discrete
1518
// messages to a graylog2 server, or data from a stream-oriented
1619
// interface (like the functions in log).
17-
type Writer struct {
20+
type GelfWriter struct {
1821
addr string
19-
mu sync.Mutex
2022
conn net.Conn
2123
hostname string
2224
Facility string // defaults to current process name
23-
CompressionLevel int // one of the consts from compress/flate
24-
CompressionType CompressType
2525
proto string
26+
CompressionLevel int // one of the consts from compress/flate
27+
CompressionType CompressType
2628
}
2729

2830
// Close connection and interrupt blocked Read or Write operations
29-
func (w *Writer) Close() error {
31+
func (w *GelfWriter) Close() error {
3032
return w.conn.Close()
3133
}
32-
33-
// Write encodes the given string in a GELF message and sends it to
34-
// the server specified in New().
35-
func (w *Writer) Write(p []byte) (n int, err error) {
36-
37-
// 1 for the function that called us.
38-
file, line := getCallerIgnoringLogMulti(1)
39-
40-
// remove trailing and leading whitespace
41-
p = bytes.TrimSpace(p)
42-
43-
// If there are newlines in the message, use the first line
44-
// for the short message and set the full message to the
45-
// original input. If the input has no newlines, stick the
46-
// whole thing in Short.
47-
short := p
48-
full := []byte("")
49-
if i := bytes.IndexRune(p, '\n'); i > 0 {
50-
short = p[:i]
51-
full = p
52-
}
53-
54-
m := Message{
55-
Version: "1.1",
56-
Host: w.hostname,
57-
Short: string(short),
58-
Full: string(full),
59-
TimeUnix: float64(time.Now().Unix()),
60-
Level: 6, // info
61-
Facility: w.Facility,
62-
Extra: map[string]interface{}{
63-
"_file": file,
64-
"_line": line,
65-
},
66-
}
67-
68-
if err = w.WriteMessage(&m); err != nil {
69-
return 0, err
70-
}
71-
72-
return len(p), nil
73-
}

gelf/writer_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func sendAndRecv(msgData string, compress CompressType) (*Message, error) {
4141
return nil, fmt.Errorf("w.Write: %s", err)
4242
}
4343

44+
w.Close()
4445
return r.ReadMessage()
4546
}
4647

@@ -60,6 +61,7 @@ func sendAndRecvMsg(msg *Message, compress CompressType) (*Message, error) {
6061
return nil, fmt.Errorf("w.Write: %s", err)
6162
}
6263

64+
w.Close()
6365
return r.ReadMessage()
6466
}
6567

0 commit comments

Comments
 (0)