- Unary RPC(一元RPC):一请求一响应
- Server Streaming:一请求多响应
- Client Streaming:多请求一响应
- Bidirectional Streaming:双向流
syntax = "proto3";
package user;
option go_package = "github.com/example/user";
// 用户服务
service UserService {
// 一元RPC
rpc GetUser (GetUserRequest) returns (GetUserResponse);
// Server Streaming
rpc ListUsers (ListUsersRequest) returns (stream User);
// Client Streaming
rpc CreateUsers (stream User) returns (CreateUsersResponse);
// Bidirectional Streaming
rpc Chat (stream Message) returns (stream Message);
}
message GetUserRequest {
int64 user_id = 1;
}
message GetUserResponse {
User user = 1;
}
message User {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message CreateUsersResponse {
int32 count = 1;
}
message Message {
string content = 1;
int64 timestamp = 2;
}package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"log"
"net"
pb "github.com/example/user"
)
type UserServer struct {
pb.UnimplementedUserServiceServer
users map[int64]*pb.User
}
func NewUserServer() *UserServer {
return &UserServer{
users: make(map[int64]*pb.User),
}
}
// Unary RPC
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
user, ok := s.users[req.UserId]
if !ok {
return nil, fmt.Errorf("user not found: %d", req.UserId)
}
return &pb.GetUserResponse{User: user}, nil
}
// Server Streaming
func (s *UserServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
for _, user := range s.users {
if err := stream.Send(user); err != nil {
return err
}
}
return nil
}
// Client Streaming
func (s *UserServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
count := 0
for {
user, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.CreateUsersResponse{
Count: int32(count),
})
}
if err != nil {
return err
}
s.users[user.Id] = user
count++
}
}
// Bidirectional Streaming
func (s *UserServer) Chat(stream pb.UserService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 回显消息
response := &pb.Message{
Content: "Echo: " + msg.Content,
Timestamp: time.Now().Unix(),
}
if err := stream.Send(response); err != nil {
return err
}
}
}
func main() {
listener, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterUserServiceServer(grpcServer, NewUserServer())
log.Println("gRPC server started on :50051")
if err := grpcServer.Serve(listener); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}package main
import (
"context"
"google.golang.org/grpc"
"log"
"time"
pb "github.com/example/user"
)
func main() {
// 连接服务器
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 1. Unary RPC
resp, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: 1})
if err != nil {
log.Printf("GetUser error: %v", err)
} else {
log.Printf("User: %+v", resp.User)
}
// 2. Server Streaming
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{Page: 1, PageSize: 10})
if err != nil {
log.Fatalf("ListUsers error: %v", err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Receive error: %v", err)
}
log.Printf("User: %+v", user)
}
}| 特性 | gRPC | REST |
|---|---|---|
| 协议 | HTTP/2 | HTTP/1.1 |
| 数据格式 | Protobuf (二进制) | JSON (文本) |
| 性能 | 高 | 中 |
| 浏览器支持 | 需要grpc-web | 原生支持 |
| 流式传输 | ✅ 支持 | ❌ 不支持 |
| 代码生成 | 自动 | 手动 |
| 学习曲线 | 陡 | 平缓 |
// Unary拦截器
func UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
// 前置处理
log.Printf("开始处理: %s", info.FullMethod)
// 调用实际处理函数
resp, err := handler(ctx, req)
// 后置处理
log.Printf("处理完成: %s, 耗时: %v", info.FullMethod, time.Since(start))
return resp, err
}
// 注册拦截器
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(UnaryInterceptor),
)ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, req)import "google.golang.org/grpc/status"
import "google.golang.org/grpc/codes"
// 服务端
if user == nil {
return nil, status.Errorf(codes.NotFound, "user %d not found", req.UserId)
}
// 客户端
resp, err := client.GetUser(ctx, req)
if err != nil {
st, ok := status.FromError(err)
if ok {
log.Printf("错误码: %s, 消息: %s", st.Code(), st.Message())
}
}// 复用连接
var conn *grpc.ClientConn
func GetConnection() (*grpc.ClientConn, error) {
if conn == nil {
var err error
conn, err = grpc.Dial("localhost:50051",
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
return nil, err
}
}
return conn, nil
}// ✅ 正确:复用连接
conn, _ := grpc.Dial(...)
defer conn.Close()
client := pb.NewUserServiceClient(conn)
for i := 0; i < 1000; i++ {
client.GetUser(ctx, req) // 复用同一连接
}
// ❌ 错误:每次创建新连接
for i := 0; i < 1000; i++ {
conn, _ := grpc.Dial(...)
client := pb.NewUserServiceClient(conn)
client.GetUser(ctx, req)
conn.Close()
}// 大量数据传输用Streaming
stream, _ := client.ListUsers(ctx, req)
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
process(user)
}关键要点:
- ✅ gRPC基于HTTP/2,性能优于REST
- ✅ Protobuf序列化效率高
- ✅ 支持四种RPC模式
- ✅ 适合微服务内部通信
💡 思考题:
- gRPC相比REST的优势是什么?
- 什么场景适合用Server Streaming?
- 如何实现gRPC的认证?