一、应用场景
api层通过metaData方式向rpc传公共参数,如token、访问来源ip、地址接口地址等等,这些信息传到rpc后,存日志等操作。
context.Context说明:
gRPC中的context.Context 也符合Go语言的使用习惯:通常情况下我们在函数首个参数放置context.Context用来传递一次RPC中有关的上下文,借助context.WithValue()或ctx.Value()往context添加变量或读取变量
数据流:
metadata就是gRPC中可以传递的上下文信息之一,所以metadata的使用方式就是:metadata记录到context,从context读取metadata
二、api层
2.1 封装一个全局的上下文 RootCtx
作用:用来存储上下文件需要传递的数据
common/utils/helper.go
package utils
import (
"bytes"
"context"
)
type KeyValueContext struct {
context.Context
values map[string]interface{}
mutex *sync.Mutex // 用于保护values的互斥锁
}
func NewKeyValueContext(ctx context.Context) *KeyValueContext {
return &KeyValueContext{
Context: ctx,
values: make(map[string]interface{}),
mutex: &sync.Mutex{}, // 初始化互斥锁
}
}
func (kvc *KeyValueContext) Set(key string, value interface{}) {
kvc.mutex.Lock() // 加锁
defer kvc.mutex.Unlock() // 确保在函数结束时解锁
kvc.values[key] = value // 设置值
}
func (kvc *KeyValueContext) Get(key string) (interface{}, bool) {
kvc.mutex.Lock() // 加锁
defer kvc.mutex.Unlock() // 确保在函数结束时解锁
value, ok := kvc.values[key] // 获取值
return value, ok
}
2.2 编写中间件headerMiddleware.go
作用:请求头中需要的数据,存入上下文件中,示如:token
common/middleware/headerMiddleware.go
package middleware
import (
"context"
"net/http"
"ytss_go_zero/common/consts"
"ytss_go_zero/common/utils"
)
// HeaderMiddleware 请求处理header头信息中间件
type HeaderMiddleware struct {
rootCtx *utils.KeyValueContext
}
// NewHeaderMiddleware ...
func NewHeaderMiddleware(rootCtx *utils.KeyValueContext) *HeaderMiddleware {
return &HeaderMiddleware{
rootCtx: rootCtx,
}
}
// Handle 请求处理
func (m *HeaderMiddleware) Handle(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
//fmt.Println("HeaderMiddleware start")
// 上下文设置更多键值对 header信息
setContextWithHeader(&(m.rootCtx.Context), r)
// 处理请求
//next(w, r)
next(w, r.WithContext(m.rootCtx.Context))
//fmt.Println("HeaderMiddleware end")
}
}
// 设置上下文 header信息
func setContextWithHeader(ctx *context.Context, r *http.Request) {
// 上下文设置更多键值对
*ctx = context.WithValue(*ctx, consts.Authorization, r.Header.Get(consts.Authorization))
*ctx = context.WithValue(*ctx, consts.Platform, r.Header.Get(consts.Platform))
*ctx = context.WithValue(*ctx, consts.ClientVersion, r.Header.Get(consts.ClientVersion))
*ctx = context.WithValue(*ctx, consts.ClientIp, utils.GetClientIP(r))
*ctx = context.WithValue(*ctx, consts.Os, r.Header.Get(consts.Os))
*ctx = context.WithValue(*ctx, consts.UserAgent, r.Header.Get(consts.UserAgent))
*ctx = context.WithValue(*ctx, consts.Method, r.Method)
*ctx = context.WithValue(*ctx, consts.ContentType, r.Header.Get(consts.ContentType))
*ctx = context.WithValue(*ctx, consts.RequestURI, r.RequestURI)
}
2.3 api的main.go中使用全局中间件调用上面的中间件
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"ytss_go_zero/app/api/admin/internal/config"
"ytss_go_zero/app/api/admin/internal/handler"
"ytss_go_zero/app/api/admin/internal/svc"
"ytss_go_zero/common/middleware"
"ytss_go_zero/common/utils"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/rest"
"github.com/zeromicro/go-zero/rest/httpx"
)
var configFile = flag.String("f", "etc/admin-api-dev.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
server := rest.MustNewServer(
c.RestConf,
rest.WithCors(c.Cors.Domains...), // 跨域 允许跨域的域名
// rest.WithCorsHeaders("Access-Control-Allow-Headers", "OS"),
rest.WithCustomCors(func(header http.Header) {
// 允许跨域的请求头 key不区分大小写
header.Add("Access-Control-Allow-Headers", c.Cors.AccessControlAllowHeaders)
header.Set("Access-Control-Allow-Methods", c.Cors.AccessControlAllowMethods)
header.Set("Access-Control-Expose-Headers", "Content-Length, Content-Type")
}, nil, "*"),
// 在 `/static/public` 目录下有需要对外提供的文件目录,比如有个文件 `1.txt`,
// 访问地: http://127.0.0.1:8888/public/1.txt 即可访问/static/public/1.txt文件
rest.WithFileServer("/public", http.Dir("./static/public")),
)
logx.MustSetup(c.Log) // 设置日志配置
logx.AddWriter(logx.NewWriter(os.Stdout)) // 添加控制台输出 文件和控制台同时输出
defer server.Stop()
// 创建一个根Context
rootCtx := utils.NewKeyValueContext(context.Background())
//c.CompanyRpcConf.Token = "123123"
ctx := svc.NewServiceContext(c, rootCtx)
handler.RegisterHandlers(server, ctx)
// print routes
server.PrintRoutes()
// 设置成功处理代码
httpx.SetOkHandler(utils.HttpOkHandler)
// 设置错误处理代码
httpx.SetErrorHandlerCtx(func(ctx context.Context, err error) (int, any) {
//http状态码, 返回内容
return http.StatusOK, utils.WrapErrorRes(err)
})
// 全局中间件 设置header头信息存入上下文 主要看这里
server.Use(middleware.NewHeaderMiddleware(rootCtx).Handle)
fmt.Printf("Starting admin-api server at %s:%d...\n", c.Host, c.Port)
server.Start()
}
2.4 封装metadata使用的函数
作用:
- 组装api调用rpc之前需要传递的metadata信息
- rpc在上下文件中获取api传递过来的metadata信息
common/token/erp_token_user.go
package token
import (
"context"
"encoding/json"
"errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"strconv"
"ytss_go_zero/common/consts"
"ytss_go_zero/common/errorx"
"ytss_go_zero/common/utils"
"github.com/zeromicro/go-zero/core/logx"
"go.uber.org/zap"
)
// HeaderStreamInterceptor api请求rpc之前,设置metaData要传过去的信息
func HeaderStreamInterceptor(ctx context.Context, methed string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
//fmt.Println("发送rpc请求之前")
//authorization := ctx.Value(consts.Authorization)
//fmt.Println("admin HeaderStreamInterceptor Authorization--->", authorization)
//
//platform := ctx.Value(consts.Platform)
//fmt.Println("admin HeaderStreamInterceptor platform--->", platform)
//
//clientVersion := ctx.Value(consts.ClientVersion)
//fmt.Println("admin HeaderStreamInterceptor clientVersion--->", clientVersion)
//
//clientIp := ctx.Value(consts.ClientIp)
//fmt.Println("admin HeaderStreamInterceptor clientIp--->", clientIp)
//
//os := ctx.Value(consts.Os)
//fmt.Println("admin HeaderStreamInterceptor os--->", os)
//
//
md := metadata.New(map[string]string{
consts.Authorization: ctx.Value(consts.Authorization).(string),
consts.Platform: ctx.Value(consts.Platform).(string),
consts.ClientVersion: ctx.Value(consts.ClientVersion).(string),
consts.ClientIp: ctx.Value(consts.ClientIp).(string),
consts.Os: ctx.Value(consts.Os).(string),
consts.UserAgent: ctx.Value(consts.UserAgent).(string),
consts.Method: ctx.Value(consts.Method).(string),
consts.RequestURI: ctx.Value(consts.RequestURI).(string),
consts.ContentType: ctx.Value(consts.ContentType).(string),
})
ctx = metadata.NewOutgoingContext(ctx, md)
err := invoker(ctx, methed, req, reply, cc, opts...)
if err != nil {
return err
}
//fmt.Println("发送rpc请求之后")
//请求之后
return nil
}
type MetaData struct {
Authorization string `json:"authorization"`
Platform string `json:"PLATFORM"` // App(手机端)/ Pad (PAD端) / Web(WEB端)
ClientVersion string `json:"version"` // 客户端版本:3.3.3
ClientIp string `json:"clientIp"` // 客户端版本:3.3.3
Os string `json:"Os"` // Android / Ios
UserAgent string `json:"userAgent"` // 客户端版本:3.3.3
Method string `json:"method"` // 客户端版本:3.3.3
ContextType string `json:"contextType"` //客户端版本 传参方式
RequestURI string `json:"requestURI"` // 客户端版本:3.3.3
}
// rpc从metadata信息中获取api传传过来的数据,返回接收到的metadata信息
func NewMetaDataWithContext(ctx context.Context) *MetaData {
var m MetaData
if _, ok := metadata.FromIncomingContext(ctx); ok {
m.Authorization = GetMetaDataWithKey(ctx, consts.Authorization)
m.Platform = GetMetaDataWithKey(ctx, consts.Platform)
m.ClientVersion = GetMetaDataWithKey(ctx, consts.ClientVersion)
m.ClientIp = GetMetaDataWithKey(ctx, consts.ClientIp)
m.Os = GetMetaDataWithKey(ctx, consts.Os)
m.UserAgent = GetMetaDataWithKey(ctx, consts.UserAgent)
m.Method = GetMetaDataWithKey(ctx, consts.Method)
m.ContextType = GetMetaDataWithKey(ctx, consts.ContentType)
m.RequestURI = GetMetaDataWithKey(ctx, consts.RequestURI)
}
return &m
}
// GetMetaData rpc中获取上下文件中的metadata信息
func GetMetaDataWithContext(ctx context.Context) (metaData *MetaData) {
// 获取metadata信息
return ctx.Value(consts.MetaData).(*MetaData)
}
// GetUserTokenWithContext rpc中获取用户信息 从token中获取用户信息,并解析为结构造体
func GetUserTokenWithContext(ctx context.Context) (userToken TokenUser, err error) {
// 获取metadata信息
metaData := GetMetaDataWithContext(ctx)
// step1 解析token信息
return AnalyzeToken(metaData.Authorization)
}
2.5 调用rpc时传入metadata参数
app/api/admin/internal/svc/service_context.go
package svc
import (
"ytss_go_zero/app/api/admin/internal/config"
"ytss_go_zero/app/api/admin/internal/middleware"
"ytss_go_zero/app/rpc/company/client/testservice"
"ytss_go_zero/common/token"
"ytss_go_zero/common/utils"
"ytss_go_zero/models"
"github.com/jinzhu/copier"
"github.com/redis/go-redis/v9"
"github.com/zeromicro/go-zero/rest"
"github.com/zeromicro/go-zero/zrpc"
"gorm.io/gorm"
)
type ServiceContext struct {
Config config.Config
RootCtx *utils.KeyValueContext
Mysql *gorm.DB
RDB *redis.Client
TokenAuth rest.Middleware
FaAreaService faareaservice.FaAreaService
}
func NewServiceContext(c config.Config, rootCtx *utils.KeyValueContext) *ServiceContext {
// 初始化业务redis驱动
var rdbConf redis.Options
copier.Copy(&rdbConf, &c.RedisDB)
return &ServiceContext{
Config: c,
TokenAuth: middleware.NewTokenAuthMiddleware(c, rootCtx).Handle,
RootCtx: rootCtx,
RDB: models.InitRedis(rdbConf),
// 测试服务 主要看这里 zrpc.WithUnaryClientInterceptor(token.HeaderStreamInterceptor)
CoreRpcTestService: testservice.NewTestService(zrpc.MustNewClient(c.CompanyRpcConf, zrpc.WithUnaryClientInterceptor(token.HeaderStreamInterceptor))),
}
}
三、rpc层
这里以order rpc为例说明
3.1 main.go主函数中,全局中间获取metadata信息
在orderrpc.go主函数中,全局中间件中获取metadata信息,并存入上下文件中
package main
import (
"flag"
"fmt"
"os"
"ytss_go_zero/app/rpc/order/internal/config"
fadeliveryorderserviceServer "ytss_go_zero/app/rpc/order/internal/server/fadeliveryorderservice"
"ytss_go_zero/app/rpc/order/internal/svc"
"ytss_go_zero/app/rpc/order/pb"
"ytss_go_zero/common/interceptor/rpcserver"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var configFile = flag.String("f", "etc/orderrpc.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
logx.MustSetup(c.Log) // 设置日志配置
logx.AddWriter(logx.NewWriter(os.Stdout)) // 添加控制台输出 文件和控制台同时输出
ctx := svc.NewServiceContext(c)
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
pb.RegisterFaDeliveryOrderServiceServer(grpcServer, fadeliveryorderserviceServer.NewFaDeliveryOrderServiceServer(ctx))
if c.Mode == service.DevMode || c.Mode == service.TestMode {
reflection.Register(grpcServer)
}
})
//rpc log
s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
//获取metaData信息 主要看这里
s.AddUnaryInterceptors(rpcserver.HeaderInterceptor)
defer s.Stop()
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start()
}
3.2 编写中间件 headerInterceptor.go
作用:
从上下文件中获取api传过来的matadata信息,并存入上下文件中
common/interceptor/rpcserver/headerInterceptor.go
package rpcserver
import (
"context"
"ytss_go_zero/common/consts"
"ytss_go_zero/common/token"
"google.golang.org/grpc"
)
/**
* @Description rpc service header interceptor
* @Author haima
* @Date 2024/11/24 13:35
* @Version 1.0
**/
func HeaderInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
//fmt.Println("HeaderInterceptor start")
metaData := token.NewMetaDataWithContext(ctx)
ctx = context.WithValue(ctx, consts.MetaData, metaData)
resp, err = handler(ctx, req)
//fmt.Println("HeaderInterceptor end")
return resp, err
}
3.3 在逻辑中获取metadata信息
app/rpc/company/internal/logic/testservice/pong_logic.go
package testservicelogic
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"ytss_go_zero/app/rpc/company/internal/svc"
"ytss_go_zero/app/rpc/company/pb"
"ytss_go_zero/common/token"
"ytss_go_zero/common/utils"
)
type PongLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewPongLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PongLogic {
return &PongLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *PongLogic) Pong(in *pb.PongReq) (*pb.PongResp, error) {
var err error
metaData := token.GetMetaDataWithContext(l.ctx) # 主要看这里
fmt.Println("value123:", metaData.UserAgent)
if in.Id == 0 {
err = errors.New("req.id is required")
if err != nil {
l.Logger.Errorf("id不能为空: req:%+v,", in)
return nil, errors.New("id不能为空")
}
return nil, err
}
dataBytes, _ := json.Marshal(data)
return &pb.PongResp{
Msg: fmt.Sprintf("data:%v", string(dataBytes)),
}, nil
}
到此完成api向rpc传递公共数据。感谢阅读到这里的同学,如果错误,请留言,看到后我看更正。
编辑者:海马
源创文章,转载请注明来源
https://www.cnblogs.com/haima/p/18566176
参考文档
写给go开发者的gRPC教程-metadata
https://juejin.cn/post/7202409558592782373
go-zero实现metadata从当前RPC服务传递到下游RPC服务上下文context
https://blog.csdn.net/qq_40726812/article/details/117525474
最后编辑:海马 更新时间:2024-12-22 19:32