One way to think of Akka HTTP is that it’s en extendable web server. Another way to think of it as a tool to build applications directly from HTTP primitives. Yet another way to conceptualize it, is thinking of it as an extension of Akka streams over HTTP. It works well in either of these roles because it provides a rich set of HTTP level building blocks, that are nevertheless as easy to work with as application level frameworks, but yet true to the protocol to an extent, that working with them feels like working with HTTP directly.

It works well on both these two levels because of its support of a kind of extension construct called directives. Directives allow us to define extensions to the Akka HTTP API, and make higher level work feasible, while still knowing exactly well what is going on on the protocol level.

I recently needed a directive which could extract file as well as regular form fields from multipart forms, without storing it. So for instance we could run some security scan on the files before storing them and only allow storage if the scan gave us the green light. This example will also demonstrate the streaming nature of Akka HTTP, because the files will be extracted as Akka Streams from multipart forms.

Extract all fields without storing

At first this component looks similar the standard fileUpload stock component, but it extracts all files. We removed the filtering for field name, because we want them all, so unlike standard fileUpload, this directive will not take any arguments.

def fileUploadMultiField: Directive1[UploadedFieldsAndFiles] = {
  entity(as[Multipart.FormData])
    .flatMap { formData =>
      extractRequestContext.flatMap { ctx =>
        implicit val mat = ctx.materializer
        implicit val ec = ctx.executionContext

        val strictParts = formData.parts.map { part =>
          part.entity.dataBytes.runFold(
            ByteString.empty)(_ ++ _).map{ materializedPartBytes =>
            part.filename match{
              case Some(filename) =>
                (None, Some(UploadedFile(FileInfo(part.name,
                  filename, part.entity.contentType),
                  materializedPartBytes.toArray)))
              case _ =>
                (Some(UploadedField(part.name,
                  materializedPartBytes.utf8String)), None)
            }
          }
        }
        .runWith(Sink.seq[Future[(Option[UploadedField],
          Option[UploadedFile])]])

        val futureOfListsOfOptionPairs =
          strictParts.flatMap(Future.sequence(_))
        val twoLists = futureOfListsOfOptionPairs
        .map{ listOfOptionPairs=>
          val (uploadedFields, uploadedFiles) = 
            listOfOptionPairs.unzip
          UploadedFieldsAndFiles(uploadedFields.flatten,
            uploadedFiles.flatten)
        }

        onSuccess(twoLists)
      }

  }
}

We can test that this works by creating a route test using Akka HTTP Testkit:

def route =
  post {
    fileUploadMultiField { fieldsAndFiles =>
        val fileContentArray = fieldsAndFiles
          .uploadedFiles
          .map(file => new String(file.bytes, 
            StandardCharsets.UTF_8))
        complete(fileContentArray.mkString(", "))
    }
  }

"the fileUpload directive" should "stream the files uploaded" in {
  val str1 = "some data"
  val str2 = "some data 2"
  val multipartForm =
    Multipart.FormData(Multipart.FormData.BodyPart.Strict(
      "field1",
      HttpEntity(ContentTypes.`text/plain(UTF-8)`, str1),
      Map("filename" → "data1.txt")),
      Multipart.FormData.BodyPart.Strict(
        "field2",
        HttpEntity(ContentTypes.`text/plain(UTF-8)`, str2),
        Map("filename" → "data2.txt"))
    )

  Post("/", multipartForm) ~> route ~> check {
    status shouldEqual StatusCodes.OK
    responseAs[String] shouldEqual Seq(str1, str2).mkString(", ")
  }
}