Scala framework for Object Storage
Usage of the Benji module for S3 Storage; e.g. Amazon S3 (compatible update to V4), CEPH.
The first step is a add the dependency in your build.sbt
.
libraryDependencies += "com.zengularity" %% "benji-s3" % "2.2.0"
// If Play WS is not yet provided:
libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-ahc-ws-standalone" % "1.1.3",
"com.typesafe.play" %% "play-ws-standalone-xml" % "1.1.3")
Then, the S3 client can be used as following in your code.
import java.nio.file.Paths
import scala.util.{ Failure, Success }
import scala.concurrent.{ ExecutionContext, Future }
import akka.util.ByteString
import akka.stream.Materializer
import akka.stream.scaladsl.{ FileIO, Sink, Source }
import play.api.libs.ws.ahc.StandaloneAhcWSClient
import play.api.libs.ws.DefaultBodyWritables._
import com.zengularity.benji.s3._
def sample1(implicit m: Materializer): Unit = {
implicit def ec: ExecutionContext = m.executionContext
// WSClient must be available in the implicit scope;
// Here a default/standalone instance is declared
implicit val ws: StandaloneAhcWSClient = StandaloneAhcWSClient()
val s3: WSS3 = S3("accessKey", "secretKey", "http", "hostAndPort")
// See "S3 Client configuration" section to see
// how to create and configure a WSS3
val bucket: WSS3BucketRef = s3.bucket("aBucket")
// Upload
/* input */
val path = Paths.get("/path/to/local/file")
lazy val data: Source[ByteString, _] = FileIO.fromPath(path)
/* target object */
val newObj = bucket.obj("newObject.ext")
/* declare the upload pipeline */
val upload: Sink[ByteString, Future[Long]] =
newObj.put[ByteString, Long](0L) { (acc, chunk) =>
println(s"uploading ${chunk.size.toString} bytes")
Future.successful(acc + chunk.size)
}
(data runWith upload).onComplete {
case Failure(e) => println(s"Upload failed: ${e.getMessage}")
case Success(_) => println("Upload ok")
}
/* Get objects list */
val objects: Future[List[com.zengularity.benji.Object]] = bucket.objects.collect[List]()
objects.map(_.foreach(obj => println(s"- ${obj.name}")))
/* Get object list with specified batch size, by default it 1000 */
val allObjects: Future[List[com.zengularity.benji.Object]] = bucket.objects.withBatchSize(100).collect[List]()
allObjects.map(_.foreach(obj => println(s"- ${obj.name}")))
// Take care to release the underlying resources
ws.close()
}
To run the compliance tests for this module, you have to go through the following steps.
#1 Copy the file src/test/resources/local.conf.sample
as src/test/resources/local.conf
.
#2 Edit the settings in this src/test/resources/local.conf
to match your environment:
ceph.s3.host
: The host name or inet address of the S3 gateway for CEPHceph.s3.accessKey
: The S3 access key (ID) for the CEPH serviceceph.s3.secretKey
: The S3 secret key for the CEPH serviceceph.s3.protocol
: Either http
or https
aws.s3.accessKey
: The access key (ID) for Amazon S3aws.s3.secretKey
: The secret key for Amazon S3#3 Make sure a Google Cloud credential is available as JSON in src/test/resources/gcs-test.json
. Also edit src/test/resources/local.conf
to set the project ID as google.storage.projectId
.
#4 Finally the test suite can be execute using SBT.
sbt test
There are several factories to create a S3 ObjectStorage
client, either passing parameters separately, or using a configuration URI.
import akka.stream.Materializer
import play.api.libs.ws.ahc.StandaloneAhcWSClient
import com.zengularity.benji.s3._
def sample2(implicit m: Materializer): Unit = {
implicit val ws: StandaloneAhcWSClient = StandaloneAhcWSClient()
// Creating a "path" style WSS3:
S3("accessKey", "secretKey", "httpProto", "hostAndPort")
// equivalent to
S3("s3:httpProto://accessKey:secretKey@hostAndPort/?style=path")
// Creating a "virtualHost" style WSS3:
S3.virtualHost("accessKey", "secretKey", "httpProto", "hostAndPort")
// equivalent to
S3("s3:httpProto://accessKey:secretKey@hostAndPort/?style=virtualHost")
// Creating a "virtualHost" style WSS3 for AWS/V4:
S3.virtualHostAwsV4(
"accessKey", "secretKey", "httpProto", "hostAndPort", "region")
// equivalent to
S3("s3:httpProto://accessKey:secretKey@hostAndPort/?style=virtualHost&awsRegion=region")
()
}
The main settings are:
905C97B16AA34C7D8E97
, for AWS).http
or https
.httpProto
; e.g. s3.amazonaws.com
).path
style and virtualHost
.style
must be virtualHost
.Even when provided in URI, the
accessKet
andsecretKey
must be provided as-is (not URI encoded).
The format for the configuration URIs is the following:
s3:${httpProto}://${accessKey}:${secretKey}@${hostAndPort}/?style=${style}
The optional parameter requestTimeout
can also be specified in the query string of such URI:
...?style=${style}&requestTimeout=${timeInMilliseconds}
When using with AWS, the style virtualHost
is recommended. Without (according your AWS settings), you can get the following error if using the path
style.
java.lang.NullPointerException: originalUrl
at com.ning.http.client.uri.UriParser.parse(UriParser.java:X)
Jet lag: A S3 client must be configured with the appropriate system time. Otherwise with S3, the RequestTimeTooSkewed
error can occur.
java.lang.IllegalStateException: Could not update the contents of the object [...]. Response (403 / Forbidden): <?xml version="1.0" encoding="UTF-8"?><Error><Code>RequestTimeTooSkewed</Code></Error>
Versioning: When using versioning with S3, you may obtain version id "null"
(not null
), this is the version assigned to objects on non-versioned buckets.
See Managing Objects in a Versioning-Enabled Bucket S3 documentation for more details.
Naming Restrictions: S3 bucket naming restriction applies (3-63 characters long, only lower cases, numbers and hyphens, etc.), it’s recommended to use DNS-compliant bucket names.