Scala framework for Object Storage
This guide covers using Benji with Amazon S3 and S3-compatible services like CEPH and MinIO.
Add the S3 module and Play WS to your build.sbt:
libraryDependencies += "com.zengularity" %% "benji-s3" % "2.3.0"
// Play WS standalone
libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-ahc-ws-standalone" % "2.2.6",
"com.typesafe.play" %% "play-ws-standalone-xml" % "2.2.6"
)
import java.nio.file.Paths
import scala.util.{ Failure, Success }
import scala.concurrent.{ ExecutionContext, Future }
import akka.util.ByteString
import akka.stream.Materializer
import akka.stream.scaladsl.{ FileIO, Sink, Source }
import play.api.libs.ws.ahc.StandaloneAhcWSClient
import play.api.libs.ws.DefaultBodyWritables._
import com.zengularity.benji.s3._
def sample1(implicit m: Materializer): Unit = {
implicit def ec: ExecutionContext = m.executionContext
// WSClient must be available in the implicit scope;
// Here a default/standalone instance is declared
implicit val ws: StandaloneAhcWSClient = StandaloneAhcWSClient()
val s3: WSS3 = S3("accessKey", "secretKey", "http", "hostAndPort")
// See "S3 Client configuration" section to see
// how to create and configure a WSS3
val bucket: WSS3BucketRef = s3.bucket("aBucket")
// Upload
/* input */
val path = Paths.get("/path/to/local/file")
lazy val data: Source[ByteString, _] = FileIO.fromPath(path)
/* target object */
val newObj = bucket.obj("newObject.ext")
/* declare the upload pipeline */
val upload: Sink[ByteString, Future[Long]] =
newObj.put[ByteString, Long](0L) { (acc, chunk) =>
println(s"uploading ${chunk.size.toString} bytes")
Future.successful(acc + chunk.size)
}
(data runWith upload).onComplete {
case Failure(e) => println(s"Upload failed: ${e.getMessage}")
case Success(_) => println("Upload ok")
}
/* Get objects list */
val objects: Future[List[com.zengularity.benji.Object]] = bucket.objects.collect[List]()
objects.map(_.foreach(obj => println(s"- ${obj.name}")))
/* Get object list with specified batch size, by default it 1000 */
val allObjects: Future[List[com.zengularity.benji.Object]] = bucket.objects.withBatchSize(100).collect[List]()
allObjects.map(_.foreach(obj => println(s"- ${obj.name}")))
// Take care to release the underlying resources
ws.close()
}
To run the compliance tests for this module, follow these steps.
Step 1: Copy the sample config:
cp src/test/resources/local.conf.sample src/test/resources/local.conf
Step 2: Edit src/test/resources/local.conf with your test environment details:
ceph.s3.host, ceph.s3.accessKey, ceph.s3.secretKey, ceph.s3.protocol — for CEPHaws.s3.accessKey, aws.s3.secretKey — for Amazon S3google.storage.projectId — for Google Cloud StorageStep 3: Provide Google Cloud credentials at src/test/resources/gcs-test.json
Step 4: Run tests via SBT:
sbt test
Several factory methods create an S3 ObjectStorage, either with explicit parameters or using a configuration URI:
import akka.stream.Materializer
import play.api.libs.ws.ahc.StandaloneAhcWSClient
import com.zengularity.benji.s3._
def sample2(implicit m: Materializer): Unit = {
implicit val ws: StandaloneAhcWSClient = StandaloneAhcWSClient()
// Creating a "path" style WSS3:
S3("accessKey", "secretKey", "httpProto", "hostAndPort")
// equivalent to
S3("s3:httpProto://accessKey:secretKey@hostAndPort/?style=path")
// Creating a "virtualHost" style WSS3:
S3.virtualHost("accessKey", "secretKey", "httpProto", "hostAndPort")
// equivalent to
S3("s3:httpProto://accessKey:secretKey@hostAndPort/?style=virtualHost")
// Creating a "virtualHost" style WSS3 for AWS/V4:
S3.virtualHostAwsV4(
"accessKey", "secretKey", "httpProto", "hostAndPort", "region")
// equivalent to
S3("s3:httpProto://accessKey:secretKey@hostAndPort/?style=virtualHost&awsRegion=region")
()
}
The main settings are:
http or httpss3.amazonaws.com)path or virtualHostvirtualHost)Both
accessKeyandsecretKeymust be provided as-is in URIs (not URI-encoded).
URI configuration format:
s3://${httpProto}://${accessKey}:${secretKey}@${hostAndPort}/?style=${style}
Optional query parameters:
requestTimeout — Request timeout in milliseconds (e.g., &requestTimeout=30000)When using AWS S3 with path style, you may see this error:
java.lang.NullPointerException: originalUrl
at com.ning.http.client.uri.UriParser.parse(UriParser.java:X)
Solution: Use virtualHost style instead (recommended for AWS).
S3 requires accurate system time. This error indicates a time mismatch:
java.lang.IllegalStateException: Could not update the contents of the object [...].
Response (403 / Forbidden): <?xml version="1.0" encoding="UTF-8"?>
<Error><Code>RequestTimeTooSkewed</Code></Error>
Solution: Synchronize your system clock with an NTP server.
When using versioning, non-versioned buckets return version ID as the string "null" (not null).
See AWS S3 Versioning documentation for details.
Follow AWS bucket naming rules: