io = $io; $this->decoder = new AvroIOBinaryDecoder($this->io); $this->datum_reader = $datum_reader; $this->read_header(); $codec = AvroUtil::array_value($this->metadata, AvroDataIO::METADATA_CODEC_ATTR); if ($codec && !AvroDataIO::is_valid_codec($codec)) throw new AvroDataIOException(sprintf('Uknown codec: %s', $codec)); $this->block_count = 0; // FIXME: Seems unsanitary to set writers_schema here. // Can't constructor take it as an argument? $this->datum_reader->set_writers_schema( AvroSchema::parse($this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR])); } /** * Reads header of object container * @throws AvroDataIOException if the file is not an Avro data file. */ private function read_header() { $this->seek(0, AvroIO::SEEK_SET); $magic = $this->read(AvroDataIO::magic_size()); if (strlen($magic) < AvroDataIO::magic_size()) throw new AvroDataIOException( 'Not an Avro data file: shorter than the Avro magic block'); if (AvroDataIO::magic() != $magic) throw new AvroDataIOException( sprintf('Not an Avro data file: %s does not match %s', $magic, AvroDataIO::magic())); $this->metadata = $this->datum_reader->read_data(AvroDataIO::metadata_schema(), AvroDataIO::metadata_schema(), $this->decoder); $this->sync_marker = $this->read(AvroDataIO::SYNC_SIZE); } /** * @internal Would be nice to implement data() as an iterator, I think * @returns array of data from object container. */ public function data() { $data = array(); while (true) { if (0 == $this->block_count) { if ($this->is_eof()) break; if ($this->skip_sync()) if ($this->is_eof()) break; $this->read_block_header(); } $data []= $this->datum_reader->read($this->decoder); $this->block_count -= 1; } return $data; } /** * Closes this writer (and its AvroIO object.) * @uses AvroIO::close() */ public function close() { return $this->io->close(); } /** * @uses AvroIO::seek() */ private function seek($offset, $whence) { return $this->io->seek($offset, $whence); } /** * @uses AvroIO::read() */ private function read($len) { return $this->io->read($len); } /** * @uses AvroIO::is_eof() */ private function is_eof() { return $this->io->is_eof(); } private function skip_sync() { $proposed_sync_marker = $this->read(AvroDataIO::SYNC_SIZE); if ($proposed_sync_marker != $this->sync_marker) { $this->seek(-AvroDataIO::SYNC_SIZE, AvroIO::SEEK_CUR); return false; } return true; } /** * Reads the block header (which includes the count of items in the block * and the length in bytes of the block) * @returns int length in bytes of the block. */ private function read_block_header() { $this->block_count = $this->decoder->read_long(); return $this->decoder->read_long(); } } /** * Writes Avro data to an AvroIO source using an AvroSchema * @package Avro */ class AvroDataIOWriter { /** * @returns string a new, unique sync marker. */ private static function generate_sync_marker() { // From http://php.net/manual/en/function.mt-rand.php comments return pack('S8', mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff) | 0x4000, mt_rand(0, 0xffff) | 0x8000, mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff)); } /** * @var AvroIO object container where data is written */ private $io; /** * @var AvroIOBinaryEncoder encoder for object container */ private $encoder; /** * @var AvroDatumWriter */ private $datum_writer; /** * @var AvroStringIO buffer for writing */ private $buffer; /** * @var AvroIOBinaryEncoder encoder for buffer */ private $buffer_encoder; // AvroIOBinaryEncoder /** * @var int count of items written to block */ private $block_count; /** * @var array map of object container metadata */ private $metadata; /** * @param AvroIO $io * @param AvroIODatumWriter $datum_writer * @param AvroSchema $writers_schema */ public function __construct($io, $datum_writer, $writers_schema=null) { if (!($io instanceof AvroIO)) throw new AvroDataIOException('io must be instance of AvroIO'); $this->io = $io; $this->encoder = new AvroIOBinaryEncoder($this->io); $this->datum_writer = $datum_writer; $this->buffer = new AvroStringIO(); $this->buffer_encoder = new AvroIOBinaryEncoder($this->buffer); $this->block_count = 0; $this->metadata = array(); if ($writers_schema) { $this->sync_marker = self::generate_sync_marker(); $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = AvroDataIO::NULL_CODEC; $this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = strval($writers_schema); $this->write_header(); } else { $dfr = new AvroDataIOReader($this->io, new AvroIODatumReader()); $this->sync_marker = $dfr->sync_marker; $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $dfr->metadata[AvroDataIO::METADATA_CODEC_ATTR]; $schema_from_file = $dfr->metadata[AvroDataIO::METADATA_SCHEMA_ATTR]; $this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = $schema_from_file; $this->datum_writer->writers_schema = AvroSchema::parse($schema_from_file); $this->seek(0, SEEK_END); } } /** * @param mixed $datum */ public function append($datum) { $this->datum_writer->write($datum, $this->buffer_encoder); $this->block_count++; if ($this->buffer->length() >= AvroDataIO::SYNC_INTERVAL) $this->write_block(); } /** * Flushes buffer to AvroIO object container and closes it. * @return mixed value of $io->close() * @see AvroIO::close() */ public function close() { $this->flush(); return $this->io->close(); } /** * Flushes biffer to AvroIO object container. * @returns mixed value of $io->flush() * @see AvroIO::flush() */ private function flush() { $this->write_block(); return $this->io->flush(); } /** * Writes a block of data to the AvroIO object container. * @throws AvroDataIOException if the codec provided by the encoder * is not supported * @internal Should the codec check happen in the constructor? * Why wait until we're writing data? */ private function write_block() { if ($this->block_count > 0) { $this->encoder->write_long($this->block_count); $to_write = strval($this->buffer); $this->encoder->write_long(strlen($to_write)); if (AvroDataIO::is_valid_codec( $this->metadata[AvroDataIO::METADATA_CODEC_ATTR])) $this->write($to_write); else throw new AvroDataIOException( sprintf('codec %s is not supported', $this->metadata[AvroDataIO::METADATA_CODEC_ATTR])); $this->write($this->sync_marker); $this->buffer->truncate(); $this->block_count = 0; } } /** * Writes the header of the AvroIO object container */ private function write_header() { $this->write(AvroDataIO::magic()); $this->datum_writer->write_data(AvroDataIO::metadata_schema(), $this->metadata, $this->encoder); $this->write($this->sync_marker); } /** * @param string $bytes * @uses AvroIO::write() */ private function write($bytes) { return $this->io->write($bytes); } /** * @param int $offset * @param int $whence * @uses AvroIO::seek() */ private function seek($offset, $whence) { return $this->io->seek($offset, $whence); } }