Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions pilot/pkg/model/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,12 @@ type Proxy struct {
// The merged gateways associated with the proxy if this is a Router
MergedGateway *MergedGateway

// service instances associated with the proxy
ServiceInstances []*ServiceInstance
// ServiceTargets contains a list of all Services associated with the proxy, contextualized for this particular proxy.
// These are unique to this proxy, as the port information is specific to it - while a ServicePort is shared with the
// service, the target port may be distinct per-endpoint. So this maintains a view specific to this proxy.
// ServiceTargets will maintain a list entry for each Service-port, so if we have 2 services each with 3 ports, we
// would have 6 entries.
ServiceTargets []ServiceTarget

// Istio version associated with the Proxy
IstioVersion *IstioVersion
Expand Down Expand Up @@ -918,16 +922,16 @@ func (node *Proxy) SetSidecarScope(ps *PushContext) {
// proxy and caches the merged object in the proxy Node. This is a convenience hack so that
// callers can simply call push.MergedGateways(node) instead of having to
// fetch all the gateways and invoke the merge call in multiple places (lds/rds).
// Must be called after ServiceInstances are set
// Must be called after ServiceTargets are set
func (node *Proxy) SetGatewaysForProxy(ps *PushContext) {
if node.Type != Router {
return
}
node.MergedGateway = ps.mergeGateways(node)
}

func (node *Proxy) SetServiceInstances(serviceDiscovery ServiceDiscovery) {
instances := serviceDiscovery.GetProxyServiceInstances(node)
func (node *Proxy) SetServiceTargets(serviceDiscovery ServiceDiscovery) {
instances := serviceDiscovery.GetProxyServiceTargets(node)

// Keep service instances in order of creation/hostname.
sort.SliceStable(instances, func(i, j int) bool {
Expand All @@ -941,7 +945,7 @@ func (node *Proxy) SetServiceInstances(serviceDiscovery ServiceDiscovery) {
return true
})

node.ServiceInstances = instances
node.ServiceTargets = instances
}

// SetWorkloadLabels will set the node.Labels.
Expand Down
14 changes: 7 additions & 7 deletions pilot/pkg/model/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func Test_parseIstioVersion(t *testing.T) {

func TestSetServiceInstances(t *testing.T) {
tnow := time.Now()
instances := []*model.ServiceInstance{
instances := []model.ServiceTarget{
{
Service: &model.Service{
CreationTime: tnow.Add(1 * time.Second),
Expand All @@ -587,19 +587,19 @@ func TestSetServiceInstances(t *testing.T) {
}

serviceDiscovery := memory.NewServiceDiscovery()
serviceDiscovery.WantGetProxyServiceInstances = instances
serviceDiscovery.WantGetProxyServiceTargets = instances

env := &model.Environment{
ServiceDiscovery: serviceDiscovery,
}

proxy := &model.Proxy{}
proxy.SetServiceInstances(env)
proxy.SetServiceTargets(env)

assert.Equal(t, len(proxy.ServiceInstances), 3)
assert.Equal(t, proxy.ServiceInstances[0].Service.Hostname, "test2.com")
assert.Equal(t, proxy.ServiceInstances[1].Service.Hostname, "test3.com")
assert.Equal(t, proxy.ServiceInstances[2].Service.Hostname, "test1.com")
assert.Equal(t, len(proxy.ServiceTargets), 3)
assert.Equal(t, proxy.ServiceTargets[0].Service.Hostname, "test2.com")
assert.Equal(t, proxy.ServiceTargets[1].Service.Hostname, "test3.com")
assert.Equal(t, proxy.ServiceTargets[2].Service.Hostname, "test1.com")
}

func TestGlobalUnicastIP(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pilot/pkg/model/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,9 @@ func MergeGateways(gateways []gatewayWithInstances, proxy *Proxy, ps *PushContex
}
}

func udpSupportedPort(number uint32, instances []*ServiceInstance) bool {
func udpSupportedPort(number uint32, instances []ServiceTarget) bool {
for _, w := range instances {
if int(number) == w.ServicePort.Port && w.ServicePort.Protocol == protocol.UDP {
if int(number) == w.Port.Port && w.Port.Protocol == protocol.UDP {
return true
}
}
Expand All @@ -362,7 +362,7 @@ func udpSupportedPort(number uint32, instances []*ServiceInstance) bool {
// When legacyGatewaySelector=true things are a bit more complex, as we support referencing a Service
// port and translating to the targetPort in addition to just directly referencing a port. In this
// case, we just make a best effort guess by picking the first match.
func resolvePorts(number uint32, instances []*ServiceInstance, legacyGatewaySelector bool) []uint32 {
func resolvePorts(number uint32, instances []ServiceTarget, legacyGatewaySelector bool) []uint32 {
ports := sets.New[uint32]()
for _, w := range instances {
if _, disablePortTranslation := w.Service.Attributes.Labels[DisableGatewayPortTranslationLabel]; disablePortTranslation && legacyGatewaySelector {
Expand All @@ -371,15 +371,15 @@ func resolvePorts(number uint32, instances []*ServiceInstance, legacyGatewaySele
// referencing the Service port, and references are un-ambiguous.
continue
}
if w.ServicePort.Port == int(number) && w.Endpoint != nil {
if w.Port.Port == int(number) {
if legacyGatewaySelector {
// When we are using legacy gateway label selection, we only resolve to a single port
// This has pros and cons; we don't allow merging of routes when it would be desirable, but
// we also avoid accidentally merging routes when we didn't intend to. While neither option is great,
// picking the first one here preserves backwards compatibility.
return []uint32{w.Endpoint.EndpointPort}
return []uint32{w.Port.TargetPort}
}
ports.Insert(w.Endpoint.EndpointPort)
ports.Insert(w.Port.TargetPort)
}
}
ret := ports.UnsortedList()
Expand Down
14 changes: 7 additions & 7 deletions pilot/pkg/model/push_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,14 +601,14 @@ var (
// This can be normal - for workloads that act only as client, or are not covered by a Service.
// It can also be an error, for example in cases the Endpoint list of a service was not updated by the time
// the sidecar calls.
// Updated by GetProxyServiceInstances
// Updated by GetProxyServiceTargets
ProxyStatusNoService = monitoring.NewGauge(
"pilot_no_ip",
"Pods not found in the endpoint table, possibly invalid.",
)

// ProxyStatusEndpointNotReady represents proxies found not be ready.
// Updated by GetProxyServiceInstances. Normal condition when starting
// Updated by GetProxyServiceTargets. Normal condition when starting
// an app with readiness, error if it doesn't change to 0.
ProxyStatusEndpointNotReady = monitoring.NewGauge(
"pilot_endpoint_not_ready",
Expand Down Expand Up @@ -1997,7 +1997,7 @@ type gatewayWithInstances struct {
// If true, ports that are not present in any instance will be used directly (without targetPort translation)
// This supports the legacy behavior of selecting gateways by pod label selector
legacyGatewaySelector bool
instances []*ServiceInstance
instances []ServiceTarget
}

func (ps *PushContext) mergeGateways(proxy *Proxy) *MergedGateway {
Expand All @@ -2019,8 +2019,8 @@ func (ps *PushContext) mergeGateways(proxy *Proxy) *MergedGateway {
if gwsvcstr, f := cfg.Annotations[InternalGatewayServiceAnnotation]; f {
gwsvcs := strings.Split(gwsvcstr, ",")
known := sets.New[string](gwsvcs...)
matchingInstances := make([]*ServiceInstance, 0, len(proxy.ServiceInstances))
for _, si := range proxy.ServiceInstances {
matchingInstances := make([]ServiceTarget, 0, len(proxy.ServiceTargets))
for _, si := range proxy.ServiceTargets {
if _, f := known[string(si.Service.Hostname)]; f && si.Service.Attributes.Namespace == cfg.Namespace {
matchingInstances = append(matchingInstances, si)
}
Expand All @@ -2031,11 +2031,11 @@ func (ps *PushContext) mergeGateways(proxy *Proxy) *MergedGateway {
}
} else if gw.GetSelector() == nil {
// no selector. Applies to all workloads asking for the gateway
gatewayInstances = append(gatewayInstances, gatewayWithInstances{cfg, true, proxy.ServiceInstances})
gatewayInstances = append(gatewayInstances, gatewayWithInstances{cfg, true, proxy.ServiceTargets})
} else {
gatewaySelector := labels.Instance(gw.GetSelector())
if gatewaySelector.SubsetOf(proxy.Labels) {
gatewayInstances = append(gatewayInstances, gatewayWithInstances{cfg, true, proxy.ServiceInstances})
gatewayInstances = append(gatewayInstances, gatewayWithInstances{cfg, true, proxy.ServiceTargets})
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/model/push_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2941,7 +2941,7 @@ func (l *localServiceDiscovery) InstancesByPort(*Service, int) []*ServiceInstanc
return l.serviceInstances
}

func (l *localServiceDiscovery) GetProxyServiceInstances(*Proxy) []*ServiceInstance {
func (l *localServiceDiscovery) GetProxyServiceTargets(*Proxy) []ServiceTarget {
panic("implement me")
}

Expand Down
33 changes: 31 additions & 2 deletions pilot/pkg/model/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,35 @@ type ServiceInstance struct {
Endpoint *IstioEndpoint `json:"endpoint,omitempty"`
}

// ServiceTarget includes a Service object, along with a specific service port
// and target port. This is basically a smaller version of ServiceInstance,
// intended to avoid the need to have the full object when only port information
// is needed.
type ServiceTarget struct {
Service *Service
Port ServiceInstancePort
}

type (
ServicePort = *Port
// ServiceInstancePort defines a port that has both a port and targetPort (which distinguishes it from model.Port)
// Note: ServiceInstancePort only makes sense in the context of a specific ServiceInstance, because TargetPort depends on a specific instance.
ServiceInstancePort struct {
ServicePort
TargetPort uint32
}
)

func ServiceInstanceToTarget(e *ServiceInstance) ServiceTarget {
return ServiceTarget{
Service: e.Service,
Port: ServiceInstancePort{
ServicePort: e.ServicePort,
TargetPort: e.Endpoint.EndpointPort,
},
}
}

// DeepCopy creates a copy of ServiceInstance.
func (instance *ServiceInstance) DeepCopy() *ServiceInstance {
return &ServiceInstance{
Expand Down Expand Up @@ -776,7 +805,7 @@ type ServiceDiscovery interface {
// Consult istio-dev before using this for anything else (except debugging/tools)
InstancesByPort(svc *Service, servicePort int) []*ServiceInstance

// GetProxyServiceInstances returns the service instances that co-located with a given Proxy
// GetProxyServiceTargets returns the service instances that co-located with a given Proxy
//
// Co-located generally means running in the same network namespace and security context.
//
Expand All @@ -793,7 +822,7 @@ type ServiceDiscovery interface {
// though with a different ServicePort and IstioEndpoint for each. If any of these overlapping
// services are not HTTP or H2-based, behavior is undefined, since the listener may not be able to
// determine the intended destination of a connection without a Host header on the request.
GetProxyServiceInstances(*Proxy) []*ServiceInstance
GetProxyServiceTargets(*Proxy) []ServiceTarget
GetProxyWorkloadLabels(*Proxy) labels.Instance

// MCSServices returns information about the services that have been exported/imported via the
Expand Down
45 changes: 18 additions & 27 deletions pilot/pkg/networking/core/v1alpha3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (configgen *ConfigGeneratorImpl) buildClusters(proxy *model.Proxy, req *mod
resources := model.Resources{}
envoyFilterPatches := req.Push.EnvoyFilters(proxy)
cb := NewClusterBuilder(proxy, req, configgen.Cache)
instances := proxy.ServiceInstances
instances := proxy.ServiceTargets
cacheStats := cacheStats{}
switch proxy.Type {
case model.SidecarProxy:
Expand Down Expand Up @@ -455,12 +455,12 @@ func buildInboundLocalityLbEndpoints(bind string, port uint32) []*endpoint.Local
}

func buildInboundClustersFromServiceInstances(cb *ClusterBuilder, proxy *model.Proxy,
instances []*model.ServiceInstance, cp clusterPatcher,
instances []model.ServiceTarget, cp clusterPatcher,
enableSidecarServiceInboundListenerMerge bool,
) []*cluster.Cluster {
clusters := make([]*cluster.Cluster, 0)
_, actualLocalHosts := getWildcardsAndLocalHost(proxy.GetIPMode())
clustersToBuild := make(map[int][]*model.ServiceInstance)
clustersToBuild := make(map[int][]model.ServiceTarget)

ingressPortListSet := sets.New[int]()
sidecarScope := proxy.SidecarScope
Expand All @@ -472,7 +472,7 @@ func buildInboundClustersFromServiceInstances(cb *ClusterBuilder, proxy *model.P
// we still need to capture all the instances on this port, as its required to populate telemetry metadata
// The first instance will be used as the "primary" instance; this means if we have an conflicts between
// Services the first one wins
port := int(instance.Endpoint.EndpointPort)
port := int(instance.Port.TargetPort)
clustersToBuild[port] = append(clustersToBuild[port], instance)
}

Expand All @@ -482,23 +482,14 @@ func buildInboundClustersFromServiceInstances(cb *ClusterBuilder, proxy *model.P
}
// For each workload port, we will construct a cluster
for epPort, instances := range clustersToBuild {
if ingressPortListSet.Contains(int(instances[0].Endpoint.EndpointPort)) {
if ingressPortListSet.Contains(int(instances[0].Port.TargetPort)) {
// here if port is declared in service and sidecar ingress both, we continue to take the one on sidecar + other service ports
// e.g. 1,2, 3 in service and 3,4 in sidecar ingress,
// this will still generate listeners for 1,2,3,4 where 3 is picked from sidecar ingress
// port present in sidecarIngress listener so let sidecar take precedence
continue
}
services := slices.Map(instances, func(e *model.ServiceInstance) ServiceTarget {
return ServiceTarget{
Service: e.Service,
Port: ServiceInstancePort{
ServicePort: e.ServicePort,
TargetPort: e.Endpoint.EndpointPort,
},
}
})
localCluster := cb.buildInboundCluster(epPort, bind, proxy, services[0], services)
localCluster := cb.buildInboundCluster(epPort, bind, proxy, instances[0], instances)
// If inbound cluster match has service, we should see if it matches with any host name across all instances.
hosts := make([]host.Name, 0, len(instances))
for _, si := range instances {
Expand All @@ -509,7 +500,7 @@ func buildInboundClustersFromServiceInstances(cb *ClusterBuilder, proxy *model.P
return clusters
}

func (configgen *ConfigGeneratorImpl) buildInboundClusters(cb *ClusterBuilder, proxy *model.Proxy, instances []*model.ServiceInstance,
func (configgen *ConfigGeneratorImpl) buildInboundClusters(cb *ClusterBuilder, proxy *model.Proxy, instances []model.ServiceTarget,
cp clusterPatcher,
) []*cluster.Cluster {
clusters := make([]*cluster.Cluster, 0)
Expand Down Expand Up @@ -542,7 +533,7 @@ func (configgen *ConfigGeneratorImpl) buildInboundClusters(cb *ClusterBuilder, p
}

func buildInboundClustersFromSidecar(cb *ClusterBuilder, proxy *model.Proxy,
instances []*model.ServiceInstance, cp clusterPatcher,
instances []model.ServiceTarget, cp clusterPatcher,
) []*cluster.Cluster {
clusters := make([]*cluster.Cluster, 0)
_, actualLocalHosts := getWildcardsAndLocalHost(proxy.GetIPMode())
Expand Down Expand Up @@ -606,9 +597,9 @@ func buildInboundClustersFromSidecar(cb *ClusterBuilder, proxy *model.Proxy,
// for a service instance that matches this ingress port as this will allow us
// to generate the right cluster name that LDS expects inbound|portNumber|portName|Hostname
svc := findOrCreateService(instances, ingressListener, sidecarScope.Name, sidecarScope.Namespace)
endpoint := ServiceTarget{
endpoint := model.ServiceTarget{
Service: svc,
Port: ServiceInstancePort{
Port: model.ServiceInstancePort{
ServicePort: listenPort,
TargetPort: uint32(port),
},
Expand All @@ -619,11 +610,11 @@ func buildInboundClustersFromSidecar(cb *ClusterBuilder, proxy *model.Proxy,
return clusters
}

func findOrCreateService(instances []*model.ServiceInstance,
func findOrCreateService(instances []model.ServiceTarget,
ingressListener *networking.IstioIngressListener, sidecar string, sidecarns string,
) *model.Service {
for _, realInstance := range instances {
if realInstance.Endpoint.EndpointPort == ingressListener.Port.Number {
if realInstance.Port.TargetPort == ingressListener.Port.Number {
return realInstance.Service
}
}
Expand Down Expand Up @@ -672,12 +663,12 @@ const (
)

type buildClusterOpts struct {
mesh *meshconfig.MeshConfig
mutable *clusterWrapper
policy *networking.TrafficPolicy
port *model.Port
serviceAccounts []string
serviceInstances []*model.ServiceInstance
mesh *meshconfig.MeshConfig
mutable *clusterWrapper
policy *networking.TrafficPolicy
port *model.Port
serviceAccounts []string
serviceTargets []model.ServiceTarget
// Used for traffic across multiple network clusters
// the east-west gateway in a remote cluster will use this value to route
// traffic to the appropriate service
Expand Down
Loading