Creating and querying real-time Customer Profiles

With this use case, we demonstrate how Quick can be used to create real-time customer profiles that can be populated with live data and queried by applications. In this example we build customer profiles for a music streaming service measuring customer metrics, making charts of the most listen albums, artists and tracks and creating recommendations based on the user's playlist.

Listening Events
{"userId": 402, "artistId": 7, "albumId": 17147, "trackId": 44975, "timestamp": 1568052379},
{"userId": 703, "artistId": 64, "albumId": 17148, "trackId": 44982, "timestamp": 1568052379},
{"userId": 4234, "artistId": 3744, "albumId": 34424, "trackId": 105501, "timestamp": 1568052382},
{"userId": 2843, "artistId": 71, "albumId": 315, "trackId": 2425, "timestamp": 1568052383},
{"userId": 1335, "artistId": 13866, "albumId": 29007, "trackId": 83201, "timestamp": 1568052385},
...

Every time a customer listens to a song a listening event with the album, artist, track id and a timestamp is emitted to an Apache Kafka topic and then processed with Kafka Streams for customer profile creation.

You can find the corresponding code in our GitHub repository. The examples are based on the real world data set LFM-1b. The Kafka Streams application is written with our streams-bootstrap library.

GraphQL Schema
type Query {
    getUserProfile(userId: Long!): UserProfile
}

type UserProfile {
    totalListenCount: Long! @topic(name: "counts", keyArgument: "userId")
    firstListenEvent: Long! @topic(name: "firstlisten", keyArgument: "userId")
    lastListenEvent: Long! @topic(name: "lastlisten", keyArgument: "userId")
    artistCharts: NamedArtistCharts! @topic(name: "topartists", keyArgument: "userId")
    albumCharts: NamedAlbumCharts! @topic(name: "topalbums", keyArgument: "userId")
    trackCharts: NamedTrackCharts! @topic(name: "toptracks", keyArgument: "userId")
}

type NamedArtistCharts {
    topK: [NamedArtistCount!]!
}

type NamedAlbumCharts {
    topK: [NamedAlbumCount!]!
}

type NamedTrackCharts {
    topK: [NamedTrackCount!]!
}
type Item {
    id: Long!
    name: String!
}

type NamedArtistCount {
    id: Long!
    artist: Item! @topic(name: "artists", keyField: "id")
    countPlays: Long!
}

type NamedAlbumCount {
    id: Long!
    album: Item! @topic(name: "albums", keyField: "id")
    countPlays: Long!
}

type NamedTrackCount {
    id: Long!
    track: Item! @topic(name: "tracks", keyField: "id")
    countPlays: Long!
}
...

For modeling and querying data in our example, we first define a schema with GraphQL. Query is one of the core operations in Quick and with getUserProfile we combine six metrics we want to query with the customer profile: The total listening events, the first and the last time a user listened to a song and charts with user’s most listened albums, artists and tracks. Later we will notice that the topics with those charts only contain ids and no names of the corresponding music data. Therefore, we resolve fields like id from topic topartists with artist names from topic artists in our GraphQL schema an call the corresponding type NamedArtistCount.

Quick Context and Gateway
quick context create --host $HOST --key $KEY

quick gateway create profiles
quick gateway apply profiles -f schema_user_profile.gql

We are ready to process and query our data. We start by setting up our Quick instance. First, we initialize the Quick CLI, which requires a base URL and an API-Key. Second, we create a new gateway and apply our GraphQL schema.

Creating topics
type ListeningEvent {
    userId: Long!
    artistId: Long!
    albumId: Long!
    trackId: Long!
    timestamp: Long!
}
quick topic create listeningevents \
--key long \
--value schema -s profiles.ListeningEvent
type Item {
    id: Long!
    name: String!
}
 quick topic create albums \ 
--key long \
--value schema -s profiles.Item

 quick topic create artists \ 
--key long \
--value schema -s profiles.Item

 quick topic create tracks \ 
--key long \
--value schema -s profiles.Item

Then, we create our main topics with the albums, artists and tracks data and the topic for storing all listening events. The command expects the topic name as well as the type or schema of key and value. Since we have complex values, we define a global GraphQL schema and apply it to the gateway. That way we will not need to specify a file, but use <name of the gateway>.<name of the type> from the global GraphQL schema for topic creation.

Simple Kafka Apps
quick topic create firstlisten --key long --value long
quick topic create lastlisten --key long --value long
quick topic create counts --key long --value long
quick app deploy firstlisten \
--registry us.gcr.io/gcp-bakdata-cluster/profile-store \
--image user-listen-activity \
--tag 1.0-SNAPSHOT \
--args input-topics=listeningevents output-topic=firstlisten kind=FIRST productive=false
Charts Kafka Apps
type Charts {
    topK: [ChartRecord!]!
}

type ChartRecord {
    id: Long!
    countPlays: Long!
}
quick topic create topartists --key long \
--value schema -s profiles.Charts

quick topic create topalbums --key long \
--value schema -s profiles.Charts

quick topic create toptracks --key long \
--value schema -s profiles.Charts



quick app deploy topartists \
--registry us.gcr.io/gcp-bakdata-cluster/profile-store \
--image user-charts \
--tag 1.0-SNAPSHOT \
--args input-topics=listeningevents output-topic=topartists productive=false

Next, we create the output topics needed for our Kafka Streams applications. Quick supports running dockerized applications: We can deploy those applications with the command 'quick app deploy...' that expects name, registry, image, tag (version) and arguments.

Query User's Profile
query{
  getUserProfile(userId:3519430){
    totalListenCount
    firstListenEvent
    lastListenEvent
    artistCharts{
      topK{
        id
        countPlays
        artist{
          id
          name
        }
      }
    }
  }
}
{
  "data": {
    "getUserProfile": {
      "totalListenCount": 405,
      "firstListenEvent": 1630485969692,
      "lastListenEvent": 1630501116662,
      "artistCharts": {
        "topK": [
          {
            "id": 924,
            "countPlays": 116,
            "artist": {
              "id": 924,
              "name": "The Killers"
            }
          },
          {
            "id": 1825,
            "countPlays": 89,
            "artist": {
              "id": 1825,
              "name": "Neil Young"
            }
          },
         ...
          {
            "id": 135,
            "countPlays": 63,
            "artist": {
              "id": 135,
              "name": "Nirvana"
            }
          }
         ...
        ]
      }
Recommendation Schema
type Query {
    getArtistRecommendations(
        userId: Long!,
        field: FieldType!=ARTIST,
        limit: Int,
        walks: Int,
        walkLength: Int,
        resetProbability: Float
    ):  Recommendations @rest(url: "http://recommender/recommendation",
        pathParameter: ["userId", "field"],
        queryParameter: ["limit", "walks", "walkLength", "resetProbability"])
}

enum FieldType {
    ARTIST
    ALBUM
    TRACK
}
type Recommendations {
    recommendations: [Recommendation!]!
}

type Recommendation {
    id: Long!
    artist: Item @topic(name: "artists", keyField: "id")
}

Finally, we want to add recommendations to our customer profiles. We can add to our global GraphQL Schema the example Query getArtistRecommendations that takes a few parameters, mandatory are only the userId and field, in our example we set field to ARTIST, but ALBUM or TRACK is also possible. The next four parameters come from the underlying recommendation algorithm SALSA and they are set by default.

Quick provides a custom directive @rest. Any type of REST service can be used in a Quick GraphQL schema. In our example, the result from the recommendation algorithm for a particular user id is return via REST as a list of artist ids. Since we want to recommend artist names, we resolve the ids from the REST service with names from the artists topic.

Recommendation Service
quick app deploy recommender \
--registry us.gcr.io/gcp-bakdata-cluster/profile-store \
--image recommender \
--tag 1.0-SNAPSHOT \
--port 8080 \
--args input-topics=listeningevents productive=false 
{"recommendations":[{"id":2041},{"id":1825},{"id":1871},{"id":2353},{"id":501},{"id":113},{"id":20},{"id":66},{"id":682},{"id":1773}]}
Query User's Artist Recommendation
query{
getArtistRecommendations(userId: 32226961){
    recommendations{
      id
      name
    }
  }
}
{
  "data": {
    "getArtistRecommendations": {
      "recommendations": [
        {
          "id": 2041,
          "name": "Leevi and the Leavings"
        },
        {
          "id": 1825,
          "name": "Neil Young"
        },
        {
          "id": 1871,
          "name": "Mogwai"
        },
        {
          "id": 2353,
          "name": "The National"
        },
          ...     
      ]
    }
  }
}