fbox/handlers.go
2021-07-13 00:19:01 +10:00

381 lines
9.5 KiB
Go

package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
log "github.com/sirupsen/logrus"
"github.com/tomasen/realip"
"github.com/unrolled/render"
"git.mills.io/prologic/fbox/store"
)
var r *render.Render
func init() {
r = render.New()
}
func configHandler(w http.ResponseWriter, req *http.Request) {
r.JSON(w, http.StatusOK, map[string]interface{}{
"commit": Commit,
"version": Version,
"dataShards": dataShards,
"parityShards": parityShards,
"software": fmt.Sprintf("fbox v%s", FullVersion()),
})
}
func clusterHandler(w http.ResponseWriter, req *http.Request) {
// TODO: Use real types here for values
stats := make(map[string]interface{})
getNodeStatus := func(uri string) (map[string]interface{}, error) {
status := make(map[string]interface{})
res, err := request(http.MethodGet, uri, nil, nil)
if err != nil {
log.WithError(err).Error("error making node status request")
return nil, fmt.Errorf("error making node status request: %w", err)
}
defer res.Body.Close()
data, err := ioutil.ReadAll(res.Body)
if err != nil {
log.WithError(err).Error("error reading response body")
return nil, fmt.Errorf("error reading response body: %w", err)
}
if err := json.Unmarshal(data, &status); err != nil {
log.WithError(err).Error("error parsing json data")
return nil, fmt.Errorf("error parsing json data: %w", err)
}
return status, nil
}
for i, node := range nodes {
// TODO: Hard-coded schema (FIXME)
uri := fmt.Sprintf("http://%s/status", node.Addr)
status, err := getNodeStatus(uri)
if err != nil {
msg := fmt.Sprintf("error getting node status for %s: %s", node, err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
stats[i] = status
}
r.JSON(w, http.StatusOK, map[string]interface{}{
"stats": stats,
"nodes": nodes,
})
}
func statusHandlerFactory(store store.Store) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
stats, err := store.Stats()
if err != nil {
msg := fmt.Sprintf("error getting blob store stats: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
r.JSON(w, http.StatusOK, map[string]interface{}{
"freeBytes": stats.Free(),
"usedBytes": stats.Used(),
"blobCount": stats.Blobs(),
})
return
}
}
func joinHandler(w http.ResponseWriter, req *http.Request) {
data := map[string]string{}
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
msg := fmt.Sprintf("error decoding join request: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusBadRequest)
return
}
nodeID, ok := data["id"]
if nodeID == "" || !ok {
msg := fmt.Sprintf("error missing or empty node id")
log.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
remoteAddr, ok := data["addr"]
if !ok {
remoteAddr = realip.FromRequest(req)
}
addr, err := net.ResolveTCPAddr("tcp4", remoteAddr)
if err != nil {
msg := fmt.Sprintf("error resolving remoteAddr %s: %s", remoteAddr, err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusBadRequest)
return
}
if addr.IP == nil {
reqAddr, err := net.ResolveTCPAddr("tcp4", req.RemoteAddr)
if err != nil {
msg := fmt.Sprintf("error resolving reqAddr %s: %s", req.RemoteAddr, err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusBadRequest)
return
}
addr.IP = reqAddr.IP
}
if addr.Port == 0 {
// TODO: Make this a constant
addr.Port = 8000
}
remoteAddr = addr.String()
log.Infof("node joined from %s", remoteAddr)
nodes[nodeID] = Node{ID: nodeID, Addr: remoteAddr}
}
func nodesHandler(w http.ResponseWriter, req *http.Request) {
r.JSON(w, http.StatusOK, nodes)
}
func filesHandler(w http.ResponseWriter, req *http.Request) {
files, err := getAllMetadata()
if err != nil {
msg := fmt.Sprintf("error reading all metadata: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
r.JSON(w, http.StatusOK, files)
}
func metadataHandler(w http.ResponseWriter, r *http.Request) {
var logger *log.Entry
status, body := func() (int, []byte) {
name := r.URL.Path
logger = log.WithFields(log.Fields{
"op": r.Method,
"name": name,
})
switch r.Method {
case http.MethodDelete:
ok, err := deleteMetadata(name)
if err != nil {
logger.WithField("err", err).Error()
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
}
if !ok {
logger.WithField("err", err).Debug("Not found")
return http.StatusNotFound, nil
}
logger.Debug("Success")
return http.StatusOK, nil
case http.MethodGet:
metadata, ok, err := getMetadata(name)
if err != nil {
logger.WithField("err", err).Error()
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
}
if !ok {
logger.WithField("err", err).Debug("Not found")
return http.StatusNotFound, nil
}
data, err := metadata.Bytes()
if err != nil {
logger.WithField("err", err).Error()
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
}
logger.Debug("Success")
return http.StatusOK, data
case http.MethodPost:
value, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.WithField("err", err).Error()
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
}
metadata, err := loadMetadata(value)
if err != nil {
logger.WithField("err", err).Error()
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
}
if err := setMetadata(name, metadata); err != nil {
logger.WithField("err", err).Error()
return http.StatusInternalServerError, []byte(fmt.Sprintf("%q: %v", name, err))
}
logger.Debug("Success")
return http.StatusOK, nil
default:
logger.Warn("Bad request")
return http.StatusBadRequest, []byte(
fmt.Sprintf("%q: invalid method, expecting DELETE, GET, or PUT",
r.Method,
))
}
}()
w.WriteHeader(status)
if body != nil {
if _, err := w.Write(body); err != nil {
logger.WithField("err", err).Error("Failed writing response")
}
}
}
func uploadHandler(w http.ResponseWriter, req *http.Request) {
name := req.URL.Path
if len(name) > maxPathLength {
msg := fmt.Sprintf("error name %d length exceeds allowed maximum of %d", len(name), maxPathLength)
log.Warn(msg)
http.Error(w, msg, http.StatusUnprocessableEntity)
return
}
tf, err := receiveFile(req.Body)
if err != nil {
msg := fmt.Sprintf("error receiving file: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
hash, err := hashReader(tf)
if err != nil {
msg := fmt.Sprintf("error hashing file: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := tf.Seek(0, io.SeekStart); err != nil {
msg := fmt.Sprintf("error seeking file: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
stat, err := os.Stat(tf.Name())
if err != nil {
msg := fmt.Sprintf("error getting file size: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
shards, err := createShards(tf, stat.Size())
if err != nil {
msg := fmt.Sprintf("error creating shards: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
shardMap, err := storeShards(shards)
if err != nil {
msg := fmt.Sprintf("error storing shards: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
metadata := NewMetadata(name, hash, stat.Size(), parityShards, shardMap)
if err := setMetadata(name, metadata); err != nil {
msg := fmt.Sprintf("error storing metadata: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func downloadHandler(w http.ResponseWriter, req *http.Request) {
name := req.URL.Path
metadata, ok, err := getMetadata(name)
if err != nil {
msg := fmt.Sprintf("error getting metdata for %s: %s", name, err)
log.Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
if !ok {
msg := fmt.Sprintf("error file not found: %s", name)
log.Error(msg)
w.WriteHeader(http.StatusNotFound)
return
}
f, err := readShards(metadata)
if err != nil {
msg := fmt.Sprintf("error reading file: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := io.Copy(w, f); err != nil {
msg := fmt.Sprintf("error sending file: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func deleteHandler(w http.ResponseWriter, req *http.Request) {
name := req.URL.Path
metadata, ok, err := getMetadata(name)
if err != nil {
msg := fmt.Sprintf("error getting metdata for %s: %s", name, err)
log.Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
if !ok {
msg := fmt.Sprintf("error file not found: %s", name)
log.Error(msg)
w.WriteHeader(http.StatusNotFound)
return
}
// TODO: Do this in the background asynchronously
if err := deleteShards(metadata); err != nil {
msg := fmt.Sprintf("error deleting file: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := deleteMetadata(name); err != nil {
msg := fmt.Sprintf("error deleting metadata: %s", err)
log.WithError(err).Error(msg)
w.WriteHeader(http.StatusInternalServerError)
return
}
}