Skip to content

Commit a65e874

Browse files
authored
feat: Accounting support (#3)
* Add access log support. Just support access log for accounting. * Add `ganted2radius` to parse and send accounting data to radius. Use crond to do it hourly. * Use github.com/robfig/cron instead of crond. * Remove hard-coded paths, and go fmt. * Upgrade cron to v3, defer write the access log, rotate the log files * Just rotate when accounting, and archive every 24 logfiles(daily) * rename archive file, make sure err during removing could be handled Delete `logFiles = logFiles[:maxBackup-1]` * Requested changes of #3 1. defer the close of `archiveFile` and `src` 2. use `log.Printf` to replace `fmt.Printf` and `fmt.Fprintf` 3. `panic()` if the log directory does not exist and can not be created either 4. fetching the previous writer and close it if valid * Format error message, use `log.Fatalf` for panic message of `Mkdir` * fix: the issue of ensuring file existence and related error return. add some comments, and fix some error message typo. * chore: `os.IsNotExist(err)` to `errors.Is(err, os.ErrNotExist)`
1 parent 7da2431 commit a65e874

File tree

6 files changed

+412
-15
lines changed

6 files changed

+412
-15
lines changed

Dockerfile

+3-1
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ WORKDIR /app
99
COPY --from=builder /usr/src/app/ganted ./
1010
ENV GANTED_LISTEN=:6626 \
1111
RADIUS_SERVER=light-freeradius:1812 \
12+
RADIUS_ACCOUNTING_SERVER=light-freeradius:1813 \
1213
RADIUS_SECRET=testing123 \
1314
GANTED_ACL=91.108.4.0/22,91.108.8.0/21,91.108.16.0/21,91.108.36.0/22,91.108.56.0/22,149.154.160.0/20,2001:67c:4e8::/48,2001:b28:f23c::/46 \
1415
GANTED_BIND_OUTPUT=0.0.0.0 \
1516
GANTED_AUTH_CACHE_RETENTION=10m \
16-
GANTED_AUTH_CACHE_GC=10m
17+
GANTED_AUTH_CACHE_GC=10m \
18+
GANTED_LOG_DIR=/var/log/ganted
1719
CMD ["./ganted"]

src/ganted2radius.go

+264
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"io/fs"
9+
"log"
10+
"os"
11+
"path/filepath"
12+
"regexp"
13+
"sort"
14+
"strconv"
15+
"strings"
16+
"time"
17+
18+
"github.com/klauspost/compress/zstd"
19+
"golang.org/x/net/context"
20+
"layeh.com/radius"
21+
"layeh.com/radius/rfc2865"
22+
"layeh.com/radius/rfc2866"
23+
)
24+
25+
// compressFile compresses the given file using zstd compression.
26+
// if the compressed file exists or any operation fails,
27+
// it returns an error.
28+
func compressFile(filepath string) error {
29+
// open the file
30+
file, err := os.Open(filepath)
31+
if err != nil {
32+
return err
33+
}
34+
defer file.Close()
35+
36+
compressedFilepath := filepath + ".zst"
37+
// Check if the file already exists first
38+
if _, err := os.Stat(compressedFilepath); err == nil {
39+
return fmt.Errorf("file %s exists", compressedFilepath)
40+
} else if !errors.Is(err, os.ErrNotExist) {
41+
// If the error is something other than "file does not exist", return it
42+
return err
43+
}
44+
// Create the file if it does not exist
45+
compressedFile, err := os.Create(compressedFilepath)
46+
if err != nil {
47+
return err
48+
}
49+
defer compressedFile.Close()
50+
// create the zstd writer
51+
zw, err := zstd.NewWriter(compressedFile)
52+
if err != nil {
53+
return err
54+
}
55+
// copy the file to the zstd writer
56+
_, err = io.Copy(zw, file)
57+
if err != nil {
58+
return err
59+
}
60+
// flush the zstd writer
61+
if err := zw.Close(); err != nil {
62+
return err
63+
}
64+
// remove the original file
65+
if err := os.Remove(filepath); err != nil {
66+
return err
67+
}
68+
return nil
69+
}
70+
71+
func findLogs(logDir string, logPattern *regexp.Regexp) ([]string, error) {
72+
var logFiles []string
73+
err := filepath.WalkDir(logDir, func(path string, d fs.DirEntry, err error) error {
74+
if err != nil {
75+
return err
76+
}
77+
if !d.IsDir() && logPattern.MatchString(d.Name()) {
78+
logFiles = append(logFiles, path)
79+
}
80+
return nil
81+
})
82+
return logFiles, err
83+
}
84+
85+
func archiveLogs(logDir string, maxBackup int) error {
86+
logPattern := regexp.MustCompile(`^access-\d{14}\.log$`)
87+
logFiles, err := findLogs(logDir, logPattern)
88+
if err != nil {
89+
return err
90+
}
91+
if len(logFiles) >= maxBackup {
92+
sort.Strings(logFiles)
93+
// create archive log file with current date
94+
date := time.Now().Format("20060102")
95+
archiveFileName := fmt.Sprintf("archived-access-%s.log", date)
96+
archiveFilePath := filepath.Join(logDir, archiveFileName)
97+
// check if access-<date>.log.zst exists first
98+
if _, err := os.Stat(archiveFilePath + ".zst"); err == nil {
99+
return fmt.Errorf("file %s exists", archiveFileName+".zst")
100+
} else if !errors.Is(err, os.ErrNotExist) {
101+
// If the error is something other than "file does not exist", return it
102+
return err
103+
}
104+
archiveFile, err := os.Create(archiveFilePath)
105+
if err != nil {
106+
return err
107+
}
108+
defer archiveFile.Close()
109+
// concatenate `maxBackup` access-<datetime>.log files to access-<date>.log
110+
for _, logFile := range logFiles {
111+
src, err := os.Open(logFile)
112+
if err != nil {
113+
return err
114+
}
115+
defer src.Close()
116+
_, err = io.Copy(archiveFile, src)
117+
if err != nil {
118+
return err
119+
}
120+
121+
}
122+
// compress access-<date>.log
123+
if err := compressFile(archiveFilePath); err != nil {
124+
return err
125+
}
126+
// If the archiveFile exists, some error occur, and archiveFile need to delete
127+
// As the compressFile will remove the original archiveFile
128+
// Else, everything is ok, delete the original log files
129+
if _, err := os.Stat(archiveFilePath); !errors.Is(err, os.ErrNotExist) {
130+
if err := os.Remove(archiveFilePath); err != nil {
131+
return fmt.Errorf("err when removing file %s", archiveFilePath)
132+
}
133+
} else if err == nil {
134+
for _, logFile := range logFiles {
135+
if err := os.Remove(logFile); err != nil {
136+
return fmt.Errorf("err when removing file %s", logFile)
137+
}
138+
}
139+
} else {
140+
return fmt.Errorf("Unknown err when clear original log file")
141+
}
142+
}
143+
return nil
144+
}
145+
146+
func parseLogFile(filename string) (map[string]int, error) {
147+
file, err := os.Open(filename)
148+
if err != nil {
149+
return nil, err
150+
}
151+
defer file.Close()
152+
153+
stats := make(map[string]int)
154+
scanner := bufio.NewScanner(file)
155+
156+
for scanner.Scan() {
157+
line := scanner.Text()
158+
fields := strings.Fields(line)
159+
if len(fields) != 8 {
160+
log.Printf("Skipping malformed line: %s\n", line)
161+
continue
162+
}
163+
164+
identity := fields[3]
165+
bytesIn, err := strconv.Atoi(fields[6])
166+
if err != nil {
167+
log.Printf("Error parsing bytes in: %v\n", err)
168+
continue
169+
}
170+
bytesOut, err := strconv.Atoi(fields[7])
171+
if err != nil {
172+
log.Printf("Error parsing bytes out: %v\n", err)
173+
continue
174+
}
175+
totalBytes := bytesIn + bytesOut
176+
177+
stats[identity] += totalBytes
178+
}
179+
180+
return stats, scanner.Err()
181+
}
182+
183+
func (r *RadiusCredentials) sendAccountingData(identity string, bytes int) error {
184+
// send an CodeAccessRequest for test
185+
sessionID := strconv.FormatInt(time.Now().Unix(), 10)
186+
log.Printf("Sending accounting data for identity %s, session ID %s, bytes %d\n", identity, sessionID, bytes)
187+
188+
// Send start accounting packet
189+
startPacket := radius.New(radius.CodeAccountingRequest, []byte(r.Secret))
190+
rfc2865.UserName_SetString(startPacket, identity)
191+
rfc2865.NASIdentifier_SetString(startPacket, r.NASIdentifier)
192+
rfc2866.AcctSessionID_Set(startPacket, []byte(sessionID))
193+
rfc2866.AcctStatusType_Set(startPacket, rfc2866.AcctStatusType_Value_Start)
194+
// log.Printf("Sending start packet\n")
195+
196+
startReply, err := radius.Exchange(context.Background(), startPacket, r.AccountingServer)
197+
if err != nil {
198+
return err
199+
}
200+
if startReply.Code != radius.CodeAccountingResponse {
201+
return fmt.Errorf("unexpected response from RADIUS server")
202+
}
203+
// log.Printf("Received start reply\n")
204+
205+
// Send stop accounting packet
206+
stopPacket := radius.New(radius.CodeAccountingRequest, r.Secret)
207+
rfc2865.UserName_SetString(stopPacket, identity)
208+
rfc2865.NASIdentifier_SetString(stopPacket, r.NASIdentifier)
209+
rfc2866.AcctSessionID_SetString(stopPacket, sessionID)
210+
rfc2866.AcctStatusType_Set(stopPacket, rfc2866.AcctStatusType_Value_Stop)
211+
rfc2866.AcctOutputOctets_Set(stopPacket, rfc2866.AcctOutputOctets(bytes))
212+
// log.Printf("Sending stop packet\n")
213+
214+
stopReply, err := radius.Exchange(context.Background(), stopPacket, r.AccountingServer)
215+
if err != nil {
216+
return err
217+
}
218+
if stopReply.Code != radius.CodeAccountingResponse {
219+
return fmt.Errorf("unexpected response from RADIUS server")
220+
}
221+
// log.Printf("Received stop reply\n")
222+
223+
return nil
224+
}
225+
226+
func (r *RadiusCredentials) accounting(accessLogger *log.Logger) error {
227+
// Get the log directory
228+
accessLogFileHandler, ok := accessLogger.Writer().(*os.File)
229+
if !ok {
230+
return fmt.Errorf("access log file is not a file")
231+
}
232+
accessLogFile := accessLogFileHandler.Name()
233+
logDir := filepath.Dir(accessLogFile)
234+
// rename the access.log file to access-<datetime>.log
235+
now := time.Now()
236+
dotIndex := strings.LastIndex(accessLogFile, ".")
237+
accountingLogFile := accessLogFile[:dotIndex] + "-" + now.Format("20060102150405") + accessLogFile[dotIndex:]
238+
if err := os.Rename(accessLogFile, accountingLogFile); err != nil {
239+
return err
240+
}
241+
// ask accessLogger to reopen the access.log file
242+
if err := setFileLoggerOutput(accessLogger, accessLogFile); err != nil {
243+
return err
244+
}
245+
stats, err := parseLogFile(accountingLogFile)
246+
if err != nil {
247+
log.Printf("[ERR] Failed to parse log file %s: %v\n", accountingLogFile, err)
248+
return err
249+
}
250+
// Sending accounting data
251+
for identity, bytes := range stats {
252+
err := r.sendAccountingData(identity, bytes)
253+
if err != nil {
254+
log.Printf("[ERR] Failed to send accounting data for identity %s: %v\n", identity, err)
255+
} else {
256+
log.Printf("Sent accounting data for identity %s\n", identity)
257+
}
258+
}
259+
// Compress all access-<datetime>.log files in the log directory
260+
if err := archiveLogs(logDir, 24); err != nil {
261+
return err
262+
}
263+
return nil
264+
}

src/go-socks5/socks5.go

+53-3
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"log"
77
"net"
88
"os"
9+
"time"
910

1011
"golang.org/x/net/context"
12+
"sync/atomic"
1113
)
1214

1315
const (
@@ -46,10 +48,41 @@ type Config struct {
4648
// Defaults to stdout.
4749
Logger *log.Logger
4850

51+
// AccessLogger can be used to provide a custom access log target.
52+
// Defaults to stdout.
53+
AccessLogger *log.Logger
54+
55+
// ErrorLogger can be used to provide a custom error log target.
56+
// Defaults to stdout.
57+
ErrorLogger *log.Logger
58+
4959
// Optional function for dialing out
5060
Dial func(ctx context.Context, network, addr string) (net.Conn, error)
5161
}
5262

63+
// ConnWrapper is a wrapper around a net.Conn that provides a way to log read/write bytes
64+
type ConnWrapper struct {
65+
net.Conn
66+
ReadBytes int64
67+
WriteBytes int64
68+
}
69+
70+
// Read reads data from the connection
71+
func (c *ConnWrapper) Read(b []byte) (int, error) {
72+
n, err := c.Conn.Read(b)
73+
// c.readBytes += int64(n) is not atomic
74+
atomic.AddInt64(&c.ReadBytes, int64(n))
75+
return n, err
76+
}
77+
78+
// Write writes data to the connection
79+
func (c *ConnWrapper) Write(b []byte) (int, error) {
80+
n, err := c.Conn.Write(b)
81+
// c.writeBytes += int64(n) is not atomic
82+
atomic.AddInt64(&c.WriteBytes, int64(n))
83+
return n, err
84+
}
85+
5386
// Server is reponsible for accepting connections and handling
5487
// the details of the SOCKS5 protocol
5588
type Server struct {
@@ -120,11 +153,15 @@ func (s *Server) Serve(l net.Listener) error {
120153
// ServeConn is used to serve a single connection.
121154
func (s *Server) ServeConn(conn net.Conn) error {
122155
defer conn.Close()
156+
157+
// Wrap the connection to log read/write bytes
158+
wrappedConn := &ConnWrapper{Conn: conn}
159+
123160
remoteAddr, ok := conn.RemoteAddr().(*net.TCPAddr)
124161
if !ok {
125162
return fmt.Errorf("Invalid remote address type: %T", conn.RemoteAddr())
126163
}
127-
bufConn := bufio.NewReader(conn)
164+
bufConn := bufio.NewReader(wrappedConn)
128165

129166
// Read the version byte
130167
version := []byte{0}
@@ -151,7 +188,7 @@ func (s *Server) ServeConn(conn net.Conn) error {
151188
request, err := NewRequest(bufConn)
152189
if err != nil {
153190
if err == unrecognizedAddrType {
154-
if err := sendReply(conn, addrTypeNotSupported, nil); err != nil {
191+
if err := sendReply(wrappedConn, addrTypeNotSupported, nil); err != nil {
155192
return fmt.Errorf("Failed to send reply: %v", err)
156193
}
157194
}
@@ -160,8 +197,21 @@ func (s *Server) ServeConn(conn net.Conn) error {
160197
request.AuthContext = authContext
161198
request.RemoteAddr = &AddrSpec{IP: remoteAddr.IP, Port: remoteAddr.Port}
162199

200+
// log access
201+
// remoteAddr, identity, time_now, request, bytes_in, bytes_out
202+
defer func() {
203+
s.config.AccessLogger.Printf("%s %s %s %s %d %d",
204+
remoteAddr,
205+
authContext.Payload["Username"],
206+
time.Now().Format(time.RFC3339),
207+
request.DestAddr.String(),
208+
wrappedConn.ReadBytes,
209+
wrappedConn.WriteBytes,
210+
)
211+
}()
212+
163213
// Process the client request
164-
if err := s.handleRequest(request, conn); err != nil {
214+
if err := s.handleRequest(request, wrappedConn); err != nil {
165215
err = fmt.Errorf("Failed to handle request: %v", err)
166216
s.config.Logger.Printf("[ERR] socks %s: %v", remoteAddr, err)
167217
return err

src/go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ replace github.com/armon/go-socks5 => ./go-socks5
77
require (
88
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
99
github.com/kisom/netallow v0.0.0-20200609175051-08f6b004e41a
10+
github.com/klauspost/compress v1.17.9
11+
github.com/robfig/cron/v3 v3.0.1
12+
golang.org/x/net v0.25.0
1013
layeh.com/radius v0.0.0-20231213012653-1006025d24f8
1114
)
12-
13-
require golang.org/x/net v0.25.0 // indirect

0 commit comments

Comments
 (0)