381 lines
9.5 KiB
Go
381 lines
9.5 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/tomasen/realip"
|
|
"github.com/unrolled/render"
|
|
|
|
"git.mills.io/prologic/fbox/store"
|
|
)
|
|
|
|
var r *render.Render
|
|
|
|
func init() {
|
|
r = render.New()
|
|
}
|
|
|
|
func configHandler(w http.ResponseWriter, req *http.Request) {
|
|
r.JSON(w, http.StatusOK, map[string]interface{}{
|
|
"commit": Commit,
|
|
"version": Version,
|
|
"dataShards": dataShards,
|
|
"parityShards": parityShards,
|
|
"software": fmt.Sprintf("fbox v%s", FullVersion()),
|
|
})
|
|
}
|
|
|
|
func clusterHandler(w http.ResponseWriter, req *http.Request) {
|
|
// TODO: Use real types here for values
|
|
stats := make(map[string]interface{})
|
|
|
|
getNodeStatus := func(uri string) (map[string]interface{}, error) {
|
|
status := make(map[string]interface{})
|
|
|
|
res, err := request(http.MethodGet, uri, nil, nil)
|
|
if err != nil {
|
|
log.WithError(err).Error("error making node status request")
|
|
return nil, fmt.Errorf("error making node status request: %w", err)
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
data, err := ioutil.ReadAll(res.Body)
|
|
if err != nil {
|
|
log.WithError(err).Error("error reading response body")
|
|
return nil, fmt.Errorf("error reading response body: %w", err)
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &status); err != nil {
|
|
log.WithError(err).Error("error parsing json data")
|
|
return nil, fmt.Errorf("error parsing json data: %w", err)
|
|
}
|
|
|
|
return status, nil
|
|
}
|
|
|
|
for i, node := range nodes {
|
|
// TODO: Hard-coded schema (FIXME)
|
|
uri := fmt.Sprintf("http://%s/status", node.Addr)
|
|
status, err := getNodeStatus(uri)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error getting node status for %s: %s", node, err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
stats[i] = status
|
|
}
|
|
|
|
r.JSON(w, http.StatusOK, map[string]interface{}{
|
|
"stats": stats,
|
|
"nodes": nodes,
|
|
})
|
|
|
|
}
|
|
|
|
func statusHandlerFactory(store store.Store) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
stats, err := store.Stats()
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error getting blob store stats: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
r.JSON(w, http.StatusOK, map[string]interface{}{
|
|
"freeBytes": stats.Free(),
|
|
"usedBytes": stats.Used(),
|
|
"blobCount": stats.Blobs(),
|
|
})
|
|
return
|
|
}
|
|
}
|
|
|
|
func joinHandler(w http.ResponseWriter, req *http.Request) {
|
|
data := map[string]string{}
|
|
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
|
|
msg := fmt.Sprintf("error decoding join request: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
nodeID, ok := data["id"]
|
|
if nodeID == "" || !ok {
|
|
msg := fmt.Sprintf("error missing or empty node id")
|
|
log.Error(msg)
|
|
http.Error(w, msg, http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
remoteAddr, ok := data["addr"]
|
|
if !ok {
|
|
remoteAddr = realip.FromRequest(req)
|
|
}
|
|
|
|
addr, err := net.ResolveTCPAddr("tcp4", remoteAddr)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error resolving remoteAddr %s: %s", remoteAddr, err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if addr.IP == nil {
|
|
reqAddr, err := net.ResolveTCPAddr("tcp4", req.RemoteAddr)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error resolving reqAddr %s: %s", req.RemoteAddr, err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
addr.IP = reqAddr.IP
|
|
}
|
|
|
|
if addr.Port == 0 {
|
|
// TODO: Make this a constant
|
|
addr.Port = 8000
|
|
}
|
|
|
|
remoteAddr = addr.String()
|
|
|
|
log.Infof("node joined from %s", remoteAddr)
|
|
nodes[nodeID] = Node{ID: nodeID, Addr: remoteAddr}
|
|
}
|
|
|
|
func nodesHandler(w http.ResponseWriter, req *http.Request) {
|
|
r.JSON(w, http.StatusOK, nodes)
|
|
}
|
|
|
|
func filesHandler(w http.ResponseWriter, req *http.Request) {
|
|
files, err := getAllMetadata()
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error reading all metadata: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
r.JSON(w, http.StatusOK, files)
|
|
}
|
|
|
|
func metadataHandler(w http.ResponseWriter, r *http.Request) {
|
|
var logger *log.Entry
|
|
|
|
status, body := func() (int, []byte) {
|
|
name := r.URL.Path
|
|
|
|
logger = log.WithFields(log.Fields{
|
|
"op": r.Method,
|
|
"name": name,
|
|
})
|
|
|
|
switch r.Method {
|
|
case http.MethodDelete:
|
|
ok, err := deleteMetadata(name)
|
|
if err != nil {
|
|
logger.WithField("err", err).Error()
|
|
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
|
|
}
|
|
if !ok {
|
|
logger.WithField("err", err).Debug("Not found")
|
|
return http.StatusNotFound, nil
|
|
}
|
|
logger.Debug("Success")
|
|
return http.StatusOK, nil
|
|
case http.MethodGet:
|
|
metadata, ok, err := getMetadata(name)
|
|
if err != nil {
|
|
logger.WithField("err", err).Error()
|
|
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
|
|
}
|
|
|
|
if !ok {
|
|
logger.WithField("err", err).Debug("Not found")
|
|
return http.StatusNotFound, nil
|
|
}
|
|
|
|
data, err := metadata.Bytes()
|
|
if err != nil {
|
|
logger.WithField("err", err).Error()
|
|
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
|
|
}
|
|
logger.Debug("Success")
|
|
return http.StatusOK, data
|
|
case http.MethodPost:
|
|
value, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
logger.WithField("err", err).Error()
|
|
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
|
|
}
|
|
metadata, err := loadMetadata(value)
|
|
if err != nil {
|
|
logger.WithField("err", err).Error()
|
|
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
|
|
}
|
|
|
|
if err := setMetadata(name, metadata); err != nil {
|
|
logger.WithField("err", err).Error()
|
|
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
|
|
}
|
|
logger.Debug("Success")
|
|
return http.StatusOK, nil
|
|
default:
|
|
logger.Warn("Bad request")
|
|
return http.StatusBadRequest, []byte(
|
|
fmt.Sprintf("%q: invalid method, expecting DELETE, GET, or PUT",
|
|
r.Method,
|
|
))
|
|
}
|
|
}()
|
|
|
|
w.WriteHeader(status)
|
|
if body != nil {
|
|
if _, err := w.Write(body); err != nil {
|
|
logger.WithField("err", err).Error("Failed writing response")
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func uploadHandler(w http.ResponseWriter, req *http.Request) {
|
|
name := req.URL.Path
|
|
if len(name) > maxPathLength {
|
|
msg := fmt.Sprintf("error name %d length exceeds allowed maximum of %d", len(name), maxPathLength)
|
|
log.Warn(msg)
|
|
http.Error(w, msg, http.StatusUnprocessableEntity)
|
|
return
|
|
}
|
|
|
|
tf, err := receiveFile(req.Body)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error receiving file: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
hash, err := hashReader(tf)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error hashing file: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if _, err := tf.Seek(0, io.SeekStart); err != nil {
|
|
msg := fmt.Sprintf("error seeking file: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
stat, err := os.Stat(tf.Name())
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error getting file size: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
shards, err := createShards(tf, stat.Size())
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error creating shards: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
shardMap, err := storeShards(shards)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error storing shards: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
metadata := NewMetadata(name, hash, stat.Size(), parityShards, shardMap)
|
|
|
|
if err := setMetadata(name, metadata); err != nil {
|
|
msg := fmt.Sprintf("error storing metadata: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
func downloadHandler(w http.ResponseWriter, req *http.Request) {
|
|
name := req.URL.Path
|
|
|
|
metadata, ok, err := getMetadata(name)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error getting metdata for %s: %s", name, err)
|
|
log.Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
if !ok {
|
|
msg := fmt.Sprintf("error file not found: %s", name)
|
|
log.Error(msg)
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
f, err := readShards(metadata)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error reading file: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
if _, err := io.Copy(w, f); err != nil {
|
|
msg := fmt.Sprintf("error sending file: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
func deleteHandler(w http.ResponseWriter, req *http.Request) {
|
|
name := req.URL.Path
|
|
|
|
metadata, ok, err := getMetadata(name)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("error getting metdata for %s: %s", name, err)
|
|
log.Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
if !ok {
|
|
msg := fmt.Sprintf("error file not found: %s", name)
|
|
log.Error(msg)
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// TODO: Do this in the background asynchronously
|
|
if err := deleteShards(metadata); err != nil {
|
|
msg := fmt.Sprintf("error deleting file: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
if _, err := deleteMetadata(name); err != nil {
|
|
msg := fmt.Sprintf("error deleting metadata: %s", err)
|
|
log.WithError(err).Error(msg)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|