Envoy XDS

前言

该篇文章所展示代码只展示大概思路,部分代码因为涉及到公司内部业务,所以没有展示出来。

代码

  • main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import (
"context"
"flag"
"log"
"net"
"path/filepath"

"google.golang.org/grpc"

"github.com/google/uuid"

"github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

var (
_addr string
_config string

VERSION string
BIDTIME string
)

func run(ctx context.Context) {
log.Printf("version: %s, bidtime: %s", VERSION, BIDTIME)

cfg, err := initConfig(_config)
if err != nil {
log.Fatalf("init config error: %s", err)
}
printConfig(cfg)

grpcServer := newGRPCServer()
envoyCache, envoyServer := newEnvoyServer(ctx)

version, err := uuid.NewRandom()
if err != nil {
log.Fatalf("generate version error: %s", err)
}

snapshot := genSnapshot(cfg, version.String())
err = snapshot.Consistent()
if err != nil {
log.Fatalf("snapshot inconsistent: %s", err)
}

err = envoyCache.SetSnapshot(ctx, cfg.Node, snapshot)
if err != nil {
log.Fatalf("set snapshot error: %s", err)
}

log.Printf("snapshot version: %s", version)

go func() { runServer(_addr, grpcServer, envoyServer) }()

s := <-sig
log.Printf("receive signal: %s", s)

grpcServer.Stop()

log.Printf("bye")

close(cls)
}

func runServer(address string, grpcServer *grpc.Server, envoyServer server.Server) {
ln, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("listen error: %s", err)
}

registerServer(grpcServer, envoyServer)

log.Printf("start grpc server success on %s", address)

err = grpcServer.Serve(ln)
if err != nil {
log.Fatalf("serve error: %s", err)
}
}
  • config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

type Config struct {
Node string `json:"node,omitempty" yaml:"node"`
Include []string `json:"include,omitempty" yaml:"include"`
Cors *iCors `json:"cors,omitempty" yaml:"cors"`
Routes []*iRoute `json:"routes,omitempty" yaml:"routes"`
Clusters []*iCluster `json:"clusters,omitempty" yaml:"clusters"`
TerminalDomain string `json:"terminal_domain" yaml:"terminal_domain"`
}

type iCors struct {
AllowMethods []string `json:"allow_methods,omitempty" yaml:"allow_methods"`
AllowHeaders []string `json:"allow_headers,omitempty" yaml:"allow_headers"`
ExposeHeaders []string `json:"expose_headers,omitempty" yaml:"expose_headers"`
}

type iRoute struct {
Match *iRouteMatch `json:"match,omitempty" yaml:"match"`
Route *iRouteRoute `json:"route,omitempty" yaml:"route"`
RequestHeadersToAdd []*iRouteHeader `json:"request_headers_to_add,omitempty" yaml:"request_headers_to_add"`
}

type iRouteMatch struct {
Prefix string `json:"prefix,omitempty" yaml:"prefix"`
Header *iRouteMatchHeader `json:"header,omitempty" yaml:"header"`
}

type iRouteMatchHeader struct {
Name string `json:"name,omitempty" yaml:"name"`
ExactMatch string `json:"exact_match,omitempty" yaml:"exact_match"`
Re2Match string `json:"re2_match,omitempty" yaml:"re2_match"`
}

type iRouteRoute struct {
Cluster string `json:"cluster,omitempty" yaml:"cluster"`
Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout"`
PrefixRewrite string `json:"prefix_rewrite,omitempty" yaml:"prefix_rewrite"`
HostRewriteLiteral string `json:"host_rewrite_literal,omitempty" yaml:"host_rewrite_literal"`
}

type iRouteHeader struct {
Header *iKeyValue `json:"header,omitempty" yaml:"header"`
}

type iKeyValue struct {
Key string `json:"key,omitempty" yaml:"key"`
Value string `json:"value,omitempty" yaml:"value"`
}

type iCluster struct {
Name string `json:"name,omitempty" yaml:"name"`
Endpoint string `json:"endpoint,omitempty" yaml:"endpoint"`
Http2 bool `json:"http2,omitempty" yaml:"http2"`
Https bool `json:"https,omitempty" yaml:"https"`

Host string `json:"host,omitempty" yaml:"host"`
Port uint32 `json:"port,omitempty" yaml:"port"`
}
  • server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"context"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)

func newGRPCServer() *grpc.Server {
grpcServer := grpc.NewServer(
grpc.MaxConcurrentStreams(1000000),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 30 * time.Second,
Timeout: 3 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 30 * time.Second,
PermitWithoutStream: true,
}),
)
return grpcServer
}

func newEnvoyServer(ctx context.Context) (cache.SnapshotCache, server.Server) {
envoyCache := cache.NewSnapshotCache(false, cache.IDHash{}, &envoyLogger{})
envoyServer := server.NewServer(ctx, envoyCache, nil)
return envoyCache, envoyServer
}

func registerServer(grpcServer *grpc.Server, envoyServer server.Server) {
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, envoyServer)
}
  • resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
"log"
"strconv"
"strings"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"

accesslog "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v3"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"

file "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3"
gzip "github.com/envoyproxy/go-control-plane/envoy/extensions/compression/gzip/compressor/v3"
compressor "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/compressor/v3"
cors "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cors/v3"
lua "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3"
router "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
http "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"

matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
)

func genSnapshot(cfg *Config, version string) *cache.Snapshot {
virtualHosts := genVirtualHosts(cfg)
resources := map[resource.Type][]types.Resource{
resource.ListenerType: {
makeListener(ListenerHTTPPort, false, virtualHosts),
makeListener(ListenerHTTPSPort, true, virtualHosts),
},
resource.ClusterType: genClusters(cfg.Clusters),
}
snap, err := cache.NewSnapshot(version, resources)
if err != nil {
log.Fatalf("failed to create snapshot: %s", err)
}
return snap
}

...

envoy.yaml

1
docker run -d --name envoy -v `pwd`/envoy.yaml:/etc/envoy/envoy.yaml -p 9901:9901 -p 10000:10000 envoyproxy/envoy:v1.23-latest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }

node:
id: arc-os
cluster: arc-os

dynamic_resources:
ads_config:
api_type: GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
cluster_name: ads_cluster
cds_config:
resource_api_version: V3
ads: { }
lds_config:
resource_api_version: V3
ads: { }

static_resources:
clusters:
- name: ads_cluster
connect_timeout: 30s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
http2_protocol_options: { }
load_assignment:
cluster_name: ads_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 172.17.0.1
port_value: 9777

Ref

  • https://github.com/envoyproxy/go-control-plane
  • https://github.com/envoyproxy/go-control-plane/blob/v0.11.0/internal/example/main/main.go