Skip to content
11 changes: 11 additions & 0 deletions app/policy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func defaultPolicy() *Policy {
Buffer: &Policy_Buffer{
Connection: p.Buffer.PerConnection,
},
Restriction: &Policy_Restriction{
MaxIPs: p.Restriction.MaxIPs,
},
}
}

Expand Down Expand Up @@ -58,6 +61,11 @@ func (p *Policy) overrideWith(another *Policy) {
Connection: another.Buffer.Connection,
}
}
if another.Restriction != nil {
p.Restriction = &Policy_Restriction{
MaxIPs: another.Restriction.MaxIPs,
}
}
}

// ToCorePolicy converts this Policy to policy.Session.
Expand All @@ -77,6 +85,9 @@ func (p *Policy) ToCorePolicy() policy.Session {
if p.Buffer != nil {
cp.Buffer.PerConnection = p.Buffer.Connection
}
if p.Restriction != nil {
cp.Restriction.MaxIPs = p.Restriction.MaxIPs
}
return cp
}

Expand Down
238 changes: 157 additions & 81 deletions app/policy/config.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions app/policy/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ message Policy {
int32 connection = 1;
}

message Restriction {
int32 maxIPs = 1;
}

Timeout timeout = 1;
Stats stats = 2;
Buffer buffer = 3;
Restriction restriction = 4;
}

message SystemPolicy {
Expand Down
44 changes: 41 additions & 3 deletions app/proxyman/inbound/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/xtls/xray-core/features/stats"
"github.com/xtls/xray-core/proxy"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/restriction"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tcp"
"github.com/xtls/xray-core/transport/internet/udp"
Expand All @@ -32,6 +33,8 @@ type worker interface {
}

type tcpWorker struct {
sync.Mutex

address net.Address
port net.Port
proxy proxy.Inbound
Expand All @@ -42,6 +45,7 @@ type tcpWorker struct {
sniffingConfig *proxyman.SniffingConfig
uplinkCounter stats.Counter
downlinkCounter stats.Counter
ipLimitPool map[session.ID]*restriction.UserMaxIp

hub internet.Listener

Expand Down Expand Up @@ -104,9 +108,22 @@ func (w *tcpWorker) callback(conn stat.Connection) {
}
ctx = session.ContextWithContent(ctx, content)

if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil {
// Add this IP address to the pool for futher IP limit check
w.Lock()
w.ipLimitPool[sid] = &restriction.UserMaxIp{
IpAddress: net.IP(conn.RemoteAddr().Network()),
}
w.Unlock()

if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher, &w.ipLimitPool, w.ipLimitPool[sid]); err != nil {
newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx))
}

// Deletes the IP address from the pool after the connection ends
w.Lock()
delete(w.ipLimitPool, sid)
w.Unlock()

cancel()
conn.Close()
}
Expand All @@ -116,6 +133,9 @@ func (w *tcpWorker) Proxy() proxy.Inbound {
}

func (w *tcpWorker) Start() error {
if len(w.ipLimitPool) == 0 {
w.ipLimitPool = make(map[session.ID]*restriction.UserMaxIp)
}
ctx := context.Background()
hub, err := internet.ListenTCP(ctx, w.address, w.port, w.stream, func(conn stat.Connection) {
go w.callback(conn)
Expand Down Expand Up @@ -244,6 +264,7 @@ type udpWorker struct {
sniffingConfig *proxyman.SniffingConfig
uplinkCounter stats.Counter
downlinkCounter stats.Counter
ipLimitPool map[session.ID]*restriction.UserMaxIp

checker *task.Periodic
activeConn map[connID]*udpConn
Expand Down Expand Up @@ -326,9 +347,23 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest
content.SniffingRequest.RouteOnly = w.sniffingConfig.RouteOnly
}
ctx = session.ContextWithContent(ctx, content)
if err := w.proxy.Process(ctx, net.Network_UDP, conn, w.dispatcher); err != nil {

// Add this IP address to the pool for futher IP limit check
w.Lock()
w.ipLimitPool[sid] = &restriction.UserMaxIp{
IpAddress: net.IP(conn.RemoteAddr().Network()),
}
w.Unlock()

if err := w.proxy.Process(ctx, net.Network_UDP, conn, w.dispatcher, &w.ipLimitPool, w.ipLimitPool[sid]); err != nil {
newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx))
}

// Deletes the IP address from the pool after the connection ends
w.Lock()
delete(w.ipLimitPool, sid)
w.Unlock()

conn.Close()
// conn not removed by checker TODO may be lock worker here is better
if !conn.inactive {
Expand Down Expand Up @@ -379,6 +414,9 @@ func (w *udpWorker) clean() error {
}

func (w *udpWorker) Start() error {
if len(w.ipLimitPool) == 0 {
w.ipLimitPool = make(map[session.ID]*restriction.UserMaxIp)
}
w.activeConn = make(map[connID]*udpConn, 16)
ctx := context.Background()
h, err := udp.ListenUDP(ctx, w.address, w.port, w.stream, udp.HubCapacity(256))
Expand Down Expand Up @@ -478,7 +516,7 @@ func (w *dsWorker) callback(conn stat.Connection) {
}
ctx = session.ContextWithContent(ctx, content)

if err := w.proxy.Process(ctx, net.Network_UNIX, conn, w.dispatcher); err != nil {
if err := w.proxy.Process(ctx, net.Network_UNIX, conn, w.dispatcher, nil, nil); err != nil {
newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx))
}
cancel()
Expand Down
22 changes: 18 additions & 4 deletions features/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type Buffer struct {
PerConnection int32
}

// Buffer contains settings for restriction such as ip restriction.
type Restriction struct {
// Maximum allowed ips, -1 for unlimited
MaxIPs int32
}

// SystemStats contains stat policy settings on system level.
type SystemStats struct {
// Whether or not to enable stat counter for uplink traffic in inbound handlers.
Expand All @@ -55,9 +61,10 @@ type System struct {

// Session is session based settings for controlling Xray requests. It contains various settings (or limits) that may differ for different users in the context.
type Session struct {
Timeouts Timeout // Timeout settings
Stats Stats
Buffer Buffer
Timeouts Timeout // Timeout settings
Stats Stats
Buffer Buffer
Restriction Restriction
}

// Manager is a feature that provides Policy for the given user by its id or level.
Expand Down Expand Up @@ -109,6 +116,12 @@ func defaultBufferPolicy() Buffer {
}
}

func defaultRestrictionPolicy() Restriction {
return Restriction{
MaxIPs: -1,
}
}

// SessionDefault returns the Policy when user is not specified.
func SessionDefault() Session {
return Session{
Expand All @@ -124,7 +137,8 @@ func SessionDefault() Session {
UserUplink: false,
UserDownlink: false,
},
Buffer: defaultBufferPolicy(),
Buffer: defaultBufferPolicy(),
Restriction: defaultRestrictionPolicy(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions infra/conf/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ func (list *PortList) UnmarshalJSON(data []byte) error {
}

type User struct {
EmailString string `json:"email"`
LevelByte byte `json:"level"`
EmailString string `json:"email"`
LevelByte byte `json:"level"`
}

func (v *User) Build() *protocol.User {
Expand Down
7 changes: 7 additions & 0 deletions infra/conf/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Policy struct {
StatsUserUplink bool `json:"statsUserUplink"`
StatsUserDownlink bool `json:"statsUserDownlink"`
BufferSize *int32 `json:"bufferSize"`
MaxIPs *int32 `json:"maxIPs"`
}

func (t *Policy) Build() (*policy.Policy, error) {
Expand Down Expand Up @@ -47,6 +48,12 @@ func (t *Policy) Build() (*policy.Policy, error) {
}
}

if t.MaxIPs != nil {
p.Restriction = &policy.Policy_Restriction{
MaxIPs: (*t.MaxIPs),
}
}

return p, nil
}

Expand Down
3 changes: 2 additions & 1 deletion proxy/dokodemo/dokodemo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport/internet/restriction"
"github.com/xtls/xray-core/transport/internet/stat"
)

Expand Down Expand Up @@ -76,7 +77,7 @@ type hasHandshakeAddress interface {
}

// Process implements proxy.Inbound.
func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher, _ *map[session.ID]*restriction.UserMaxIp, _ *restriction.UserMaxIp) error {
newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WriteToLog(session.ExportIDToError(ctx))
dest := net.Destination{
Network: network,
Expand Down
3 changes: 2 additions & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/reality"
"github.com/xtls/xray-core/transport/internet/restriction"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ type Inbound interface {
Network() []net.Network

// Process processes a connection of given network. If necessary, the Inbound can dispatch the connection to an Outbound.
Process(context.Context, net.Network, stat.Connection, routing.Dispatcher) error
Process(context.Context, net.Network, stat.Connection, routing.Dispatcher, *map[session.ID]*restriction.UserMaxIp, *restriction.UserMaxIp) error
}

// An Outbound process outbound connections.
Expand Down
29 changes: 28 additions & 1 deletion proxy/trojan/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/xtls/xray-core/common"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport/internet/reality"
"github.com/xtls/xray-core/transport/internet/restriction"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
"github.com/xtls/xray-core/transport/internet/udp"
Expand All @@ -35,6 +37,8 @@ func init() {

// Server is an inbound connection handler that handles messages in trojan protocol.
type Server struct {
sync.Mutex

policyManager policy.Manager
validator *Validator
fallbacks map[string]map[string]map[string]*Fallback // or nil
Expand Down Expand Up @@ -131,7 +135,7 @@ func (s *Server) Network() []net.Network {
}

// Process implements proxy.Inbound.Process().
func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher, usrIpRstrct *map[session.ID]*restriction.UserMaxIp, connIp *restriction.UserMaxIp) error {
sid := session.ExportIDToError(ctx)

iConn := conn
Expand Down Expand Up @@ -219,6 +223,29 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Con
inbound.User = user
sessionPolicy = s.policyManager.ForLevel(user.Level)

if sessionPolicy.Restriction.MaxIPs > 0 {
addr := conn.RemoteAddr().(*net.TCPAddr)

uniqueIps := make(map[string]bool)

s.Lock()
// Iterate through the connections and find unique used IP addresses withing last 30 seconds.
for _, conn := range *usrIpRstrct {
if conn.User == user.Email && !conn.IpAddress.Equal(addr.IP) && ((time.Now().Unix() - conn.Time) < 30) {
uniqueIps[conn.IpAddress.String()] = true
}
}
s.Unlock()

if len(uniqueIps) >= int(sessionPolicy.Restriction.MaxIPs) {
return newError("User ", user, " has exceeded their allowed IPs.").AtWarning()
}

connIp.IpAddress = addr.IP
connIp.User = user.Email
connIp.Time = time.Now().Unix()
}

if destination.Network == net.Network_UDP { // handle udp request
return s.handleUDPPayload(ctx, &PacketReader{Reader: clientReader}, &PacketWriter{Writer: conn}, dispatcher)
}
Expand Down
Loading