/* * Copyright 2015 Google Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main import ( "encoding/json" "flag" "fmt" "./third_party/websocket" "log" "net/http" "net/url" "reflect" "./cherry" "./rtdb" "./service" "regexp" "time" "strconv" "io" "io/ioutil" "strings" ) var rtdbServer *rtdb.Server var testRunner *cherry.TestRunner var rpcServices *service.Library var objectModel *rtdb.ObjectModel // \todo [petri] is there a better place for this? func EncodeJSONRPC (method string, params interface{}) ([]byte, error) { type Message struct { JsonRPC string `json:"jsonrpc"` Method string `json:"method"` Params []interface{} `json:"params"` } return json.Marshal(Message{ JsonRPC: "2.0", Method: method, Params: []interface{}{ params }, }) } func debugStr (s string) string { if len(s) > 100 { return s[0:97] + "..." } else { return s } } // Websocket handling. func wsHandler (w http.ResponseWriter, r *http.Request) { conn, err := websocket.Upgrade(w, r, nil, 1024, 1024) if _, ok := err.(websocket.HandshakeError); ok { http.Error(w, "Not a websocket handshake", 400) return } else if err != nil { log.Println(err) return } log.Println("[socket] connection established") defer conn.Close() // Channels. type msgReceived struct { content []byte err error } type sendMessage struct { content []byte } connQueue := make(chan interface{}, 1000) // \note A too small capacity causes deadlock; this is a temporary fix. // RTDB listener callbacks. onSubscribeObject := func(objects []rtdb.Object) { // Send initial object to client. log.Printf("[socket] send %d subscribed object to client\n", len(objects)) encoded, err := EncodeJSONRPC("rtdb.InitObjects", objects) if err != nil { panic(err) } connQueue <- sendMessage{ encoded } } onUpdateObjects := func(changes []rtdb.Update) { // \todo [petri] currently handled inside server thread -- move to client thread instead? // \todo [petri] we should be sending {id:ops}, but changes instead contains values getObjIds := func() []string { names := make([]string, len(changes)) for ndx, change := range changes { names[ndx] = change.ObjId } return names } log.Printf("[socket] update client objects: %q\n", getObjIds()) type UpdateObjects struct { Changes []rtdb.Update `json:"changes"` } update := UpdateObjects { Changes: changes, } encoded, err := EncodeJSONRPC("rtdb.UpdateObjects", update) // \todo [petri] what to do on error?! if err == nil { connQueue <- sendMessage{ encoded } } } // Create client. listener := rtdbServer.NewListener(onSubscribeObject, onUpdateObjects) rpcClient := cherry.NewRPCClient(listener) // Handle message reading in separate goroutine. connectionClosed := make(chan struct{}) go func() { // Handle incoming messages in this thread. for { select { case <-connectionClosed: return default: _, input, err := conn.ReadMessage() connQueue <- msgReceived{ input, err } if err != nil { <-connectionClosed; return } } } }() // Handle socket connection. func() { for { op := <- connQueue switch op.(type) { case msgReceived: msg := op.(msgReceived) if msg.err != nil { log.Printf("[socket] error reading message: %s\n", msg.err) return } result, err := rpcServices.CallJSON(rpcClient, msg.content) if err == nil { log.Printf("[socket] result: %s\n", debugStr(string(result))) err = conn.WriteMessage(websocket.TextMessage, result) if err != nil { log.Printf("[socket] WARNING: unable to write message: %s\n", err); return } } else { log.Printf("[socket] WARNING: invalid rpc call '%s': '%s'\n", string(msg.content), err) // \todo [petri] return error! } case sendMessage: // Send message to client. msg := op.(sendMessage) err = conn.WriteMessage(websocket.TextMessage, msg.content) if err != nil { log.Printf("[socket] WARNING: unable to write message: %s\n", err); return } } } }() connectionClosed <- struct{}{} // Cleanup. log.Printf("[socket] destroy listener\n") rtdbServer.DestroyListener(listener) // Close channel and eat all messages (to avoid blocking senders). close(connQueue) for _ = range connQueue {} log.Println("[socket] client disconnected") } // Helper for handling GET requests of the form /uriDirectoryName/param . func handleSingleParamGETRequest (response http.ResponseWriter, request *http.Request, uriDirectoryName string, writeResponse func(param string) error) { if request.Method != "GET" { http.Error(response, "Invalid request", 400) return } uri, err := url.ParseRequestURI(request.RequestURI) if err != nil { log.Printf("[getrequest] Error when parsing URI: %v\n", err) http.Error(response, err.Error(), 404) return } re, _ := regexp.Compile(fmt.Sprintf("/%s/(.*)", uriDirectoryName)) param := re.FindStringSubmatch(uri.Path)[1] err = writeResponse(param) if err != nil { log.Printf("[getrequest] Finished with error: %v\n", err) http.Error(response, err.Error(), 404) return } log.Printf("[getrequest] Finished successfully\n") } // Batch execution log export request handler. func executionLogExportHandler (response http.ResponseWriter, request *http.Request) { handleSingleParamGETRequest(response, request, "executionLog", func(batchResultId string) error { log.Printf("[execlog] Handling request for batch '%s'\n", batchResultId) str, err := testRunner.QueryBatchExecutionLog(batchResultId) if err != nil { return err } _, err = io.WriteString(response, str) return err }) } type batchResultExportHttpResponse struct { response http.ResponseWriter } func (w batchResultExportHttpResponse) Write (p []byte) (int, error) { return w.response.Write(p) } func (w batchResultExportHttpResponse) SetFilename (name string) { w.response.Header().Add("Content-Disposition", "attachment; filename=" + strconv.Quote(name)) } // QPA export request handler. func exportHandler (response http.ResponseWriter, request *http.Request) { handleSingleParamGETRequest(response, request, "export", func(batchResultId string) error { log.Printf("[export] Handling request for batch '%s'\n", batchResultId) return cherry.WriteBatchResultExport(batchResultExportHttpResponse{response}, rtdbServer, batchResultId) }) } // QPA import request handler. func importHandler (response http.ResponseWriter, request *http.Request) { if request.Method != "POST" || request.RequestURI != "/import/" { http.Error(response, "Invalid request", 400) return } parts, err := request.MultipartReader() if err != nil { http.Error(response, "Expected a multipart upload", 400) return } log.Printf("[import] Received request with Content-Length %d\n", request.ContentLength) anyImportSucceeded := false anyImportFailed := false for { file, err := parts.NextPart(); if err != nil { break } startTime := time.Now() batchResultDefaultName := "Import-" + startTime.Format("2006-Jan-02 15:04:05") err = testRunner.ImportBatch(batchResultDefaultName, file, request.ContentLength) if err != nil { log.Printf("[import] Import failed with error %v\n", err) anyImportFailed = true } else { log.Printf("[import] Single import finished\n") anyImportSucceeded = true } file.Close() } request.Body.Close() if !anyImportFailed { // no failures response.WriteHeader(http.StatusOK) } else if !anyImportSucceeded { // no successes http.Error(response, "Import failed", 500) } else { // both failures and successes http.Error(response, "Partial failure", 207) } } // Garbage in, strings split by newlines and comments removed out. func splitAndTrimTestSetFilters (setFilters string) []string { cleanFilters := make([]string, 0) commentEraser := regexp.MustCompile(`#.*$`) for _, filter := range strings.Split(strings.Replace(setFilters,"\r\n","\n", -1), "\n") { noComments := commentEraser.ReplaceAllString(filter, "") clean := strings.TrimSpace(noComments) if len(clean) > 0 { cleanFilters = append(cleanFilters, clean) } } return cleanFilters } func importTestSetHandler (response http.ResponseWriter, request *http.Request) { if request.Method != "POST" || request.RequestURI != "/importTestSet/" { http.Error(response, "Invalid request", 400) return } parts, err := request.MultipartReader() if err != nil { http.Error(response, "Expected a multipart upload", 400) return } log.Printf("[test set import] Received request with Content-Length %d\n", request.ContentLength) anyImportFailed := false var setName string var setFilters string for { part, err := parts.NextPart(); if err != nil { break } data, err := ioutil.ReadAll(part) if err != nil { log.Printf("[test set import] Reading upload failed with error %v\n", err) anyImportFailed = true } else { switch part.FormName() { case "set-name": setName = string(data) case "set-filters": setFilters = string(data) } } part.Close() } request.Body.Close() filterList := splitAndTrimTestSetFilters(setFilters) if !anyImportFailed && len(setName) > 0 && len(filterList) > 0 { err = cherry.AddTestSet(rtdbServer, setName, filterList) if err != nil { log.Printf("[test set import] Import failed with error %v\n", err) http.Error(response, "Import failed", 500) } else { log.Printf("[test set import] Imported test set %s with %d filters\n", setName, len(filterList)) response.WriteHeader(http.StatusOK) } } else { log.Printf("[test set import] Tried to import empty filter set or empty set name or other failure.\n") http.Error(response, "Import failed", 500) } } // Mapping of third party locations to desired server locations // This allows us to remap the locations for release packages or when versions change func setUpFileMappings() { type ServerFileMapping struct { SourceRootDir string ServerPrefix string } serverFileMappings := []ServerFileMapping{ {"third_party/ui-bootstrap", "/ui-bootstrap/"}, {"third_party/angular", "/lib/angular/"}, {"third_party/ui-router", "/lib/ui-router/"}, {"third_party/jquery", "/lib/jquery/"}, {"third_party/spin", "/lib/spin/"}, {"third_party/angular-spinner", "/lib/angular-spinner/"}, {"third_party/bootstrap", "/lib/bootstrap/"}, {"third_party/sax", "/lib/sax/"}, {"third_party/underscore", "/lib/underscore/"}, {"third_party/angular-tree-control", "/lib/angular-tree-control/"}} // Special case the main license http.HandleFunc("/LICENSE", func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "LICENSE") }) // The client hierarchy served as-is fs := http.Dir("client") fileHandler := http.FileServer(fs) http.Handle("/", fileHandler) // Re-map third-party to point to the listed directories for _, mapping := range serverFileMappings { bootStrapFs := http.Dir(mapping.SourceRootDir) bootStrapFileHandler := http.StripPrefix(mapping.ServerPrefix, http.FileServer(bootStrapFs)) http.Handle(mapping.ServerPrefix, bootStrapFileHandler) } } // Main func main () { port := flag.Int("port", 8080, "port to serve on") dbName := flag.String("db", "Cherry.db", "database file name (use :memory: for in-memory)") // \todo [2014-10-02 pyry] Temporary workaround to make cherry usable on non-SSD systems. // Real fix is to write better DB backend. dbSyncIo := flag.Bool("db-sync-io", true, "use synchronous IO in DB backend") flag.Parse() // Initialize Cherry object model. objectModel := rtdb.NewObjectModel(cherry.GetObjectTypes()) // Create RTDB instance. log.Println("[main] create sql backend") rtdbBackend, err := rtdb.NewSQLiteBackend(*dbName, *dbSyncIo) if err != nil { log.Println(err); return } log.Println("[main] create rtdb") rtdbServer, err = rtdb.NewServer(objectModel, rtdbBackend) if err != nil { log.Println(err); return } // Initialize database data model. cherry.InitDB(rtdbServer) cherry.StartADBDeviceListPoller(rtdbServer, 2 * time.Second) // Initialize RPC services. testRunner = cherry.NewTestRunner(rtdbServer) rpcHandler := cherry.NewRPCHandler(rtdbServer, testRunner) rpcServices = service.NewServiceLibrary( reflect.TypeOf((*cherry.RPCClient)(nil)), []service.HandlerSpec{ service.HandlerSpec{ "rtdb", rpcHandler }, }) // Setup http handling. setUpFileMappings() http.HandleFunc("/ws", wsHandler) http.HandleFunc("/executionLog/", executionLogExportHandler) http.HandleFunc("/export/", exportHandler) http.HandleFunc("/import/", importHandler) http.HandleFunc("/importTestSet/", importTestSetHandler) // Fire up http server! addr := fmt.Sprintf("127.0.0.1:%d", *port) log.Printf("Listening on port %d\n", *port) err = http.ListenAndServe(addr, nil) log.Println(err.Error()) }