Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<stringType>Utf8</stringType>
<createSetters>false</createSetters>
<fieldVisibility>private</fieldVisibility>
<imports>
<import>src/main/avro/standard_event.avsc</import>
</imports>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
39 changes: 39 additions & 0 deletions dataset/src/main/avro/standard_event.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"name": "StandardEvent",
"namespace": "org.kitesdk.data.event",
"type": "record",
"doc": "A standard event type for logging, based on the paper 'The Unified Logging Infrastructure for Data Analytics at Twitter' by Lee et al, http://vldb.org/pvldb/vol5/p1771_georgelee_vldb2012.pdf",
"fields": [
{
"name": "event_initiator",
"type": "string",
"doc": "Where the event was triggered from in the format {client,server}_{user,app}, e.g. 'client_user'. Required."
},
{
"name": "event_name",
"type": "string",
"doc": "A hierarchical name for the event, with parts separated by ':'. Required."
},
{
"name": "user_id",
"type": "long",
"doc": "A unique identifier for the user. Required."
},
{
"name": "session_id",
"type": "string",
"doc": "A unique identifier for the session. Required."
},
{
"name": "ip",
"type": "string",
"doc": "The IP address of the host where the event originated. Required."
},
{
"name": "timestamp",
"type": "long",
"doc": "The point in time when the event occurred, represented as the number of milliseconds since January 1, 1970, 00:00:00 GMT. Required."
}

]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2014 Cloudera, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kitesdk.examples.data;

import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;

public abstract class BaseEventsTool extends Configured implements Tool {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like any of the code in this class is used, so it would be better to remove it and make GenerateEvents implement Tool directly.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent. Done.


protected String uri = "dataset:hive:events";

@Override
public int run(String[] args) throws Exception {
int start = 0;
if (args.length >= 1 && !args[0].startsWith("--")) {
uri = args[0];
start = 1;
}

List<String> argsList = new LinkedList<String>();
for (int i = start; i < args.length; i++) {
argsList.add(args[i]);
}

return run(argsList);
}

public abstract int run(List<String> args) throws Exception;
}
118 changes: 118 additions & 0 deletions dataset/src/main/java/org/kitesdk/examples/data/GenerateEvents.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2015 Cloudera, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kitesdk.examples.data;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.View;
import org.kitesdk.data.event.StandardEvent;

public class GenerateEvents extends BaseEventsTool {
protected Random random;
protected long baseTimestamp;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specific to a single run, so I think it makes sense to move it into run as a local variable.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseTimestamp is also used in the randomTimestamp method. Are you suggesting that I pass it as a variable each time? Does it really make a difference?

protected long counter;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed because it is now a local variable.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the declaration for the global counter variable.


public GenerateEvents() {
random = new Random();
baseTimestamp = System.currentTimeMillis();
counter = 0l;
}

@Override
public int run(List<String> args) throws Exception {
return 0;
}

public int run(String[] args) throws Exception {

View<StandardEvent> events = Datasets.load(
"dataset:hive:events", StandardEvent.class);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this URI a default and use args[0] if it is defined?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gladly - finally getting the syntax correct for passing variables from the command line to the run() method was the big win in this exercise.


DatasetWriter<StandardEvent> writer = events.newWriter();
try {
Utf8 sessionId = new Utf8("sessionId");
long userId = 0;
Utf8 ip = new Utf8("ip");
int randomEventCount = 0;
while (System.currentTimeMillis() - baseTimestamp < 36000) {
sessionId = randomSessionId();
userId = randomUserId();
ip = randomIp();
randomEventCount = random.nextInt(25);
for (int i=0; i < randomEventCount; i++) {
writer.write(generateRandomEvent(sessionId, userId, ip));
}
}
} finally {
writer.close();
}

System.out.println("Generated " + counter + " events");

return 0;
}

public StandardEvent generateRandomEvent(Utf8 sessionId, long userId, Utf8 ip) {
return StandardEvent.newBuilder()
.setEventInitiator(new Utf8("client_user"))
.setEventName(randomEventName())
.setUserId(userId)
.setSessionId(sessionId)
.setIp(ip)
.setTimestamp(randomTimestamp())
.build();
}

public Utf8 randomEventName() {
return new Utf8("event"+counter++);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is no longer random, it would make more sense to move it into the loop in run and keep track of the counter as a local variable.

}

public long randomUserId() {
return random.nextInt(10);
}

public Utf8 randomSessionId() {
return new Utf8(UUID.randomUUID().toString());
}

public Utf8 randomIp() {
return new Utf8("192.168." + (random.nextInt(254) + 1) + "."
+ (random.nextInt(254) + 1));
}

public long randomTimestamp() {
long delta = System.currentTimeMillis() - baseTimestamp;
delta = delta*1000l+random.nextInt(5000);
return baseTimestamp+delta;
}

public static void main(String... args) throws Exception {
int rc = ToolRunner.run(new Configuration(), new GenerateEvents(), args);

System.exit(rc);
}

}