diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..9c837ba --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file + +version: 2 +updates: + - package-ecosystem: "maven" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md deleted file mode 100644 index c71d3b3..0000000 --- a/.github/pull_request_template.md +++ /dev/null @@ -1,11 +0,0 @@ -# Description of change -(write a short description here or paste a link to JIRA) - -# QA steps - - [ ] automated tests passing - - [ ] manual qa steps passing (list below) - -# Risks - -# Rollback steps - - revert this branch diff --git a/.github/workflows/maven-unit-tests.yml b/.github/workflows/maven-unit-tests.yml new file mode 100644 index 0000000..83a23cb --- /dev/null +++ b/.github/workflows/maven-unit-tests.yml @@ -0,0 +1,29 @@ +name: Maven Unit Tests + +on: + push: + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + java: [11, 17] + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: ${{ matrix.java }} + cache: maven + + - name: Run unit tests + run: mvn -B -ntp test diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml new file mode 100644 index 0000000..1554e33 --- /dev/null +++ b/.github/workflows/maven.yml @@ -0,0 +1,34 @@ +# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-maven + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Java CI with Maven + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + cache: maven + - name: Build with Maven + run: mvn -B package --file pom.xml \ No newline at end of file diff --git a/pom.xml b/pom.xml index ff56f01..f4b2327 100644 --- a/pom.xml +++ b/pom.xml @@ -1,25 +1,16 @@ 4.0.0 Java Stitch Client - com.stitchdata + de.spendit java-stitch-client Java Stitch Client - 0.5.6-SNAPSHOT - https://github.com/stitchdata/java-stitch-client + 1.0.0. + https://github.com/Spendit-AG/java-stitch-client - https://github.com/stitchdata/java-stitch-client - scm:git:git@github.com:stitchdata/java-stitch-client.git + https://github.com/Spendit-AG/java-stitch-client + scm:git:git@github.com:Spendit-AG/java-stitch-client.git \ - - - miked@stitchdata.com - Mike DeLaurentis - miked@stitchdata.com - Stitch - http://stitchdata.com - - ossrh @@ -31,7 +22,7 @@ org.apache.maven.plugins maven-source-plugin - 2.2.1 + 3.4.0 attach-sources @@ -44,7 +35,10 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.9.1 + 3.12.0 + + false + attach-javadocs @@ -53,11 +47,11 @@ - + org.apache.maven.plugins maven-gpg-plugin - 1.5 + 3.2.8 sign-artifacts @@ -71,7 +65,7 @@ org.sonatype.plugins nexus-staging-maven-plugin - 1.6.7 + 1.7.0 true ossrh @@ -82,7 +76,7 @@ org.apache.maven.plugins maven-release-plugin - 2.5 + 3.3.1 true false @@ -118,28 +112,23 @@ com.cognitect transit-java - 0.8.311 - - - org.apache.httpcomponents - fluent-hc - 4.5.2 + 1.1.403 org.glassfish - javax.json - 1.0.4 + jakarta.json + 2.0.1 junit junit - 4.5 + 4.13.2 test - 1.7 - 1.7 + 11 + 11 diff --git a/src/main/java/com/stitchdata/client/examples/CallbackExample.java b/src/main/java/com/stitchdata/client/examples/CallbackExample.java deleted file mode 100644 index e854a5e..0000000 --- a/src/main/java/com/stitchdata/client/examples/CallbackExample.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.stitchdata.client.examples; - -import java.io.IOException; -import com.stitchdata.client.StitchClient; -import com.stitchdata.client.StitchClientBuilder; -import com.stitchdata.client.StitchException; -import com.stitchdata.client.StitchMessage; -import com.stitchdata.client.FlushHandler; -import java.util.List; -import java.util.Map; -import java.util.HashMap; - -public class CallbackExample { - - private static Map makePerson(int id, String name) { - Map result = new HashMap(); - result.put("id", id); - result.put("name", name); - return result; - } - - public static void exitWithError(String message) { - System.err.println(message); - System.exit(-1); - } - - public static void main(String ...args) { - if (args.length != 3) { - System.err.println("Usage: CLIENT_ID TOKEN NAMESPACE"); - System.exit(-1); - } - - Integer clientId = Integer.parseInt(args[0]); - String token = args[1]; - String namespace = args[2]; - - Map[] people = new Map[] { - makePerson(1, "Jerry Garcia"), - makePerson(2, "Omar Rodgriguez Lopez"), - makePerson(3, "Nina Simone"), - makePerson(4, "Joni Mitchell"), - makePerson(5, "David Bowie") - }; - - try (StitchClient stitch = new StitchClientBuilder() - .withClientId(clientId) - .withToken(token) - .withNamespace(namespace) - .withTableName("people") - .withKeyNames("id") - .withFlushHandler(new FlushHandler() { - public void onFlush(List names) { - for (Object name : names) { - System.out.println(name); - } - } - }) - .build()) { - for (Map person : people) { - StitchMessage message = StitchMessage.newUpsert() - .withSequence(System.currentTimeMillis()) - .withData(person); - stitch.push(message, person.get("name")); - } - } - catch (StitchException e) { - exitWithError("Stitch error " + e.getMessage()); - } - catch (IOException e) { - exitWithError(e.getMessage()); - } - } -} diff --git a/src/main/java/com/stitchdata/client/examples/SimpleExample.java b/src/main/java/com/stitchdata/client/examples/SimpleExample.java deleted file mode 100644 index 5f1f253..0000000 --- a/src/main/java/com/stitchdata/client/examples/SimpleExample.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.stitchdata.client.examples; - -import java.io.IOException; -import com.stitchdata.client.StitchClient; -import com.stitchdata.client.StitchClientBuilder; -import com.stitchdata.client.StitchException; -import com.stitchdata.client.StitchMessage; -import java.util.Map; -import java.util.HashMap; - -public class SimpleExample { - - private static Map makePerson(int id, String name) { - Map result = new HashMap(); - result.put("id", id); - result.put("name", name); - return result; - } - - public static void exitWithError(String message) { - System.err.println(message); - System.exit(-1); - } - - public static void main(String ...args) { - if (args.length != 3) { - System.err.println("Usage: CLIENT_ID TOKEN NAMESPACE"); - System.exit(-1); - } - - Integer clientId = Integer.parseInt(args[0]); - String token = args[1]; - String namespace = args[2]; - - Map[] people = new Map[] { - makePerson(1, "Jerry Garcia"), - makePerson(2, "Omar Rodgriguez Lopez"), - makePerson(3, "Nina Simone"), - makePerson(4, "Joni Mitchell"), - makePerson(5, "David Bowie") - }; - - try (StitchClient stitch = new StitchClientBuilder() - .withClientId(clientId) - .withToken(token) - .withNamespace(namespace) - .withTableName("people") - .withKeyNames("id") - .build()) { - for (Map person : people) { - stitch.push( - StitchMessage.newUpsert() - .withSequence(System.currentTimeMillis()) - .withData(person)); - } - } - catch (StitchException e) { - exitWithError("Stitch error " + e.getMessage()); - } - catch (IOException e) { - exitWithError(e.getMessage()); - } - } -} diff --git a/src/main/java/com/stitchdata/client/Buffer.java b/src/main/java/de/spendit/stitchdata/client/Buffer.java similarity index 71% rename from src/main/java/com/stitchdata/client/Buffer.java rename to src/main/java/de/spendit/stitchdata/client/Buffer.java index fa37308..340dc84 100644 --- a/src/main/java/com/stitchdata/client/Buffer.java +++ b/src/main/java/de/spendit/stitchdata/client/Buffer.java @@ -1,16 +1,9 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Queue; -import java.util.LinkedList; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import com.cognitect.transit.Writer; -import com.cognitect.transit.TransitFactory; -import com.cognitect.transit.Reader; public class Buffer { @@ -31,9 +24,9 @@ synchronized List take(int batchSizeBytes, int batchDelayMillis) { } boolean ready = - availableBytes >= batchSizeBytes || - queue.size() >= MAX_MESSAGES_PER_BATCH || - System.currentTimeMillis() - queue.peek().entryTime >= batchDelayMillis; + availableBytes >= batchSizeBytes || + queue.size() >= MAX_MESSAGES_PER_BATCH || + System.currentTimeMillis() - queue.peek().entryTime >= batchDelayMillis; if (!ready) { return null; @@ -44,7 +37,7 @@ synchronized List take(int batchSizeBytes, int batchDelayMillis) { // Start size at 2 to allow for opening and closing brackets int size = 2; while (!queue.isEmpty() && - size + queue.peek().bytes.length < MAX_BATCH_SIZE_BYTES) { + size + queue.peek().bytes.length < MAX_BATCH_SIZE_BYTES) { Entry entry = queue.remove(); // Add size of record plus the comma delimiter size += entry.bytes.length + 1; @@ -69,8 +62,8 @@ static class Entry { // We need two extra bytes for the [ and ] wrapping the record. if (bytes.length > MAX_BATCH_SIZE_BYTES - 2) { throw new IllegalArgumentException( - "Can't accept a record larger than " + (MAX_BATCH_SIZE_BYTES - 2) - + " bytes"); + "Can't accept a record larger than " + (MAX_BATCH_SIZE_BYTES - 2) + + " bytes"); } } } diff --git a/src/main/java/com/stitchdata/client/FlushHandler.java b/src/main/java/de/spendit/stitchdata/client/FlushHandler.java similarity index 92% rename from src/main/java/com/stitchdata/client/FlushHandler.java rename to src/main/java/de/spendit/stitchdata/client/FlushHandler.java index 4302ee0..5bc0e54 100644 --- a/src/main/java/com/stitchdata/client/FlushHandler.java +++ b/src/main/java/de/spendit/stitchdata/client/FlushHandler.java @@ -1,4 +1,4 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; import java.util.List; diff --git a/src/main/java/com/stitchdata/client/StitchClient.java b/src/main/java/de/spendit/stitchdata/client/StitchClient.java similarity index 72% rename from src/main/java/com/stitchdata/client/StitchClient.java rename to src/main/java/de/spendit/stitchdata/client/StitchClient.java index 228b87f..07a2fe1 100644 --- a/src/main/java/com/stitchdata/client/StitchClient.java +++ b/src/main/java/de/spendit/stitchdata/client/StitchClient.java @@ -1,38 +1,31 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; -import java.io.Closeable; -import java.io.EOFException; -import java.io.Flushable; -import java.io.IOException; -import java.io.ByteArrayOutputStream; -import java.io.ByteArrayInputStream; -import java.io.UnsupportedEncodingException; - -import java.util.List; +import com.cognitect.transit.Reader; +import com.cognitect.transit.TransitFactory; +import com.cognitect.transit.WriteHandler; +import com.cognitect.transit.Writer; +import jakarta.json.Json; +import jakarta.json.JsonObject; +import jakarta.json.JsonReader; + +import java.io.*; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; -import java.util.Map; import java.util.HashMap; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.client.ClientProtocolException; -import org.apache.http.entity.ContentType; -import org.apache.http.StatusLine; -import org.apache.http.HttpResponse; -import org.apache.http.HttpEntity; -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonReader; -import com.cognitect.transit.Writer; -import com.cognitect.transit.WriteHandler; -import com.cognitect.transit.TransitFactory; -import com.cognitect.transit.Reader; +import java.util.List; +import java.util.Map; /** * Client for Stitch. * *

Callers should use {@link StitchClientBuilder} to construct * instances of {@link StitchClient}.

- * + *

* A StitchClient takes messages (instances of {@link StitchMessage}) * and submits them to Stitch in batches. A call to {@link * StitchClient#push(StitchMessage)} adds a message to the current @@ -46,7 +39,7 @@ * batchSizeBytes to 0 will effectively disable batching and cause * each call to {@link #push(StitchMessage)} to send the message * immediatley. - * + *

* You should open the client in a try-with-resources statement to * ensure that it is closed, otherwise you will lose any messages that * have been added to the buffer but not yet delivered. @@ -72,7 +65,7 @@ * } * } * - * + *

* Instances of StitchClient are thread-safe. If buffering is enabled * (which it is by default), then multiple threads will accumulate * records into the same batch. When one of those threads makes a call @@ -88,14 +81,14 @@ public class StitchClient implements Flushable, Closeable { // HTTP constants public static final String PUSH_URL - = "https://api.stitchdata.com/v2/import/push"; + = "https://api.stitchdata.com/v2/import/push"; private static final int HTTP_CONNECT_TIMEOUT = 1000 * 60 * 2; - private static final ContentType CONTENT_TYPE = - ContentType.create("application/transit+json"); + private static final String CONTENT_TYPE = "application/transit+json"; // HTTP properties private final int connectTimeout = HTTP_CONNECT_TIMEOUT; private final String stitchUrl; + private final HttpClient httpClient; // Client-specific message values private final int clientId; @@ -111,7 +104,7 @@ public class StitchClient implements Flushable, Closeable { private final Buffer buffer; private final FlushHandler flushHandler; - private final Map> writeHandlers; + private final Map> writeHandlers; private static void putWithDefault(Map map, String key, Object value, Object defaultValue) { map.put(key, value != null ? value : defaultValue); @@ -128,15 +121,16 @@ private byte[] messageToBytes(StitchMessage message) { HashMap map = new HashMap(); switch (message.getAction()) { - case UPSERT: - map.put("action", "upsert"); - putWithDefault(map, "key_names", message.getKeyNames(), keyNames); - putIfNotNull(map, "data", message.getData()); - break; - case SWITCH_VIEW: - map.put("action", "switch_view"); - break; - default: throw new IllegalArgumentException("Action must not be null"); + case UPSERT: + map.put("action", "upsert"); + putWithDefault(map, "key_names", message.getKeyNames(), keyNames); + putIfNotNull(map, "data", message.getData()); + break; + case SWITCH_VIEW: + map.put("action", "switch_view"); + break; + default: + throw new IllegalArgumentException("Action must not be null"); } map.put("client_id", clientId); @@ -153,17 +147,16 @@ private byte[] messageToBytes(StitchMessage message) { } StitchClient( - String stitchUrl, - int clientId, - String token, - String namespace, - String tableName, - List keyNames, - int batchSizeBytes, - int batchDelayMillis, - FlushHandler flushHandler, - Map> writeHandlers) - { + String stitchUrl, + int clientId, + String token, + String namespace, + String tableName, + List keyNames, + int batchSizeBytes, + int batchDelayMillis, + FlushHandler flushHandler, + Map> writeHandlers) { this.stitchUrl = stitchUrl; this.clientId = clientId; this.token = token; @@ -175,6 +168,10 @@ private byte[] messageToBytes(StitchMessage message) { this.buffer = new Buffer(); this.flushHandler = flushHandler; this.writeHandlers = TransitFactory.writeHandlerMap(writeHandlers); + this.httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofMillis(connectTimeout)) + .version(HttpClient.Version.HTTP_1_1) + .build(); } /** @@ -192,8 +189,8 @@ private byte[] messageToBytes(StitchMessage message) { * @param message the message * @throws StitchException if Stitch rejected or was unable to * process the message - * @throws IOException if there was an error communicating with - * Stitch + * @throws IOException if there was an error communicating with + * Stitch */ public void push(StitchMessage message) throws StitchException, IOException { push(message, message); @@ -217,13 +214,13 @@ public void push(StitchMessage message) throws StitchException, IOException { * sent immediately and this function will block until it is * delivered.

* - * @param message the message - * @param callbackArg flush handler will be invoked with this as + * @param message the message + * @param callbackArg flush handler will be invoked with this as * one of the callbackArgs. * @throws StitchException if Stitch rejected or was unable to * process the message - * @throws IOException if there was an error communicating with - * Stitch + * @throws IOException if there was an error communicating with + * Stitch */ public void push(StitchMessage message, Object callbackArg) throws StitchException, IOException { buffer.put(new Buffer.Entry(messageToBytes(message), callbackArg)); @@ -234,28 +231,45 @@ public void push(StitchMessage message, Object callbackArg) throws StitchExcepti } StitchResponse sendToStitch(String body) throws IOException { - Request request = Request.Post(stitchUrl) - .connectTimeout(connectTimeout) - .addHeader("Authorization", "Bearer " + token) - .bodyString(body, CONTENT_TYPE); - - HttpResponse response = request.execute().returnResponse(); - int statusCode = response.getStatusLine().getStatusCode(); - String reasonPhrase = response.getStatusLine().getReasonPhrase(); - ContentType contentType = ContentType.get(response.getEntity()); + HttpRequest request = HttpRequest.newBuilder(URI.create(stitchUrl)) + .header("Authorization", "Bearer " + token) + .header("Content-Type", CONTENT_TYPE) + .POST(HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8)) + .build(); + + HttpResponse response; + try { + response = httpClient.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while sending request to Stitch", e); + } + + int statusCode = response.statusCode(); + String reasonPhrase = ""; + String contentType = response.headers().firstValue("Content-Type").orElse(null); JsonObject content = null; // Don't attempt to parse body for 5xx responses or if the // Content-Type doesn't explicitly state application/json. - if (statusCode < 500 && - contentType != null && - ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) { - JsonReader rdr = Json.createReader(response.getEntity().getContent()); + if (statusCode < 500 && isJsonContentType(contentType)) { + JsonReader rdr = Json.createReader(new StringReader(response.body())); content = rdr.readObject(); } return new StitchResponse(statusCode, reasonPhrase, content); } + private static boolean isJsonContentType(String contentTypeHeader) { + if (contentTypeHeader == null) { + return false; + } + int separator = contentTypeHeader.indexOf(';'); + String mimeType = separator >= 0 + ? contentTypeHeader.substring(0, separator).trim() + : contentTypeHeader.trim(); + return "application/json".equalsIgnoreCase(mimeType); + } + void sendBatch(List batch) throws IOException { String body = serializeEntries(batch); @@ -285,7 +299,7 @@ static String serializeEntries(List entries) throws UnsupportedEnc for (Buffer.Entry entry : entries) { ByteArrayInputStream bais = new ByteArrayInputStream(entry.bytes); Reader reader = TransitFactory.reader(TransitFactory.Format.JSON, bais); - messages.add((Map)reader.read()); + messages.add((Map) reader.read()); } ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -299,8 +313,8 @@ static String serializeEntries(List entries) throws UnsupportedEnc * * @throws StitchException if Stitch rejected or was unable to * process the message - * @throws IOException if there was an error communicating with - * Stitch + * @throws IOException if there was an error communicating with + * Stitch */ public void flush() throws IOException { while (true) { @@ -317,8 +331,8 @@ public void flush() throws IOException { * * @throws StitchException if Stitch rejected or was unable to * process the message - * @throws IOException if there was an error communicating with - * Stitch + * @throws IOException if there was an error communicating with + * Stitch */ public void close() throws IOException { flush(); diff --git a/src/main/java/com/stitchdata/client/StitchClientBuilder.java b/src/main/java/de/spendit/stitchdata/client/StitchClientBuilder.java similarity index 86% rename from src/main/java/com/stitchdata/client/StitchClientBuilder.java rename to src/main/java/de/spendit/stitchdata/client/StitchClientBuilder.java index 46c030b..49e837b 100644 --- a/src/main/java/com/stitchdata/client/StitchClientBuilder.java +++ b/src/main/java/de/spendit/stitchdata/client/StitchClientBuilder.java @@ -1,41 +1,17 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; + +import com.cognitect.transit.WriteHandler; -import java.io.ByteArrayOutputStream; -import java.io.ByteArrayInputStream; -import java.io.UnsupportedEncodingException; -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.InputStreamReader; -import java.io.IOException; -import java.util.Collection; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.HashMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import com.cognitect.transit.Writer; -import com.cognitect.transit.WriteHandler; -import com.cognitect.transit.TransitFactory; -import com.cognitect.transit.Reader; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.client.ClientProtocolException; -import org.apache.http.entity.ContentType; -import org.apache.http.StatusLine; -import org.apache.http.HttpResponse; -import org.apache.http.HttpEntity; - -import javax.json.Json; -import javax.json.JsonReader; /** * Use this to build instances of StitchClient. * *

Basic usage

- * + *

* Every client must have a client id, access token, and * namespace. You should have gotten these parameters when you set up * the integration at http://stitchdata.com. You must set them with @@ -53,7 +29,7 @@ * * *

Optionally set message defaults

- * + *

* If your application will send messages into only one table, you can * set the table name and key names here with {@link * #withTableName(String)} and {@link #withKeyNames(List)}, {@link @@ -75,7 +51,7 @@ * * *

Optionally tune batch parameters

- * + *

* A StitchClient takes records (instances of {@link StitchMessage}) * and submits them to Stitch in batches. A call to {@link * StitchClient#push(StitchMessage)} adds a record to the current @@ -133,7 +109,8 @@ public class StitchClientBuilder { private int batchDelayMillis = DEFAULT_BATCH_DELAY_MILLIS; private FlushHandler flushHandler = null; private String pushUrl = StitchClient.PUSH_URL; - private Map> writeHandlers = null; + private Map> writeHandlers = null; + /** * Specify your Stitch client id. This is a required setting. * @@ -219,7 +196,7 @@ public StitchClientBuilder withKeyNames(String... keyNames) { * key name here. Otherwise, you can set it individually on each * message with {@link StitchMessage#withKeyNames}. * - * @param keyName key names + * @param keyName key names * @return this object */ public StitchClientBuilder withKeyName(String keyName) { @@ -277,7 +254,7 @@ public StitchClientBuilder withPushUrl(String pushUrl) { * @param writeHandlers the write handlers * @return this object */ - public StitchClientBuilder withWriteHandlers(Map> writeHandlers) { + public StitchClientBuilder withWriteHandlers(Map> writeHandlers) { this.writeHandlers = writeHandlers; return this; } @@ -289,11 +266,11 @@ public StitchClientBuilder withWriteHandlers(Map> writeH */ public StitchClient build() { return new StitchClient( - pushUrl, clientId, token, namespace, - tableName, keyNames, - batchSizeBytes, - batchDelayMillis, - flushHandler, - writeHandlers); + pushUrl, clientId, token, namespace, + tableName, keyNames, + batchSizeBytes, + batchDelayMillis, + flushHandler, + writeHandlers); } } diff --git a/src/main/java/com/stitchdata/client/StitchException.java b/src/main/java/de/spendit/stitchdata/client/StitchException.java similarity index 94% rename from src/main/java/com/stitchdata/client/StitchException.java rename to src/main/java/de/spendit/stitchdata/client/StitchException.java index 471041b..538f7dc 100644 --- a/src/main/java/com/stitchdata/client/StitchException.java +++ b/src/main/java/de/spendit/stitchdata/client/StitchException.java @@ -1,4 +1,4 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; import java.io.IOException; diff --git a/src/main/java/com/stitchdata/client/StitchMessage.java b/src/main/java/de/spendit/stitchdata/client/StitchMessage.java similarity index 99% rename from src/main/java/com/stitchdata/client/StitchMessage.java rename to src/main/java/de/spendit/stitchdata/client/StitchMessage.java index de1d488..a5084a8 100644 --- a/src/main/java/com/stitchdata/client/StitchMessage.java +++ b/src/main/java/de/spendit/stitchdata/client/StitchMessage.java @@ -1,4 +1,4 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; import java.util.Arrays; import java.util.List; diff --git a/src/main/java/com/stitchdata/client/StitchResponse.java b/src/main/java/de/spendit/stitchdata/client/StitchResponse.java similarity index 91% rename from src/main/java/com/stitchdata/client/StitchResponse.java rename to src/main/java/de/spendit/stitchdata/client/StitchResponse.java index 0d287c7..654cacd 100644 --- a/src/main/java/com/stitchdata/client/StitchResponse.java +++ b/src/main/java/de/spendit/stitchdata/client/StitchResponse.java @@ -1,7 +1,7 @@ -package com.stitchdata.client; -import javax.json.JsonReader; -import javax.json.Json; -import javax.json.JsonObject; +package de.spendit.stitchdata.client; + + +import jakarta.json.JsonObject; /** * Encapsulates a response received from Stitch. diff --git a/src/test/java/com/stitchdata/client/BufferTest.java b/src/test/java/de/spendit/stitchdata/client/BufferTest.java similarity index 89% rename from src/test/java/com/stitchdata/client/BufferTest.java rename to src/test/java/de/spendit/stitchdata/client/BufferTest.java index 84f81c2..7451597 100644 --- a/src/test/java/com/stitchdata/client/BufferTest.java +++ b/src/test/java/de/spendit/stitchdata/client/BufferTest.java @@ -1,20 +1,22 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; +import com.cognitect.transit.TransitFactory; +import com.cognitect.transit.Writer; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.io.ByteArrayOutputStream; -import com.cognitect.transit.Writer; -import com.cognitect.transit.WriteHandler; -import com.cognitect.transit.TransitFactory; -import com.cognitect.transit.Reader; import java.util.Arrays; -import java.util.List; import java.util.HashMap; +import java.util.List; import java.util.Map; -import org.junit.*; + import static org.junit.Assert.*; -public class BufferTest { +public class BufferTest { static final Map tinyRecord = new HashMap(); static final Map bigRecord = new HashMap(); @@ -50,7 +52,7 @@ public void putMessage(Map record) { } public String takeBatchBody(int batchSizeBytes, int batchDelayMillis) - throws UnsupportedEncodingException { + throws UnsupportedEncodingException { List entries = buffer.take(batchSizeBytes, batchDelayMillis); return entries == null ? null : StitchClient.serializeEntries(entries); } @@ -69,8 +71,8 @@ public void testWithholdUntilBytesAvailable() throws IOException { assertNull(takeBatchBody(36, Integer.MAX_VALUE)); putMessage(tinyRecord); assertEquals( - "[" + tinyResult + "," + tinyResult + "," + tinyResult + "]", - takeBatchBody(36, Integer.MAX_VALUE)); + "[" + tinyResult + "," + tinyResult + "," + tinyResult + "]", + takeBatchBody(36, Integer.MAX_VALUE)); } @Test @@ -99,7 +101,7 @@ public void testDoesNotExceedMaxBatchSize() throws IOException { assertNull(batch3); } - @Test(expected=IllegalArgumentException.class) + @Test(expected = IllegalArgumentException.class) public void assertCantPutRecordLargerThanMaxMessageSize() { putMessage(hugeRecord); } diff --git a/src/test/java/de/spendit/stitchdata/client/CallbackExampleTest.java b/src/test/java/de/spendit/stitchdata/client/CallbackExampleTest.java new file mode 100644 index 0000000..9e5cf46 --- /dev/null +++ b/src/test/java/de/spendit/stitchdata/client/CallbackExampleTest.java @@ -0,0 +1,87 @@ +package de.spendit.stitchdata.client; + +import com.cognitect.transit.Reader; +import com.cognitect.transit.TransitFactory; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class CallbackExampleTest { + + private static Map makePerson(int id, String name) { + Map result = new HashMap(); + result.put("id", id); + result.put("name", name); + return result; + } + + private static class CollectingFlushHandler implements FlushHandler { + final List names = new ArrayList(); + + public void onFlush(List arg) { + names.addAll(arg); + } + } + + private static class DummyStitchClient extends StitchClient { + final List receivedPeople = new ArrayList(); + + DummyStitchClient(FlushHandler flushHandler) { + super("", 1, "token", "namespace", "people", Arrays.asList(new String[]{"id"}), + StitchClientBuilder.DEFAULT_BATCH_SIZE_BYTES, + StitchClientBuilder.DEFAULT_BATCH_DELAY_MILLIS, + flushHandler, + null); + } + + @Override + StitchResponse sendToStitch(String body) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)); + Reader reader = TransitFactory.reader(TransitFactory.Format.JSON, bais); + List records = (List) reader.read(); + + for (Object record : records) { + Map data = (Map) ((Map) record).get("data"); + receivedPeople.add(data); + } + + return new StitchResponse(200, "ok", null); + } + } + + @Test + public void callbackExampleFlowShouldPushPeopleAndInvokeCallbackWithNames() throws IOException { + Map[] people = new Map[]{ + makePerson(1, "Jerry Garcia"), + makePerson(2, "Omar Rodgriguez Lopez"), + makePerson(3, "Nina Simone"), + makePerson(4, "Joni Mitchell"), + makePerson(5, "David Bowie") + }; + + CollectingFlushHandler flushHandler = new CollectingFlushHandler(); + DummyStitchClient stitch = new DummyStitchClient(flushHandler); + + try (stitch) { + for (Map person : people) { + StitchMessage message = StitchMessage.newUpsert() + .withSequence(System.currentTimeMillis()) + .withData(person); + stitch.push(message, person.get("name")); + } + } + + assertEquals(people.length, stitch.receivedPeople.size()); + assertEquals(Arrays.asList( + "Jerry Garcia", + "Omar Rodgriguez Lopez", + "Nina Simone", + "Joni Mitchell", + "David Bowie"), flushHandler.names); + } +} diff --git a/src/test/java/de/spendit/stitchdata/client/SimpleExampleTest.java b/src/test/java/de/spendit/stitchdata/client/SimpleExampleTest.java new file mode 100644 index 0000000..f347c8f --- /dev/null +++ b/src/test/java/de/spendit/stitchdata/client/SimpleExampleTest.java @@ -0,0 +1,78 @@ +package de.spendit.stitchdata.client; + +import com.cognitect.transit.Reader; +import com.cognitect.transit.TransitFactory; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class SimpleExampleTest { + + private static Map makePerson(int id, String name) { + Map result = new HashMap<>(); + result.put("id", id); + result.put("name", name); + return result; + } + + private static class DummyStitchClient extends StitchClient { + final List> receivedPeople = new ArrayList>(); + + DummyStitchClient() { + super("", 1, "token", "namespace", "people", Arrays.asList("id"), + StitchClientBuilder.DEFAULT_BATCH_SIZE_BYTES, + StitchClientBuilder.DEFAULT_BATCH_DELAY_MILLIS, + null, + null); + } + + @Override + StitchResponse sendToStitch(String body) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)); + Reader reader = TransitFactory.reader(TransitFactory.Format.JSON, bais); + List records = (List) reader.read(); + + for (Object record : records) { + Map row = (Map) record; + Map data = (Map) row.get("data"); + receivedPeople.add(data); + } + + return new StitchResponse(200, "ok", null); + } + } + + @Test + public void simpleExampleFlowShouldPushAllPeople() throws IOException { + Map[] people = new Map[]{ + makePerson(1, "Jerry Garcia"), + makePerson(2, "Omar Rodgriguez Lopez"), + makePerson(3, "Nina Simone"), + makePerson(4, "Joni Mitchell"), + makePerson(5, "David Bowie") + }; + + DummyStitchClient stitch = new DummyStitchClient(); + try (DummyStitchClient ignored = stitch) { + for (Map person : people) { + stitch.push( + StitchMessage.newUpsert() + .withSequence(System.currentTimeMillis()) + .withData(person)); + } + } + + assertEquals(people.length, stitch.receivedPeople.size()); + for (int i = 0; i < people.length; i++) { + Number expectedId = (Number) people[i].get("id"); + Number actualId = (Number) stitch.receivedPeople.get(i).get("id"); + assertEquals(expectedId.longValue(), actualId.longValue()); + assertEquals(people[i].get("name"), stitch.receivedPeople.get(i).get("name")); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/stitchdata/client/StitchClientTest.java b/src/test/java/de/spendit/stitchdata/client/StitchClientTest.java similarity index 84% rename from src/test/java/com/stitchdata/client/StitchClientTest.java rename to src/test/java/de/spendit/stitchdata/client/StitchClientTest.java index 75a33ab..1e86316 100644 --- a/src/test/java/com/stitchdata/client/StitchClientTest.java +++ b/src/test/java/de/spendit/stitchdata/client/StitchClientTest.java @@ -1,19 +1,19 @@ -package com.stitchdata.client; +package de.spendit.stitchdata.client; +import com.cognitect.transit.Reader; +import com.cognitect.transit.TransitFactory; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.List; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicInteger; +import java.lang.reflect.Field; +import java.net.http.HttpClient; +import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import com.cognitect.transit.TransitFactory; -import com.cognitect.transit.Reader; -import org.junit.*; -import static org.junit.Assert.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; /** * Attempts to exercise concurrent calls to {@link @@ -26,7 +26,7 @@ * some debug print statements that we have used to show that batches * at least contain records from multiple threads. */ -public class StitchClientTest { +public class StitchClientTest { private static final int NUM_THREADS = 4; private static final int NUM_RECORDS_PER_THREAD = 10000; @@ -43,7 +43,7 @@ public class StitchClientTest { private class DummyStitchClient extends StitchClient { DummyStitchClient(FlushHandler flushHandler) { - super("", 0, null, null, null, Arrays.asList(new String[] { "id" }), StitchClientBuilder.DEFAULT_BATCH_SIZE_BYTES, 60000000, flushHandler, null); + super("", 0, null, null, null, Arrays.asList(new String[]{"id"}), StitchClientBuilder.DEFAULT_BATCH_SIZE_BYTES, 60000000, flushHandler, null); } @Override @@ -57,8 +57,8 @@ StitchResponse sendToStitch(String body) throws IOException { counts[i] = 0; } for (Object record : records) { - Map data = (Map) ((Map)record).get("data"); - int threadId = ((Long) ((Map)data).get("threadId")).intValue(); + Map data = (Map) ((Map) record).get("data"); + int threadId = ((Long) ((Map) data).get("threadId")).intValue(); counts[threadId]++; numRecordsByThreadId.get(threadId).incrementAndGet(); } @@ -106,16 +106,14 @@ public void run() { record.put("recordId", recordId); try { StitchMessage message = StitchMessage.newUpsert() - .withSequence(0) - .withData(record); + .withSequence(0) + .withData(record); if (useCallback) { stitch.push(message, String.format("thread-%d-record-%d", threadId, recordId)); - } - else { + } else { stitch.push(message); } - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -153,9 +151,21 @@ public void testConcurrentPushesWithoutCallback() throws IOException { } + @Test + public void testUsesHttp11Client() throws Exception { + try (StitchClient stitch = new DummyStitchClient(null)) { + Field httpClientField = StitchClient.class.getDeclaredField("httpClient"); + httpClientField.setAccessible(true); + HttpClient httpClient = (HttpClient) httpClientField.get(stitch); + + assertEquals(HttpClient.Version.HTTP_1_1, httpClient.version()); + } + } + private static class SetFlushHandler implements FlushHandler { final ConcurrentSkipListSet callbackArgsReceived = - new ConcurrentSkipListSet(); + new ConcurrentSkipListSet(); + public void onFlush(List arg) { callbackArgsReceived.addAll(arg); }