Skip to content

Latest commit

 

History

History
402 lines (311 loc) · 7.98 KB

File metadata and controls

402 lines (311 loc) · 7.98 KB

7.3 gRPC 与 API 设计

📍 导航返回目录 | 上一节:DDD


gRPC 核心概念

四种RPC模式

  1. Unary RPC(一元RPC):一请求一响应
  2. Server Streaming:一请求多响应
  3. Client Streaming:多请求一响应
  4. Bidirectional Streaming:双向流

Protobuf 定义

syntax = "proto3";

package user;
option go_package = "github.com/example/user";

// 用户服务
service UserService {
    // 一元RPC
    rpc GetUser (GetUserRequest) returns (GetUserResponse);
    
    // Server Streaming
    rpc ListUsers (ListUsersRequest) returns (stream User);
    
    // Client Streaming
    rpc CreateUsers (stream User) returns (CreateUsersResponse);
    
    // Bidirectional Streaming
    rpc Chat (stream Message) returns (stream Message);
}

message GetUserRequest {
    int64 user_id = 1;
}

message GetUserResponse {
    User user = 1;
}

message User {
    int64 id = 1;
    string name = 2;
    string email = 3;
    int32 age = 4;
}

message ListUsersRequest {
    int32 page = 1;
    int32 page_size = 2;
}

message CreateUsersResponse {
    int32 count = 1;
}

message Message {
    string content = 1;
    int64 timestamp = 2;
}

Go 服务端实现

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "log"
    "net"
    
    pb "github.com/example/user"
)

type UserServer struct {
    pb.UnimplementedUserServiceServer
    users map[int64]*pb.User
}

func NewUserServer() *UserServer {
    return &UserServer{
        users: make(map[int64]*pb.User),
    }
}

// Unary RPC
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, ok := s.users[req.UserId]
    if !ok {
        return nil, fmt.Errorf("user not found: %d", req.UserId)
    }
    
    return &pb.GetUserResponse{User: user}, nil
}

// Server Streaming
func (s *UserServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    for _, user := range s.users {
        if err := stream.Send(user); err != nil {
            return err
        }
    }
    return nil
}

// Client Streaming
func (s *UserServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
    count := 0
    
    for {
        user, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.CreateUsersResponse{
                Count: int32(count),
            })
        }
        if err != nil {
            return err
        }
        
        s.users[user.Id] = user
        count++
    }
}

// Bidirectional Streaming
func (s *UserServer) Chat(stream pb.UserService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        
        // 回显消息
        response := &pb.Message{
            Content:   "Echo: " + msg.Content,
            Timestamp: time.Now().Unix(),
        }
        
        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

func main() {
    listener, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    
    grpcServer := grpc.NewServer()
    pb.RegisterUserServiceServer(grpcServer, NewUserServer())
    
    log.Println("gRPC server started on :50051")
    if err := grpcServer.Serve(listener); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

Go 客户端实现

package main

import (
    "context"
    "google.golang.org/grpc"
    "log"
    "time"
    
    pb "github.com/example/user"
)

func main() {
    // 连接服务器
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()
    
    client := pb.NewUserServiceClient(conn)
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // 1. Unary RPC
    resp, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: 1})
    if err != nil {
        log.Printf("GetUser error: %v", err)
    } else {
        log.Printf("User: %+v", resp.User)
    }
    
    // 2. Server Streaming
    stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{Page: 1, PageSize: 10})
    if err != nil {
        log.Fatalf("ListUsers error: %v", err)
    }
    
    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("Receive error: %v", err)
        }
        log.Printf("User: %+v", user)
    }
}

gRPC vs REST

特性 gRPC REST
协议 HTTP/2 HTTP/1.1
数据格式 Protobuf (二进制) JSON (文本)
性能
浏览器支持 需要grpc-web 原生支持
流式传输 ✅ 支持 ❌ 不支持
代码生成 自动 手动
学习曲线 平缓

中间件

拦截器(Interceptor)

// Unary拦截器
func UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    start := time.Now()
    
    // 前置处理
    log.Printf("开始处理: %s", info.FullMethod)
    
    // 调用实际处理函数
    resp, err := handler(ctx, req)
    
    // 后置处理
    log.Printf("处理完成: %s, 耗时: %v", info.FullMethod, time.Since(start))
    
    return resp, err
}

// 注册拦截器
grpcServer := grpc.NewServer(
    grpc.UnaryInterceptor(UnaryInterceptor),
)

最佳实践

1. 超时控制

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

resp, err := client.GetUser(ctx, req)

2. 错误处理

import "google.golang.org/grpc/status"
import "google.golang.org/grpc/codes"

// 服务端
if user == nil {
    return nil, status.Errorf(codes.NotFound, "user %d not found", req.UserId)
}

// 客户端
resp, err := client.GetUser(ctx, req)
if err != nil {
    st, ok := status.FromError(err)
    if ok {
        log.Printf("错误码: %s, 消息: %s", st.Code(), st.Message())
    }
}

3. 连接池

// 复用连接
var conn *grpc.ClientConn

func GetConnection() (*grpc.ClientConn, error) {
    if conn == nil {
        var err error
        conn, err = grpc.Dial("localhost:50051",
            grpc.WithInsecure(),
            grpc.WithBlock(),
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                Time:                10 * time.Second,
                Timeout:             3 * time.Second,
                PermitWithoutStream: true,
            }),
        )
        if err != nil {
            return nil, err
        }
    }
    return conn, nil
}

性能优化

1. 连接复用

// ✅ 正确:复用连接
conn, _ := grpc.Dial(...)
defer conn.Close()

client := pb.NewUserServiceClient(conn)
for i := 0; i < 1000; i++ {
    client.GetUser(ctx, req)  // 复用同一连接
}

// ❌ 错误:每次创建新连接
for i := 0; i < 1000; i++ {
    conn, _ := grpc.Dial(...)
    client := pb.NewUserServiceClient(conn)
    client.GetUser(ctx, req)
    conn.Close()
}

2. 使用Streaming

// 大量数据传输用Streaming
stream, _ := client.ListUsers(ctx, req)
for {
    user, err := stream.Recv()
    if err == io.EOF {
        break
    }
    process(user)
}

本章小结

关键要点

  • ✅ gRPC基于HTTP/2,性能优于REST
  • ✅ Protobuf序列化效率高
  • ✅ 支持四种RPC模式
  • ✅ 适合微服务内部通信

扩展阅读


💡 思考题

  1. gRPC相比REST的优势是什么?
  2. 什么场景适合用Server Streaming?
  3. 如何实现gRPC的认证?

⏮️ 上一节:DDD | ⏏️ 返回目录