Fixed esuf to make it actually work against bosh lite testing.. probably isn't totally stable yet though

This commit is contained in:
Geoff Franks 2016-02-17 17:29:48 -05:00
parent 1cb3932382
commit 5332e7a352
2 changed files with 149 additions and 63 deletions

View File

@ -6,10 +6,14 @@ it to a `green` state.
It iterates over all indices, finding any UNASSIGNED shards, and assigns them to an appropriate
data node, based on the assignments of other shards in the index. It will not assign a shard to
a data node that already has a shard. It will not assign replica shards until the primary
shard is online. It will do its best to balance the primary shard relocations evenly across
data nodes.
a data node that already has a shard. It will not assign replica shards until the corresponding
primary shard is online.
**NOTE:** This takes a heavy-handed approach to bringing things online. If a primary shard is
stuck in an UNASSGINED state, `esuf` will pass the `allow_primary: true` data to ElasticSearch,
telling it to bring the primary online, even if it does not have the data for it. This
may result in data loss to that shard of the index, but will get the cluster into a working
state again.
### Installation

202
main.go
View File

@ -1,10 +1,12 @@
package main
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/voxelbrain/goptions"
"io"
"io/ioutil"
"net/http"
"net/http/httputil"
@ -14,12 +16,13 @@ import (
)
var debug bool
var trace bool
var SkipSSLValidation bool
type Index struct {
PrimaryShards []Shard
ReplicaShards []Shard
Name string
PrimaryShard Shard
}
type Node struct {
@ -28,7 +31,7 @@ type Node struct {
}
type Shard struct {
Index string
Index int
Node string
Relocating string
Status string
@ -46,6 +49,12 @@ func DEBUG(format string, args ...interface{}) {
}
}
func TRACE(format string, args ...interface{}) {
if trace {
DEBUG(format, args...)
}
}
func ERROR(format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, "%s\n", fmt.Sprintf(format, args...))
}
@ -66,6 +75,8 @@ func trueString(str string) bool {
func main() {
var options struct {
Debug bool `goptions:"-D, --debug, description='Enable debugging'"`
Trace bool `goptions:"--trace, description='Enable API call tracing'"`
Fix bool `goptions:"-f, --fix, description='Fix any issues detected'"`
Host string `goptions:"-H, --elasticsearch_url, description='ElasticSearch URL. Defaults to http://localhost:9200'"`
SkipSSLVerify bool `goptions:"-k, --skip-ssl-validation, description='Disable SSL certificate checking'"`
Help bool `goptions:"-h, --help"`
@ -84,6 +95,15 @@ func main() {
debug = true
}
if options.Trace {
trace = true
debug = true
}
if trueString(os.Getenv("TRACE")) {
trace = true
debug = true
}
if options.SkipSSLVerify {
SkipSSLValidation = true
}
@ -104,66 +124,105 @@ func main() {
FATAL(err.Error())
}
TRACE("Indices: %v\nNodes: %v", indices, nodes)
for _, index := range indices {
DEBUG("Examining index %s", index.Name)
var availNodeMap map[string]Node
for _, node := range nodes {
availNodeMap[node.Id] = node
fmt.Printf("Checking index %s\n", index.Name)
availNodeMaps := make([]map[string]Node, len(index.PrimaryShards))
for i, _ := range availNodeMaps {
availNodeMaps[i] = map[string]Node{}
for _, node := range nodes {
availNodeMaps[i][node.Id] = node
}
}
for _, shard := range index.PrimaryShards {
delete(availNodeMaps[shard.Index], shard.Node)
delete(availNodeMaps[shard.Index], shard.Relocating)
}
delete(availNodeMap, index.PrimaryShard.Node)
delete(availNodeMap, index.PrimaryShard.Relocating)
for _, shard := range index.ReplicaShards {
delete(availNodeMap, shard.Node)
delete(availNodeMap, shard.Relocating)
delete(availNodeMaps[shard.Index], shard.Node)
delete(availNodeMaps[shard.Index], shard.Relocating)
}
var availNodes []Node
for _, n := range availNodeMap {
availNodes = append(availNodes, n)
availNodesList := make([][]Node, len(index.PrimaryShards))
for i, _ := range availNodesList {
var availNodes []Node
for _, n := range availNodeMaps[i] {
availNodes = append(availNodes, n)
}
availNodesList[i] = availNodes
}
DEBUG("Nodes available to host shards: %v", availNodes)
if index.PrimaryShard.Status == "UNASSIGNED" {
var node Node
node, availNodes = availNodes[0], availNodes[1:]
DEBUG("Primary Shard to be assigned to %s", node.Name)
for _, shard := range index.PrimaryShards {
if shard.Status == "UNASSIGNED" {
availNodes := availNodesList[shard.Index]
if len(availNodes) > 0 {
var node Node
node, availNodes = availNodes[0], availNodes[1:]
fmt.Printf(" %s shard %d primary is UNASSIGNED.\n", index.Name, shard.Index)
DEBUG("Would assign %s as new node", node.Name)
if options.Fix {
fmt.Printf(" - bringing up %s shard %d primary on %s\n", index.Name, shard.Index, node.Name)
err := rerouteShard(options.Host, index.Name, shard.Index, node.Name, true)
if err != nil {
ERROR(err.Error())
}
}
} else {
fmt.Printf(" %s shard %d primary is UNASSIGNED, and no nodes are available to assign it to\n", index.Name, shard.Index)
}
}
}
for _, shard := range index.ReplicaShards {
if shard.Status == "UNASSIGNED" {
var node Node
node, availNodes = availNodes[0], availNodes[1:]
DEBUG("Shard %s to be assigned to %s", shard.Index, node.Name)
availNodes := availNodesList[shard.Index]
if len(availNodes) > 0 {
var node Node
node, availNodes = availNodes[0], availNodes[1:]
fmt.Printf(" %s shard %d replica is UNASSIGNED.\n", index.Name, shard.Index)
DEBUG("Would assign %s as new node", node.Name)
if options.Fix {
fmt.Printf(" - bringing up %s shard %d replica on %s\n", index.Name, shard.Index, node.Name)
err := rerouteShard(options.Host, index.Name, shard.Index, node.Name, false)
if err != nil {
ERROR(err.Error())
}
}
} else {
fmt.Printf(" %s shard %d replica is UNASSIGNED, and no nodes are available to assign it to\n", index.Name, shard.Index)
}
}
}
}
}
func httpRequest(method string, url string) ([]byte, error) {
func httpRequest(method string, url string, data io.Reader) ([]byte, error) {
req, err := http.NewRequest("GET", url, nil)
req, err := http.NewRequest(method, url, data)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")
dumpReq, err := httputil.DumpRequest(req, true)
if err != nil {
return nil, err
}
DEBUG("HTTP Request:\n%s", dumpReq)
TRACE("HTTP Request:\n%s", dumpReq)
client := &http.Client{}
client.Transport = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: SkipSSLValidation}}
resp, err := client.Do(req)
dumpResp, dumpErr := httputil.DumpResponse(resp, true)
if dumpErr != nil {
return nil, err
}
DEBUG("HTTP Response:\n%s", dumpResp)
if err != nil {
return nil, err
}
dumpResp, err := httputil.DumpResponse(resp, true)
if err != nil {
return nil, err
}
TRACE("HTTP Response:\n%s", dumpResp)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
@ -171,11 +230,15 @@ func httpRequest(method string, url string) ([]byte, error) {
return nil, err
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("%s returned %d: %s", url, resp.StatusCode, body)
}
return body, nil
}
func getDataNodes(host string) ([]Node, error) {
data, err := httpRequest("GET", host+"/_nodes?pretty")
data, err := httpRequest("GET", host+"/_nodes?pretty", nil)
if err != nil {
return []Node{}, err
}
@ -224,8 +287,16 @@ func getDataNodes(host string) ([]Node, error) {
return nodelist, nil
}
func rerouteShard(host string, index string, shard int, node string, primary bool) error {
data := fmt.Sprintf(`{"commands":[{"allocate":{"index":"%s","shard":%d,"node":"%s","allow_primary":%t}}]}`, index, shard, node, primary)
dataBuf := bytes.NewBuffer([]byte(data))
_, err := httpRequest("POST", host+"/_cluster/reroute", dataBuf)
return err
}
func getIndices(host string) ([]Index, error) {
data, err := httpRequest("GET", host+"/_status?pretty")
data, err := httpRequest("GET", host+"/_cluster/state?pretty", nil)
if err != nil {
return []Index{}, err
}
@ -238,34 +309,44 @@ func getIndices(host string) ([]Index, error) {
var indexlist []Index
if obj, ok := o.(map[string]interface{}); ok {
if indices, ok := obj["indices"].(map[string]interface{}); ok {
for indexName, i := range indices {
if indexData, ok := i.(map[string]interface{}); ok {
index := Index{Name: indexName}
if shards, ok := indexData["shards"].(map[string]interface{}); ok {
for shardIndex, s := range shards {
if shardList, ok := s.([]interface{}); ok {
for i, s := range shardList {
if shard, ok := s.(map[string]interface{}); ok {
if rt, ok := obj["routing_table"].(map[string]interface{}); ok {
// indices
if indices, ok := rt["indices"].(map[string]interface{}); ok {
for indexName, i := range indices {
if indexData, ok := i.(map[string]interface{}); ok {
index := Index{Name: indexName}
if shards, ok := indexData["shards"].(map[string]interface{}); ok {
index.PrimaryShards = make([]Shard, len(shards))
index.ReplicaShards = make([]Shard, len(shards))
for shardKey, s := range shards {
if shardList, ok := s.([]interface{}); ok {
for i, s := range shardList {
if shard, ok := s.(map[string]interface{}); ok {
if routing, ok := shard["routing"].(map[string]interface{}); ok {
var primary bool
var state string
var node string
var relocating string
if primary, ok = routing["primary"].(bool); !ok {
return []Index{}, fmt.Errorf("Could not parse `indices.%s.shards.%s.[%d].routing.primary", indexName, shardIndex, i)
var shardFloat float64
shardFloat = -1.0
if primary, ok = shard["primary"].(bool); !ok {
return []Index{}, fmt.Errorf("Could not parse `routing_table.indices.%s.shards.%s.[%d].primary", indexName, shardKey, i)
}
if state, ok = routing["state"].(string); !ok {
return []Index{}, fmt.Errorf("Could not parse `indices.%s.shards.%s.[%d].routing.state", indexName, shardIndex, i)
if state, ok = shard["state"].(string); !ok {
return []Index{}, fmt.Errorf("Could not parse `routing_table.indices.%s.shards.%s.[%d].state", indexName, shardKey, i)
}
if node, ok = routing["node"].(string); !ok {
return []Index{}, fmt.Errorf("Could not parse `indices.%s.shards.%s.[%d].routing.node", indexName, shardIndex, i)
if node, ok = shard["node"].(string); shard["node"] != nil && !ok {
return []Index{}, fmt.Errorf("Could not parse `routing_table.indices.%s.shards.%s.[%d].node", indexName, shardKey, i)
}
if relocating, ok = routing["relocating_node"].(string); !ok {
return []Index{}, fmt.Errorf("Could not parse `indices.%s.shards.%s.[%d].routing.relocating_node", indexName, shardIndex, i)
if relocating, ok = shard["relocating_node"].(string); shard["relocating"] != nil && !ok {
return []Index{}, fmt.Errorf("Could not parse `routing_table.indices.%s.shards.%s.[%d].relocating_node", indexName, shardKey, i)
}
if shardFloat, ok = shard["shard"].(float64); !ok {
return []Index{}, fmt.Errorf("Could not parse 'routing_table.indices.%s.shards.%s.[%d].shard", indexName, shardKey, i)
}
shardIndex := int(shardFloat)
parsedShard := Shard{
Index: shardIndex,
@ -275,30 +356,31 @@ func getIndices(host string) ([]Index, error) {
}
if primary {
index.PrimaryShard = parsedShard
index.PrimaryShards[shardIndex] = parsedShard
} else {
index.ReplicaShards = append(index.ReplicaShards, parsedShard)
index.ReplicaShards[shardIndex] = parsedShard
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `indices.%s.shards.%s.[%d].routing", indexName, shardIndex, i)
return []Index{}, fmt.Errorf("Unexpected data type for `routing_table.indices.%s.shards.%s.[%d]", indexName, shardKey, i)
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `indices.%s.shards.%s.[%d]", indexName, shardIndex, i)
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `routing_table.indices.%s.shards.%s", indexName, shardKey)
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `indices.%s.shards.%s", indexName, shardIndex)
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `routing_table.indices.%s.shards", indexName)
}
indexlist = append(indexlist, index)
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `indices.%s.shards", indexName)
return []Index{}, fmt.Errorf("Unexpected data type for `routing_table.indices.%s`", indexName)
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `indices.%s`", indexName)
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `routing_table.indices`")
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type for `indices`")
return []Index{}, fmt.Errorf("Unexpected data type for `routing_table`")
}
} else {
return []Index{}, fmt.Errorf("Unexpected data type returned from `GET /_status`")